Conduit

Conduits are a solution to the streaming data problem. Often times, laziness allows us to process large amounts of data without pulling all values into memory. However, doing so in the presence of I/O requires us to use lazy I/O. The main downside to lazy I/O is non-determinism: we have no guarantees of when our resource finalizers will be run. For small application, this may be acceptable, but for a high-load server, we could quickly run out of scarce resources, such as file handles.

Conduits allow us to process large streams of data while still retaining deterministic resource handling. They provide a unified interface for data streams, whether they come from files, sockets, or memory. And when combined with ResourceT, we can safely allocate resources, knowing that they will always be reclaimed- even in the presence of exceptions.

This appendix covers version 0.2 of the conduit package.

Conduits in Five Minutes

While a good understanding of the lower-level mechanics of conduits is advisable, you can get very far without it. Let's start off with some high-level examples. Don't worry if some of the details seem a bit magical right now. We'll cover everything in the course of this appendix. Let's start with the terminology, and then some sample code.

Source

A producer of data. The data could be in a file, coming from a socket, or in memory as a list. To access this data, we pull from the source.

Sink

A consumer of data. Basic examples would be a sum function (adding up a stream of numbers fed in), a file sink (which writes all incoming bytes to a file), or a socket. We push data into a sink. When the sink finishes processing (we'll explain that later), it returns some value.

Conduit

A transformer of data. The simplest example is a map function, though there are many others. Like a sink, we push data into a conduit. But instead of returning a single value at the end, a conduit can return multiple outputs every time it is pushed to.

Fuse

(Thanks to David Mazieres for the term.) A conduit can be fused with a source to produce a new, modified source (the $= operator). For example, you could have a source that reads bytes from a file, and a conduit that decodes bytes into text. If you fuse them together, you would now have a source that reads text from a file. Likewise, a conduit and a sink can fuse into a new sink (=$), and two conduits can fuse into a new conduit (=$=).

Connect

You can connect a source to a sink using the $$ operator. Doing so will pull data from the source and push it to the sink, until either the source or sink signals that they are "done."

Let's see some examples of conduit code.

{-# LANGUAGE OverloadedStrings #-}
import Data.Conduit -- the core library
import qualified Data.Conduit.List as CL -- some list-like functions
import qualified Data.Conduit.Binary as CB -- bytes
import qualified Data.Conduit.Text as CT

import Data.ByteString (ByteString)
import Data.Text (Text)
import qualified Data.Text as T
import Control.Monad.ST (runST)

-- Let's start with the basics: connecting a source to a sink. We'll use the
-- built in file functions to implementing efficient, constant-memory,
-- resource-friendly file copying.
--
-- Two things to note: we use $$ to connect our source to our sink, and then
-- use runResourceT.
copyFile :: FilePath -> FilePath -> IO ()
copyFile src dest = runResourceT $ CB.sourceFile src $$ CB.sinkFile dest


-- The Data.Conduit.List module provides a number of helper functions for
-- creating sources, sinks, and conduits. Let's look at a typical fold: summing
-- numbers.
sumSink :: Resource m => Sink Int m Int
sumSink = CL.fold (+) 0

-- If we want to go a little more low-level, we can code our sink with the
-- sinkState function. This function takes three parameters: an initial state,
-- a push function (receive some more data), and a close function.
sumSink2 :: Resource m => Sink Int m Int
sumSink2 = sinkState
    0 -- initial value

    -- update the state with the new input and
    -- indicate that we want more input
    (\accum i -> return $ StateProcessing (accum + i))
    (\accum -> return accum) -- return the current accum value on close

-- Another common helper function is sourceList. Let's see how we can combine
-- that function with our sumSink to reimplement the built-in sum function.
sum' :: [Int] -> Int
sum' input = runST $ runResourceT $ CL.sourceList input $$ sumSink

-- Since this is Haskell, let's write a source to generate all of the
-- Fibonacci numbers. We'll use sourceState. The state will contain the next
-- two numbers in the sequence. We also need to provide a pull function, which
-- will return the next number and update the state.
fibs :: Resource m => Source m Int
fibs = sourceState
    (0, 1) -- initial state
    (\(x, y) -> return $ StateOpen (y, x + y) x)

-- Suppose we want to get the sum of the first 10 Fibonacci numbers. We can use
-- the isolate conduit to make sure the sum sink only consumes 10 values.
sumTenFibs :: Int
sumTenFibs =
       runST -- runs fine in pure code
     $ runResourceT
     $ fibs
    $= CL.isolate 10 -- fuse the source and conduit into a source
    $$ sumSink

-- We can also fuse the conduit into the sink instead, we just swap a few
-- operators.
sumTenFibs2 :: Int
sumTenFibs2 =
       runST
     $ runResourceT
     $ fibs
    $$ CL.isolate 10
    =$ sumSink

-- Alright, let's make some conduits. Let's turn our numbers into text. Sounds
-- like a job for a map...

intToText :: Int -> Text -- just a helper function
intToText = T.pack . show

textify :: Resource m => Conduit Int m Text
textify = CL.map intToText

-- Like previously, we can use a conduitState helper function. But here, we
-- don't even need state, so we provide a dummy state value.
textify2 :: Resource m => Conduit Int m Text
textify2 = conduitState
    ()
    (\() input -> return $ StateProducing () [intToText input])
    (\() -> return [])

-- Let's make the unlines conduit, that puts a newline on the end of each piece
-- of input. We'll just use CL.map; feel free to write it with conduitState as
-- well for practice.
unlines' :: Resource m => Conduit Text m Text
unlines' = CL.map $ \t -> t `T.append` "\n"

-- And let's write a function that prints the first N fibs to a file. We'll
-- use UTF8 encoding.
writeFibs :: Int -> FilePath -> IO ()
writeFibs count dest =
      runResourceT
    $ fibs
   $= CL.isolate count
   $= textify
   $= unlines'
   $= CT.encode CT.utf8
   $$ CB.sinkFile dest

-- We used the $= operator to fuse the conduits into the sources, producing a
-- single source. We can also do the opposite: fuse the conduits into the sink. We can even combine the two.
writeFibs2 :: Int -> FilePath -> IO ()
writeFibs2 count dest =
      runResourceT
    $ fibs
   $= CL.isolate count
   $= textify
   $$ unlines'
   =$ CT.encode CT.utf8
   =$ CB.sinkFile dest

-- Or we could fuse all those inner conduits into a single conduit...
someIntLines :: ResourceThrow m -- encoding can throw an exception
             => Int
             -> Conduit Int m ByteString
someIntLines count =
      CL.isolate count
  =$= textify
  =$= unlines'
  =$= CT.encode CT.utf8

-- and then use that conduit
writeFibs3 :: Int -> FilePath -> IO ()
writeFibs3 count dest =
      runResourceT
    $ fibs
   $= someIntLines count
   $$ CB.sinkFile dest

main :: IO ()
main = do
    putStrLn $ "First ten fibs: " ++ show sumTenFibs
    writeFibs 20 "fibs.txt"
    copyFile "fibs.txt" "fibs2.txt"

Structure of this Chapter

The remainder of this chapter covers five major topics in conduits:

  • ResourceT, the underlying technique that allows us to have guaranteed resource deallocation.

  • Sources, our data producers

  • Sinks, our data consumers

  • Conduits, our data transformers

  • Buffering, which allows us to avoid Inversion of Control

The Resource monad transformer

The Resource transformer (ResourceT) plays a vital role in proper resource management in the conduit project. It is included within the conduit package itself. We'll explaining ResourceT as its own entity. While some of the design decisions clearly are biased towards conduits, ResourceT should remain a usable tool in its own right.

Goals

What's wrong with the following code?

import System.IO

main = do
    output <- openFile "output.txt" WriteMode
    input  <- openFile "input.txt"  ReadMode
    hGetContents input >>= hPutStr output
    hClose input
    hClose output

If the file input.txt does not exist, then an exception will be thrown when trying to open it. As a result, hClose output will never be called, and we'll have leaked a scarce resource (a file descriptor). In our tiny program, this isn't a big deal, but clearly we can't afford such waste in a long running, highly active server process.

Fortunately, solving the problem is easy:

import System.IO

main =
    withFile "output.txt" WriteMode $ \output ->
    withFile "input.txt" ReadMode $ \input ->
    hGetContents input >>= hPutStr output

withFile makes sure that the Handle is always closed, even in the presence of exceptions. It also handles asynchronous exceptions. Overall, it's a great approach to use... when you can use it. While often withFile is easy to use, sometimes it can require restructuring our programs. And this restructuring can range from mildly tedious to wildly inefficient.

Let's take enumerators for example. If you look in the documentation, there is an enumFile function (for reading contents from a file), but no iterFile (for writing contents to a file). That's because the flow of control in an iteratee doesn't allow proper allocation of the Handle. Instead, in order to write to a file, you need to allocate the Handle before entering the Iteratee, e.g.:

import System.IO
import Data.Enumerator
import Data.Enumerator.Binary

main =
    withFile "output.txt" WriteMode $ \output ->
    run_ $ enumFile "input.txt" $$ iterHandle output

This code works fine, but imagine that, instead of simply piping data directly to the file, there was a huge amount of computation that occurred before we need to use the output handle. We will have allocated a file descriptor long before we needed it, and thereby locked up a scarce resource in our application. Besides this, there are times when we can't allocate the file before hand, such as when we won't know which file to open until we've read from the input file.

One of the stated goals of conduits is to solve this problem, and it does so via ResourceT. As a result, the above program can be written in conduit as:

{-# LANGUAGE OverloadedStrings #-}
import Data.Conduit
import Data.Conduit.Binary

main = runResourceT $ sourceFile "input.txt" $$ sinkFile "output.txt"

How it Works

There are essentially three base functions on ResourceT, and then a bunch of conveniences thrown on top. The first function is:

register :: IO () -> ResourceT IO ReleaseKey

This function registers a piece of code that it asserts must be run. It gives back a ReleaseKey, which is used by the next function:

release :: ReleaseKey -> ResourceT IO ()

Calling release on a ReleaseKey immediately performs the action you previously registered. You may call release on the same ReleaseKey as many times as you like; the first time it is called, it unregisters the action. This means you can safely register an action like a memory free, and have no concerns that it will be called twice.

Eventually, we'll want to exit our special ResourceT. To do so, we use:

runResourceT :: ResourceT IO a -> IO a

This seemingly innocuous function is where all the magic happens. It runs through all of the registered cleanup actions and performs them. It is fully exception safe, meaning the cleanups will be performed in the presence of both synchronous and asynchronous exceptions. And as mentioned before, calling release will unregister an action, so there is no concern of double-freeing.

Finally, as a convenience, we provide one more function for the common case of allocating a resource and registering a release action:

with :: IO a -- ^ allocate
     -> (a -> IO ()) -- ^ free resource
     -> ResourceT IO (ReleaseKey, a)

So, to rework our first buggy example to use ResourceT, we would write:

import System.IO
import Control.Monad.Trans.Resource
import Control.Monad.Trans.Class (lift)

main = runResourceT $ do
    (releaseO, output) <- with (openFile "output.txt" WriteMode) hClose
    (releaseI, input)  <- with (openFile "input.txt"  ReadMode)  hClose
    lift $ hGetContents input >>= hPutStr output
    release releaseI
    release releaseO

Now there is no concern of any exceptions preventing the releasing of resources. We could skip the release calls if we want to, and in an example this small, it would not make any difference. But for larger applications, where we want processing to continue, this ensures that the Handles are freed as early as possible, keeping our scarce resource usage to a minimum.

Some Type Magic

As alluded to, there's a bit more to ResourceT than simply running in IO. Let's cover some of the things we need from this underlying Monad.

  • Mutable references to keep track of the registered release actions. You might think we could just use a StateT transformer, but then our state wouldn't survive exceptions.

  • We only want to register actions in the base monad. For example, if we have a ResourceT (WriterT [Int] IO) stack, we only want to register IO actions. This makes it easy to lift our stacks around (i.e., add an extra transformer to the middle of an existing stack), and avoids confusing issues about the threading of other monadic side-effects.

  • Some way to guarantee an action is performed, even in the presence of exceptions. This boils down to needing a bracket-like function.

For the first point, we define a new typeclass to represent monads that have mutable references:

class Monad m => HasRef m where
    type Ref m :: * -> *
    newRef' :: a -> m (Ref m a)
    readRef' :: Ref m a -> m a
    writeRef' :: Ref m a -> a -> m ()
    modifyRef' :: Ref m a -> (a -> (a, b)) -> m b
    mask :: ((forall a. m a -> m a) -> m b) -> m b
    mask_ :: m a -> m a
    try :: m a -> m (Either SomeException a)

We have an associated type to signify what the reference type should be. (For fans of fundeps, you'll see in the next section that this has to be an associated type.) Then we provide a number of basic reference operations. Finally, there are some functions to help with exceptions, which are needed to safely implement the functions described in the last section. The instance for IO is very straight-forward:

instance HasRef IO where
    type Ref IO = I.IORef
    newRef' = I.newIORef
    modifyRef' = I.atomicModifyIORef
    readRef' = I.readIORef
    writeRef' = I.writeIORef
    mask = E.mask
    mask_ = E.mask_
    try = E.try

However, we have a problem when it comes to implementing the ST instance: there is no way to deal with exceptions in the ST monad. As a result, mask, mask_ and try are given default implementations that do no exception checking. This gives rise to the first word of warning: operations in the ST monad are not exception safe. You should not be allocating scarce resources in ST when using ResourceT. You might be wondering why bother with ResourceT at all then for ST. The answer is that there is a lot you can do with conduits without allocating scarce resources, and ST is a great way to do this in a pure way. But more on this later.

Now onto point 2: we need some way to deal with this base monad concept. Again, we use an associated type (again explained in the next section). Our solution looks something like:

class (HasRef (Base m), Monad m) => Resource m where
    type Base m :: * -> *

    resourceLiftBase :: Base m a -> m a

But we forgot about point 3: some bracket-like function. So we need one more method in this typeclass:

    resourceBracket_ :: Base m a -> Base m b -> m c -> m c

The reason the first two arguments to resourceBracket_ (allocation and cleanup) live in Base m instead of m is that, in ResourceT, all allocation and cleanup lives in the base monad.

So on top of our HasRef instance for IO, we now need a Resource instance as well. This is similarly straight-forward:

instance Resource IO where
    type Base IO = IO
    resourceLiftBase = id
    resourceBracket_ = E.bracket_

We have similar ST instances, with resourceBracket_ having no exception safety. The final step is dealing with monad transformers. We don't need to provide a HasRef instance, but we do need a Resource instance. The tricky part is providing a valid implementation of resourceBracket_. For this, we use some functions from monad-control:

instance (MonadTransControl t, Resource m, Monad (t m))
        => Resource (t m) where
    type Base (t m) = Base m

    resourceLiftBase = lift . resourceLiftBase
    resourceBracket_ a b c =
        control' $ \run -> resourceBracket_ a b (run c)
      where
        control' f = liftWith f >>= restoreT . return

For any transformer, its base is the base of its inner monad. Similarly, we lift to the base by lifting to the inner monad and then lifting to the base from there. The tricky part is the implemetnation of resourceBracket_. I will not go into a detailed explanation, as I would simply make a fool of myself.

Definition of ResourceT

We now have enough information to understand the definition of ResourceT:

newtype ReleaseKey = ReleaseKey Int

type RefCount = Int
type NextKey = Int

data ReleaseMap base =
    ReleaseMap !NextKey !RefCount !(IntMap (base ()))

newtype ResourceT m a =
    ResourceT (Ref (Base m) (ReleaseMap (Base m)) -> m a)

We see that ReleaseKey is simply an Int. If you skip a few lines down, this will make sense, since we're using an IntMap to keep track of the registered actions. We also define two type synonyms: RefCount and NextKey. NextKey keeps track of the most recently assigned value for a key, and is incremented each time register is called. We'll touch on RefCount later.

The ReleaseMap is three pieces of information: the next key and the reference count, and then the map of all registered actions. Notice that ReleaseMap takes a type parameter base, which states which monad release actions must live in.

Finally, a ResourceT is essentially a ReaderT that keeps a mutable reference to a ReleaseMap. The reference type is determined by the base of the monad in question, as is the cleanup monad. This is why we need to use associated types.

The majority of the rest of the code in the Control.Monad.Trans.Resource module is just providing instances for the ResourceT type.

Other Typeclasses

There are three other typeclasses provided by the module:

ResourceUnsafeIO

Any monad which can lift IO actions into it, but that this may be considered unsafe. The prime candidate here is ST. Care should be taken to only lift actions which do not acquire scarce resources and which don't "fire the missiles." In other words, all the normal warnings of unsafeIOToST apply.

ResourceThrow

For actions that can throw exceptions. This automatically applies to all IO-based monads. For ST-based monads, you can use the supplied ExceptionT transformer to provide exception-throwing capabilities. Some functions in conduit, for example, will require this (e.g., text decoding).

ResourceIO

A convenience class tying together a bunch of other classes, included the two mentioned above. This is purely for convenience; you could achieve the same effect without this type class, you'd just have to do a lot more typing.

Forking

It would seem that forking a thread would be inherently unsafe with ResourceT, since the parent thread may call runResourceT while the child thread is still accessing some of the allocated resources. This is indeed true, if you use the normal forkIO function.

In order to solve this, ResourceT includes reference counting. When you fork a new thread via resourceForkIO, the RefCount value of the ReleaseMap is incremented. Every time runResourceT is called, the value is decremented. Only when the value hits 0 are all the release actions called.

Convenience Exports

In addition to what's been listed so far, there are a few extra functions exported (mostly) for convenience.

  • newRef, writeRef, and readRef wrap up the HasRef versions of the functions and allow them to run in any ResourceT.

  • withIO is essentially a type-restricted version of with, but working around some of the nastiness with types you would otherwise run into. In general: you'll want to use withIO when writing IO code.

  • transResourceT let's you modify which monad your ResourceT is running in, assuming it keeps the same base.

    transResourceT :: (Base m ~ Base n)
                   => (m a -> n a)
                   -> ResourceT m a
                   -> ResourceT n a
    transResourceT f (ResourceT mx) = ResourceT (\r -> f (mx r))

Source

I think it's simplest to understand sources by looking at the types:

data SourceResult m a = Open (Source m a) a | Closed
data Source m a = Source
    { sourcePull :: ResourceT m (SourceResult m a)
    , sourceClose :: ResourceT m ()
    }

A source has just two operations on it: you can pull data from it, and you can close it (think of closing a file handle). When you pull, you either get some data and the a new Source (the source is still open), or nothing (the source is closed). Let's look at some of the simplest sources:

import Prelude hiding (repeat)
import Data.Conduit

-- | Never give any data
eof :: Monad m => Source m a
eof = Source
    { sourcePull = return Closed
    , sourceClose = return ()
    }

-- | Always give the same value
repeat :: Monad m => a -> Source m a
repeat a = Source
    { sourcePull = return $ Open (repeat a) a
    , sourceClose = return ()
    }

These sources are very straight-forward, since they always return the same results. Additionally, their close records don't do anything. You might think that this is a bug: shouldn't a call to sourcePull return Closed after it's been closed? This isn't required, since one of the rules of sources is that they can never be reused. In other words:

  • If a Source returns Open, it has provided you with a new Source which you should use in place of the original one.

  • If it returns Closed, then you cannot perform any more operations on it.

Don't worry too much about the invariant. In practice, you will almost never call sourcePull or sourceClose yourself. In fact, you hardly even write them yourself either (that's what sourceState and sourceIO are for). The point is that we can make some assumptions when we implement our sources.

State

There is something similar about the two sources mentioned above: they never change. They always return the same value. In other words, they have no state. For almost all serious sources, we'll need some kind of state.

The way we store state in a source is by updating the returned Source value in the Open constructor. This is best seen with an example.

import Data.Conduit
import Control.Monad.Trans.Resource

-- | Provide data from the list, one element at a time.
sourceList :: Resource m => [a] -> Source m a
sourceList list = Source
    { sourcePull =
        case list of
            [] -> return Closed -- no more data

            -- This is where we store our state: by return a new
            -- Source with the rest of the list
            x:xs -> return $ Open (sourceList xs) x
        , sourceClose = return ()
        }

Each time we pull from the source, it checks the input list. If the list is empty, pulling returns Closed, which makes sense. If the list is not empty, pulling returns Open with both the next value in the list, and a new Source value containing the rest of the input list.

sourceState and sourceIO

In addition to being able to manually create Sources, we also have a few convenience functions that allow us to create most sources in a more high-level fashion. sourceState let's you write code similar to how you would use the State monad. You provide an initial state, your pull function is provided with the current state, and it returns a new state and a return value. Let's use this to reimplement sourceList.

import Data.Conduit
import Control.Monad.Trans.Resource

-- | Provide data from the list, one element at a time.
sourceList :: Resource m => [a] -> Source m a
sourceList state0 = sourceState
    state0
    pull
  where
    pull [] = return StateClosed
    pull (x:xs) = return $ StateOpen xs x

Notice the usage of the StateClosed and StateOpen constructors. These are very similar to Closed and Open, except that instead of specifying the next Source to be used, you provide the next state (here, the remainder of the list).

The other common activity is to perform some I/O allocation (like opening a file), registering some cleanup action (closing that file), and having a function for pulling data from that resource. conduit comes built-in with a sourceFile function that gives a stream of ByteStrings. Let's write a wildly inefficient alternative that returns a stream of characters.

import Data.Conduit
import Control.Monad.Trans.Resource
import System.IO
import Control.Monad.IO.Class (liftIO)

sourceFile :: ResourceIO m => FilePath -> Source m Char
sourceFile fp = sourceIO
    (openFile fp ReadMode)
    hClose
    (\h -> liftIO $ do
        eof <- hIsEOF h
        if eof
            then return IOClosed
            else fmap IOOpen $ hGetChar h)

Like sourceState, it uses a variant on the Open and Closed constructors. sourceIO does a number of things for us:

  • It registers the cleanup function with the ResourceT transformer, ensuring it gets called even in the presence of exceptions.

  • It sets up the sourceClose record to release the resource immediately.

  • As soon as you return IOClosed, it will release the resource.

Sinks

A sink consumes a stream of data, and produces a result. A sink must always produce a result, and must always produce a single result. This is encoded in the types themselves.

There is a Monad instance for sink, making it simple to compose multiple sinks together into a larger sink. You can also use the built-in sink functions to perform most of your work. Like sources, you'll rarely need to dive into the inner workings. Let's start off with an example: getting lines from a stream of Chars (we'll assume Unix line endings for simplicity).

import Data.Conduit
import qualified Data.Conduit.List as CL

-- Get a single line from the stream.
sinkLine :: Resource m => Sink Char m String
sinkLine = sinkState
    id -- initial state, nothing at the beginning of the line
    push
    close
  where
    -- On a new line, return the contents up until here
    push front '\n' =
        return $ StateDone Nothing $ front []

    -- Just another character, add it to the front and keep going
    push front char =
        return $ StateProcessing $ front . (char:)

    -- Got an EOF before hitting a newline, just give what we have so far
    close front = return $ front []

-- Get all the lines from the stream, until we hit a blank line or EOF.
sinkLines :: Resource m => Sink Char m [String]
sinkLines = do
    line <- sinkLine
    if null line
        then return []
        else do
            lines <- sinkLines
            return $ line : lines

content :: String
content = unlines
    [ "This is the first line."
    , "Here's the second."
    , ""
    , "After the blank."
    ]

main :: IO ()
main = do
    lines <- runResourceT $ CL.sourceList content $$ sinkLines
    mapM_ putStrLn lines

Running this sample produces the expected output:

This is the first line.
Here's the second.

sinkLine demonstrates usage of the sinkState function, which is very similar to the sourceState function we just saw. It takes three arguments: an initial state, a push function (takes the current state and next input, and returns a new state and result) and a close function (takes the current state and returns an output). As opposed to sourceState- which doesn't need a close function- a sink is required to always return a result.

Our push function has two clauses. When it gets a newline character, it indicates that processing is complete via StateDone. The Nothing indicates that there is no leftover input (we'll discuss that later). It also gives an output of all the characters it has received. The second clause simply appends the new character to the existing state and indicates that we are still working via StateProcessing. The close function returns all characters.

sinkLines shows how we can use the monadic interface to produce new sinks. If you replace sinkLine with getLine, this would look like standard code to pull lines from standard input. This familiar interface should make it easy to get up and running quickly.

Types

The types for sinks are just a bit more involved than sources. Let's have a look:

type SinkPush input m output = input -> ResourceT m (SinkResult input m output)
type SinkClose m output = ResourceT m output

data SinkResult input m output =
    Processing (SinkPush input m output) (SinkClose m output)
  | Done (Maybe input) output

data Sink input m output =
    SinkNoData output
  | SinkData
        { sinkPush :: SinkPush input m output
        , sinkClose :: SinkClose m output
        }
  | SinkLift (ResourceT m (Sink input m output))

Whenever a sink is pushed to, it can either say it needs more data (Processing) or say it's all done. When still processing, it must provided updated push and close function; when done, it returns any leftover inut and the output. Fairly straight-forward.

The first real "gotcha" is the three constructors for Sink. Why do we need SinkNoData: aren't sinks all about consuming data? The answer is that we need it to efficiently implement our Monad instance. When we use return, we're giving back a value that requires no data in order to compute it. We could model this with the SinkData constructor, with something like:

myReturn a = SinkData (\input -> return (Done (Just input) a)) (return a)

But doing so would force reading in an extra bit of input that we don't need right now, and possibly will never need. (Have a look again at the sinkLines example.) So instead, we have an extra constructor to indicate that no input is required. Likewise, SinkLift is provided in order to implement an efficient MonadTrans instance.

Sinks: no helpers

Let's try to implement some sinks on the "bare metal", without any helper functions.

import Data.Conduit
import System.IO
import Control.Monad.Trans.Resource
import Control.Monad.IO.Class (liftIO)

-- Consume all input and discard it.
sinkNull :: Resource m => Sink a m ()
sinkNull =
    SinkData push close
  where
    push _ignored = return $ Processing push close
    close = return ()

-- Let's stream characters to a file. Here we do need some kind of
-- initialization. We do this by initializing in a push function,
-- and then returning a different push function for subsequent
-- calls. By using withIO, we know that the handle will be closed even
-- if there's an exception.
sinkFile :: ResourceIO m => FilePath -> Sink Char m ()
sinkFile fp =
    SinkData pushInit closeInit
  where
    pushInit char = do
        (releaseKey, handle) <- withIO (openFile fp WriteMode) hClose
        push releaseKey handle char
    closeInit = do
        -- Never opened a file, so nothing to do here
        return ()

    push releaseKey handle char = do
        liftIO $ hPutChar handle char
        return $ Processing (push releaseKey handle) (close releaseKey handle)

    close releaseKey _ = do
        -- Close the file handle as soon as possible.
        return ()

-- And we'll count how many values were in the stream.
count :: Resource m => Sink a m Int
count =
    SinkData (push 0) (close 0)
  where
    push count _ignored =
        return $ Processing (push count') (close count')
      where
        count' = count + 1

    close count = return count

Nothing is particularly complicated to implement. You should notice a common pattern here: declaring your push and close functions in a where clause, and then using them twice: once for the initial SinkData, and once for the Processing constructor. This can become a bit tedious; that's why we have helper functions.

Sinks: with helpers

Let's rewrite sinkFile and count to take advantage of the helper functions sinkIO and sinkState, respectively.

import Data.Conduit
import System.IO
import Control.Monad.IO.Class (liftIO)

-- We never have to touch the release key directly, sinkIO automatically
-- releases our resource as soon as we return IODone from our push function,
-- or sinkClose is called.
sinkFile :: ResourceIO m => FilePath -> Sink Char m ()
sinkFile fp = sinkIO
    (openFile fp WriteMode)
    hClose
    -- push: notice that we are given the handle and the input
    (\handle char -> do
        liftIO $ hPutChar handle char
        return IOProcessing)
    -- close: we're also given the handle, but we don't use it
    (\_handle -> return ())

-- And we'll count how many values were in the stream.
count :: Resource m => Sink a m Int
count = sinkState
    0
    -- The push function gets both the current state and the next input...
    (\state _ignored ->
        -- and it returns the new state
        return $ StateProcessing $ state + 1)
    -- The close function gets the final state and returns the output.
    (\state -> return state)

Nothing dramatic, just slightly shorter, less error-prone code. Using these two helper functions is highly recommended, as it ensures proper resource management and state updating.

List functions

As easy as it is to write your own sinks, you'll likely want to take advantage of the built-in sinks available in the Data.Conduit.List module. These provide analogues to common list functions, like folding. (The module also has some Conduits, like map.)

If you're looking for some way to practice with conduits, reimplementing the functions in the List module- both with and without the helper functions- would be a good start.

Let's look at some simple things we can make out of the built-in sinks.

import Data.Conduit
import qualified Data.Conduit.List as CL
import Control.Monad.IO.Class (liftIO)

-- A sum function.
sum' :: Resource m => Sink Int m Int
sum' = CL.fold (+) 0

-- Print every input value to standard output.
printer :: (Show a, ResourceIO m) => Sink a m ()
printer = CL.mapM_ (liftIO . print)

-- Sum up all the values in a stream after the first five.
sumSkipFive :: Resource m => Sink Int m Int
sumSkipFive = do
    CL.drop 5
    CL.fold (+) 0

-- Print each input number and sum the total
printSum :: ResourceIO m => Sink Int m Int
printSum = do
    total <- CL.foldM go 0
    liftIO $ putStrLn $ "Sum: " ++ show total
    return total
  where
    go accum int = do
        liftIO $ putStrLn $ "New input: " ++ show int
        return $ accum + int

Connecting

At the end of the day, we're actually going to want to use our sinks. While we could manually call sinkPush and sinkClose, it's tedious. For example:

main :: IO ()
main = runResourceT $ do
    res <-
        case printSum of
            SinkData push close -> loop [1..10] push close
            SinkNoData res -> return res
    liftIO $ putStrLn $ "Got a result: " ++ show res
  where
    loop [] _push close = close
    loop (x:xs) push close = do
        mres <- push x
        case mres of
            Done _leftover res -> return res
            Processing push' close' -> loop xs push' close'

Instead, the recommended approach is to connect your sink to a source. Not only is this simpler, it's less error prone, and means you have a lot of flexibility in where your data is coming from. To rewrite the example above:

main :: IO ()
main = runResourceT $ do
    res <- CL.sourceList [1..10] $$ printSum
    liftIO $ putStrLn $ "Got a result: " ++ show res

Connecting takes care of testing for the sink constructor (SinkData versus SinkNoData versus SinkLift), pulling from the source, and pushing to/closing the sink.

However, there is one thing I wanted to point out from the long-winded example. On the second to last line, we ignore the leftover value of Done. This brings up the issue of data loss. This is an important topic that has had a lot of thought put into it. Unfortunately, we can't fully cover it yet, as we haven't discussed the main culprit in the drama: Conduits (the type, not the package).

But as a quick note here, the leftover value from the Done constructor is not always ignored. The Monad instance, for example, uses it to pass data from one sink to the next in a binding. And in fact, the real connect operator doesn't always throw away the leftovers. When we cover resumable sources later, we'll see that the leftover value is put back on the buffer to allow later sinks reusing an existing source to pull the value.

Conduit

This section covers the final major datatype in our package, conduits. While sources produce a stream of data and sinks consume a stream, conduits transform a stream.

Types

As we did previously, let's start off by looking at the types involved.

data ConduitResult input m output =
    Producing (Conduit input m output) [output]
  | Finished (Maybe input) [output]

data Conduit input m output = Conduit
    { conduitPush :: input -> ResourceT m (ConduitResult input m output)
    , conduitClose :: ResourceT m [output]
    }

This should look very similar to what we've seen with sinks. A conduit can be pushed to, in which case it returns a result. A result either indicates that it is still producing data, or that it is finished. When a conduit is closed, it returns some more output.

But let's examine the idiosyncracies a bit. Like sinks, we can only push one piece of input at a time, and leftover data may be 0 or 1 pieces. However, there are a few changes:

  • When producing (the equivalent of processing for a sink), we can return output. This is because a conduit will product a new stream of output instead of producing a single output value at the end of processing.

  • A sink always returns a single output value, while a conduit returns 0 or more outputs (a list). To understand why, consider conduits such as concatMap (produces multiple outputs for one input) and filter (returns 0 or 1 output for each input).

  • We have no special constructor like SinkNoData. That's because we provide no Monad instance for conduits. We'll see later how you can still use a familiar Monadic approach to creating conduits.

Overall conduits should seem very similar to what we've covered so far.

Simple conduits

We'll start off by defining some simple conduits that don't have any state.

import Prelude hiding (map, concatMap)
import Data.Conduit

-- A simple conduit that just passes on the data as-is.
passThrough :: Monad m => Conduit input m input
passThrough = Conduit
    { conduitPush = \input -> return $ Producing passThrough [input]
    , conduitClose = return []
    }

-- map values in a stream
map :: Monad m => (input -> output) -> Conduit input m output
map f = Conduit
    { conduitPush = \input -> return $ Producing (map f) [f input]
    , conduitClose = return []
    }

-- map and concatenate
concatMap :: Monad m => (input -> [output]) -> Conduit input m output
concatMap f = Conduit
    { conduitPush = \input -> return $ Producing (concatMap f) $ f input
    , conduitClose = return []
    }

Stateful conduits

Of course, not all conduits can be declared without state. Doing so on the bare metal is not too difficult.

import Prelude hiding (reverse)
import qualified Data.List
import Data.Conduit
import Control.Monad.Trans.Resource

-- Reverse the elements in the stream. Note that this has the same downside as
-- the standard reverse function: you have to read the entire stream into
-- memory before producing any output.
reverse :: Resource m => Conduit input m input
reverse =
    mkConduit []
  where
    mkConduit state = Conduit (push state) (close state)
    push state input = return $ Producing (mkConduit $ input : state) []
    close state = return state

-- Same thing with sort: it will pull everything into memory
sort :: (Ord input, Resource m) => Conduit input m input
sort =
    mkConduit []
  where
    mkConduit state = Conduit (push state) (close state)
    push state input = return $ Producing (mkConduit $ input : state) []
    close state = return $ Data.List.sort state

But we can do better. Just like sourceState and sinkState, we have conduitState to simplify things.

import Prelude hiding (reverse)
import qualified Data.List
import Data.Conduit

-- Reverse the elements in the stream. Note that this has the same downside as
-- the standard reverse function: you have to read the entire stream into
-- memory before producing any output.
reverse :: Resource m => Conduit input m input
reverse =
    conduitState [] push close
  where
    push state input = return $ StateProducing (input : state) []
    close state = return state

-- Same thing with sort: it will pull everything into memory
sort :: (Ord input, Resource m) => Conduit input m input
sort =
    conduitState [] push close
  where
    push state input = return $ StateProducing (input : state) []
    close state = return $ Data.List.sort state

Using conduits

The way Conduits interact with the rest of the package is via fusing. A conduit can be fused into a source, producing a new source, fused into a sink to produce a new sink, or fused with another conduit to produce a new conduit. It's best to just look at the fusion operators.

-- Left fusion: source + conduit = source
($=) :: (Resource m, IsSource src) => src m a -> Conduit a m b -> Source m b

-- Right fusion: conduit + sink = sink
(=$) :: Resource m => Conduit a m b -> Sink b m c -> Sink a m c

-- Middle fusion: conduit + conduit = conduit
(=$=) :: Resource m => Conduit a m b -> Conduit b m c -> Conduit a m c

Using these operators is straightforward.

useConduits = do
    runResourceT
          $  CL.sourceList [1..10]
          $= reverse
          $= CL.map show
          $$ CL.consume

    -- equivalent to
    runResourceT
          $  CL.sourceList [1..10]
          $$ reverse
          =$ CL.map show
          =$ CL.consume

    -- and equivalent to
    runResourceT
          $  CL.sourceList [1..10]
          $$ (reverse =$= CL.map show)
          =$ CL.consume

There is in fact one last way of expressing the same idea. I'll leave it as an exercise to the reader to discover it.

It may seem like all these different approaches are redundant. While occassionally you can in fact choose whichever approach you feel like using, in many cases you will need a specific approach. For example:

  • If you have a stream of numbers, and you want to apply a conduit (e.g., map show) to only some of the stream that will be passed to a specific sink, you'll want to use the right fusion operator.

  • If you're reading a file, and want to parse the entire file as textual data, you'll want to use left fusion to convert the entire stream.

  • If you want to create reusable conduits that combine together individual, smaller conduits, you'll use middle fusion.

Data loss

Let's forget about conduits for a moment. Instead, suppose we want to write a program- using plain old lists- that will take a list of numbers, apply some kind of transformation to them, take the first five transformed values and do something with them, and then do something else with the remaining non-transformed values. For example, we want something like:

main = do
    let list = [1..10]
        transformed = map show list
        (begin, end) = splitAt 5 transformed
        untransformed = map read end
    mapM_ putStrLn begin
    print $ sum untransformed

But clearly this isn't a good general solution, since we don't want to have to transform and then untransform every element in the list. For one thing, we may not always have an inverse function. Another issue is efficiency. In this case, we can write something more efficient:

main = do
    let list = [1..10]
        (begin, end) = splitAt 5 list
        transformed = map show begin
    mapM_ putStrLn transformed
    print $ sum end

Note the change: we perform our split before transforming any elements. This works because, with map, we have a 1-to-1 correspondence between the input and output elements. So splitting at 5 before or after mapping show is the same thing. But what happens if we replace map show with something more devious.

deviousTransform =
    concatMap go
  where
    go 1 = [show 1]
    go 2 = [show 2, "two"]
    go 3 = replicate 5 "three"
    go x = [show x]

We no longer have the 1-to-1 correspondence. As a result, we can't use the second method. But it's even worse: we can't use the first method either, since there's no inverse of our deviousTransform.

There's only one solution to the problem that I'm aware of: transform elements one at a time. The final program looks like this:

deviousTransform 1 = [show 1]
deviousTransform 2 = [show 2, "two"]
deviousTransform 3 = replicate 5 "three"
deviousTransform x = [show x]

transform5 :: [Int] -> ([String], [Int])
transform5 list =
    go [] list
  where
    go output (x:xs)
        | newLen >= 5 = (take 5 output', xs)
        | otherwise = go output' xs
      where
        output' = output ++ deviousTransform x
        newLen = length output'

    -- Degenerate case: not enough input to make 5 outputs
    go output [] = (output, [])

main = do
    let list = [1..10]
        (begin, end) = transform5 list
    mapM_ putStrLn begin
    print $ sum end

The final output of this program is

1
2
two
three
three
49
What's important to note is that the number 3 is converted into five copies of the word "three", yet only two of them show up in the output. The rest are discarded in the take 5 call.

This whole exercise is just to demonstrate the issue of data loss in conduits. By forcing conduits to accept only one input at a time, we avoid the issue of transforming too many elements at once. That doesn't mean we don't lose any data: if a conduit produces too much output for the receiving sink to handle, some of it may be lost.

To put all this another way: conduits avoid chunking to get away from data loss. This is not an issue unique to conduits. If you look in the implementation of concatMapM for enumerator, you'll see that it forces elements to be handled one at a time. In conduits, we opted to force the issue at the type level.

SequencedSink

Suppose we want to be able to combine up existing conduits and sinks to produce a new, more powerful conduit. For example, we want to write a conduit that takes a stream of numbers and sums up every five. In other words, for the input [1..50], it should result in the sequence [15,40,65,90,115,140,165,190,215,240]. We can definitely do this with the low-level conduit interface.

sum5Raw :: Resource m => Conduit Int m Int
sum5Raw =
    conduitState (0, 0) push close
  where
    push (total, count) input
        | newCount == 5 = return $ StateProducing (0, 0) [newTotal]
        | otherwise     = return $ StateProducing (newTotal, newCount) []
      where
        newTotal = total + input
        newCount = count + 1
    close (total, count)
        | count == 0 = return []
        | otherwise  = return [total]

But this is frustrating, since we already have all the tools we need to do this at a high level! There's the fold sink for adding up the numbers, and the isolate conduit which will only allow up to a certain number of elements to be passed to a sink. Can't we combine these somehow?

The answer is a SequencedSink. The idea is to create a normal Sink, except it returns a special output called a SequencedSinkResponse. This value can emit new output, stop processing data, or transfer control to a new conduit. (See the Haddocks for more information.) Then we can turn this into a Conduit using the sequenceSink function. This function also takes some state value that gets passed through to the sink.

So we can rewrite sum5Raw in a much more high-level manner.

sum5 :: Resource m => Conduit Int m Int
sum5 = sequenceSink () $ \() -> do
    nextSum <- CL.isolate 5 =$ CL.fold (+) 0
    return $ Emit () [nextSum]

All of the () in there are simply the unused state variable being passed around, they can be ignored. Otherwise, we're doing exactly what we want. We fuse isolate to fold to get the sum of the next five elements from the stream. We then emit that value, and start all over again.

Let's say we want to modify this slightly. We want to get the first 8 sums, and then pass through the remaining values, multiplied by 2. We can keep track of how many values we've returned in our state, and then use the StartConduit constructor to pass control to the multiply-by-2 conduit next.

sum5Pass :: Resource m => Conduit Int m Int
sum5Pass = sequenceSink 0 $ \count -> do
    if count == 8
        then return $ StartConduit $ CL.map (* 2)
        else do
            nextSum <- CL.isolate 5 =$ CL.fold (+) 0
            return $ Emit (count + 1) [nextSum]

These are obviously very contrived examples, but I hope it makes clear the power and simplicity available from this approach.

Buffering

Buffering is one of the unique features of conduits. With buffering, conduits no longer need to control the flow of your application. In some cases, this can lead to simpler code.

Inversion of Control

Buffering was actually one of the main motivations in the creation of the conduit package. To see its importance, we need to consider the approach we've seen so far, which we'll call inversion of control, or IoC.

Suppose you want to count how many newline characters there are in a file. In the standard imperative approach, you would do someting like:

  1. Open the file

  2. Pull some data into a buffer

  3. Loop over the values in the buffer, incrementing a counter on each newline character

  4. Return to 2

  5. Close the file

Notice that your code is explicitly calling out to other code and that code is returning control back to your code. You have retained full control of the flow of execution of your program. The conduit approach we've seen so far does not work this way. Instead, you would:

  1. Write a sink that counts newlines and adds the result to an accumulator.

  2. Connect the sink to a source

There's no doubt in my mind that this is an easier approach. You don't have to worry about opening and closing files or pulling data from the file. Instead, the data you need to process is simply presented to you. This is the advantage of IoC: you can focus on specifically your piece of the code.

We use this IoC approach all over Haskell: for example, instead of readMVar and putMVar, you can use withMVar. Don't bother with openFile and closeFile, just use withFile and pass in a function that uses the Handle. Even C has a version of this: why malloc and free when you could just alloca?

Actually, that last one is a huge red herring. Of course you can't just use alloca for everything. alloca only allocates memory locally on the stack, not dynamically on the heap. There's no way to return your allocated memory outside the current function.

But actually, the same restriction applies to the whole family of with functions: you can never return an allocated resource outside of the "block". Usually this works out just fine, but we need to recognize that this is a change in how we structure our programs. Often times, with simple examples, this is a minor change. However, in larger settings this can become very difficult to manage, bordering on impossible at times.

A web server

Let's say we're going to write a web server. We're going to use the following low-level operations:

data Socket
recv    :: Socket -> Int -> IO ByteString -- returns empty when the socket is closed
sendAll :: Socket -> ByteString -> IO ()

We're up to the part where we need to implement the function handleConn that handles an individual connection. It will look something like this:

data Request  -- request headers, HTTP version, etc
data Response -- status code, response headers, resposne body
type Application = Request -> IO Response
handleConn :: Application -> Socket -> IO ()

What does our handleConn need to do? In broad strokes:

  1. Parse the request line

  2. Parse the request headers

  3. Construct the Request value

  4. Pass Request to the Application and get back a Response

  5. Send the Response over the Socket

We start off by writing steps 1 and 2 manually, without using conduits. We'll do this very simply and just assume three space-separated strings. We end up with something that looks like:

data RequestLine = RequestLine ByteString ByteString ByteString

parseRequestLine :: Socket -> IO RequestLine
parseRequestLine socket = do
    bs <- recv socket 4096
    let (method:path:version:ignored) = S8.words bs
    return $ RequestLine method path version

There are two issues here: it doesn't handle the case where there are less than three words in the chunk of data, and it throws away any extra data. We can definitely solve both of these issues manually, but it's very tedious. It's much easier to implement this in terms of conduits.

import Data.ByteString (ByteString)
import qualified Data.ByteString as S
import Data.Conduit
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.List as CL

data RequestLine = RequestLine ByteString ByteString ByteString

parseRequestLine :: Sink ByteString IO RequestLine
parseRequestLine = do
    let space = toEnum $ fromEnum ' '
    let getWord = do
            CB.dropWhile (== space)
            bss <- CB.takeWhile (/= space) =$ CL.consume
            return $ S.concat bss

    method <- getWord
    path <- getWord
    version <- getWord
    return $ RequestLine method path version

This means that our code will automatically be supplied with more data as it comes in, and any extra data will automatically be buffered in the Source, ready for the next time it's used. Now we can easily structure our program together, demonstrating the power of the conduits approach:

import Data.ByteString (ByteString)
import Data.Conduit
import Data.Conduit.Network (sourceSocket)
import Control.Monad.IO.Class (liftIO)
import Network.Socket (Socket)

data RequestLine = RequestLine ByteString ByteString ByteString
type Headers = [(ByteString, ByteString)]
data Request = Request RequestLine Headers
data Response = Response
type Application = Request -> IO Response

parseRequestHeaders :: Sink ByteString IO Headers
parseRequestHeaders = undefined

parseRequestLine :: Sink ByteString IO RequestLine
parseRequestLine = undefined

sendResponse :: Socket -> Response -> IO ()
sendResponse = undefined

handleConn :: Application -> Socket -> IO ()
handleConn app socket = do
    req <- runResourceT $ sourceSocket socket $$ do
        requestLine <- parseRequestLine
        headers <- parseRequestHeaders
        return $ Request requestLine headers
    res <- liftIO $ app req
    liftIO $ sendResponse socket res

Whither the request body?

This is all great, until we realize we can't read the request body. The Application is simply given the Request, and lives in the IO monad. It has no access whatsoever to the incoming stream of data.

There's an easy fix for this actually: have the Application live in the Sink monad. This is the very approach we took with enumerator-based WAI 0.4. However, there are two problems:

  • People find it confusing. What people expect is that the Request value would have a requestBody value of type Source.

  • This makes certain kinds of usage incredibly difficult. For example, trying to write an HTTP proxy combining WAI and http-enumerator proved to be almost impossible.

This is the downside of inversion of control. Our code wants to be in control. It wants to be given something to pull from, something to push to, and run with it. We need some solution to the problem.

The simplest solution would be to just create a new Source and pass that to the Application. Unfortunately, this will cause problems with our buffering. You see, when we connect our source to the parseRequestLine and parseRequestHeaders sinks, it made a call to recv. If the data it received was not enough to cover all of the headers, it would issue another call. When it had enough data, it would stop. However, odds are that it didn't stop exactly at the end of the headers. It likely consumed a bit of the request body as well.

If we just create a new source and pass that to the request, it will be missing the beginning of the request body. We need some way to pass that buffered data along.

BufferedSource

And so we finally get to introduce the last data type in conduits: BufferedSource. This is an abstract data type, but all it really does is keep a mutable reference to a buffer and an underlying Source. In order to create one of these, you use the bufferSource function.

bufferSource ::Resource m => Source m a -> ResourceT m (BufferedSource m a)

This one little change is what allows us to easily solve our web server dilemna. Instead of connecting a Source to our parsing Sinks, we use a BufferedSource. At the end of each connection, any leftover data is put back on the buffer. For our web server case, we can now create a BufferedSource, use that to read the request line and headers, and then pass that same BufferedSource to the application for reading the request body.

Typeclass

We want to be able to connect a buffered source to a sink, just like we would a regular source. We would also like to be able to fuse it to a conduit. In order to make this convenient, conduit has a typeclass, IsSource. There are instances provided for both Source and BufferedSource. Both the connect ($$) and left-fuse ($=) operators use this typeclass.

There's one "gotcha" in the BufferedSource instance of this typeclass, so let's explain it. Suppose we want to write a file copy function, without any buffering. This is a fairly standard usage of conduits:

sourceFile input $$ sinkFile output

When this line is run, both the input and output files are opened, the data is copied, and then both files are closed. Let's change this example slightly to use buffering:

bsrc <- bufferSource $ sourceFile input
bsrc $$ isolate 50 =$ sinkFile output1
bsrc $$ sinkFile output2

When is the input file opened and closed? The opening occurs on the first line, when buffering the source. And if we follow the normal rules from sources, the file should be closed after the second line. However, if we did that, we couldn't reuse bsrc for line 3!

So instead, $$ does not close the file. As a result, you can pass a buffered source to as many actions as you want, without concerns that the file handle has been closed out from under you.

This presents one caveat: when you're finished with a buffered source, you should manually call bsourceClose on it. However, as usual, this is merely an optimization, as the source will automatically be closed when runResourceT is called.

Recapping the web server

So what exactly does our web server look like now?

import Data.ByteString (ByteString)
import Data.Conduit
import Data.Conduit.Network (sourceSocket)
import Control.Monad.IO.Class (liftIO)
import Network.Socket (Socket)

data RequestLine = RequestLine ByteString ByteString ByteString
type Headers = [(ByteString, ByteString)]
data Request = Request RequestLine Headers (BufferedSource IO ByteString)
data Response = Response
type Application = Request -> ResourceT IO Response

parseRequestHeaders :: Sink ByteString IO Headers
parseRequestHeaders = undefined

parseRequestLine :: Sink ByteString IO RequestLine
parseRequestLine = undefined

sendResponse :: Socket -> Response -> IO ()
sendResponse = undefined

handleConn :: Application -> Socket -> IO ()
handleConn app socket = runResourceT $ do
    bsrc <- bufferSource $ sourceSocket socket
    requestLine <- bsrc $$ parseRequestLine
    headers <- bsrc $$ parseRequestHeaders
    let req = Request requestLine headers bsrc
    res <- app req
    liftIO $ sendResponse socket res

We've made a few minor changes. Firstly, the Application now lives in the ResourceT IO monad. This isn't strictly necessary, but it's very convenient: the application can now register cleanup actions that will only take place after the response has been fully sent to the client.

But the major changes are in the handleConn function. We now start off by buffering our source. This buffered source is then used twice in our function, and then passed off to the application.

Note: You are looking at version 1.1 of the book, which is three versions behind

Chapters