Conduits, part 3: Sinks

December 27, 2011

GravatarMichael Snoyman

Conduits released!

Since the last blog post, I've uploaded the conduit packages to Hackage so that everyone can play along at home. Obviously, this code is very new, and user feedback is highly appreciated. In the same vein, it's entirely possible that there will be changes to the API. That said, a huge amount of code has already been migrated over to conduit (besides the packages below, all of WAI, http-conduit, and a number of XML libraries have been ported), so I feel pretty confident that things are mostly stable.

The code is split into a few different pacakges:

conduit
Base package, including the Resource transformer.
attoparsec-conduit
Turn attoparsec parsers into Sinks.
blaze-builder-conduit
Convert a stream of Builders into a stream of ByteStrings.
filesystem-conduit
Traverse folders, and convenience adapters for the system-filepath package.
zlib-conduit
Compress and decompress streams of bytes.

Quick Review

This is part 3 in the conduits series. The first two parts were:

  1. The Resource monad transformers
  2. Conduits- an overview, and in-depth coverage of sources

To give a basic overview: you use runResourceT to run a Resource block, which ensures all allocated resources are freed. The conduit package always lives inside a Resource block, which ensures that resources are freed. Sources produce data, sinks consume data, and conduits transform data. You can fuse conduits into sources, into sinks, or with other conduits. And finally, you can connect a source to a sink to produce a result.

That should be enough information to cover this post, though reading the previous posts- and especially the second one- is highly recommended.

Sinks

A sink consumes a stream of data, and produces a result. A sink must always produce a result, and must always produce a single result. This is encoded in the types themselves.

There is a Monad instance for sink, making it simple to compose multiple sinks together into a larger sink. You can also use the built-in sink functions to perform most of your work. Like sources, you'll rarely need to dive into the inner workings. Let's start off with an example: getting lines from a stream of Chars (we'll assume Unix line endings for simplicity).

import Data.Conduit
import qualified Data.Conduit.List as CL

-- Get a single line from the stream.
sinkLine :: Resource m => Sink Char m String
sinkLine = sinkState
    id -- initial state, nothing at the beginning of the line
    push
    close
  where
    -- On a new line, return the contents up until here
    push front '\n' =
        return $ StateDone Nothing $ front []

    -- Just another character, add it to the front and keep going
    push front char =
        return $ StateProcessing $ front . (char:)

    -- Got an EOF before hitting a newline, just give what we have so far
    close front = return $ front []

-- Get all the lines from the stream, until we hit a blank line or EOF.
sinkLines :: Resource m => Sink Char m [String]
sinkLines = do
    line <- sinkLine
    if null line
        then return []
        else do
            lines <- sinkLines
            return $ line : lines

content :: String
content = unlines
    [ "This is the first line."
    , "Here's the second."
    , ""
    , "After the blank."
    ]

main :: IO ()
main = do
    lines <- runResourceT $ CL.sourceList content $$ sinkLines
    mapM_ putStrLn lines

Running this sample produces the expected output:

This is the first line.
Here's the second.

sinkLine demonstrates usage of the sinkState function, which is very similar to the sourceState function we just saw. It takes three arguments: an initial state, a push function (takes the current state and next input, and returns a new state and result) and a close function (takes the current state and returns an output). As opposed to sourceState- which doesn't need a close function- a sink is required to always return a result.

Our push function has two clauses. When it gets a newline character, it indicates that processing is complete via StateDone. The Nothing indicates that there is no leftover input (we'll discuss that later). It also gives an output of all the characters it has received. The second clause simply appends the new character to the existing state and indicates that we are still working via StateProcessing. The close function returns all characters.

sinkLines shows how we can use the monadic interface to produce new sinks. If you replace sinkLine with getLine, this would look like standard code to pull lines from standard input. This familiar interface should make it easy to get up and running quickly.

Types

The types for sinks are just a bit more involved than sources. Let's have a look:

type SinkPush input m output = input -> ResourceT m (SinkResult input m output)
type SinkClose m output = ResourceT m output

data SinkResult input m output =
    Processing (SinkPush input m output) (SinkClose m output)
  | Done (Maybe input) output

data Sink input m output =
    SinkNoData output
  | SinkData
        { sinkPush :: SinkPush input m output
        , sinkClose :: SinkClose m output
        }
  | SinkLift (ResourceT m (Sink input m output))

Whenever a sink is pushed to, it can either say it needs more data (Processing) or say it's all done. When still processing, it must provided updated push and close function; when done, it returns any leftover inut and the output. Fairly straight-forward.

The first real "gotcha" is the three constructors for Sink. Why do we need SinkNoData: aren't sinks all about consuming data? The answer is that we need it to efficiently implement our Monad instance. When we use return, we're giving back a value that requires no data in order to compute it. We could model this with the SinkData constructor, with something like:

myReturn a = SinkData (\input -> return (Done (Just input) a)) (return a)

But doing so would force reading in an extra bit of input that we don't need right now, and possibly will never need. (Have a look again at the sinkLines example.) So instead, we have an extra constructor to indicate that no input is required. Likewise, SinkLift is provided in order to implement an efficient MonadTrans instance.

Sinks: no helpers

Let's try to implement some sinks on the "bare metal", without any helper functions.

import Data.Conduit
import System.IO
import Control.Monad.Trans.Resource
import Control.Monad.IO.Class (liftIO)

-- Consume all input and discard it.
sinkNull :: Resource m => Sink a m ()
sinkNull =
    SinkData push close
  where
    push _ignored = return $ Processing push close
    close = return ()

-- Let's stream characters to a file. Here we do need some kind of
-- initialization. We do this by initializing in a push function,
-- and then returning a different push function for subsequent
-- calls. By using withIO, we know that the handle will be closed even
-- if there's an exception.
sinkFile :: ResourceIO m => FilePath -> Sink Char m ()
sinkFile fp =
    SinkData pushInit closeInit
  where
    pushInit char = do
        (releaseKey, handle) <- withIO (openFile fp WriteMode) hClose
        push releaseKey handle char
    closeInit = do
        -- Never opened a file, so nothing to do here
        return ()

    push releaseKey handle char = do
        liftIO $ hPutChar handle char
        return $ Processing (push releaseKey handle) (close releaseKey handle)

    close releaseKey _ = do
        -- Close the file handle as soon as possible.
        return ()

-- And we'll count how many values were in the stream.
count :: Resource m => Sink a m Int
count =
    SinkData (push 0) (close 0)
  where
    push count _ignored =
        return $ Processing (push count') (close count')
      where
        count' = count + 1

    close count = return count

Nothing is particularly complicated to implement. You should notice a common pattern here: declaring your push and close functions in a where clause, and then using them twice: once for the initial SinkData, and once for the Processing constructor. This can become a bit tedious; that's why we have helper functions.

Sinks: with helpers

Let's rewrite sinkFile and count to take advantage of the helper functions sinkIO and sinkState, respectively.

import Data.Conduit
import System.IO
import Control.Monad.IO.Class (liftIO)

-- We never have to touch the release key directly, sinkIO automatically
-- releases our resource as soon as we return IODone from our push function,
-- or sinkClose is called.
sinkFile :: ResourceIO m => FilePath -> Sink Char m ()
sinkFile fp = sinkIO
    (openFile fp WriteMode)
    hClose
    -- push: notice that we are given the handle and the input
    (\handle char -> do
        liftIO $ hPutChar handle char
        return IOProcessing)
    -- close: we're also given the handle, but we don't use it
    (\_handle -> return ())

-- And we'll count how many values were in the stream.
count :: Resource m => Sink a m Int
count = sinkState
    0
    -- The push function gets both the current state and the next input...
    (\state _ignored ->
        -- and it returns the new state
        return $ StateProcessing $ state + 1)
    -- The close function gets the final state and returns the output.
    (\state -> return state)

Nothing dramatic, just slightly shorter, less error-prone code. Using these two helper functions is highly recommended, as it ensures proper resource management and state updating.

List functions

As easy as it is to write your own sinks, you'll likely want to take advantage of the built-in sinks available in the Data.Conduit.List module. These provide analogues to common list functions, like folding. (The module also has some Conduits, like map.)

If you're looking for some way to practice with conduits, reimplementing the functions in the List module- both with and without the helper functions- would be a good start.

Let's look at some simple things we can make out of the built-in sinks.

import Data.Conduit
import qualified Data.Conduit.List as CL
import Control.Monad.IO.Class (liftIO)

-- A sum function.
sum' :: Resource m => Sink Int m Int
sum' = CL.fold (+) 0

-- Print every input value to standard output.
printer :: (Show a, ResourceIO m) => Sink a m ()
printer = CL.mapM_ (liftIO . print)

-- Sum up all the values in a stream after the first five.
sumSkipFive :: Resource m => Sink Int m Int
sumSkipFive = do
    CL.drop 5
    CL.fold (+) 0

-- Print each input number and sum the total
printSum :: ResourceIO m => Sink Int m Int
printSum = do
    total <- CL.foldM go 0
    liftIO $ putStrLn $ "Sum: " ++ show total
    return total
  where
    go accum int = do
        liftIO $ putStrLn $ "New input: " ++ show int
        return $ accum + int

Connecting

At the end of the day, we're actually going to want to use our sinks. While we could manually call sinkPush and sinkClose, it's tedious. For example:

main :: IO ()
main = runResourceT $ do
    res <-
        case printSum of
            SinkData push close -> loop [1..10] push close
            SinkNoData res -> return res
    liftIO $ putStrLn $ "Got a result: " ++ show res
  where
    start (SinkData push close) = loop [1..10] push close
    start (SinkNoData res) = return res
    start (SinkLift msink) = msink >>= start

    loop [] _push close = close
    loop (x:xs) push close = do
        mres <- push x
        case mres of
            Done _leftover res -> return res
            Processing push' close' -> loop xs push' close'

Instead, the recommended approach is to connect your sink to a source. Not only is this simpler, it's less error prone, and means you have a lot of flexibility in where your data is coming from. To rewrite the example above:

main :: IO ()
main = runResourceT $ do
    res <- CL.sourceList [1..10] $$ printSum
    liftIO $ putStrLn $ "Got a result: " ++ show res

Connecting takes care of testing for the sink constructor (SinkData versus SinkNoData versus SinkLift), pulling from the source, and pushing to/closing the sink.

However, there is one thing I wanted to point out from the long-winded example. On the second to last line, we ignore the leftover value of Done. This brings up the issue of data loss. This is an important topic that has had a lot of thought put into it. Unfortunately, we can't fully cover it yet, as we haven't discussed the main culprit in the drama: Conduits (the type, not the package).

But as a quick note here, the leftover value from the Done constructor is not always ignored. The Monad instance, for example, uses it to pass data from one sink to the next in a binding. And in fact, the real connect operator doesn't always throw away the leftovers. When we cover resumable sources later, we'll see that the leftover value is put back on the buffer to allow later sinks reusing an existing source to pull the value.

To be continued...

We still have a lot to cover in conduits, though at this point you likely have enough information to get started using them. The next big topic is Conduits. We'll see what they are, and how they combine together with sources and sinks. Finally, we'll try to cover the larger design decisions behind conduits, and some more advanced usages.

Comments

comments powered by Disqus

Archives