Is it possible to create pipes that get all values that have been sent downstream in a certain time period? I'm implementing a server where the protocol allows me to concatenate outgoing packets and compress them together, so I'd like to effectively "empty out" the queue of downstream ByteStrings every 100ms and mappend them together to then yield on to the next pipe which does the compression.
Here's a solution using pipes-concurrency. You give it any Input and it will periodically drain the input of all values:
import Control.Applicative ((<|>))
import Control.Concurrent (threadDelay)
import Data.Foldable (forM_)
import Pipes
import Pipes.Concurrent
drainAll :: Input a -> STM (Maybe [a])
drainAll i = do
ma <- recv i
case ma of
Nothing -> return Nothing
Just a -> loop (a:)
where
loop diffAs = do
ma <- recv i <|> return Nothing
case ma of
Nothing -> return (Just (diffAs []))
Just a -> loop (diffAs . (a:))
bucketsEvery :: Int -> Input a -> Producer [a] IO ()
bucketsEvery microseconds i = loop
where
loop = do
lift $ threadDelay microseconds
ma <- lift $ atomically $ drainAll i
forM_ ma $ \a -> do
yield a
loop
This gives you much greater control over how you consume elements from upstream, by selecting the type of Buffer you use to build the Input.
If you're new to pipes-concurrency, you can read the tutorial which explains how to use spawn, Buffer and Input.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With