Introduction to Functional Parsing in Python

First of all, I should tell you why parsers are so beautifully expressed in a functional framework: a lot of grammars have an inherently recursive structure. Take as an example a structure of nested lists:

[a,[b,c],d,[[e]]]

From such an example we can expect to see recursion playing a role in parsing. In BNF this looks like:

<expr>   ::= <list> | <symbol>
<list>   ::= '[' <seq> ']'
<seq>    ::= <expr> | <expr> ',' <seq>
<symbol> ::= 'a-z' | <symbol>

Note: if you find this notation confusing, you may want to take a look at this video.

Notice the definition of a <list> depends on <seq> depends on <expr> depends on <list>. Seen like this, even <seq> and <symbol> are recursive in the BNF; although in this case the (simple) recursion expresses the sequential nature of these grammars. In the case of sequences, we may replace the recursion with an (imperative) for-loop and be done with it. In almost every real word case however, we deal with more involved recursions like the nested list example. Functional parsers often have definitions that lie closer to the BNF notation shown above. Functional languages in which these parsers are implemented often have the features (either tail-call elimination or lazy evaluation) to handle these definitions without too much loss of performance.

Tail recursion in BNF

Let’s expand the definition of <list> by replacing <seq> with its definition (We’ll use parentheses and ellipses in free but predictable manner).

<list> = '[' (<expr> | <expr> ',' <seq>) ']'
<list> = '[' ((<list> | <symbol>) | (<list> | <symbol>) ',' <seq>) ']'

We may now choose a path in this expression. A possible <list> looks like:

<list> =? '[' <list> ',' <seq> ']'

We see <list> reappearing in a position that is not at the tail-end of the BNF expression. Contrast this to <symbol>:

<symbol> = 'a-z' 'a-z' ... ('a-z' | <symbol>)

No matter how we expand the expression, the recursion is always at the end. In evaluating a tail-recursion, the recursion can always be converted to a (more efficient) loop.

Because Python is neither lazy nor tail-recursive, we need to do a bit more work. We use a trampoline to effect tail recursion.

Functional parsers (or parser-combinators) are a method of writing and combining parsers. They allow you to build complex parsers from the ground up using a set of basic primitives. The structure is outlined in the beautiful Functional Pearl by Hutton and Meijer [@monadic-parsing].

Since monads are not really a thing in Python, we’ll have to somehow translate these concepts. In its most basic form, a parser is a function that takes a string and returns a parsed object together with the rest of the string. Graham Hutton puts it in a rhyme, which I changed slightly to suit our context:

A parser for things
Is a function from strings
To options of pairs
Of things and strings

The optional part in our case means: may throw an exception instead.

First parsers

[2]:
from typing import Optional, Union, Callable, Any, TypeVar, Generic

T = TypeVar("T")
Parser = Callable[[str], tuple[T, str]]

Failures

If the parser fails, a Failure is raised.

[3]:
from dataclasses import dataclass

class Failure(Exception):
    def __init__(self, msg: str):
        self.msg = msg

    def __str__(self):
        return self.msg


class EndOfInput(Failure):
    def __init__(self):
        super().__init__("End of input.")


class Expected(Failure):
    def __init__(self, inp: str, msg: str):
        self.msg = msg
        self.inp = inp

    def __str__(self):
        return f"{self.msg}, got: {self.inp}"

We start with an example of a function that parses a positive integer. Here we use a regular expression to parse the integer itself. From this first parser we’ll expand out to write something that can parse an arbitrarily nested list of integers.

[4]:
import re

def integer(inp: str) -> tuple[int, str]:
    if m := re.match("-?[1-9][0-9]*", inp):
        result = int(m[0])
        rest = inp[m.end(0):]
        return (result, rest)
    else:
        raise Expected(inp, "a number")

Let’s test this:

[5]:
integer("42")
[5]:
(42, '')

Note that the second return value is the part of the input that is remaining after our parser is done.

[6]:
integer("89, 20, 40")
[6]:
(89, ', 20, 40')
[7]:
integer("hello")
Expected: a number, got: hello

Now say we want to parse two integers. We need to be able to parse some whitespace.

[8]:
def whitespace(inp: str) -> tuple[str, str]:
    m = re.match("\s+", inp)
    if not m:
        raise Expected(inp, "whitespace")
    return (m[0], inp[m.end(0):])
[9]:
whitespace("    abcd")
[9]:
('    ', 'abcd')
[10]:
whitespace("abcd")
Expected: whitespace, got: abcd

In many cases whitespace is optional. We can write another parser that catches the exception for us.

[11]:
def optional(p: Parser[T]) -> Parser[Optional[T]]:
    def optional_p(inp: str) -> tuple[Optional[T], str]:
        try:
            (x, inp) = p(inp)
        except Failure:
            return (None, inp)
        return (x, inp)
    return optional_p
[12]:
optional(whitespace)("abcd")
[12]:
(None, 'abcd')

Let us try to parse a pair of integers now.

[13]:
def twice(p: Parser[T]) -> Parser[tuple[T, T]]:
    def twiced(inp: str) -> tuple[tuple[T, T], str]:
        (a, inp) = p(inp)
        (_, inp) = whitespace(inp)
        (b, inp) = p(inp)
        return ((a, b), inp)
    return twiced
[14]:
twice(integer)("1 2 3 4")
[14]:
((1, 2), ' 3 4')

We could also go for a comma separated list of integers. First we need to parse a comma.

[15]:
def char(allowed: str) -> Parser[str]:
    def char_p(inp: str) -> tuple[str, str]:
        if not inp or inp[0] not in allowed:
            raise Expected(inp, f"one of \"{allowed}\"")
        return (inp[0], inp[1:])
    return char_p
[16]:
char("abc")("abbcaabd")
[16]:
('a', 'bbcaabd')
[17]:
char("abc")("d")
Expected: one of "abc", got: d
[18]:
comma = char(",")
comma(", 20, 40")
[18]:
(',', ' 20, 40')
[19]:
comma("89, 20, 40")
Expected: one of ",", got: 89, 20, 40

It will be convenient to have a function that parses a thing and then also consumes the whitespace after it.

[20]:
def tokenize(p: Parser[T]) -> Parser[T]:
    def tokenized(inp: str) -> tuple[T, str]:
        (x, inp) = p(inp)
        (_, inp) = optional(whitespace)(inp)
        return (x, inp)
    return tokenized

Notice that the definition of tokenize no longer contains any raw string processing! Also, see how the whitespace is stripped from our input!

[21]:
comma = tokenize(char(","))
comma(",     ######")
[21]:
(',', '######')

We now want to parse a list of integers, separated by commas. We can try to do this recursively.

[22]:
U = TypeVar("U")

def sep_by(p: Parser[T], sep: Parser[U]) -> Parser[list[T]]:
    def sep_byed(inp: str) -> tuple[list[T], str]:
        # parse an integer
        (x, inp) = p(inp)
        try:
            # parse a comma
            (_, _inp) = sep(inp)
            # parse the rest of the list
            (rest, inp) = sep_byed(_inp)
        except Failure:
            return ([x], inp)
        return ([x] + rest, inp)
    return sep_byed
[23]:
sep_by(integer, comma)("1, 2, 3, 4, abcde")
[23]:
([1, 2, 3, 4], ', abcde')

This implementation is not so effecient, and because it uses recursion we run the risc of stack overflow. We can write the inner function in an imperative style to be more efficient here. Later on we will see that we can write recursive parsers safely, but even then, seeing that this is Python and not Haskell, it can be advantageous to rewrite some core parsers imperatively. The important bit is that the framework still stands: the outer world doesn’t see your trash.

[24]:
def sep_by(p: Parser[T], sep: Parser[U]) -> Parser[list[T]]:
    def sep_byed(inp: str) -> tuple[list[T], str]:
        result = []
        inp_bc = inp
        while True:
            try:
                (x, inp_bc) = p(inp)
                result.append(x)
                (_, inp) = sep(inp_bc)
            except Failure:
                return (result, inp_bc)
    return sep_byed
[25]:
sep_by(integer, comma)("1, 2, 3, 4, abcde")
[25]:
([1, 2, 3, 4], ', abcde')

Many and some

many and some are bread and butter of parser combinators. An important fact to note here is that many will always succeed. Having nested many parsers may result in infinite loops.

[26]:
def many(p: Parser[T]) -> Parser[list[T]]:
    def manied(inp: str) -> tuple[list[T], str]:
        result = []
        while True:
            try:
                (x, inp) = p(inp)
                result.append(x)
            except Failure:
                return (result, inp)
    return manied
[27]:
def some(p: Parser[T]) -> Parser[list[T]]:
    def somed(inp: str) -> tuple[list[T], str]:
        (x, inp) = p(inp)
        (rest, inp) = many(p)(inp)
        return ([x] + rest, inp)
    return somed

As an example, we can now write a parser for a list of integers, S-expression style.

[28]:
def list_of(p: Parser[T]) -> Parser[list[T]]:
    def listed(inp: str) -> tuple[list[T], str]:
        (_, inp) = tokenize(char("("))(inp)
        (x, inp) = many(p)(inp)
        (_, inp) = tokenize(char(")"))(inp)
        return (x, inp)
    return listed
[29]:
list_of(tokenize(integer))("(1 2 3)")
[29]:
([1, 2, 3], '')

Choices

Another common pattern is when we have several options to parse. For the choice parser we need a new type of Failure, namely one that outlines the different expectations and why they all failed. The choice parser tries each parser in the order they were given; the first parser to succeed gets the go-ahead.

[30]:
class ChoiceFailure(Expected):
    def __init__(self, inp, failures):
        super().__init__(inp, "")
        self.failures = failures

    def __str__(self):
        return "All options failed:\n" + "\n".join(
            "    | " + f.msg for f in self.failures) + \
            f"\ngot: {self.inp}"

def choice(*ps: Parser[T]) -> Parser[T]:
    def choiced(inp: str) -> tuple[T, str]:
        failures = []
        for p in ps:
            try:
                (x, inp) = p(inp)
            except Failure as f:
                failures.append(f)
                continue
            else:
                return (x, inp)
        raise ChoiceFailure(inp, failures)
    return choiced
[31]:
many(choice(char("abc"), char("123")))("12ab3#$*&(*&@")
[31]:
(['1', '2', 'a', 'b', '3'], '#$*&(*&@')
[32]:
some(choice(char("abc"), char("123"), whitespace))("#&*(&@")
ChoiceFailure: All options failed:
    | one of "abc"
    | one of "123"
    | whitespace
got: #&*(&@

Nested structures

We are now ready to parse a nested list structure. Note that the only reason why we put this parser in a function explicitely is to allow the recursion.

[33]:
def nested_list(p: Parser[T]) -> Parser[Any]:
    def nested_listed(inp: str) -> tuple[Any, str]:
        return choice(p, list_of(nested_list(p)))(inp)
    return nested_listed
[34]:
nested_list(tokenize(integer))("(1 2 (3 4  )5 (() (6)))")
[34]:
([1, 2, [3, 4], 5, [[], [6]]], '')

This concludes the basic introduction to parser combinators. What is left is finding ways of writing down parsers in a nicer way. First we’ll have to deal with the pesky problem of recursion.

Sequencing

We have seen several instances where we had to sequence (x, inp) = parse(inp). It would be nice if we could combine parsers in a smarter way, and have them plug together. In other words: we can make these parsers more composable. Suppose we want a function sequence(*ps) that parses each argument in order, and returns the result of the last parser. An imperative solution looks as follows:

[35]:
def sequence(*ps: Parser[Any]) -> Parser[Any]:
    def sequenced(inp: str) -> tuple[Any, str]:
        for p in ps:
            (x, inp) = p(inp)
        return (x, inp)
    return sequenced

Every intermediate result is thrown away. Now suppose we have two parsers, we need the result of the first one, but only if the second one succeeds. We saw this case before when we were parsing lists. We make things a bit easier by omitting commas, so we’re parsing (1 2 3) now.

[36]:
def list_of(p: Parser[T]) -> Parser[list[T]]:
    return sequence(char("("), closed_by(many(p), char(")")))

How would we implement closed_by()?

[37]:
def closed_by(p1: Parser[T], p2: Parser[U]) -> Parser[T]:
    def closed_byed(inp: str) -> tuple[T, str]:
        (x, inp) = p1(inp)
        (_, inp) = p2(inp)
        return (x, inp)
    return closed_byed
[38]:
def value(x: T) -> Parser[T]:
    def valued(inp: str) -> tuple[T, str]:
        return (x, inp)
    return valued

def closed_by(p1: Parser[T], p2: Parser[U]) -> Parser[T]:
    # return bind(p1, lambda x: bind(p2, lambda _: value(x)))
    return p1 >> (lambda x: p2 >> (lambda _: value(x)))
[39]:
def sequence_recur(p: Parser[Any], *ps: Parser[Any]) -> Parser[Any]:
    def sequenced(inp: str) -> tuple[Any, str]:
        (_, inp) = p(inp)
        return sequence_recur(*ps)(inp)

    if ps:
        return sequenced
    else:
        return p


def bind(p: Parser[T], f: Callable[(T,), Parser[U]]) -> Parser[U]:
    def bound(inp: str) -> tuple[U, str]:
        (x, inp) = p(inp)
        return f(x)(inp)
    return bound


def sequence_bind(p: Parser[Any], *ps: Parser[Any]) -> Parser[Any]:
    if ps:
        return bind(p, lambda _: sequence_bind(*ps))
    else:
        return p

Trampolines

Trampolines can be a bit tricky to understand, so we first explain them using the example of a factorial function. The factorial of \(n\), noted \(n! := n (n - 1)!\), also \(0! := 1\).

[40]:
def factorial(n):
    if n == 0:
        return 1
    else:
        return n * factorial(n - 1)
[41]:
factorial(10)
[41]:
3628800
[42]:
factorial(10000)
RecursionError: maximum recursion depth exceeded

How can we elude this recursion? We could write a for-loop, but let’s say we still want to use recursion. After all, when we’re writing parsers recursions can become inevitable: see for instance the nested list example. First what we need to change is to make the function tail-recursive. This is done by adding an accumulator argument.

[43]:
def factorial(n, acc=1):
    if n == 0:
        return acc
    else:
        return factorial(n-1, acc*n)

The difference is that we return the direct result of the recursive function call. In the previous version we still had to multiply the result of the recursion by n. If the return value of a function is the direct result of another (recursive) function call, we may call the function tail-recursive. Written in this way, we can prevent the call from allocating another stack frame.

What we do is delaying the call.

We store the function call in an object of class Trampoline. We may call the compute() method on this class to start a loop. This loop keeps evaluating successive calls, until the result is something else than a Trampoline object.

[44]:
class Trampoline:
    def __init__(self, func, *args, **kwargs):
        self.func = func
        self.args = args
        self.kwargs = kwargs

    def compute(self):
        cont = self
        while True:
            cont = cont.func(*cont.args, **cont.kwargs)
            if not isinstance(cont, Trampoline):
                return cont

def factorial(n, acc=1):
    if n == 0:
        return acc
    else:
        return Trampoline(factorial, n-1, acc*n)
[45]:
from math import log10
log10(factorial(10000).compute())
[45]:
35659.45427452078

We can make this example a bit more beautiful by providing a decorator.

[46]:
from functools import (wraps, partial)

def trampoline(f):
    return wraps(f)(partial(Trampoline, f))

Now we can keep our old definition!

[47]:
@trampoline
def factorial(n, acc=1):
    if n == 0:
        return acc
    else:
        return factorial(n-1, acc*n)
[48]:
log10(factorial(10000).compute())
[48]:
35659.45427452078

The full parser

These parsers had us chain several (x, inp) = some_parser(inp) together. It would be cool if we had a better syntax to write that. To do this however, we need to wrap our parser functions in a class. Then the question is also: how do we propagate the return value x through this chain? In Haskell this problem is solved with monads. We can try to do the same in Python, especially now that we have trampolines.

There will be two more additions to the scheme outlined above: the cursor and the auxiliary state object.

Cursor

When parsing larger files it can be inefficient to parse a text character by character. Often, we need to pass a certain amount of text to a secondary function that then converts the string into a value for us. Instead of working directly with strings we can use a Cursor class.

In normal parsing mode, we need only to know the position of the cursor within the text; however, on many occasion it is useful to have a cursor object that spans a selection of text being parsed.

[49]:
@dataclass
class Cursor:
    data: bytes
    begin: int
    end: int
    encoding: str = "utf-8"

    def __len__(self):
        """Length of selection."""
        return self.end - self.begin

    def __bool__(self):
        """Returns `True` if there is more input to parse, `False` otherwise."""
        return self.end < len(self.data)

    ...

For the full definition, see the corresponding API documentation.

Auxiliary state

The same way we pass the cursor state through all the parser calls, we can track auxiliary state. The complete function signature of a parser then looks like this:

[50]:
def some_parser(cursor: Cursor, aux: Any) -> tuple[T, Cursor, Any]:
    pass

In the current set of parsers, this state is used as a stack. The stack is initialized with the empty list, then values can be pushed and popped using the push and pop parsers. The pop parser takes an optional transfer argument that maps the content of the popped value.

[51]:
from byteparsing import sequence, push, pop

sequence(push(42), pop()).parse(b"")
[51]:
42
[59]:
from byteparsing.parsers import integer, tokenize

sequence(
    tokenize(integer),
    tokenize(integer) >> push,
    tokenize(integer),
    pop()).parse(b"1 2 3")
[59]:
2

While powerful as a mechanism for maintaining auxiliary state, in many cases, using a named_sequence instead of sequence can make your code a lot more readable.

[62]:
from byteparsing.parsers import named_sequence

named_sequence(
    _1=tokenize(integer),
    result=tokenize(integer),
    _2=tokenize(integer)
).parse(b"1 2 3")
[62]:
{'result': 2}

Another use for the auxiliary variable, is to hide some configuration variable: see Parser grammar: using_config and with_config.

Hide trace backs

For the purpose of this notebook it was better to hide tracebacks on exceptions. Run this cell first, then the rest.

[53]:
import sys
ipython = get_ipython()

def exception_handler(exception_type, exception, traceback):
    print("%s: %s" % (exception_type.__name__, exception), file=sys.stderr)

ipython._showtraceback = exception_handler