I am trying to construct a Conduit that receives as input ByteStrings (of around 1kb per chunk in size) and produces as output concatenated ByteStrings of 512kb chunks.
This seems like it should be simple to do, but I'm having a lot of trouble, most of the strategies I've tried using have only succeeded in dividing the chunks into smaller chunks, I haven't succeeded in concatenating larger chunks.
I started out trying isolate, then takeExactlyE and eventually conduitVector, but to no avail. Eventually I settled on this:
import qualified Data.Conduit as C
import qualified Data.Conduit.Combinators as C
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
chunksOfAtLeast :: Monad m => Int -> C.Conduit B.ByteString m BL.ByteString
chunksOfAtLeast chunkSize = loop BL.empty chunkSize
where
loop buffer n = do
mchunk <- C.await
case mchunk of
Nothing ->
-- Yield last remaining bytes
when (n < chunkSize) (C.yield buffer)
Just chunk -> do
-- Yield when the buffer has been filled and start over
let buffer' = buffer <> BL.fromStrict chunk
l = B.length chunk
if n <= l
then C.yield buffer' >> loop BL.empty chunkSize
else loop buffer' (n - l)
P.S. I decided not to split larger chunks for this function, but this was just a convenient simplification.
However, this seems very verbose given all the conduit functions that deal with chunking[1,2,3,4]. Please help! There must surely be a better way to do this using combinators, but I am missing some piece of intuition!
P.P.S. Is it ok to use lazy bytestring for the buffer as I've done? I'm a bit unclear about the internal representation for bytestring and whether this will help, especially since I'm using BL.length which I guess might evaluate the thunk anyway?
Just to elaborate on Michael's answer and comments, I ended up with this conduit:
import qualified Data.Conduit as C
import qualified Data.Conduit.Combinators as C
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
-- | "Strict" rechunk of a chunked conduit
chunksOfE' :: (MonadBase base m, PrimMonad base)
=> Int
-> C.Conduit ByteString m ByteString
chunksOfE' chunkSize = C.vectorBuilder chunkSize C.mapM_E =$= C.map fromByteVector
My understanding is that vectorBuilder will pay the cost for concatenating the smaller chunks early on, producing the aggregated chunks as strict bytestrings.
From what I can tell, an alternative implementation that produces lazy bytestring chunks (i.e. "chunked chunks") might be desirable when the aggregated chunks are very large and/or feed into a naturally streaming interface like a network socket. Here's my best attempt at the "lazy bytestring" version:
import qualified Data.Sequences.Lazy as SL
import qualified Data.Sequences as S
import qualified Data.Conduit.List as CL
-- | "Lazy" rechunk of a chunked conduit
chunksOfE :: (Monad m, SL.LazySequence lazy strict)
=> S.Index lazy
-> C.Conduit strict m lazy
chunksOfE chunkSize = CL.sequence C.sinkLazy =$= C.takeE chunkSize
How about this?
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
import ClassyPrelude.Conduit
chunksOfAtLeast :: Monad m => Int -> Conduit ByteString m LByteString
chunksOfAtLeast chunkSize =
loop
where
loop = do
lbs <- takeCE chunkSize =$= sinkLazy
unless (null lbs) $ do
yield lbs
loop
main :: IO ()
main =
yieldMany ["hello", "there", "world!"]
$$ chunksOfAtLeast 3
=$ mapM_C print
There are lots of other approaches that you could take depending on your goals. If you wanted to have a strict buffer, then using blaze-builder of vectorBuilder would make a lot of sense. But this keeps the same type signature you have already.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With