Conduits

December 25, 2011

GravatarMichael Snoyman

Conduits

Conduits are a solution to the streaming data problem. They fit the same solution space as enumerators, in that we want to have deterministic resource handling, constant memory usage, and composability. However, conduits have been designed based on a huge amount of experience using enumerators in real life projects. They intend to solve the same problems in a simpler, more robust way.

This post is not intended to be a detailed comparison between enumerators and conduits, though it will be impossible to avoid some comparison. Instead, the goal is to explain the design decisions and usage of conduits, and give enough information that a reader familiar with both concepts can draw his/her own conclusions. In addition, the README file of the conduit repo on Github provides some of the motivation for conduits.

Note that this article will assume a basic understanding of the Resource monad transformer, which was described in a previous article. Also, conduits have not yet been released to Hackage, though the code in question is mostly stable, and is in fact in production use already.

Goals

We often talk vaguely about having streaming, composable data. Let's give some concrete ideas of what we want out of this library:

  • A unified interface for streaming data from and piping data into various places, whether they be files, sockets, or memory-based.
  • Some technique for modifying this stream of data in some way, such as applying HTTP chunking to a stream of bytes of decoding a stream of bytes into a stream of characters.
  • Deterministic resource management. If we read data from a file, we want the file to be closed immediately after an EOF is reached. We should not need to wait for any finalizer to be called. (This is a common complaint against lazy I/O.)
  • Exception safety. All resources should be freed, even in the presence of exceptions, and even if those exceptions are asynchronous.
  • Any part of the conduit pipeline- including the data consumer- should be able to safely acquire scarce resources and know that they will be released. (This is a downside of the enumerator approach, and one of our main design goals.)
  • Conduits should interoperate well with monad transformer stacks. Almost all frameworks today use a monad stack in some way, and we want easy interoperability.
  • The developer should be free to choose the control flow of the program. (This is another complaint against enumerator, which creates a form of "inversion of control.")
  • Finally, conduits should be simple. The majority of this article should be understandable by almost any Haskell developer. If not, I've failed in either my writing or my coding.

Conduits in Five Minutes

While a good understanding of the lower-level mechanics of conduits is advisable, you can get very far without it. Let's start off with some high-level examples. Don't worry if some of the details seem a bit magical right now. We'll cover everything in the course of this series. Let's start with the terminology, and then some sample code.

Source
A producer of data. The data could be in a file, coming from a socket, or in memory as a list. To access this data, we pull from the source.
Sink
A consumer of data. Basic examples would be a sum function (adding up a stream of numbers fed in), a file sink (which writes all incoming bytes to a file), or a socket. We push data into a sink. When the sink finishes processing (we'll explain that later), it returns some value.
Conduit
A transformer of data. The simplest example is a map function, though there are many others. Like a sink, we push data into a conduit. But instead of returning a single value at the end, a conduit can return multiple outputs every time it is pushed to.
Fuse
(Thanks to David Mazieres for the term.) A conduit can be fused with a source to produce a new, modified source (the $= operator). For example, you could have a source that reads bytes from a file, and a conduit that decodes bytes into text. If you fuse them together, you would now have a source that reads text from a file. Likewise, a conduit and a sink can fuse into a new sink (=$), and two conduits can fuse into a new conduit (=$=).
Connect
You can connect a source to a sink using the $$ operator. Doing so will pull data from the source and push it to the sink, until either the source or sink signals that they are "done."

Let's see some examples of conduit code.

{-# LANGUAGE OverloadedStrings #-}
import Data.Conduit -- the core library
import qualified Data.Conduit.List as CL -- some list-like functions
import qualified Data.Conduit.Binary as CB -- bytes
import qualified Data.Conduit.Text as CT

import Data.ByteString (ByteString)
import Data.Text (Text)
import qualified Data.Text as T
import Control.Monad.ST (runST)

-- Let's start with the basics: connecting a source to a sink. We'll use the
-- built in file functions to implementing efficient, constant-memory,
-- resource-friendly file copying.
--
-- Two things to note: we use $$ to connect our source to our sink, and then
-- use runResourceT.
copyFile :: FilePath -> FilePath -> IO ()
copyFile src dest = runResourceT $ CB.sourceFile src $$ CB.sinkFile dest


-- The Data.Conduit.List module provides a number of helper functions for
-- creating sources, sinks, and conduits. Let's look at a typical fold: summing
-- numbers.
sumSink :: Resource m => Sink Int m Int
sumSink = CL.fold (+) 0

-- If we want to go a little more low-level, we can code our sink with the
-- sinkState function. This function takes three parameters: an initial state,
-- a push function (receive some more data), and a close function.
sumSink2 :: Resource m => Sink Int m Int
sumSink2 = sinkState
    0 -- initial value

    -- update the state with the new input and
    -- indicate that we want more input
    (\accum i -> return $ StateProcessing (accum + i))
    (\accum -> return accum) -- return the current accum value on close

-- Another common helper function is sourceList. Let's see how we can combine
-- that function with our sumSink to reimplement the built-in sum function.
sum' :: [Int] -> Int
sum' input = runST $ runResourceT $ CL.sourceList input $$ sumSink

-- Since this is Haskell, let's write a source to generate all of the
-- Fibonacci numbers. We'll use sourceState. The state will contain the next
-- two numbers in the sequence. We also need to provide a pull function, which
-- will return the next number and update the state.
fibs :: Resource m => Source m Int
fibs = sourceState
    (0, 1) -- initial state
    (\(x, y) -> return $ StateOpen (y, x + y) x)

-- Suppose we want to get the sum of the first 10 Fibonacci numbers. We can use
-- the isolate conduit to make sure the sum sink only consumes 10 values.
sumTenFibs :: Int
sumTenFibs =
        runST -- runs fine in pure code
      $ runResourceT
      $ fibs
    $= CL.isolate 10 -- fuse the source and conduit into a source
    $$ sumSink

-- We can also fuse the conduit into the sink instead, we just swap a few
-- operators.
sumTenFibs2 :: Int
sumTenFibs2 =
        runST
      $ runResourceT
      $ fibs
    $$ CL.isolate 10
    =$ sumSink

-- Alright, let's make some conduits. Let's turn our numbers into text. Sounds
-- like a job for a map...

intToText :: Int -> Text -- just a helper function
intToText = T.pack . show

textify :: Resource m => Conduit Int m Text
textify = CL.map intToText

-- Like previously, we can use a conduitState helper function. But here, we
-- don't even need state, so we provide a dummy state value.
textify2 :: Resource m => Conduit Int m Text
textify2 = conduitState
    ()
    (\() input -> return $ StateProducing () [intToText input])
    (\() -> return [])

-- Let's make the unlines conduit, that puts a newline on the end of each piece
-- of input. We'll just use CL.map; feel free to write it with conduitState as
-- well for practice.
unlines' :: Resource m => Conduit Text m Text
unlines' = CL.map $ \t -> t `T.append` "\n"

-- And let's write a function that prints the first N fibs to a file. We'll
-- use UTF8 encoding.
writeFibs :: Int -> FilePath -> IO ()
writeFibs count dest =
      runResourceT
    $ fibs
   $= CL.isolate count
   $= textify
   $= unlines'
   $= CT.encode CT.utf8
   $$ CB.sinkFile dest

-- We used the $= operator to fuse the conduits into the sources, producing a
-- single source. We can also do the opposite: fuse the conduits into the sink. We can even combine the two.
writeFibs2 :: Int -> FilePath -> IO ()
writeFibs2 count dest =
      runResourceT
    $ fibs
   $= CL.isolate count
   $= textify
   $$ unlines'
   =$ CT.encode CT.utf8
   =$ CB.sinkFile dest

-- Or we could fuse all those inner conduits into a single conduit...
someIntLines :: ResourceThrow m -- encoding can throw an exception
             => Int
             -> Conduit Int m ByteString
someIntLines count =
      CL.isolate count
  =$= textify
  =$= unlines'
  =$= CT.encode CT.utf8

-- and then use that conduit
writeFibs3 :: Int -> FilePath -> IO ()
writeFibs3 count dest =
      runResourceT
    $ fibs
   $= someIntLines count
   $$ CB.sinkFile dest

main :: IO ()
main = do
    putStrLn $ "First ten fibs: " ++ show sumTenFibs
    writeFibs 20 "fibs.txt"
    copyFile "fibs.txt" "fibs2.txt"

Source

I think it's simplest to understand sources by looking at the types:

data SourceResult m a = Open (Source m a) a | Closed
data Source m a = Source
    { sourcePull :: ResourceT m (SourceResult m a)
    , sourceClose :: ResourceT m ()
    }

A source has just two operations on it: you can pull data from it, and you can close it (think of closing a file handle). When you pull, you either get some data and the a new Source (the source is still open), or nothing (the source is closed). Let's look at some of the simplest sources:

import Prelude hiding (repeat)
import Data.Conduit

-- | Never give any data
eof :: Monad m => Source m a
eof = Source
    { sourcePull = return Closed
    , sourceClose = return ()
    }

-- | Always give the same value
repeat :: Monad m => a -> Source m a
repeat a = Source
    { sourcePull = return $ Open (repeat a) a
    , sourceClose = return ()
    }

These sources are very straight-forward, since they always return the same results. Additionally, their close records don't do anything. You might think that this is a bug: shouldn't a call to sourcePull return Closed after it's been closed? This isn't required, since one of the rules of sources is that they can never be reused. In other words:

  • If a Source returns Open, it has provided you with a new Source which you should use in place of the original one.
  • If it returns Closed, then you cannot perform any more operations on it.

Don't worry too much about the invariant. In practice, you will almost never call sourcePull or sourceClose yourself. In fact, you hardly even write them yourself either (that's what sourceState and sourceIO are for). The point is that we can make some assumptions when we implement our sources.

State

There is something similar about the two sources mentioned above: they never change. They always return the same value. In other words, they have no state. For almost all serious sources, we'll need some kind of state.

The way we store state in a source is by updating the returned Source value in the Open constructor. This is best seen with an example.

import Data.Conduit
import Control.Monad.Trans.Resource

-- | Provide data from the list, one element at a time.
sourceList :: Resource m => [a] -> Source m a
sourceList list = Source
    { sourcePull =
        case list of
            [] -> return Closed -- no more data

            -- This is where we store our state: by return a new
            -- Source with the rest of the list
            x:xs -> return $ Open (sourceList xs) x
        , sourceClose = return ()
        }

Each time we pull from the source, it checks the input list. If the list is empty, pulling returns Closed, which makes sense. If the list is not empty, pulling returns Open with both the next value in the list, and a new Source value containing the rest of the input list.

sourceState and sourceIO

In addition to being able to manually create Sources, we also have a few convenience functions that allow us to create most sources in a more high-level fashion. sourceState let's you write code similar to how you would use the State monad. You provide an initial state, your pull function is provided with the current state, and it returns a new state and a return value. Let's use this to reimplement sourceList.

import Data.Conduit
import Control.Monad.Trans.Resource

-- | Provide data from the list, one element at a time.
sourceList :: Resource m => [a] -> Source m a
sourceList state0 = sourceState
    state0
    pull
  where
    pull [] = return StateClosed
    pull (x:xs) = return $ StateOpen xs x

Notice the usage of the StateClosed and StateOpen constructors. These are very similar to Closed and Open, except that instead of specifying the next Source to be used, you provide the next state (here, the remainder of the list).

The other common activity is to perform some I/O allocation (like opening a file), registering some cleanup action (closing that file), and having a function for pulling data from that resource. conduit comes built-in with a sourceFile function that gives a stream of ByteStrings. Let's write a wildly inefficient alternative that returns a stream of characters.

import Data.Conduit
import Control.Monad.Trans.Resource
import System.IO
import Control.Monad.IO.Class (liftIO)

sourceFile :: ResourceIO m => FilePath -> Source m Char
sourceFile fp = sourceIO
    (openFile fp ReadMode)
    hClose
    (\h -> liftIO $ do
        eof <- hIsEOF h
        if eof
            then return IOClosed
            else fmap IOOpen $ hGetChar h)

Like sourceState, it uses a variant on the Open and Closed constructors. sourceIO does a number of things for us:

  • It registers the cleanup function with the ResourceT transformer, ensuring it gets called even in the presence of exceptions.
  • It sets up the sourceClose record to release the resource immediately.
  • As soon as you return IOClosed, it will release the resource.

Next time

Everyone loves cliffhangers, right? Well, there's too much information to stick into a single blog post, so here are the topics I'm hoping to cover in the rest of this series:

  • Sinks
  • Connecting sources to sinks
  • Conduits
  • The fuse operators
  • The rules of data loss
  • Buffering/resumable sources
  • Using conduits in pure code
  • Building up conduits from sinks
  • Some real-life examples

Remember that conduit has not yet been officially released, and some of the information in this article may change over time. However, the core concepts are mostly solidified now.

Comments

comments powered by Disqus

Archives