Simpler conduit core

October 10, 2013

GravatarBy Michael Snoyman

NOTE I strongly recommend reading this post on School of Haskell. It makes extensive usage of active code.

In my last blog post, I made the case that automatic termination is the cause of a lot of problems in both pipes and conduit. In this blog post, I'd like to:

  1. Derive a simple core datatype that does not have automatic termination.

  2. Discuss the desired behavior of this datatype.

  3. Assess what convenience functionality we lost along the way, and see if we can get it back with an added layer on top of the core.

Note that this blog post is meant to demonstrate an idea; it's not to be taken as a working implementation. There are many details that still need to be worked out. I'm hoping that presenting my thoughts in this manner will give enough of a basis for an informed discussion.

Deriving the core datatype

I want to start as simple as possible. Let's begin with the core datatype from pipes 1.0. This leaves out any extra complexity we don't want to deal with in this process, like leftovers, finalizers, or bidirectionality.

data Pipe i o m r
    = Pure r
    | M (m (Pipe i o m r))
    | Yield (Pipe i o m r) o
    | Await (i -> Pipe i o m r)

Quick recap: M is for performing monadic actions. Yield passes a value downstream, and gives up control of the flow of execution to downstream. If downstream meanwhile returns Pure, the current Pipe will never get control of execution again. Similarly, Await asks for a value from upstream, and will similarly terminate of upstream returns Pure.

Let's implement the identity pipe in terms of the raw constructors:

idP :: Monad m => Pipe i i m r
idP = Await (Yield idP)

We'll also need some kind of function to compose two pipes together. The type for this function is:

fuse :: Monad m
     => Pipe a b m r
     -> Pipe b c m r
     -> Pipe a c m r

You can experiment with some sample code here:

import Control.Monad

data Pipe i o m r
    = Pure r
    | M (m (Pipe i o m r))
    | Yield (Pipe i o m r) o
    | Await (i -> Pipe i o m r)

fuse :: Monad m
     => Pipe a b m r
     -> Pipe b c m r
     -> Pipe a c m r
fuse _ (Pure r) = Pure r
fuse up (M m) = M (liftM (fuse up) m)
fuse up (Yield down o) = Yield (fuse up down) o
fuse up0 (Await down) =
    go up0
  where
    go (Pure r) = Pure r
    go (M m) = M (liftM go m)
    go (Yield up b) = fuse up (down b)
    go (Await up) = Await (go . up)

(>->) = fuse

idP :: Monad m => Pipe i i m r
idP = Await (Yield idP)

consume :: Monad m => Int -> Pipe i o m [i]
consume =
    go id
  where
    go front 0 = Pure (front [])
    go front count = Await $ \i -> go (front . (i:)) (count - 1)

yieldMany :: Monad m => [o] -> Pipe i o m r
yieldMany [] = error "FIXME"
yieldMany (o:os) = Yield (yieldMany os) o

runPipe :: Monad m => Pipe () () m r -> m r
runPipe (Pure r) = return r
runPipe (M m) = m >>= runPipe
runPipe (Await f) = runPipe (f ())
runPipe (Yield f ()) = runPipe f

main = runPipe (yieldMany [1..] >-> idP >-> consume 10) >>= print

In this blog post, I want to modify the core datatype to allow for non-termination. We'll start with the Await side of the equation, and allow the identity pipe guide our design for the rest of the other constructors. (Note: we're going to target simplicity here, not efficiency. Efficiency can be addressed another time.)

Await and Yield: add Maybe

The simplest way to allow for non-termination is to modify Await to include a Maybe wrapper:

Await (Maybe i -> Pipe i o m r)

This means that, when awaiting for a response from upstream, we can be informed via Nothing that no values are available. However, our idP no longer compiles. That's because we're getting a Maybe i, but Yield expects an i. Let's fix this by modifying our Yield constructor also:

Yield (Pipe i o m r) (Maybe o)

Now our original idP continues to compile. This change now allows us to write a fold, e.g.:

fold :: Monad m => (r -> i -> r) -> r -> Pipe i o m r
fold f =
    loop
  where
    loop r = Await $ \mi ->
        case mi of
            Just i -> loop $! f r i
            Nothing -> Pure r

Finalization

However, we still haven't actually fixed the non-termination problem. As soon as one Pipe terminates, the whole Pipe terminates. The practical issue is that prompt termination is still not achieved. Try out the following example:

import Control.Monad

data Pipe i o m r
    = Pure r
    | M (m (Pipe i o m r))
    | Yield (Pipe i o m r) (Maybe o)
    | Await (Maybe i -> Pipe i o m r)

fuse :: Monad m
     => Pipe a b m r
     -> Pipe b c m r
     -> Pipe a c m r
fuse _ (Pure r) = Pure r
fuse up (M m) = M (liftM (fuse up) m)
fuse up (Yield down o) = Yield (fuse up down) o
fuse up0 (Await down) =
    go up0
  where
    go (Pure r) = Pure r
    go (M m) = M (liftM go m)
    go (Yield up b) = fuse up (down b)
    go (Await up) = Await (go . up)

(>->) :: Monad m
     => Pipe a b m r
     -> Pipe b c m r
     -> Pipe a c m r
(>->) = fuse

idP :: Monad m => Pipe i i m r
idP = Await (Yield idP)

consume :: Monad m => Int -> Pipe i o m [i]
consume =
    go id
  where
    go front 0 = Pure (front [])
    go front count = Await $ \mi ->
        case mi of
            Just i -> go (front . (i:)) (count - 1)
            Nothing -> Pure (front [])
-- show Data producer with finalization
yieldMany :: [o] -> Pipe i o IO r
yieldMany [] = M (putStrLn "Finalization" >> return (Yield (yieldMany []) Nothing))
yieldMany (o:os) = Yield (yieldMany os) (Just o)
-- /show

fold :: Monad m => (r -> i -> r) -> r -> Pipe i o m r
fold f =
    loop
  where
    loop r = Await $ \mi ->
        case mi of
            Just i -> loop $! f r i
            Nothing -> Pure r

runPipe :: Monad m => Pipe () () m r -> m r
runPipe (Pure r) = return r
runPipe (M m) = m >>= runPipe
runPipe (Await f) = runPipe (f Nothing)
runPipe (Yield f _) = runPipe f

-- show When does the finalizer get called?
main = do
    runPipe (yieldMany [1..10] >-> idP >-> fold (+) 0) >>= print
    runPipe (yieldMany [1..10] >-> idP >-> consume 5) >>= print
-- /show

If the input stream it fully consumed, then Finalization is printed. Otherwise, it's not. We need to change our semantics so that we don't exit until all Pipes exit.

Downstream is already notified when upstream is done producing data, via Nothing getting passed with Yield. We need a similar mechanism on the upstream side. In this case, we want it to be a Maybe result value. This value needs to be present as soon as the Pipe begins execution. To allow for this, we're going to refactor our type a bit as follows:

data Step i o m r
    = Pure r
    | M (m (Step i o m r))
    | Yield (Pipe i o m r) (Maybe o)
    | Await (Maybe i -> Step i o m r)

type Pipe i o m r = Maybe r -> Step i o m r

Now a Pipe is notified if downstream has already completed execution. I'd like to focus on one important distinction in the Step type's constructors: whereas M and Await represent the next thing to be done via a Step value, Yield represents it with a Pipe value. The reason for this distinction is that, when you call Yield, downstream has a chance to continue processing, and may provide a return value if it hadn't provided one previously. In the M and Await cases, downstream never gets a chance to provide a new result value.

Let's think about the identity pipe. Its semantics should be that, if downstream is done processing, it's also done processing. If downstream is not done processing, it should await for a new value from upstream and yield it downstream. This turns out to be pretty easy to implement:

idP :: Monad m => Pipe i i m r
idP Nothing = Await (Yield idP)
idP (Just r) = Pure r

Below is our full running example.

import Control.Monad

data Step i o m r
    = Pure r
    | M (m (Step i o m r))
    | Yield (Pipe i o m r) (Maybe o)
    | Await (Maybe i -> Step i o m r)

type Pipe i o m r = Maybe r -> Step i o m r

-- show Fusion is a bit more complicated now
fuse :: Monad m
     => Pipe a b m r
     -> Pipe b c m r
     -> Pipe a c m r
fuse up down mr = fuse' up (down mr)

fuse' :: Monad m
      => (Maybe r -> Step a b m r)
      -> Step b c m r
      -> Step a c m r
fuse' up0 (Pure r) =
    go $ up0 $ Just r
  where
    go (Pure r') = Pure r'
    go (M m) = M (liftM go m)
    go (Yield up _) = go $ up $ Just r
    go (Await up) = Await $ \ma -> go $ up ma
fuse' up (M m) = M (liftM (fuse' up) m)
fuse' up (Yield down o) = Yield (fuse up down) o
fuse' up0 (Await down) =
    go $ up0 Nothing
  where
    -- It's easy to get this next clause wrong.
    -- We need to make sure that we give downstream
    -- a chance to finish processing, not just terminate
    -- immediately.
    go (Pure r) = fuse' (\_ -> Pure r) (down Nothing)
    go (M m) = M (liftM go m)
    go (Yield up b) = fuse' up (down b)
    go (Await up) = Await (go . up)
-- /show

(>->) :: Monad m
     => Pipe a b m r
     -> Pipe b c m r
     -> Pipe a c m r
(>->) = fuse

idP :: Monad m => Pipe i i m r
idP Nothing = Await (Yield idP)
idP (Just r) = Pure r

-- show Consumers can just ignore downstream results
consume :: Monad m => Int -> Pipe i o m [i]
consume count0 _ =
    go id count0
  where
    go front 0 = Pure (front [])
    go front count = Await $ \mi ->
        case mi of
            Just i -> go (front . (i:)) (count - 1)
            Nothing -> Pure (front [])
-- /show

-- show Finalization
-- Note that we only finalize once downstream completes.
-- We could instead finalize as soon as our input is empty,
-- which would allow for more promptness. Try implementing that
-- change. Make sure that the finalizer only gets called once!
yieldMany :: [o] -> Pipe i o IO r
yieldMany _ (Just r) = M (putStrLn "Finalization" >> return (Pure r))
yieldMany [] Nothing = Yield (yieldMany []) Nothing
yieldMany (o:os) Nothing = Yield (yieldMany os) (Just o)
-- /show

fold :: Monad m => (r -> i -> r) -> r -> Pipe i o m r
fold f r0 _ =
    loop r0
  where
    loop r = Await $ \mi ->
        case mi of
            Just i -> loop $! f r i
            Nothing -> Pure r

runPipe :: Monad m => Pipe () () m r -> m r
runPipe f = runStep (f Nothing)

runStep :: Monad m => Step () () m r -> m r
runStep (Pure r) = return r
runStep (M m) = m >>= runStep
runStep (Await f) = runStep (f Nothing)
runStep (Yield f _) = runPipe f

main = do
    runPipe (yieldMany [1..10] >-> idP >-> fold (+) 0) >>= print
    runPipe (yieldMany [1..10] >-> idP >-> consume 5) >>= print

That's it, we've moved from automatic termination to manual termination. We can now fold, get prompt finalization, and even modify result values from downstream. Let's play with a few more changes.

Draining upstream

Consider writing a Pipe that takes precisely 5 values from upstream and passes them downstream. If downstream finishes early, it still takes those 5 values. This is not currently possible in pipes or conduit. Let's try it out in our new framework:

import Control.Monad

data Step i o m r
    = Pure r
    | M (m (Step i o m r))
    | Yield (Pipe i o m r) (Maybe o)
    | Await (Maybe i -> Step i o m r)

type Pipe i o m r = Maybe r -> Step i o m r

fuse :: Monad m
     => Pipe a b m r
     -> Pipe b c m r
     -> Pipe a c m r
fuse up down mr = fuse' up (down mr)

fuse' :: Monad m
      => (Maybe r -> Step a b m r)
      -> Step b c m r
      -> Step a c m r
fuse' up0 (Pure r) =
    go $ up0 $ Just r
  where
    go (Pure r') = Pure r'
    go (M m) = M (liftM go m)
    go (Yield up _) = go $ up $ Just r
    go (Await up) = Await $ \ma -> go $ up ma
fuse' up (M m) = M (liftM (fuse' up) m)
fuse' up (Yield down o) = Yield (fuse up down) o
fuse' up0 (Await down) =
    go $ up0 Nothing
  where
    go (Pure r) = fuse' (\_ -> Pure r) (down Nothing)
    go (M m) = M (liftM go m)
    go (Yield up b) = fuse' up (down b)
    go (Await up) = Await (go . up)

(>->) :: Monad m
     => Pipe a b m r
     -> Pipe b c m r
     -> Pipe a c m r
(>->) = fuse

idP :: Monad m => Pipe i i m r
idP Nothing = Await (Yield idP)
idP (Just r) = Pure r

consume :: Monad m => Int -> Pipe i o m [i]
consume count0 _ =
    go id count0
  where
    go front 0 = Pure (front [])
    go front count = Await $ \mi ->
        case mi of
            Just i -> go (front . (i:)) (count - 1)
            Nothing -> Pure (front [])

fold :: Monad m => (r -> i -> r) -> r -> Pipe i o m r
fold f r0 _ =
    loop r0
  where
    loop r = Await $ \mi ->
        case mi of
            Just i -> loop $! f r i
            Nothing -> Pure r

runPipe :: Monad m => Pipe () () m r -> m r
runPipe f = runStep (f Nothing)

-- show
takeExactly :: Monad m => Int -> Pipe i i m r
takeExactly 0 (Just r) = Pure r
takeExactly 0 Nothing = Yield (takeExactly 0) Nothing
takeExactly count _ = Await $ \mi -> Yield (takeExactly (count - 1)) mi
-- We can optimize the above a bit by skipping
-- extra Awaits, give it a shot.

yieldMany :: Show o => [o] -> Pipe i o IO r
yieldMany rest (Just r) = M $ do
    putStrLn $ "Finalization: " ++ show rest
    return (Pure r)
yieldMany [] Nothing = Yield (yieldMany []) Nothing
yieldMany (o:os) Nothing = Yield (yieldMany os) (Just o)

main = runPipe (yieldMany [1..10] >-> takeExactly 5 >-> fold (+) 0) >>= print
-- /show

runStep :: Monad m => Step () () m r -> m r
runStep (Pure r) = return r
runStep (M m) = m >>= runStep
runStep (Await f) = runStep (f Nothing)
runStep (Yield f _) = runPipe f

Downstream result type

There's a nice generalization we can provide here. There's no reason that the upstream and downstream result types need to be the same. In fact, if we just add an extra type parameter (d for downstream result), the rest of our code can remain the same. Here's an example:

import Control.Monad

-- show
data Step i o d m r
    = Pure r
    | M (m (Step i o d m r))
    | Yield (Pipe i o d m r) (Maybe o)
    | Await (Maybe i -> Step i o d m r)

type Pipe i o d m r = Maybe d -> Step i o d m r

-- Notice the way that a->b->c flows downstream.
-- On the other hand, the result values x->y->z
-- "bubble up" from downstream.
fuse :: Monad m
     => Pipe a b y m z
     -> Pipe b c x m y
     -> Pipe a c x m z
-- /show
fuse up down mr = fuse' up (down mr)

fuse' :: Monad m
      => (Maybe y -> Step a b y m z)
      -> Step b c x m y
      -> Step a c x m z
fuse' up0 (Pure r) =
    go $ up0 $ Just r
  where
    go (Pure r') = Pure r'
    go (M m) = M (liftM go m)
    go (Yield up _) = go $ up $ Just r
    go (Await up) = Await $ \ma -> go $ up ma
fuse' up (M m) = M (liftM (fuse' up) m)
fuse' up (Yield down o) = Yield (fuse up down) o
fuse' up0 (Await down) =
    go $ up0 Nothing
  where
    go (Pure r) = fuse' (\_ -> Pure r) (down Nothing)
    go (M m) = M (liftM go m)
    go (Yield up b) = fuse' up (down b)
    go (Await up) = Await (go . up)

(>->) :: Monad m
     => Pipe a b y m z
     -> Pipe b c x m y
     -> Pipe a c x m z
(>->) = fuse

-- show
-- Identity keeps the same stream value (i) and result
-- value (r) for both upstream and downstream.
idP :: Monad m => Pipe i i r m r
idP Nothing = Await (Yield idP)
idP (Just r) = Pure r
-- /show

-- show
-- Consumers can just ignore the downstream result.
consume :: Monad m => Int -> Pipe i o d m [i]
consume count0 _ =
    go id count0
  where
    go front 0 = Pure (front [])
    go front count = Await $ \mi ->
        case mi of
            Just i -> go (front . (i:)) (count - 1)
            Nothing -> Pure (front [])
-- /show

fold :: Monad m => (r -> i -> r) -> r -> Pipe i o d m r
fold f r0 _ =
    loop r0
  where
    loop r = Await $ \mi ->
        case mi of
            Just i -> loop $! f r i
            Nothing -> Pure r

runPipe :: Monad m => Pipe () () d m r -> m r
runPipe f = runStep (f Nothing)

runStep :: Monad m => Step () () d m r -> m r
runStep (Pure r) = return r
runStep (M m) = m >>= runStep
runStep (Await f) = runStep (f Nothing)
runStep (Yield f _) = runPipe f

-- show
-- Producers can simply return the downstream result
-- as its own result value.
takeExactly :: Monad m => Int -> Pipe i i r m r
takeExactly 0 (Just r) = Pure r
takeExactly 0 Nothing = Yield (takeExactly 0) Nothing
takeExactly count _ = Await $ \mi -> Yield (takeExactly (count - 1)) mi

yieldMany :: Show o => [o] -> Pipe i o r IO r
yieldMany rest (Just r) = M $ do
    putStrLn $ "Finalization: " ++ show rest
    return (Pure r)
yieldMany [] Nothing = Yield (yieldMany []) Nothing
yieldMany (o:os) Nothing = Yield (yieldMany os) (Just o)
-- /show

main = runPipe (yieldMany [1..10] >-> takeExactly 5 >-> fold (+) 0) >>= print

However, we can also use this functionality to modify the result type. For example, we may want to return any unused values from our input list in yieldMany. This would look like:

import Control.Monad

data Step i o d m r
    = Pure r
    | M (m (Step i o d m r))
    | Yield (Pipe i o d m r) (Maybe o)
    | Await (Maybe i -> Step i o d m r)

type Pipe i o d m r = Maybe d -> Step i o d m r

fuse :: Monad m
     => Pipe a b y m z
     -> Pipe b c x m y
     -> Pipe a c x m z
fuse up down mr = fuse' up (down mr)

fuse' :: Monad m
      => (Maybe y -> Step a b y m z)
      -> Step b c x m y
      -> Step a c x m z
fuse' up0 (Pure r) =
    go $ up0 $ Just r
  where
    go (Pure r') = Pure r'
    go (M m) = M (liftM go m)
    go (Yield up _) = go $ up $ Just r
    go (Await up) = Await $ \ma -> go $ up ma
fuse' up (M m) = M (liftM (fuse' up) m)
fuse' up (Yield down o) = Yield (fuse up down) o
fuse' up0 (Await down) =
    go $ up0 Nothing
  where
    go (Pure r) = fuse' (\_ -> Pure r) (down Nothing)
    go (M m) = M (liftM go m)
    go (Yield up b) = fuse' up (down b)
    go (Await up) = Await (go . up)

(>->) :: Monad m
     => Pipe a b y m z
     -> Pipe b c x m y
     -> Pipe a c x m z
(>->) = fuse

idP :: Monad m => Pipe i i r m r
idP Nothing = Await (Yield idP)
idP (Just r) = Pure r

consume :: Monad m => Int -> Pipe i o d m [i]
consume count0 _ =
    go id count0
  where
    go front 0 = Pure (front [])
    go front count = Await $ \mi ->
        case mi of
            Just i -> go (front . (i:)) (count - 1)
            Nothing -> Pure (front [])

fold :: Monad m => (r -> i -> r) -> r -> Pipe i o d m r
fold f r0 _ =
    loop r0
  where
    loop r = Await $ \mi ->
        case mi of
            Just i -> loop $! f r i
            Nothing -> Pure r

runPipe :: Monad m => Pipe () () d m r -> m r
runPipe f = runStep (f Nothing)

runStep :: Monad m => Step () () d m r -> m r
runStep (Pure r) = return r
runStep (M m) = m >>= runStep
runStep (Await f) = runStep (f Nothing)
runStep (Yield f _) = runPipe f

takeExactly :: Monad m => Int -> Pipe i i r m r
takeExactly 0 (Just r) = Pure r
takeExactly 0 Nothing = Yield (takeExactly 0) Nothing
takeExactly count _ = Await $ \mi -> Yield (takeExactly (count - 1)) mi

-- show
yieldMany :: Monad m => [o] -> Pipe i o r m ([o], r)
yieldMany rest (Just r) = Pure (rest, r)
yieldMany [] Nothing = Yield (yieldMany []) Nothing
yieldMany (o:os) Nothing = Yield (yieldMany os) (Just o)

main = runPipe (yieldMany [1..10] >-> takeExactly 5 >-> fold (+) 0) >>= print
-- /show
-- IGNORED

Leftovers/chunked data

So I promised a solution to leftovers as well. Actually, we can base a solution on that preceding example. When we return a result, we should also return any unconsumed input with it. When we monadically compose two Pipes, we want the leftovers from the first to be injected into the second to be used any time the second awaits. We haven't actually shown a Monad instance yet, and without creating a newtype in place of our type synonym, we can't. But let's play around with some pipeReturn and pipeBind functions:

import Control.Monad

-- show
data Step i o d m r
    = Pure [i] r
    | M (m (Step i o d m r))
    | Yield (Pipe i o d m r) (Maybe o)
    | Await (Maybe i -> Step i o d m r)

type Pipe i o d m r = Maybe ([o], d) -> Step i o d m r

pipeReturn :: Monad m => r -> Pipe i o d m r
pipeReturn r _downRes = Pure [] r

pipeBind :: Monad m
         => Pipe i o d m a
         -> (a -> Pipe i o d m b)
         -> Pipe i o d m b
pipeBind f g mr =
    go $ f mr
  where
    go (Pure is a) = inject is (g a) mr
    go (M m) = M (liftM go m)
    go (Yield p o) = Yield (pipeBind p g) o
    go (Await f) = Await (go . f)

inject :: Monad m => [i] -> Pipe i o d m r -> Pipe i o d m r
inject [] p = p
inject is0 p0 =
    go is0 . p0
  where
    go [] p = p
    go is (Pure is' r) = Pure (is' ++ is) r
    go is (M m) = M (liftM (go is) m)
    go is (Yield p o) = Yield (go is . p) o
    go (i:is) (Await p) = go is (p $ Just i)

peek :: Monad m => Pipe i o d m (Maybe i)
peek _ = Await $ \mi ->
    case mi of
        Nothing -> Pure [] Nothing
        Just i -> Pure [i] (Just i)

peekFold :: Monad m => (r -> i -> r) -> r -> Pipe i o d m (Maybe i, r)
peekFold f r = peek `pipeBind` \a -> fold f r `pipeBind` \b -> pipeReturn (a, b)

main = do
    runPipe (yieldMany [1..10] >-> peek) >>= print
    runPipe (yieldMany [1..10] >-> peekFold (+) 0) >>= print
-- /show

fuse :: Monad m
     => Pipe a b y m z
     -> Pipe b c x m y
     -> Pipe a c x m z
fuse up down mr = fuse' up (down mr)

fuse' :: Monad m
      => (Maybe ([b], y) -> Step a b y m z)
      -> Step b c x m y
      -> Step a c x m z
fuse' up0 (Pure cs z) =
    go $ up0 $ Just (cs, z)
  where
    go (Pure bs y) = Pure bs y
    go (M m) = M (liftM go m)
    go (Yield up _) = go $ up $ Just ([], z)
    go (Await up) = Await $ \ma -> go $ up ma
fuse' up (M m) = M (liftM (fuse' up) m)
fuse' up (Yield down o) = Yield (fuse up down) o
fuse' up0 (Await down) =
    go $ up0 Nothing
  where
    go (Pure bs y) = fuse' (\_ -> Pure bs y) (down Nothing)
    go (M m) = M (liftM go m)
    go (Yield up b) = fuse' up (down b)
    go (Await up) = Await (go . up)

(>->) :: Monad m
     => Pipe a b y m z
     -> Pipe b c x m y
     -> Pipe a c x m z
(>->) = fuse

idP :: Monad m => Pipe i i r m r
idP Nothing = Await (Yield idP)
idP (Just (is, r)) = Pure is r

consume :: Monad m => Int -> Pipe i o d m [i]
consume count0 _ =
    go id count0
  where
    go front 0 = Pure [] (front [])
    go front count = Await $ \mi ->
        case mi of
            Just i -> go (front . (i:)) (count - 1)
            Nothing -> Pure [] (front [])

fold :: Monad m => (r -> i -> r) -> r -> Pipe i o d m r
fold f r0 _ =
    loop r0
  where
    loop r = Await $ \mi ->
        case mi of
            Just i -> loop $! f r i
            Nothing -> Pure [] r

runPipe :: Monad m => Pipe () () d m r -> m r
runPipe f = runStep (f Nothing)

runStep :: Monad m => Step () () d m r -> m r
runStep (Pure _ r) = return r
runStep (M m) = m >>= runStep
runStep (Await f) = runStep (f Nothing)
runStep (Yield f _) = runPipe f

takeExactly :: Monad m => Int -> Pipe i i r m r
-- We specifically ignore leftovers, since we want to ensure
-- that we consumed exactly the given number of elements
-- from the stream.
takeExactly 0 (Just (_, r)) = Pure [] r
takeExactly 0 Nothing = Yield (takeExactly 0) Nothing
takeExactly count _ = Await $ \mi -> Yield (takeExactly (count - 1)) mi

yieldMany :: Monad m => [o] -> Pipe i o r m ([o], [o], r)
yieldMany rest (Just (os, r)) = Pure [] (rest, os, r)
yieldMany [] Nothing = Yield (yieldMany []) Nothing
yieldMany (o:os) Nothing = Yield (yieldMany os) (Just o)

Early termination

So we've done all of this work to get rid of early termination. But in practice, it's often very convenient to have early termination. For example, a common pipes and conduit idiom to create a producer from an infinite list is:

mapM_ yield [1..]

Without some kind of early termination, this will never exit. Fortunately, having early exit from a Monad is a solved problem. We could either layer something like MaybeT in our transformer stack, or add a new constructor to our Step datatype. There are a few complications introduced by this, notably that we have an extra type parameter (t for termination) which must be unified with our result type, but the approach works.

import Control.Monad
-- show
data Step i o d t m r
    = Pure [i] r
    | M (m (Step i o d t m r))
    | Yield (Pipe i o d t m r) (Maybe o)
    | Await (Maybe i -> Step i o d t m r)
    | Stop [i] t

type Pipe i o d t m r = Maybe ([o], d) -> Step i o d t m r

yield :: Monad m => o -> Pipe i o d d m ()
yield _ (Just (_, d)) = Stop [] d
yield o Nothing = Yield (pipeReturn ()) (Just o)

stop :: Monad m => Pipe i o d d m r
stop (Just (_, r)) = Stop [] r
stop Nothing = Yield stop Nothing

pipeMapM_ :: Monad m
          => (a -> Pipe i o d t m ())
          -> [a]
          -> Pipe i o d t m ()
pipeMapM_ _ [] = pipeReturn ()
pipeMapM_ f (x:xs) = f x `pipeBind` (const (pipeMapM_ f xs))

main :: IO ()
main = do
    let producer =
            pipeMapM_ yield [1..] `pipeBind`
            const stop
    runPipe (producer >-> takeExactly 10 >-> fold (+) 0)
        >>= print
-- /show
pipeReturn :: Monad m => r -> Pipe i o d t m r
pipeReturn r _downRes = Pure [] r

pipeBind :: Monad m
         => Pipe i o d t m a
         -> (a -> Pipe i o d t m b)
         -> Pipe i o d t m b
pipeBind f g mr =
    go $ f mr
  where
    go (Pure is a) = inject is (g a) mr
    go (M m) = M (liftM go m)
    go (Yield p o) = Yield (pipeBind p g) o
    go (Await p) = Await (go . p)
    go (Stop is t) = Stop is t

inject :: Monad m => [i] -> Pipe i o d t m r -> Pipe i o d t m r
inject [] p = p
inject is0 p0 =
    go is0 . p0
  where
    go [] p = p
    go is (Pure is' r) = Pure (is' ++ is) r
    go is (M m) = M (liftM (go is) m)
    go is (Yield p o) = Yield (go is . p) o
    go (i:is) (Await p) = go is (p $ Just i)
    go is (Stop is' r) = Stop (is' ++ is) r

peek :: Monad m => Pipe i o d t m (Maybe i)
peek _ = Await $ \mi ->
    case mi of
        Nothing -> Pure [] Nothing
        Just i -> Pure [i] (Just i)

peekFold :: Monad m => (r -> i -> r) -> r -> Pipe i o d t m (Maybe i, r)
peekFold f r = peek `pipeBind` \a -> fold f r `pipeBind` \b -> pipeReturn (a, b)

fuse :: Monad m
     => Pipe a b y z m z
     -> Pipe b c x y m y
     -> Pipe a c x z m z
fuse up down mr = fuse' up (down mr)

fuse' :: Monad m
      => (Maybe ([b], y) -> Step a b y z m z)
      -> Step b c x y m y
      -> Step a c x z m z
fuse' up0 (Pure cs z) =
    go $ up0 $ Just (cs, z)
  where
    go (Pure bs y) = Pure bs y
    go (M m) = M (liftM go m)
    go (Yield up _) = go $ up $ Just ([], z)
    go (Await up) = Await $ \ma -> go $ up ma
    go (Stop bs y) = Stop bs y
fuse' up (M m) = M (liftM (fuse' up) m)
fuse' up (Yield down o) = Yield (fuse up down) o
fuse' up0 (Await down) =
    go $ up0 Nothing
  where
    go (Pure bs y) = fuse' (\_ -> Pure bs y) (down Nothing)
    go (M m) = M (liftM go m)
    go (Yield up b) = fuse' up (down b)
    go (Await up) = Await (go . up)
    go (Stop bs y) = fuse' (\_ -> Stop bs y) (down Nothing)
fuse' up0 (Stop cs z) = fuse' up0 (Pure cs z)

(>->) :: Monad m
     => Pipe a b y z m z
     -> Pipe b c x y m y
     -> Pipe a c x z m z
(>->) = fuse

idP :: Monad m => Pipe i i r t m r
idP Nothing = Await (Yield idP)
idP (Just (is, r)) = Pure is r

consume :: Monad m => Int -> Pipe i o d t m [i]
consume count0 _ =
    go id count0
  where
    go front 0 = Pure [] (front [])
    go front count = Await $ \mi ->
        case mi of
            Just i -> go (front . (i:)) (count - 1)
            Nothing -> Pure [] (front [])

fold :: Monad m => (r -> i -> r) -> r -> Pipe i o d t m r
fold f r0 _ =
    loop r0
  where
    loop r = Await $ \mi ->
        case mi of
            Just i -> loop $! f r i
            Nothing -> Pure [] r

runPipe :: Monad m => Pipe () () d r m r -> m r
runPipe f = runStep (f Nothing)

runStep :: Monad m => Step () () d r m r -> m r
runStep (Pure _ r) = return r
runStep (M m) = m >>= runStep
runStep (Await f) = runStep (f Nothing)
runStep (Yield f _) = runPipe f
runStep (Stop _ r) = return r

takeExactly :: Monad m => Int -> Pipe i i r t m r
-- We specifically ignore leftovers, since we want to ensure
-- that we consumed exactly the given number of elements
-- from the stream.
takeExactly 0 (Just (_, r)) = Pure [] r
takeExactly 0 Nothing = Yield (takeExactly 0) Nothing
takeExactly count _ = Await $ \mi -> Yield (takeExactly (count - 1)) mi

yieldMany :: Monad m => [o] -> Pipe i o r t m ([o], [o], r)
yieldMany rest (Just (os, r)) = Pure [] (rest, os, r)
yieldMany [] Nothing = Yield (yieldMany []) Nothing
yieldMany (o:os) Nothing = Yield (yieldMany os) (Just o)

Conclusion

The approach I've shown here is not optimized. A faster way of implementing this is to collapse Pipe and Step back into a single data type, and have a dedicated constructor to asking for downstream results. It's also more efficient to skip all of the Maybe wrappers. Instead of Await taking a Maybe i, Await can have two fields: one for when a value is available from upstream, and the other for when no such value is available.

Though it's currently highly experimental, I've implemented these ideas in an experimental branch of conduit. A quick translation from our approach is:

  • Stop is called Terminate.
  • Await has been deconstructed into two fields.
  • Instead of Yield providing a Maybe o, it provides an o. The functionality provided by yielding Nothing is now handled by the Empty constructor.
  • Check is used for checking for downstream results.
  • Yield and Empty are also able to check for downstream results. In the case of Empty, we are guaranteed that downstream must provide a result. In the case of Yield, we include the extra field as an optimization, since composing Yield and Check is otherwise so common.

I've been able to replicate the current conduit API on top of this. The full test suite passes, including some tests for identity and associativity that failed previously. And the benchmarks show this approach as comparable to previous versions.

This approach seems very promising to me, but I also think we need to take time to analyze it properly. I look forward to hearing what the rest of the community thinks.

Comments

comments powered by Disqus

Archives