Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Haskell / Conduit: read file line by line

Scenario: I have a ~900mb text file that is formatted as follows

...
Id:   109101
ASIN: 0806978473
  title: The Beginner's Guide to Tai Chi
  group: Book
  salesrank: 672264
  similar: 0
  categories: 3
   |Books[283155]|Subjects[1000]|Sports[26]|Individual Sports[16533]|Martial Arts[16571]|General[16575]
   |Books[283155]|Subjects[1000]|Sports[26]|Individual Sports[16533]|Martial Arts[16571]|Taichi[16583]
   |Books[283155]|Subjects[1000]|Sports[26]|General[11086921]
  reviews: total: 2  downloaded: 2  avg rating: 5
    2000-4-4  cutomer: A191SV1V1MK490  rating: 5  votes:   0  helpful:   0
    2004-7-10  cutomer:  AVXBUEPNVLZVC  rating: 5  votes:   0  helpful:   0
                    (----- empty line ------)    
Id :

and want to parse the information from it.

Problem: As a first step (and because I need it for another context) I want to process the file line by line and then collect the "chunks" belonging to one product together and then process them seperately with other logic.

So the plan is the following:

  1. Define a source that represents the text file
  2. Define a conduit (?) that takes one line each from that source and...
  3. ... passes it to some other components.

Now, I am trying to adapt the following example:

doStuff = do
  writeFile "input.txt" "This is a \n test." -- Filepath -> String -> IO ()

  runConduitRes                  -- m r
    $ sourceFileBS "input.txt"   -- ConduitT i ByteString m ()  -- by "chunk"
    .| sinkFile "output.txt"     -- FilePath -> ConduitT ByteString o m ()

  readFile "output.txt"
    >>= putStrLn 

So sourceFileBS "input.txt" is of type ConduitT i ByteString m (), that is, a conduit with

  • input type i
  • output type ByteStream
  • monad type t
  • result type ().

sinkFile streams all incoming data into the given file. sinkFile "output.txt" is a conduit with input type ByteStream.

What I want now is to process the input source line-by-line, that is, pass on only one line each downstream. In pseudocode:

sourceFile "input.txt"
splitIntoLines
yieldMany (?)
other stuff

How do I do that?

What I currently have is

copyFile = do
  writeFile "input.txt" "This is a \n test." -- Filepath -> String -> IO ()

  runConduitRes                  -- m r
    (lineC $ sourceFileBS "input.txt")   -- ConduitT i ByteString m ()  -- by "chunk"
    .| sinkFile "output.txt"     -- FilePath -> ConduitT ByteString o m ()

  readFile "output.txt"
    >>= putStrLn --

but that gives the following type error:

    * Couldn't match type `bytestring-0.10.8.2:Data.ByteString.Internal.ByteString'
                     with `Void'
      Expected type: ConduitT
                       ()
                       Void
                       (ResourceT
                          (ConduitT
                             a0 bytestring-0.10.8.2:Data.ByteString.Internal.ByteString m0))
                       ()
        Actual type: ConduitT
                       ()
                       bytestring-0.10.8.2:Data.ByteString.Internal.ByteString
                       (ResourceT
                          (ConduitT
                             a0 bytestring-0.10.8.2:Data.ByteString.Internal.ByteString m0))
                       ()
    * In the first argument of `runConduitRes', namely
        `(lineC $ sourceFileBS "input.txt")'
      In the first argument of `(.|)', namely
        `runConduitRes (lineC $ sourceFileBS "input.txt")'
      In a stmt of a 'do' block:
        runConduitRes (lineC $ sourceFileBS "input.txt")
          .| sinkFile "output.txt"
   |
28 |     (lineC $ sourceFileBS "input.txt")   -- ConduitT i ByteString m ()  -- by "chunk"
   |      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

This makes me believe that the problem now is that the first conduit in line does not have an input type compatible with runConduitRes.

I just cant make sense of it and really need a hint.

Thanks a lot in advance.

like image 228
ngmir Avatar asked Jan 23 '26 11:01

ngmir


1 Answers

I was struggling with this today, and found this question while trying to figure out a similar problem. I was trying to break git logs into chunks for further parsing, like

commit 12345
Author: Me
Date:   Thu Jan 25 13:45:16 2019 -0500

    made some changes

 1 file changed, 10 insertions(+), 0 deletions(-)

commit 54321
Author: Me
...and so on...

The function I needed is almost splitOnUnBounded from Data.Conduit.Combinators, but I couldn't quite figure out how to write the predicate function there.

I came up with the following Conduit that is a slight modification of splitOnUnbounded. source It will take a stream of lists. There is one line of text per list, as I find it a bit easier to think about that way, though this is surely not an optimal solution.

It will group the lines of text together using a function that takes the next line and returns a Bool indicating if the next line is the start of the next group of text.


groupLines :: (Monad m, MonadIO m) => (Text -> Bool) -> [T.Text] -> ConduitM Text [Text] m ()
groupLines startNextLine ls = start
  where
    -- If the next line in the stream is Nothing, return.
    -- If the next line is the stream is Just line, then
    --   accumulate that line
    start = await >>= maybe (return ()) (accumulateLines ls)
    accumulateLines ls nextLine = do
      -- if ls is [], then add nextLine. Try to get a new next line. If there isn't one, yield. If there is a next line,
      --     yield lines and call accumulatelines again.
      -- if ls is [Text], check if nextLine is the start of the next group. If it isn't, add nextLine to ls,
      --    try got the the next nextLine. if there isn't one, yield, and if there is one, call accumulate lines again.
      --    If nextLine _is_ the start of the next group, the yield this group of lines and call accumulate lines again.
      nextLine' <- await
      case nextLine' of
        Nothing -> yield ls'
        Just l ->
          if Prelude.null ls
            then accumulateLines ls' l
            else
              if startNextLine l
                then yield ls' >> accumulateLines [] l
                else accumulateLines ls' l
      where
        ls' = ls ++ [nextLine]

It can be used in a conduit like the following. Just pass the function above a Text -> Bool function that tells the conduit when the next collection of text should start.


isCommitLine :: Text -> Bool
isCommitLine t = listToMaybe (TS.indices "commit" t) == Just 0

logParser =
  sourceFile "logs.txt"
    .| decodeUtf8
    .| linesUnbounded
    .| groupLines isCommitLine []
    .| Data.Conduit.Combinators.map (intercalate "\n")
    -- do something with each log entry here --
    .| Data.Conduit.Combinators.print

main :: IO ()
main = runConduitRes logParser

I'm new to Haskell, and strongly suspect this isn't the best way to accomplish this. So if others have better suggestions, I'll be happy to learn! Otherwise, maybe posting this solution here will help somebody down the line.

like image 90
NateV Avatar answered Jan 26 '26 02:01

NateV



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!