2007-05-31 23:23:42 +02:00
|
|
|
\documentclass[10pt]{article}
|
|
|
|
\usepackage{fancyvrb}
|
|
|
|
\usepackage{url}
|
|
|
|
\DefineVerbatimEnvironment{lua}{Verbatim}{fontsize=\small,commandchars=\@\#\%}
|
|
|
|
\DefineVerbatimEnvironment{C}{Verbatim}{fontsize=\small,commandchars=\@\#\%}
|
|
|
|
\DefineVerbatimEnvironment{mime}{Verbatim}{fontsize=\small,commandchars=\$\#\%}
|
|
|
|
\newcommand{\stick}[1]{\vbox{\setlength{\parskip}{0pt}#1}}
|
|
|
|
\newcommand{\bl}{\ensuremath{\mathtt{\backslash}}}
|
2007-10-11 23:16:29 +02:00
|
|
|
|
2007-05-31 23:23:42 +02:00
|
|
|
|
|
|
|
\title{Filters, sources, sinks, and pumps\\
|
|
|
|
{\large or Functional programming for the rest of us}}
|
|
|
|
\author{Diego Nehab}
|
|
|
|
|
|
|
|
\begin{document}
|
|
|
|
|
|
|
|
\maketitle
|
|
|
|
|
|
|
|
\begin{abstract}
|
2007-10-11 23:16:29 +02:00
|
|
|
Certain data processing operations can be implemented in the
|
|
|
|
form of filters. A filter is a function that can process data
|
|
|
|
received in consecutive function calls, returning partial
|
|
|
|
results after each invocation. Examples of operations that can be
|
|
|
|
implemented as filters include the end-of-line normalization
|
|
|
|
for text, Base64 and Quoted-Printable transfer content
|
|
|
|
encodings, the breaking of text into lines, SMTP dot-stuffing,
|
|
|
|
and there are many others. Filters become even
|
|
|
|
more powerful when we allow them to be chained together to
|
|
|
|
create composite filters. In this context, filters can be seen
|
|
|
|
as the middle links in a chain of data transformations. Sources an sinks
|
|
|
|
are the corresponding end points of these chains. A source
|
|
|
|
is a function that produces data, chunk by chunk, and a sink
|
|
|
|
is a function that takes data, chunk by chunk. In this
|
|
|
|
article, we describe the design of an elegant interface for filters,
|
|
|
|
sources, sinks, and chaining, and illustrate each step
|
|
|
|
with concrete examples.
|
2007-05-31 23:23:42 +02:00
|
|
|
\end{abstract}
|
|
|
|
|
2007-10-11 23:16:29 +02:00
|
|
|
|
2007-05-31 23:23:42 +02:00
|
|
|
\section{Introduction}
|
|
|
|
|
|
|
|
Within the realm of networking applications, we are often
|
2007-10-11 23:16:29 +02:00
|
|
|
required apply transformations to streams of data. Examples
|
2007-05-31 23:23:42 +02:00
|
|
|
include the end-of-line normalization for text, Base64 and
|
|
|
|
Quoted-Printable transfer content encodings, breaking text
|
|
|
|
into lines with a maximum number of columns, SMTP
|
|
|
|
dot-stuffing, \texttt{gzip} compression, HTTP chunked
|
|
|
|
transfer coding, and the list goes on.
|
|
|
|
|
|
|
|
Many complex tasks require a combination of two or more such
|
|
|
|
transformations, and therefore a general mechanism for
|
|
|
|
promoting reuse is desirable. In the process of designing
|
2007-10-11 23:16:29 +02:00
|
|
|
\texttt{LuaSocket~2.0}, David Burgess and I were forced to deal with
|
|
|
|
this problem. The solution we reached proved to be very
|
|
|
|
general and convenient. It is based on the concepts of
|
|
|
|
filters, sources, sinks, and pumps, which we introduce
|
|
|
|
below.
|
2007-05-31 23:23:42 +02:00
|
|
|
|
|
|
|
\emph{Filters} are functions that can be repeatedly invoked
|
|
|
|
with chunks of input, successively returning processed
|
|
|
|
chunks of output. More importantly, the result of
|
|
|
|
concatenating all the output chunks must be the same as the
|
2007-06-01 00:27:40 +02:00
|
|
|
result of applying the filter to the concatenation of all
|
2007-05-31 23:23:42 +02:00
|
|
|
input chunks. In fancier language, filters \emph{commute}
|
2007-10-11 23:16:29 +02:00
|
|
|
with the concatenation operator. As a result, chunk
|
|
|
|
boundaries are irrelevant: filters correctly handle input
|
|
|
|
data no matter how it is split.
|
|
|
|
|
|
|
|
A \emph{chain} transparently combines the effect of one or
|
|
|
|
more filters. The interface of a chain is
|
|
|
|
indistinguishable from the interface of its components.
|
|
|
|
This allows a chained filter to be used wherever an atomic
|
|
|
|
filter is expected. In particular, chains can be
|
2007-06-01 00:27:40 +02:00
|
|
|
themselves chained to create arbitrarily complex operations.
|
2007-05-31 23:23:42 +02:00
|
|
|
|
|
|
|
Filters can be seen as internal nodes in a network through
|
|
|
|
which data will flow, potentially being transformed many
|
2007-10-11 23:16:29 +02:00
|
|
|
times along its way. Chains connect these nodes together.
|
|
|
|
To complete the picture, we need \emph{sources} and
|
|
|
|
\emph{sinks}. These are the initial and final nodes of the
|
|
|
|
network, respectively. Less abstractly, a source is a
|
|
|
|
function that produces new data every time it is called.
|
|
|
|
Conversely, sinks are functions that give a final
|
|
|
|
destination to the data they receive. Naturally, sources
|
|
|
|
and sinks can also be chained with filters to produce
|
|
|
|
filtered sources and sinks.
|
2007-05-31 23:23:42 +02:00
|
|
|
|
|
|
|
Finally, filters, chains, sources, and sinks are all passive
|
|
|
|
entities: they must be repeatedly invoked in order for
|
|
|
|
anything to happen. \emph{Pumps} provide the driving force
|
|
|
|
that pushes data through the network, from a source to a
|
2007-10-11 23:16:29 +02:00
|
|
|
sink.
|
2007-05-31 23:23:42 +02:00
|
|
|
|
2007-06-01 00:27:40 +02:00
|
|
|
In the following sections, we start with a simplified
|
|
|
|
interface, which we later refine. The evolution we present
|
|
|
|
is not contrived: it recreates the steps we followed
|
|
|
|
ourselves as we consolidated our understanding of these
|
|
|
|
concepts within our application domain.
|
2007-05-31 23:23:42 +02:00
|
|
|
|
2007-06-01 00:27:40 +02:00
|
|
|
\subsection{A simple example}
|
2007-05-31 23:23:42 +02:00
|
|
|
|
2007-10-11 23:16:29 +02:00
|
|
|
Let us use the end-of-line normalization of text as an
|
2007-05-31 23:23:42 +02:00
|
|
|
example to motivate our initial filter interface.
|
|
|
|
Assume we are given text in an unknown end-of-line
|
|
|
|
convention (including possibly mixed conventions) out of the
|
2007-10-11 23:16:29 +02:00
|
|
|
commonly found Unix (LF), Mac OS (CR), and DOS (CRLF)
|
|
|
|
conventions. We would like to be able to write code like the
|
|
|
|
following:
|
2007-05-31 23:23:42 +02:00
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
2007-10-11 23:16:29 +02:00
|
|
|
local in = source.chain(source.file(io.stdin), normalize("\r\n"))
|
|
|
|
local out = sink.file(io.stdout)
|
|
|
|
pump.all(in, out)
|
2007-05-31 23:23:42 +02:00
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
This program should read data from the standard input stream
|
2007-10-11 23:16:29 +02:00
|
|
|
and normalize the end-of-line markers to the canonic CRLF
|
|
|
|
marker, as defined by the MIME standard. Finally, the
|
|
|
|
normalized text should be sent to the standard output
|
2007-05-31 23:23:42 +02:00
|
|
|
stream. We use a \emph{file source} that produces data from
|
|
|
|
standard input, and chain it with a filter that normalizes
|
|
|
|
the data. The pump then repeatedly obtains data from the
|
|
|
|
source, and passes it to the \emph{file sink}, which sends
|
|
|
|
it to the standard output.
|
|
|
|
|
|
|
|
In the code above, the \texttt{normalize} \emph{factory} is a
|
2007-10-11 23:16:29 +02:00
|
|
|
function that creates our normalization filter. This filter
|
|
|
|
will replace any end-of-line marker with the canonic
|
|
|
|
`\verb|\r\n|' marker. The initial filter interface is
|
2007-05-31 23:23:42 +02:00
|
|
|
trivial: a filter function receives a chunk of input data,
|
|
|
|
and returns a chunk of processed data. When there are no
|
|
|
|
more input data left, the caller notifies the filter by invoking
|
2007-10-11 23:16:29 +02:00
|
|
|
it with a \texttt{nil} chunk. The filter responds by returning
|
|
|
|
the final chunk of processed data.
|
2007-05-31 23:23:42 +02:00
|
|
|
|
|
|
|
Although the interface is extremely simple, the
|
2007-06-01 00:27:40 +02:00
|
|
|
implementation is not so obvious. A normalization filter
|
2007-05-31 23:23:42 +02:00
|
|
|
respecting this interface needs to keep some kind of context
|
2007-06-01 00:27:40 +02:00
|
|
|
between calls. This is because a chunk boundary may lie between
|
2007-10-11 23:16:29 +02:00
|
|
|
the CR and LF characters marking the end of a line. This
|
2007-06-01 00:27:40 +02:00
|
|
|
need for contextual storage motivates the use of
|
|
|
|
factories: each time the factory is invoked, it returns a
|
2007-05-31 23:23:42 +02:00
|
|
|
filter with its own context so that we can have several
|
|
|
|
independent filters being used at the same time. For
|
|
|
|
efficiency reasons, we must avoid the obvious solution of
|
|
|
|
concatenating all the input into the context before
|
2007-10-11 23:16:29 +02:00
|
|
|
producing any output.
|
2007-05-31 23:23:42 +02:00
|
|
|
|
2007-06-01 00:27:40 +02:00
|
|
|
To that end, we break the implementation into two parts:
|
2007-05-31 23:23:42 +02:00
|
|
|
a low-level filter, and a factory of high-level filters. The
|
2007-06-01 00:27:40 +02:00
|
|
|
low-level filter is implemented in C and does not maintain
|
2007-05-31 23:23:42 +02:00
|
|
|
any context between function calls. The high-level filter
|
2007-06-01 00:27:40 +02:00
|
|
|
factory, implemented in Lua, creates and returns a
|
2007-05-31 23:23:42 +02:00
|
|
|
high-level filter that maintains whatever context the low-level
|
|
|
|
filter needs, but isolates the user from its internal
|
|
|
|
details. That way, we take advantage of C's efficiency to
|
|
|
|
perform the hard work, and take advantage of Lua's
|
|
|
|
simplicity for the bookkeeping.
|
|
|
|
|
|
|
|
\subsection{The Lua part of the filter}
|
|
|
|
|
|
|
|
Below is the complete implementation of the factory of high-level
|
|
|
|
end-of-line normalization filters:
|
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
2007-10-11 23:16:29 +02:00
|
|
|
function filter.cycle(low, ctx, extra)
|
2007-05-31 23:23:42 +02:00
|
|
|
return function(chunk)
|
|
|
|
local ret
|
2007-10-11 23:16:29 +02:00
|
|
|
ret, ctx = low(ctx, chunk, extra)
|
2007-05-31 23:23:42 +02:00
|
|
|
return ret
|
|
|
|
end
|
|
|
|
end
|
|
|
|
%
|
|
|
|
|
|
|
|
@stick#
|
|
|
|
function normalize(marker)
|
2007-10-11 23:16:29 +02:00
|
|
|
return cycle(eol, 0, marker)
|
2007-05-31 23:23:42 +02:00
|
|
|
end
|
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
The \texttt{normalize} factory simply calls a more generic
|
2007-10-11 23:16:29 +02:00
|
|
|
factory, the \texttt{cycle} factory. This factory receives a
|
2007-05-31 23:23:42 +02:00
|
|
|
low-level filter, an initial context, and an extra
|
2007-06-01 00:27:40 +02:00
|
|
|
parameter, and returns a new high-level filter. Each time
|
|
|
|
the high-level filer is passed a new chunk, it invokes the
|
|
|
|
low-level filter with the previous context, the new chunk,
|
|
|
|
and the extra argument. It is the low-level filter that
|
|
|
|
does all the work, producing the chunk of processed data and
|
2007-10-11 23:16:29 +02:00
|
|
|
a new context. The high-level filter then updates its
|
2007-06-01 00:27:40 +02:00
|
|
|
internal context, and returns the processed chunk of data to
|
|
|
|
the user. Notice that we take advantage of Lua's lexical
|
2007-05-31 23:23:42 +02:00
|
|
|
scoping to store the context in a closure between function
|
|
|
|
calls.
|
|
|
|
|
2007-10-11 23:16:29 +02:00
|
|
|
Concerning the low-level filter code, we must first accept
|
2007-05-31 23:23:42 +02:00
|
|
|
that there is no perfect solution to the end-of-line marker
|
2007-06-01 00:27:40 +02:00
|
|
|
normalization problem. The difficulty comes from an
|
|
|
|
inherent ambiguity in the definition of empty lines within
|
2007-05-31 23:23:42 +02:00
|
|
|
mixed input. However, the following solution works well for
|
|
|
|
any consistent input, as well as for non-empty lines in
|
|
|
|
mixed input. It also does a reasonable job with empty lines
|
|
|
|
and serves as a good example of how to implement a low-level
|
|
|
|
filter.
|
|
|
|
|
2007-10-11 23:16:29 +02:00
|
|
|
The idea is to consider both CR and~LF as end-of-line
|
2007-05-31 23:23:42 +02:00
|
|
|
\emph{candidates}. We issue a single break if any candidate
|
2007-10-11 23:16:29 +02:00
|
|
|
is seen alone, or followed by a different candidate. In
|
|
|
|
other words, CR~CR~and LF~LF each issue two end-of-line
|
|
|
|
markers, whereas CR~LF~and LF~CR issue only one marker each.
|
|
|
|
This method correctly handles the Unix, DOS/MIME, VMS, and Mac
|
|
|
|
OS conventions.
|
|
|
|
|
|
|
|
\subsection{The C part of the filter}
|
|
|
|
|
|
|
|
Our low-level filter is divided into two simple functions.
|
|
|
|
The inner function performs the normalization itself. It takes
|
|
|
|
each input character in turn, deciding what to output and
|
|
|
|
how to modify the context. The context tells if the last
|
|
|
|
processed character was an end-of-line candidate, and if so,
|
|
|
|
which candidate it was. For efficiency, it uses
|
|
|
|
Lua's auxiliary library's buffer interface:
|
2007-05-31 23:23:42 +02:00
|
|
|
\begin{quote}
|
|
|
|
\begin{C}
|
|
|
|
@stick#
|
|
|
|
@#define candidate(c) (c == CR || c == LF)
|
2007-10-11 23:16:29 +02:00
|
|
|
static int process(int c, int last, const char *marker,
|
2007-05-31 23:23:42 +02:00
|
|
|
luaL_Buffer *buffer) {
|
|
|
|
if (candidate(c)) {
|
|
|
|
if (candidate(last)) {
|
2007-10-11 23:16:29 +02:00
|
|
|
if (c == last) luaL_addstring(buffer, marker);
|
2007-05-31 23:23:42 +02:00
|
|
|
return 0;
|
|
|
|
} else {
|
|
|
|
luaL_addstring(buffer, marker);
|
|
|
|
return c;
|
|
|
|
}
|
|
|
|
} else {
|
2007-10-11 23:16:29 +02:00
|
|
|
luaL_putchar(buffer, c);
|
2007-05-31 23:23:42 +02:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
%
|
|
|
|
\end{C}
|
|
|
|
\end{quote}
|
|
|
|
|
2007-10-11 23:16:29 +02:00
|
|
|
The outer function simply interfaces with Lua. It receives the
|
|
|
|
context and input chunk (as well as an optional
|
|
|
|
custom end-of-line marker), and returns the transformed
|
|
|
|
output chunk and the new context:
|
2007-05-31 23:23:42 +02:00
|
|
|
\begin{quote}
|
|
|
|
\begin{C}
|
|
|
|
@stick#
|
|
|
|
static int eol(lua_State *L) {
|
2007-10-11 23:16:29 +02:00
|
|
|
int ctx = luaL_checkint(L, 1);
|
2007-05-31 23:23:42 +02:00
|
|
|
size_t isize = 0;
|
|
|
|
const char *input = luaL_optlstring(L, 2, NULL, &isize);
|
|
|
|
const char *last = input + isize;
|
|
|
|
const char *marker = luaL_optstring(L, 3, CRLF);
|
|
|
|
luaL_Buffer buffer;
|
|
|
|
luaL_buffinit(L, &buffer);
|
|
|
|
if (!input) {
|
|
|
|
lua_pushnil(L);
|
|
|
|
lua_pushnumber(L, 0);
|
|
|
|
return 2;
|
|
|
|
}
|
|
|
|
while (input < last)
|
2007-10-11 23:16:29 +02:00
|
|
|
ctx = process(*input++, ctx, marker, &buffer);
|
2007-05-31 23:23:42 +02:00
|
|
|
luaL_pushresult(&buffer);
|
2007-10-11 23:16:29 +02:00
|
|
|
lua_pushnumber(L, ctx);
|
2007-05-31 23:23:42 +02:00
|
|
|
return 2;
|
|
|
|
}
|
|
|
|
%
|
|
|
|
\end{C}
|
|
|
|
\end{quote}
|
|
|
|
|
2007-10-11 23:16:29 +02:00
|
|
|
Notice that if the input chunk is \texttt{nil}, the operation
|
|
|
|
is considered to be finished. In that case, the loop will
|
|
|
|
not execute a single time and the context is reset to the
|
|
|
|
initial state. This allows the filter to be reused many
|
|
|
|
times.
|
|
|
|
|
2007-05-31 23:23:42 +02:00
|
|
|
When designing your own filters, the challenging part is to
|
2007-06-01 00:27:40 +02:00
|
|
|
decide what will be in the context. For line breaking, for
|
2007-10-11 23:16:29 +02:00
|
|
|
instance, it could be the number of bytes left in the
|
2007-05-31 23:23:42 +02:00
|
|
|
current line. For Base64 encoding, it could be a string
|
|
|
|
with the bytes that remain after the division of the input
|
2007-06-01 00:27:40 +02:00
|
|
|
into 3-byte atoms. The MIME module in the \texttt{LuaSocket}
|
2007-05-31 23:23:42 +02:00
|
|
|
distribution has many other examples.
|
|
|
|
|
|
|
|
\section{Filter chains}
|
|
|
|
|
2007-10-11 23:16:29 +02:00
|
|
|
Chains add a lot to the power of filters. For example,
|
2007-06-01 00:27:40 +02:00
|
|
|
according to the standard for Quoted-Printable encoding,
|
2007-10-11 23:16:29 +02:00
|
|
|
text must be normalized to a canonic end-of-line marker
|
|
|
|
prior to encoding. To help specifying complex
|
|
|
|
transformations like this, we define a chain factory that
|
|
|
|
creates a composite filter from one or more filters. A
|
|
|
|
chained filter passes data through all its components, and
|
|
|
|
can be used wherever a primitive filter is accepted.
|
2007-06-01 00:27:40 +02:00
|
|
|
|
|
|
|
The chaining factory is very simple. The auxiliary
|
|
|
|
function~\texttt{chainpair} chains two filters together,
|
|
|
|
taking special care if the chunk is the last. This is
|
2007-10-11 23:16:29 +02:00
|
|
|
because the final \texttt{nil} chunk notification has to be
|
2007-06-01 00:27:40 +02:00
|
|
|
pushed through both filters in turn:
|
2007-05-31 23:23:42 +02:00
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
|
|
|
local function chainpair(f1, f2)
|
|
|
|
return function(chunk)
|
|
|
|
local ret = f2(f1(chunk))
|
|
|
|
if chunk then return ret
|
|
|
|
else return ret .. f2() end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
%
|
|
|
|
|
|
|
|
@stick#
|
|
|
|
function filter.chain(...)
|
2007-10-11 23:16:29 +02:00
|
|
|
local f = arg[1]
|
|
|
|
for i = 2, @#arg do
|
|
|
|
f = chainpair(f, arg[i])
|
2007-05-31 23:23:42 +02:00
|
|
|
end
|
|
|
|
return f
|
|
|
|
end
|
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
Thanks to the chain factory, we can
|
2007-06-01 00:27:40 +02:00
|
|
|
define the Quoted-Printable conversion as such:
|
2007-05-31 23:23:42 +02:00
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
2007-10-11 23:16:29 +02:00
|
|
|
local qp = filter.chain(normalize("\r\n"),
|
|
|
|
encode("quoted-printable"))
|
|
|
|
local in = source.chain(source.file(io.stdin), qp)
|
|
|
|
local out = sink.file(io.stdout)
|
|
|
|
pump.all(in, out)
|
2007-05-31 23:23:42 +02:00
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
\section{Sources, sinks, and pumps}
|
|
|
|
|
|
|
|
The filters we introduced so far act as the internal nodes
|
|
|
|
in a network of transformations. Information flows from node
|
|
|
|
to node (or rather from one filter to the next) and is
|
2007-06-01 00:27:40 +02:00
|
|
|
transformed along the way. Chaining filters together is our
|
2007-05-31 23:23:42 +02:00
|
|
|
way to connect nodes in this network. As the starting point
|
|
|
|
for the network, we need a source node that produces the
|
|
|
|
data. In the end of the network, we need a sink node that
|
|
|
|
gives a final destination to the data.
|
|
|
|
|
|
|
|
\subsection{Sources}
|
|
|
|
|
|
|
|
A source returns the next chunk of data each time it is
|
2007-10-11 23:16:29 +02:00
|
|
|
invoked. When there is no more data, it simply returns
|
|
|
|
\texttt{nil}. In the event of an error, the source can inform the
|
|
|
|
caller by returning \texttt{nil} followed by an error message.
|
2007-05-31 23:23:42 +02:00
|
|
|
|
|
|
|
Below are two simple source factories. The \texttt{empty} source
|
|
|
|
returns no data, possibly returning an associated error
|
2007-10-11 23:16:29 +02:00
|
|
|
message. The \texttt{file} source works harder, and
|
|
|
|
yields the contents of a file in a chunk by chunk fashion:
|
2007-05-31 23:23:42 +02:00
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
|
|
|
function source.empty(err)
|
|
|
|
return function()
|
|
|
|
return nil, err
|
|
|
|
end
|
|
|
|
end
|
|
|
|
%
|
|
|
|
|
|
|
|
@stick#
|
|
|
|
function source.file(handle, io_err)
|
|
|
|
if handle then
|
|
|
|
return function()
|
|
|
|
local chunk = handle:read(2048)
|
|
|
|
if not chunk then handle:close() end
|
|
|
|
return chunk
|
|
|
|
end
|
|
|
|
else return source.empty(io_err or "unable to open file") end
|
|
|
|
end
|
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
\subsection{Filtered sources}
|
|
|
|
|
2007-06-01 00:27:40 +02:00
|
|
|
A filtered source passes its data through the
|
2007-05-31 23:23:42 +02:00
|
|
|
associated filter before returning it to the caller.
|
2007-06-01 00:27:40 +02:00
|
|
|
Filtered sources are useful when working with
|
|
|
|
functions that get their input data from a source (such as
|
2007-10-11 23:16:29 +02:00
|
|
|
the pump in our first example). By chaining a source with one or
|
2007-06-01 00:27:40 +02:00
|
|
|
more filters, the function can be transparently provided
|
|
|
|
with filtered data, with no need to change its interface.
|
2007-05-31 23:23:42 +02:00
|
|
|
Here is a factory that does the job:
|
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
|
|
|
function source.chain(src, f)
|
2007-10-11 23:16:29 +02:00
|
|
|
return source.simplify(function()
|
|
|
|
if not src then return nil end
|
2007-05-31 23:23:42 +02:00
|
|
|
local chunk, err = src()
|
|
|
|
if not chunk then
|
|
|
|
src = nil
|
|
|
|
return f(nil)
|
2007-10-11 23:16:29 +02:00
|
|
|
else return f(chunk) end
|
|
|
|
end)
|
2007-05-31 23:23:42 +02:00
|
|
|
end
|
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
\subsection{Sinks}
|
|
|
|
|
2007-10-11 23:16:29 +02:00
|
|
|
Just as we defined an interface a data source,
|
2007-06-01 00:27:40 +02:00
|
|
|
we can also define an interface for a data destination.
|
|
|
|
We call any function respecting this
|
2007-05-31 23:23:42 +02:00
|
|
|
interface a \emph{sink}. In our first example, we used a
|
|
|
|
file sink connected to the standard output.
|
|
|
|
|
|
|
|
Sinks receive consecutive chunks of data, until the end of
|
2007-10-11 23:16:29 +02:00
|
|
|
data is signaled by a \texttt{nil} chunk. A sink can be
|
2007-05-31 23:23:42 +02:00
|
|
|
notified of an error with an optional extra argument that
|
2007-10-11 23:16:29 +02:00
|
|
|
contains the error message, following a \texttt{nil} chunk.
|
2007-05-31 23:23:42 +02:00
|
|
|
If a sink detects an error itself, and
|
2007-10-11 23:16:29 +02:00
|
|
|
wishes not to be called again, it can return \texttt{nil},
|
2007-05-31 23:23:42 +02:00
|
|
|
followed by an error message. A return value that
|
2007-10-11 23:16:29 +02:00
|
|
|
is not \texttt{nil} means the source will accept more data.
|
2007-05-31 23:23:42 +02:00
|
|
|
|
|
|
|
Below are two useful sink factories.
|
|
|
|
The table factory creates a sink that stores
|
|
|
|
individual chunks into an array. The data can later be
|
|
|
|
efficiently concatenated into a single string with Lua's
|
|
|
|
\texttt{table.concat} library function. The \texttt{null} sink
|
|
|
|
simply discards the chunks it receives:
|
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
|
|
|
function sink.table(t)
|
|
|
|
t = t or {}
|
|
|
|
local f = function(chunk, err)
|
|
|
|
if chunk then table.insert(t, chunk) end
|
|
|
|
return 1
|
|
|
|
end
|
|
|
|
return f, t
|
|
|
|
end
|
|
|
|
%
|
|
|
|
|
|
|
|
@stick#
|
|
|
|
local function null()
|
|
|
|
return 1
|
|
|
|
end
|
|
|
|
|
|
|
|
function sink.null()
|
|
|
|
return null
|
|
|
|
end
|
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
Naturally, filtered sinks are just as useful as filtered
|
|
|
|
sources. A filtered sink passes each chunk it receives
|
2007-10-11 23:16:29 +02:00
|
|
|
through the associated filter before handing it to the
|
2007-05-31 23:23:42 +02:00
|
|
|
original sink. In the following example, we use a source
|
|
|
|
that reads from the standard input. The input chunks are
|
|
|
|
sent to a table sink, which has been coupled with a
|
|
|
|
normalization filter. The filtered chunks are then
|
|
|
|
concatenated from the output array, and finally sent to
|
|
|
|
standard out:
|
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
2007-10-11 23:16:29 +02:00
|
|
|
local in = source.file(io.stdin)
|
|
|
|
local out, t = sink.table()
|
|
|
|
out = sink.chain(normalize("\r\n"), out)
|
|
|
|
pump.all(in, out)
|
2007-05-31 23:23:42 +02:00
|
|
|
io.write(table.concat(t))
|
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
\subsection{Pumps}
|
|
|
|
|
2007-10-11 23:16:29 +02:00
|
|
|
Adrian Sietsma noticed that, although not on purpose, our
|
|
|
|
interface for sources is compatible with Lua iterators.
|
|
|
|
That is, a source can be neatly used in conjunction
|
|
|
|
with \texttt{for} loops. Using our file
|
|
|
|
source as an iterator, we can write the following code:
|
2007-05-31 23:23:42 +02:00
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
|
|
|
for chunk in source.file(io.stdin) do
|
|
|
|
io.write(chunk)
|
|
|
|
end
|
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
Loops like this will always be present because everything
|
|
|
|
we designed so far is passive. Sources, sinks, filters: none
|
|
|
|
of them can do anything on their own. The operation of
|
|
|
|
pumping all data a source can provide into a sink is so
|
|
|
|
common that it deserves its own function:
|
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
|
|
|
function pump.step(src, snk)
|
|
|
|
local chunk, src_err = src()
|
|
|
|
local ret, snk_err = snk(chunk, src_err)
|
2007-06-01 00:27:40 +02:00
|
|
|
if chunk and ret then return 1
|
|
|
|
else return nil, src_err or snk_err end
|
2007-05-31 23:23:42 +02:00
|
|
|
end
|
|
|
|
%
|
|
|
|
|
|
|
|
@stick#
|
|
|
|
function pump.all(src, snk, step)
|
2007-06-01 00:27:40 +02:00
|
|
|
step = step or pump.step
|
|
|
|
while true do
|
|
|
|
local ret, err = step(src, snk)
|
|
|
|
if not ret then
|
|
|
|
if err then return nil, err
|
|
|
|
else return 1 end
|
|
|
|
end
|
|
|
|
end
|
2007-05-31 23:23:42 +02:00
|
|
|
end
|
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
The \texttt{pump.step} function moves one chunk of data from
|
|
|
|
the source to the sink. The \texttt{pump.all} function takes
|
|
|
|
an optional \texttt{step} function and uses it to pump all the
|
2007-10-11 23:16:29 +02:00
|
|
|
data from the source to the sink. We can now use everything
|
|
|
|
we have to write a program that reads a binary file from
|
2007-05-31 23:23:42 +02:00
|
|
|
disk and stores it in another file, after encoding it to the
|
|
|
|
Base64 transfer content encoding:
|
|
|
|
\begin{quote}
|
|
|
|
\begin{lua}
|
|
|
|
@stick#
|
2007-10-11 23:16:29 +02:00
|
|
|
local in = source.chain(
|
2007-05-31 23:23:42 +02:00
|
|
|
source.file(io.open("input.bin", "rb")),
|
|
|
|
encode("base64"))
|
2007-10-11 23:16:29 +02:00
|
|
|
local out = sink.chain(
|
2007-05-31 23:23:42 +02:00
|
|
|
wrap(76),
|
|
|
|
sink.file(io.open("output.b64", "w")))
|
2007-10-11 23:16:29 +02:00
|
|
|
pump.all(in, out)
|
2007-05-31 23:23:42 +02:00
|
|
|
%
|
|
|
|
\end{lua}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
The way we split the filters here is not intuitive, on
|
|
|
|
purpose. Alternatively, we could have chained the Base64
|
|
|
|
encode filter and the line-wrap filter together, and then
|
|
|
|
chain the resulting filter with either the file source or
|
2007-10-11 23:16:29 +02:00
|
|
|
the file sink. It doesn't really matter. The Base64 and the
|
|
|
|
line wrapping filters are part of the \texttt{LuaSocket}
|
|
|
|
distribution.
|
2007-05-31 23:23:42 +02:00
|
|
|
|
|
|
|
\section{Exploding filters}
|
|
|
|
|
2007-10-11 23:16:29 +02:00
|
|
|
Our current filter interface has one flagrant shortcoming.
|
|
|
|
When David Burgess was writing his \texttt{gzip} filter, he
|
|
|
|
noticed that a decompression filter can explode a small
|
|
|
|
input chunk into a huge amount of data. To address this
|
|
|
|
problem, we decided to change the filter interface and allow
|
|
|
|
exploding filters to return large quantities of output data
|
|
|
|
in a chunk by chunk manner.
|
2007-06-01 00:27:40 +02:00
|
|
|
|
|
|
|
More specifically, after passing each chunk of input to
|
|
|
|
a filter, and collecting the first chunk of output, the
|
|
|
|
user must now loop to receive other chunks from the filter until no
|
2007-05-31 23:23:42 +02:00
|
|
|
filtered data is left. Within these secondary calls, the
|
|
|
|
caller passes an empty string to the filter. The filter
|
|
|
|
responds with an empty string when it is ready for the next
|
|
|
|
input chunk. In the end, after the user passes a
|
2007-10-11 23:16:29 +02:00
|
|
|
\texttt{nil} chunk notifying the filter that there is no
|
2007-05-31 23:23:42 +02:00
|
|
|
more input data, the filter might still have to produce too
|
|
|
|
much output data to return in a single chunk. The user has
|
2007-10-11 23:16:29 +02:00
|
|
|
to loop again, now passing \texttt{nil} to the filter each time,
|
|
|
|
until the filter itself returns \texttt{nil} to notify the
|
2007-05-31 23:23:42 +02:00
|
|
|
user it is finally done.
|
|
|
|
|
|
|
|
Fortunately, it is very easy to modify a filter to respect
|
|
|
|
the new interface. In fact, the end-of-line translation
|
|
|
|
filter we presented earlier already conforms to it. The
|
|
|
|
complexity is encapsulated within the chaining functions,
|
|
|
|
which must now include a loop. Since these functions only
|
2007-06-01 00:27:40 +02:00
|
|
|
have to be written once, the user is rarely affected.
|
2007-05-31 23:23:42 +02:00
|
|
|
Interestingly, the modifications do not have a measurable
|
2007-06-01 00:27:40 +02:00
|
|
|
negative impact in the performance of filters that do
|
2007-05-31 23:23:42 +02:00
|
|
|
not need the added flexibility. On the other hand, for a
|
|
|
|
small price in complexity, the changes make exploding
|
2007-10-11 23:16:29 +02:00
|
|
|
filters practical.
|
2007-05-31 23:23:42 +02:00
|
|
|
|
|
|
|
\section{A complex example}
|
|
|
|
|
|
|
|
The LTN12 module in the \texttt{LuaSocket} distribution
|
2007-10-11 23:16:29 +02:00
|
|
|
implements the ideas we have described. The MIME
|
|
|
|
and SMTP modules are especially integrated with LTN12,
|
2007-05-31 23:23:42 +02:00
|
|
|
and can be used to showcase the expressive power of filters,
|
|
|
|
sources, sinks, and pumps. Below is an example
|
|
|
|
of how a user would proceed to define and send a
|
2007-06-01 00:27:40 +02:00
|
|
|
multipart message, with attachments, using \texttt{LuaSocket}:
|
2007-05-31 23:23:42 +02:00
|
|
|
\begin{quote}
|
|
|
|
\begin{mime}
|
|
|
|
local smtp = require"socket.smtp"
|
|
|
|
local mime = require"mime"
|
|
|
|
local ltn12 = require"ltn12"
|
|
|
|
|
|
|
|
local message = smtp.message{
|
|
|
|
headers = {
|
|
|
|
from = "Sicrano <sicrano@example.com>",
|
|
|
|
to = "Fulano <fulano@example.com>",
|
|
|
|
subject = "A message with an attachment"},
|
|
|
|
body = {
|
2007-10-11 23:16:29 +02:00
|
|
|
preamble = "Hope you can see the attachment\r\n",
|
2007-05-31 23:23:42 +02:00
|
|
|
[1] = {
|
2007-10-11 23:16:29 +02:00
|
|
|
body = "Here is our logo\r\n"},
|
2007-05-31 23:23:42 +02:00
|
|
|
[2] = {
|
|
|
|
headers = {
|
|
|
|
["content-type"] = 'image/png; name="luasocket.png"',
|
|
|
|
["content-disposition"] =
|
|
|
|
'attachment; filename="luasocket.png"',
|
|
|
|
["content-description"] = 'LuaSocket logo',
|
|
|
|
["content-transfer-encoding"] = "BASE64"},
|
|
|
|
body = ltn12.source.chain(
|
|
|
|
ltn12.source.file(io.open("luasocket.png", "rb")),
|
|
|
|
ltn12.filter.chain(
|
|
|
|
mime.encode("base64"),
|
|
|
|
mime.wrap()))}}}
|
|
|
|
|
|
|
|
assert(smtp.send{
|
|
|
|
rcpt = "<fulano@example.com>",
|
|
|
|
from = "<sicrano@example.com>",
|
|
|
|
source = message})
|
|
|
|
\end{mime}
|
|
|
|
\end{quote}
|
|
|
|
|
|
|
|
The \texttt{smtp.message} function receives a table
|
|
|
|
describing the message, and returns a source. The
|
|
|
|
\texttt{smtp.send} function takes this source, chains it with the
|
2007-06-01 00:27:40 +02:00
|
|
|
SMTP dot-stuffing filter, connects a socket sink
|
|
|
|
with the server, and simply pumps the data. The message is never
|
2007-05-31 23:23:42 +02:00
|
|
|
assembled in memory. Everything is produced on demand,
|
|
|
|
transformed in small pieces, and sent to the server in chunks,
|
|
|
|
including the file attachment that is loaded from disk and
|
|
|
|
encoded on the fly. It just works.
|
|
|
|
|
|
|
|
\section{Conclusions}
|
|
|
|
|
2007-06-01 00:27:40 +02:00
|
|
|
In this article, we introduced the concepts of filters,
|
2007-05-31 23:23:42 +02:00
|
|
|
sources, sinks, and pumps to the Lua language. These are
|
2007-06-01 00:27:40 +02:00
|
|
|
useful tools for stream processing in general. Sources provide
|
2007-05-31 23:23:42 +02:00
|
|
|
a simple abstraction for data acquisition. Sinks provide an
|
|
|
|
abstraction for final data destinations. Filters define an
|
|
|
|
interface for data transformations. The chaining of
|
|
|
|
filters, sources and sinks provides an elegant way to create
|
2007-06-01 00:27:40 +02:00
|
|
|
arbitrarily complex data transformations from simpler
|
2007-10-11 23:16:29 +02:00
|
|
|
components. Pumps simply move the data through.
|
2007-05-31 23:23:42 +02:00
|
|
|
|
|
|
|
\end{document}
|