Conduits, part 4: Conduits

January 4, 2012

GravatarBy Michael Snoyman

Review and Status

This is part 4 in the conduits series. You can see the previous posts at:

  1. The Resource monad transformer
  2. Sources
  3. Sinks

This part covers the final major datatype in our package, conduits. While sources produce a stream of data and sinks consume a stream, conduits transform a stream.

Also, just wanted to give an update on conduits activity. A few of the major enumerator libraries have been converted over to conduits (http-conduit and xml-conduit) and released to Hackage. Also, the Github versions of WAI (including Warp), Persistent and Yesod have been converted over as well.

Not only does the code all work, it's already allowing enhancements we hadn't even thought of. All in all, the change to conduits has been a very pleasant one.

Types

As we did previously, let's start off by looking at the types involved.

data ConduitResult input m output =
    Producing (Conduit input m output) [output]
  | Finished (Maybe input) [output]

data Conduit input m output = Conduit
    { conduitPush :: input -> ResourceT m (ConduitResult input m output)
    , conduitClose :: ResourceT m [output]
    }

This should look very similar to what we've seen with sinks. A conduit can be pushed to, in which case it returns a result. A result either indicates that it is still producing data, or that it is finished. When a conduit is closed, it returns some more output.

But let's examine the idiosyncracies a bit. Like sinks, we can only push one piece of input at a time, and leftover data may be 0 or 1 pieces. However, there are a few changes:

  • When producing (the equivalent of processing for a sink), we can return output. This is because a conduit will product a new stream of output instead of producing a single output value at the end of processing.
  • A sink always returns a single output value, while a conduit returns 0 or more outputs (a list). To understand why, consider conduits such as concatMap (produces multiple outputs for one input) and filter (returns 0 or 1 output for each input).
  • We have no special constructor like SinkNoData. That's because we provide no Monad instance for conduits. We'll see later how you can still use a familiar Monadic approach to creating conduits.

Overall conduits should seem very similar to what we've covered so far.

Simple conduits

We'll start off by defining some simple conduits that don't have any state.

import Prelude hiding (map, concatMap)
import Data.Conduit

-- A simple conduit that just passes on the data as-is.
passThrough :: Monad m => Conduit input m input
passThrough = Conduit
    { conduitPush = \input -> return $ Producing passThrough [input]
    , conduitClose = return []
    }

-- map values in a stream
map :: Monad m => (input -> output) -> Conduit input m output
map f = Conduit
    { conduitPush = \input -> return $ Producing (map f) [f input]
    , conduitClose = return []
    }

-- map and concatenate
concatMap :: Monad m => (input -> [output]) -> Conduit input m output
concatMap f = Conduit
    { conduitPush = \input -> return $ Producing (concatMap f) $ f input
    , conduitClose = return []
    }

Stateful conduits

Of course, not all conduits can be declared without state. Doing so on the bare metal is not too difficult.

import Prelude hiding (reverse)
import qualified Data.List
import Data.Conduit
import Control.Monad.Trans.Resource

-- Reverse the elements in the stream. Note that this has the same downside as
-- the standard reverse function: you have to read the entire stream into
-- memory before producing any output.
reverse :: Resource m => Conduit input m input
reverse =
    mkConduit []
  where
    mkConduit state = Conduit (push state) (close state)
    push state input = return $ Producing (mkConduit $ input : state) []
    close state = return state

-- Same thing with sort: it will pull everything into memory
sort :: (Ord input, Resource m) => Conduit input m input
sort =
    mkConduit []
  where
    mkConduit state = Conduit (push state) (close state)
    push state input = return $ Producing (mkConduit $ input : state) []
    close state = return $ Data.List.sort state

But we can do better. Just like sourceState and sinkState, we have conduitState to simplify things.

import Prelude hiding (reverse)
import qualified Data.List
import Data.Conduit

-- Reverse the elements in the stream. Note that this has the same downside as
-- the standard reverse function: you have to read the entire stream into
-- memory before producing any output.
reverse :: Resource m => Conduit input m input
reverse =
    conduitState [] push close
  where
    push state input = return $ StateProducing (input : state) []
    close state = return state

-- Same thing with sort: it will pull everything into memory
sort :: (Ord input, Resource m) => Conduit input m input
sort =
    conduitState [] push close
  where
    push state input = return $ StateProducing (input : state) []
    close state = return $ Data.List.sort state

Using conduits

The way Conduits interact with the rest of the package is via fusing. A conduit can be fused into a source, producing a new source, fused into a sink to produce a new sink, or fused with another conduit to produce a new conduit. It's best to just look at the fusion operators.

-- Left fusion: source + conduit = source
($=) :: (Resource m, IsSource src) => src m a -> Conduit a m b -> Source m b

-- Right fusion: conduit + sink = sink
(=$) :: Resource m => Conduit a m b -> Sink b m c -> Sink a m c

-- Middle fusion: conduit + conduit = conduit
(=$=) :: Resource m => Conduit a m b -> Conduit b m c -> Conduit a m c

Using these operators is straightforward.

useConduits = do
    runResourceT
          $  CL.sourceList [1..10]
          $= reverse
          $= CL.map show
          $$ CL.consume

    -- equivalent to
    runResourceT
          $  CL.sourceList [1..10]
          $$ reverse
          =$ CL.map show
          =$ CL.consume

    -- and equivalent to
    runResourceT
          $  CL.sourceList [1..10]
          $$ (reverse =$= CL.map show)
          =$ CL.consume

There is in fact one last way of expressing the same idea. I'll leave it as an exercise to the reader to discover it.

It may seem like all these different approaches are redundant. While occasionally you can in fact choose whichever approach you feel like using, in many cases you will need a specific approach. For example:

  • If you have a stream of numbers, and you want to apply a conduit (e.g., map show) to only some of the stream that will be passed to a specific sink, you'll want to use the right fusion operator.
  • If you're reading a file, and want to parse the entire file as textual data, you'll want to use left fusion to convert the entire stream.
  • If you want to create reusable conduits that combine together individual, smaller conduits, you'll use middle fusion.

Data loss

Let's forget about conduits for a moment. Instead, suppose we want to write a program- using plain old lists- that will take a list of numbers, apply some kind of transformation to them, take the first five transformed values and do something with them, and then do something else with the remaining non-transformed values. For example, we want something like:

main = do
    let list = [1..10]
        transformed = map show list
        (begin, end) = splitAt 5 transformed
        untransformed = map read end
    mapM_ putStrLn begin
    print $ sum untransformed

But clearly this isn't a good general solution, since we don't want to have to transform and then untransform every element in the list. For one thing, we may not always have an inverse function. Another issue is efficiency. In this case, we can write something more efficient:

main = do
    let list = [1..10]
        (begin, end) = splitAt 5 list
        transformed = map show begin
    mapM_ putStrLn transformed
    print $ sum end

Note the change: we perform our split before transforming any elements. This works because, with map, we have a 1-to-1 correspondence between the input and output elements. So splitting at 5 before or after mapping show is the same thing. But what happens if we replace map show with something more devious.

deviousTransform =
    concatMap go
  where
    go 1 = [show 1]
    go 2 = [show 2, "two"]
    go 3 = replicate 5 "three"
    go x = [show x]

We no longer have the 1-to-1 correspondence. As a result, we can't use the second method. But it's even worse: we can't use the first method either, since there's no inverse of our deviousTransform.

There's only one solution to the problem that I'm aware of: transform elements one at a time. The final program looks like this:

deviousTransform 1 = [show 1]
deviousTransform 2 = [show 2, "two"]
deviousTransform 3 = replicate 5 "three"
deviousTransform x = [show x]

transform5 :: [Int] -> ([String], [Int])
transform5 list =
    go [] list
  where
    go output (x:xs)
        | newLen >= 5 = (take 5 output', xs)
        | otherwise = go output' xs
      where
        output' = output ++ deviousTransform x
        newLen = length output'

    -- Degenerate case: not enough input to make 5 outputs
    go output [] = (output, [])

main = do
    let list = [1..10]
        (begin, end) = transform5 list
    mapM_ putStrLn begin
    print $ sum end

The final output of this program is

1
2
two
three
three
49
What's important to note is that the number 3 is converted into five copies of the word "three", yet only two of them show up in the output. The rest are discarded in the take 5 call.

This whole exercise is just to demonstrate the issue of data loss in conduits. By forcing conduits to accept only one input at a time, we avoid the issue of transforming too many elements at once. That doesn't mean we don't lose any data: if a conduit produces too much output for the receiving sink to handle, some of it may be lost.

To put all this another way: conduits avoid chunking to get away from data loss. This is not an issue unique to conduits. If you look in the implementation of concatMapM for enumerator, you'll see that it forces elements to be handled one at a time. In conduits, we opted to force the issue at the type level.

SequencedSink

Suppose we want to be able to combine up existing conduits and sinks to produce a new, more powerful conduit. For example, we want to write a conduit that takes a stream of numbers and sums up every five. In other words, for the input [1..50], it should result in the sequence [15,40,65,90,115,140,165,190,215,240]. We can definitely do this with the low-level conduit interface.

sum5Raw :: Resource m => Conduit Int m Int
sum5Raw =
    conduitState (0, 0) push close
  where
    push (total, count) input
        | newCount == 5 = return $ StateProducing (0, 0) [newTotal]
        | otherwise     = return $ StateProducing (newTotal, newCount) []
      where
        newTotal = total + input
        newCount = count + 1
    close (total, count)
        | count == 0 = return []
        | otherwise  = return [total]

But this is frustrating, since we already have all the tools we need to do this at a high level! There's the fold sink for adding up the numbers, and the isolate conduit which will only allow up to a certain number of elements to be passed to a sink. Can't we combine these somehow?

The answer is a SequencedSink. The idea is to create a normal Sink, except it returns a special output called a SequencedSinkResponse. This value can emit new output, stop processing data, or transfer control to a new conduit. (See the Haddocks for more information.) Then we can turn this into a Conduit using the sequenceSink function. This function also takes some state value that gets passed through to the sink.

So we can rewrite sum5Raw in a much more high-level manner.

sum5 :: Resource m => Conduit Int m Int
sum5 = sequenceSink () $ \() -> do
    nextSum <- CL.isolate 5 =$ CL.fold (+) 0
    return $ Emit () [nextSum]

All of the () in there are simply the unused state variable being passed around, they can be ignored. Otherwise, we're doing exactly what we want. We fuse isolate to fold to get the sum of the next five elements from the stream. We then emit that value, and start all over again.

Let's say we want to modify this slightly. We want to get the first 8 sums, and then pass through the remaining values, multiplied by 2. We can keep track of how many values we've returned in our state, and then use the StartConduit constructor to pass control to the multiply-by-2 conduit next.

sum5Pass :: Resource m => Conduit Int m Int
sum5Pass = sequenceSink 0 $ \count -> do
    if count == 8
        then return $ StartConduit $ CL.map (* 2)
        else do
            nextSum <- CL.isolate 5 =$ CL.fold (+) 0
            return $ Emit (count + 1) [nextSum]

These are obviously very contrived examples, but I hope it makes clear the power and simplicity available from this approach.

Summary

We're nearing the end of our conduits series. The last remaining major point is buffering and resumable sources. (I would have included it here, but (a) it doesn't exactly fit with the rest of the material and (b) it's 10:30 at night and I want to go to sleep.) In addition, we'll try to cover some real-life use cases for conduits, and give examples of where libraries like http-conduit and the upcoming conduit-based wai can be used together.

Comments

comments powered by Disqus

Archives