Conduits, part 5: Buffering

January 10, 2012

GravatarBy Michael Snoyman


This is the fifth (and probably final) part in a series on conduits, an approach to handling streams of data. You can see the previous posts at:

  1. The Resource monad transformer
  2. Sources
  3. Sinks
  4. Conduits

This post is mostly about the concept of buffering. We'll also cover a few other miscealaneous topics as well.

Inversion of Control

Buffering was actually one of the main motivations in the creation of the conduit package. To see its importance, we need to consider the approach we've seen so far, which we'll call inversion of control, or IoC.

Suppose you want to count how many newline characters there are in a file. In the standard imperative approach, you would do someting like:

  1. Open the file
  2. Pull some data into a buffer
  3. Loop over the values in the buffer, incrementing a counter on each newline character
  4. Return to 2
  5. Close the file

Notice that your code is explicitly calling out to other code and that code is returning control back to your code. You have retained full control of the flow of execution of your program. The conduit approach we've seen so far does not work this way. Instead, you would:

  1. Write a sink that counts newlines and adds the result to an accumulator.
  2. Connect the sink to a source

There's no doubt in my mind that this is an easier approach. You don't have to worry about opening and closing files or pulling data from the file. Instead, the data you need to process is simply presented to you. This is the advantage of IoC: you can focus on specifically your piece of the code.

We use this IoC approach all over Haskell: for example, instead of readMVar and putMVar, you can use withMVar. Don't bother with openFile and closeFile, just use withFile and pass in a function that uses the Handle. Even C has a version of this: why malloc and free when you could just alloca?

Actually, that last one is a huge red herring. Of course you can't just use alloca for everything. alloca only allocates memory locally on the stack, not dynamically on the heap. There's no way to return your allocated memory outside the current function.

But actually, the same restriction applies to the whole family of with functions: you can never return an allocated resource outside of the "block". Usually this works out just fine, but we need to recognize that this is a change in how we structure our programs. Often times, with simple examples, this is a minor change. However, in larger settings this can become very difficult to manage, bordering on impossible at times.

A web server

Let's say we're going to write a web server. We're going to use the following low-level operations:

data Socket
recv    :: Socket -> Int -> IO ByteString -- returns empty when the socket is closed
sendAll :: Socket -> ByteString -> IO ()

We're up to the part where we need to implement the function handleConn that handles an individual connection. It will look something like this:

data Request  -- request headers, HTTP version, etc
data Response -- status code, response headers, resposne body
type Application = Request -> IO Response
handleConn :: Application -> Socket -> IO ()

What does our handleConn need to do? In broad strokes:

  1. Parse the request line
  2. Parse the request headers
  3. Construct the Request value
  4. Pass Request to the Application and get back a Response
  5. Send the Response over the Socket

We start off by writing steps 1 and 2 manually, without using conduits. We'll do this very simply and just assume three space-separated strings. We end up with something that looks like:

data RequestLine = RequestLine ByteString ByteString ByteString

parseRequestLine :: Socket -> IO RequestLine
parseRequestLine socket = do
    bs <- recv socket 4096
    let (method:path:version:ignored) = S8.words bs
    return $ RequestLine method path version

There are two issues here: it doesn't handle the case where there are less than three words in the chunk of data, and it throws away any extra data. We can definitely solve both of these issues manually, but it's very tedious. It's much easier to implement this in terms of conduits.

import Data.ByteString (ByteString)
import qualified Data.ByteString as S
import Data.Conduit
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.List as CL

data RequestLine = RequestLine ByteString ByteString ByteString

parseRequestLine :: Sink ByteString IO RequestLine
parseRequestLine = do
    let space = toEnum $ fromEnum ' '
    let getWord = do
            CB.dropWhile (== space)
            bss <- CB.takeWhile (/= space) =$ CL.consume
            return $ S.concat bss

    method <- getWord
    path <- getWord
    version <- getWord
    return $ RequestLine method path version

This means that our code will automatically be supplied with more data as it comes in, and any extra data will automatically be buffered in the Source, ready for the next time it's used. Now we can easily structure our program together, demonstrating the power of the conduits approach:

import Data.ByteString (ByteString)
import Data.Conduit
import Data.Conduit.Network (sourceSocket)
import Control.Monad.IO.Class (liftIO)
import Network.Socket (Socket)

data RequestLine = RequestLine ByteString ByteString ByteString
type Headers = [(ByteString, ByteString)]
data Request = Request RequestLine Headers
data Response = Response
type Application = Request -> IO Response

parseRequestHeaders :: Sink ByteString IO Headers
parseRequestHeaders = undefined

parseRequestLine :: Sink ByteString IO RequestLine
parseRequestLine = undefined

sendResponse :: Socket -> Response -> IO ()
sendResponse = undefined

handleConn :: Application -> Socket -> IO ()
handleConn app socket = do
    req <- runResourceT $ sourceSocket socket $$ do
        requestLine <- parseRequestLine
        headers <- parseRequestHeaders
        return $ Request requestLine headers
    res <- liftIO $ app req
    liftIO $ sendResponse socket res

Whither the request body?

This is all great, until we realize we can't read the request body. The Application is simply given the Request, and lives in the IO monad. It has no access whatsoever to the incoming stream of data.

There's an easy fix for this actually: have the Application live in the Sink monad. This is the very approach we took with enumerator-based WAI 0.4. However, there are two problems:

  • People find it confusing. What people expect is that the Request value would have a requestBody value of type Source.
  • This makes certain kinds of usage incredibly difficult. For example, trying to write an HTTP proxy combining WAI and http-enumerator proved to be almost impossible.

This is the downside of inversion of control. Our code wants to be in control. It wants to be given something to pull from, something to push to, and run with it. We need some solution to the problem.

The simplest solution would be to just create a new Source and pass that to the Application. Unfortunately, this will cause problems with our buffering. You see, when we connect our source to the parseRequestLine and parseRequestHeaders sinks, it made a call to recv. If the data it received was not enough to cover all of the headers, it would issue another call. When it had enough data, it would stop. However, odds are that it didn't stop exactly at the end of the headers. It likely consumed a bit of the request body as well.

If we just create a new source and pass that to the request, it will be missing the beginning of the request body. We need some way to pass that buffered data along.


And so we finally get to introduce the last data type in conduits: BufferedSource. This is an abstract data type, but all it really does is keep a mutable reference to a buffer and an underlying Source. In order to create one of these, you use the bufferSource function.

bufferSource ::Resource m => Source m a -> ResourceT m (BufferedSource m a)

This one little change is what allows us to easily solve our web server dilemna. Instead of connecting a Source to our parsing Sinks, we use a BufferedSource. At the end of each connection, any leftover data is put back on the buffer. For our web server case, we can now create a BufferedSource, use that to read the request line and headers, and then pass that same BufferedSource to the application for reading the request body.


We want to be able to connect a buffered source to a sink, just like we would a regular source. We would also like to be able to fuse it to a conduit. In order to make this convenient, conduit has a typeclass, IsSource. There are instances provided for both Source and BufferedSource. Both the connect ($$) and left-fuse ($=) operators use this typeclass.

There's one "gotcha" in the BufferedSource instance of this typeclass, so let's explain it. Suppose we want to write a file copy function, without any buffering. This is a fairly standard usage of conduits:

sourceFile input $$ sinkFile output

When this line is run, both the input and output files are opened, the data is copied, and then both files are closed. Let's change this example slightly to use buffering:

bsrc <- bufferSource $ sourceFile input
bsrc $$ isolate 50 =$ sinkFile output1
bsrc $$ sinkFile output2

When is the input file opened and closed? The opening occurs on the first line, when buffering the source. And if we follow the normal rules from sources, the file should be closed after the second line. However, if we did that, we couldn't reuse bsrc for line 3!

So instead, $$ does not close the file. As a result, you can pass a buffered source to as many actions as you want, without concerns that the file handle has been closed out from under you.

This presents one caveat: when you're finished with a buffered source, you should manually call bsourceClose on it. However, as usual, this is merely an optimization, as the source will automatically be closed when runResourceT is called.

Recapping the web server

So what exactly does our web server look like now?

import Data.ByteString (ByteString)
import Data.Conduit
import Data.Conduit.Network (sourceSocket)
import Control.Monad.IO.Class (liftIO)
import Network.Socket (Socket)

data RequestLine = RequestLine ByteString ByteString ByteString
type Headers = [(ByteString, ByteString)]
data Request = Request RequestLine Headers (BufferedSource IO ByteString)
data Response = Response
type Application = Request -> ResourceT IO Response

parseRequestHeaders :: Sink ByteString IO Headers
parseRequestHeaders = undefined

parseRequestLine :: Sink ByteString IO RequestLine
parseRequestLine = undefined

sendResponse :: Socket -> Response -> IO ()
sendResponse = undefined

handleConn :: Application -> Socket -> IO ()
handleConn app socket = runResourceT $ do
    bsrc <- bufferSource $ sourceSocket socket
    requestLine <- bsrc $$ parseRequestLine
    headers <- bsrc $$ parseRequestHeaders
    let req = Request requestLine headers bsrc
    res <- app req
    liftIO $ sendResponse socket res

We've made a few minor changes. Firstly, the Application now lives in the ResourceT IO monad. This isn't strictly necessary, but it's very convenient: the application can now register cleanup actions that will only take place after the response has been fully sent to the client.

But the major changes are in the handleConn function. We now start off by buffering our source. This buffered source is then used twice in our function, and then passed off to the application.

That's all folks!

Thanks for making it through this very long series of posts, I hope it's been informative. The next step is to dive into the conduit packages on Hackage. Also, stay tuned in the next few weeks for an all new, all conduit release of Yesod.

My hope is that the simplicity afforded by conduits will allow people not alone to become more involved in playing around with code, but will let people make even more interesting combinations of the existing packages. I'm looking forward to seeing the results.


comments powered by Disqus