\documentclass[10pt]{article} \usepackage{fancyvrb} \usepackage{url} \DefineVerbatimEnvironment{lua}{Verbatim}{fontsize=\small,commandchars=\@\#\%} \DefineVerbatimEnvironment{C}{Verbatim}{fontsize=\small,commandchars=\@\#\%} \DefineVerbatimEnvironment{mime}{Verbatim}{fontsize=\small,commandchars=\$\#\%} \newcommand{\stick}[1]{\vbox{\setlength{\parskip}{0pt}#1}} \newcommand{\bl}{\ensuremath{\mathtt{\backslash}}} \title{Filters, sources, sinks, and pumps\\ {\large or Functional programming for the rest of us}} \author{Diego Nehab} \begin{document} \maketitle \begin{abstract} Certain data processing operations can be implemented in the form of filters. A filter is a function that can process data received in consecutive function calls, returning partial results after each invocation. Examples of operations that can be implemented as filters include the end-of-line normalization for text, Base64 and Quoted-Printable transfer content encodings, the breaking of text into lines, SMTP byte stuffing, and there are many others. Filters become even more powerful when we allow them to be chained together to create composite filters. In this context, filters can be seen as the middle links in a chain of data transformations. Sources an sinks are the corresponding end points of these chains. A source is a function that produces data, chunk by chunk, and a sink is a function that takes data, chunk by chunk. In this chapter, we describe the design of an elegant interface for filters, sources, sinks and chaining, refine it until it reaches a high degree of generality. We discuss implementation challenges, provide practical solutions, and illustrate each step with concrete examples. \end{abstract} \section{Introduction} Within the realm of networking applications, we are often required apply transformations to streams of data. Examples include the end-of-line normalization for text, Base64 and Quoted-Printable transfer content encodings, breaking text into lines with a maximum number of columns, SMTP dot-stuffing, \texttt{gzip} compression, HTTP chunked transfer coding, and the list goes on. Many complex tasks require a combination of two or more such transformations, and therefore a general mechanism for promoting reuse is desirable. In the process of designing LuaSocket 2.0, David Burgess and I were forced to deal with this problem. The solution we reached proved to be very general and convenient. It is based on the concepts of filters, sources, sinks, and pumps, which we introduce below. \emph{Filters} are functions that can be repeatedly invoked with chunks of input, successively returning processed chunks of output. More importantly, the result of concatenating all the output chunks must be the same as the result of applying the filter over the concatenation of all input chunks. In fancier language, filters \emph{commute} with the concatenation operator. As a result, chunk boundaries are irrelevant: filters correctly handle input data no matter how it was originally split. A \emph{chain} transparently combines the effect of one or more filters. The interface of a chain must be indistinguishable from the interface of its components. This allows a chained filter to be used wherever an atomic filter is expected. In particular, chains can be chained themselves to create arbitrarily complex operations. Filters can be seen as internal nodes in a network through which data will flow, potentially being transformed many times along its way. Chains connect these nodes together. To complete the picture, we need \emph{sources} and \emph{sinks}. These are the initial and final nodes of the network, respectively. Less abstractly, a source is a function that produces new data every time it is called. Conversely, sinks are functions that give a final destination to the data they receive. Naturally, sources and sinks can also be chained with filters to produce filtered sources and sinks. Finally, filters, chains, sources, and sinks are all passive entities: they must be repeatedly invoked in order for anything to happen. \emph{Pumps} provide the driving force that pushes data through the network, from a source to a sink. These concepts will become less abstract with examples. In the following sections, we start with a simplified interface, which we refine several times until no obvious shortcomings remain. The evolution we present is not contrived: it recreates the steps we followed ourselves as we consolidated our understanding of these concepts and the applications that benefit from them. \subsection{A concrete example} Let us use the end-of-line normalization of text as an example to motivate our initial filter interface. Assume we are given text in an unknown end-of-line convention (including possibly mixed conventions) out of the commonly found Unix (LF), Mac OS (CR), and DOS (CRLF) conventions. We would like to be able to write code like the following: \begin{quote} \begin{lua} @stick# local in = source.chain(source.file(io.stdin), normalize("\r\n")) local out = sink.file(io.stdout) pump.all(in, out) % \end{lua} \end{quote} This program should read data from the standard input stream and normalize the end-of-line markers to the canonic CRLF marker, as defined by the MIME standard. Finally, the normalized text should be sent to the standard output stream. We use a \emph{file source} that produces data from standard input, and chain it with a filter that normalizes the data. The pump then repeatedly obtains data from the source, and passes it to the \emph{file sink}, which sends it to the standard output. In the code above, the \texttt{normalize} \emph{factory} is a function that creates our normalization filter. This filter will replace any end-of-line marker with the canonic `\verb|\r\n|' marker. The initial filter interface is trivial: a filter function receives a chunk of input data, and returns a chunk of processed data. When there are no more input data left, the caller notifies the filter by invoking it with a \texttt{nil} chunk. The filter responds by returning the final chunk of processed data. Although the interface is extremely simple, the implementation is not so obvious. Any filter respecting this interface needs to keep some kind of context between calls. This is because chunks can for example be broken between the CR and LF characters marking the end of a line. This need for contextual storage is what motivates the use of factories: each time the factory is called, it returns a filter with its own context so that we can have several independent filters being used at the same time. For efficiency reasons, we must avoid the obvious solution of concatenating all the input into the context before producing any output. To that end, we will break the implementation in two parts: a low-level filter, and a factory of high-level filters. The low-level filter will be implemented in C and will not carry any context between function calls. The high-level filter factory, implemented in Lua, will create and return a high-level filter that maintains whatever context the low-level filter needs, but isolates the user from its internal details. That way, we take advantage of C's efficiency to perform the hard work, and take advantage of Lua's simplicity for the bookkeeping. \subsection{The Lua part of the filter} Below is the complete implementation of the factory of high-level end-of-line normalization filters: \begin{quote} \begin{lua} @stick# function filter.cycle(low, ctx, extra) return function(chunk) local ret ret, ctx = low(ctx, chunk, extra) return ret end end % @stick# function normalize(marker) return cycle(eol, 0, marker) end % \end{lua} \end{quote} The \texttt{normalize} factory simply calls a more generic factory, the \texttt{cycle} factory. This factory receives a low-level filter, an initial context, and an extra parameter, and returns the corresponding high-level filter. Each time the high-level filer is passed a new chunk, it invokes the low-level filter passing it the previous context, the new chunk, and the extra argument. The low-level filter in turn produces the chunk of processed data and a new context. The high-level filter then updates its internal context, and returns the processed chunk of data to the user. It is the low-level filter that does all the work. Notice that we take advantage of Lua's lexical scoping to store the context in a closure between function calls. Concerning the low-level filter code, we must first accept that there is no perfect solution to the end-of-line marker normalization problem itself. The difficulty comes from an inherent ambiguity on the definition of empty lines within mixed input. However, the following solution works well for any consistent input, as well as for non-empty lines in mixed input. It also does a reasonable job with empty lines and serves as a good example of how to implement a low-level filter. The idea is to consider both CR and~LF as end-of-line \emph{candidates}. We issue a single break if any candidate is seen alone, or followed by a different candidate. In other words, CR~CR~and LF~LF each issue two end-of-line markers, whereas CR~LF~and LF~CR issue only one marker each. This idea correctly handles the Unix, DOS/MIME, VMS, and Mac OS, as well as other more obscure conventions. \subsection{The C part of the filter} Our low-level filter is divided into two simple functions. The inner function actually does the conversion. It takes each input character in turn, deciding what to output and how to modify the context. The context tells if the last character processed was an end-of-line candidate, and if so, which candidate it was. \begin{quote} \begin{C} @stick# @#define candidate(c) (c == CR || c == LF) static int process(int c, int last, const char *marker, luaL_Buffer *buffer) { if (candidate(c)) { if (candidate(last)) { if (c == last) luaL_addstring(buffer, marker); return 0; } else { luaL_addstring(buffer, marker); return c; } } else { luaL_putchar(buffer, c); return 0; } } % \end{C} \end{quote} The inner function makes use of Lua's auxiliary library's buffer interface for efficiency. The outer function simply interfaces with Lua. It receives the context and the input chunk (as well as an optional custom end-of-line marker), and returns the transformed output chunk and the new context. \begin{quote} \begin{C} @stick# static int eol(lua_State *L) { int ctx = luaL_checkint(L, 1); size_t isize = 0; const char *input = luaL_optlstring(L, 2, NULL, &isize); const char *last = input + isize; const char *marker = luaL_optstring(L, 3, CRLF); luaL_Buffer buffer; luaL_buffinit(L, &buffer); if (!input) { lua_pushnil(L); lua_pushnumber(L, 0); return 2; } while (input < last) ctx = process(*input++, ctx, marker, &buffer); luaL_pushresult(&buffer); lua_pushnumber(L, ctx); return 2; } % \end{C} \end{quote} Notice that if the input chunk is \texttt{nil}, the operation is considered to be finished. In that case, the loop will not execute a single time and the context is reset to the initial state. This allows the filter to be reused many times. When designing your own filters, the challenging part is to decide what will be the context. For line breaking, for instance, it could be the number of bytes left in the current line. For Base64 encoding, it could be a string with the bytes that remain after the division of the input into 3-byte atoms. The MIME module in the LuaSocket distribution has many other examples. \section{Filter chains} Chains add a lot to the power of filters. For example, according to the standard for Quoted-Printable encoding, the text must be normalized into its canonic form prior to encoding, as far as end-of-line markers are concerned. To help specifying complex transformations like these, we define a chain factory that creates a composite filter from one or more filters. A chained filter passes data through all its components, and can be used wherever a primitive filter is accepted. The chaining factory is very simple. All it does is return a function that passes data through all filters and returns the result to the user. The auxiliary function~\texttt{chainpair} can only chain two filters together. In the auxiliary function, special care must be taken if the chunk is the last. This is because the final \texttt{nil} chunk notification has to be pushed through both filters in turn: \begin{quote} \begin{lua} @stick# local function chainpair(f1, f2) return function(chunk) local ret = f2(f1(chunk)) if chunk then return ret else return ret .. f2() end end end % @stick# function filter.chain(...) local f = arg[1] for i = 2, table.getn(arg) do f = chainpair(f, arg[i]) end return f end % \end{lua} \end{quote} Thanks to the chain factory, we can trivially define the Quoted-Printable conversion: \begin{quote} \begin{lua} @stick# local qp = filter.chain(normalize("\r\n"), encode("quoted-printable")) local in = source.chain(source.file(io.stdin), qp) local out = sink.file(io.stdout) pump.all(in, out) % \end{lua} \end{quote} \section{Sources, sinks, and pumps} The filters we introduced so far act as the internal nodes in a network of transformations. Information flows from node to node (or rather from one filter to the next) and is transformed on its way out. Chaining filters together is our way to connect nodes in this network. As the starting point for the network, we need a source node that produces the data. In the end of the network, we need a sink node that gives a final destination to the data. \subsection{Sources} A source returns the next chunk of data each time it is invoked. When there is no more data, it simply returns \texttt{nil}. In the event of an error, the source can inform the caller by returning \texttt{nil} followed by an error message. Below are two simple source factories. The \texttt{empty} source returns no data, possibly returning an associated error message. The \texttt{file} source is more usefule, and yields the contents of a file in a chunk by chunk fashion. \begin{quote} \begin{lua} @stick# function source.empty(err) return function() return nil, err end end % @stick# function source.file(handle, io_err) if handle then return function() local chunk = handle:read(2048) if not chunk then handle:close() end return chunk end else return source.empty(io_err or "unable to open file") end end % \end{lua} \end{quote} \subsection{Filtered sources} It is often useful to chain a source with a filter. A filtered source passes its data through the associated filter before returning it to the caller. Here is a factory that does the job: \begin{quote} \begin{lua} @stick# function source.chain(src, f) return source.simplify(function() if not src then return nil end local chunk, err = src() if not chunk then src = nil return f(nil) else return f(chunk) end end) end % \end{lua} \end{quote} Our motivating example in the introduction chains a source with a filter. Filtered sources are useful when working with functions that get their input data from a source (such as the pump in the example). By chaining a source with one or more filters, the function can be transparently provided with filtered data, with no need to change its interface. \subsection{Sinks} Just as we defined an interface for sources of data, we can also define an interface for a destination for data. We call any function respecting this interface a \emph{sink}. In our first example, we used a file sink connected to the standard output. Sinks receive consecutive chunks of data, until the end of data is notified with a \texttt{nil} chunk. A sink can be notified of an error with an optional extra argument that contains the error message, following a \texttt{nil} chunk. If a sink detects an error itself, and wishes not to be called again, it can return \texttt{nil}, followed by an error message. A return value that is not \texttt{nil} means the source will accept more data. Below are two useful sink factories. The table factory creates a sink that stores individual chunks into an array. The data can later be efficiently concatenated into a single string with Lua's \texttt{table.concat} library function. The \texttt{null} sink simply discards the chunks it receives: \begin{quote} \begin{lua} @stick# function sink.table(t) t = t or {} local f = function(chunk, err) if chunk then table.insert(t, chunk) end return 1 end return f, t end % @stick# local function null() return 1 end function sink.null() return null end % \end{lua} \end{quote} Naturally, filtered sinks are just as useful as filtered sources. A filtered sink passes each chunk it receives through the associated filter before handing it to the original sink. In the following example, we use a source that reads from the standard input. The input chunks are sent to a table sink, which has been coupled with a normalization filter. The filtered chunks are then concatenated from the output array, and finally sent to standard out: \begin{quote} \begin{lua} @stick# local in = source.file(io.stdin) local out, t = sink.table() out = sink.chain(normalize("\r\n"), out) pump.all(in, out) io.write(table.concat(t)) % \end{lua} \end{quote} \subsection{Pumps} Adrian Sietsma noticed that, although not on purpose, our interface for sources is compatible with Lua iterators. That is, a source can be neatly used in conjunction with \texttt{for} loops. Using our file source as an iterator, we can write the following code: \begin{quote} \begin{lua} @stick# for chunk in source.file(io.stdin) do io.write(chunk) end % \end{lua} \end{quote} Loops like this will always be present because everything we designed so far is passive. Sources, sinks, filters: none of them can do anything on their own. The operation of pumping all data a source can provide into a sink is so common that it deserves its own function: \begin{quote} \begin{lua} @stick# function pump.step(src, snk) local chunk, src_err = src() local ret, snk_err = snk(chunk, src_err) return chunk and ret and not src_err and not snk_err, src_err or snk_err end % @stick# function pump.all(src, snk, step) step = step or pump.step while true do local ret, err = step(src, snk) if not ret then return not err, err end end end % \end{lua} \end{quote} The \texttt{pump.step} function moves one chunk of data from the source to the sink. The \texttt{pump.all} function takes an optional \texttt{step} function and uses it to pump all the data from the source to the sink. We can now use everything we have to write a program that reads a binary file from disk and stores it in another file, after encoding it to the Base64 transfer content encoding: \begin{quote} \begin{lua} @stick# local in = source.chain( source.file(io.open("input.bin", "rb")), encode("base64")) local out = sink.chain( wrap(76), sink.file(io.open("output.b64", "w"))) pump.all(in, out) % \end{lua} \end{quote} The way we split the filters here is not intuitive, on purpose. Alternatively, we could have chained the Base64 encode filter and the line-wrap filter together, and then chain the resulting filter with either the file source or the file sink. It doesn't really matter. \section{Exploding filters} Our current filter interface has one flagrant shortcoming. When David Burgess was writing his \texttt{gzip} filter, he noticed that a decompression filter can explode a small input chunk into a huge amount of data. To address this, we decided to change our filter interface to allow exploding filters to return large quantities of output data in a chunk by chunk manner. More specifically, after passing each chunk of input data to a filter and collecting the first chunk of output data, the user must now loop to receive data from the filter until no filtered data is left. Within these secondary calls, the caller passes an empty string to the filter. The filter responds with an empty string when it is ready for the next input chunk. In the end, after the user passes a \texttt{nil} chunk notifying the filter that there is no more input data, the filter might still have to produce too much output data to return in a single chunk. The user has to loop again, this time passing \texttt{nil} each time, until the filter itself returns \texttt{nil} to notify the user it is finally done. Fortunately, it is very easy to modify a filter to respect the new interface. In fact, the end-of-line translation filter we presented earlier already conforms to it. The complexity is encapsulated within the chaining functions, which must now include a loop. Since these functions only have to be written once, the user is not affected. Interestingly, the modifications do not have a measurable negative impact in the the performance of filters that do not need the added flexibility. On the other hand, for a small price in complexity, the changes make exploding filters practical. \section{A complex example} The LTN12 module in the \texttt{LuaSocket} distribution implements the ideas we have described. The MIME and SMTP modules are especially integrated with LTN12, and can be used to showcase the expressive power of filters, sources, sinks, and pumps. Below is an example of how a user would proceed to define and send a multipart message with attachments, using \texttt{LuaSocket}: \begin{quote} \begin{mime} local smtp = require"socket.smtp" local mime = require"mime" local ltn12 = require"ltn12" local message = smtp.message{ headers = { from = "Sicrano ", to = "Fulano ", subject = "A message with an attachment"}, body = { preamble = "Hope you can see the attachment\r\n", [1] = { body = "Here is our logo\r\n"}, [2] = { headers = { ["content-type"] = 'image/png; name="luasocket.png"', ["content-disposition"] = 'attachment; filename="luasocket.png"', ["content-description"] = 'LuaSocket logo', ["content-transfer-encoding"] = "BASE64"}, body = ltn12.source.chain( ltn12.source.file(io.open("luasocket.png", "rb")), ltn12.filter.chain( mime.encode("base64"), mime.wrap()))}}} assert(smtp.send{ rcpt = "", from = "", source = message}) \end{mime} \end{quote} The \texttt{smtp.message} function receives a table describing the message, and returns a source. The \texttt{smtp.send} function takes this source, chains it with the SMTP dot-stuffing filter, creates a connects a socket sink to the server, and simply pumps the data. The message is never assembled in memory. Everything is produced on demand, transformed in small pieces, and sent to the server in chunks, including the file attachment that is loaded from disk and encoded on the fly. It just works. \section{Conclusions} In this article we introduce the concepts of filters, sources, sinks, and pumps to the Lua language. These are useful tools for data processing in general. Sources provide a simple abstraction for data acquisition. Sinks provide an abstraction for final data destinations. Filters define an interface for data transformations. The chaining of filters, sources and sinks provides an elegant way to create arbitrarily complex data transformation from simpler transformations. Pumps simply move the data through. \end{document}