Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Concurrent write with OCaml Async

I'm reading data from the network and I'd like to write it to a file whenever I get them. The writes are concurrent and non sequential (think P2P file sharing). In C, I would get a file descriptor to the file(for the duration of the program) then use lseek, followed by write and eventually close the fd. These operations could be protected by a mutex in a multithreaded setting (especially, lseek and write should be atomic).

I don't really see how to get that behavior in Async. My initial idea is to have something like this.

 let write fd s pos = 
     let posl = Int64.of_int pos in
     Async_unix.Unix_syscalls.lseek fd ~mode:`Set posl
     >>| fun _ -> 
     let wr = Writer.create t.fd in
     let len = String.length s in
     Writer.write wr s ~pos:0 ~len

Then, the writes are scheduled asynchronously when data is received.

My solution isn't correct. For one thing, this write task need to be atomic but it is not the case, since two lseek can be executed before the first Writer.write. Even if I can schedule the write sequentially it wouldn't help since Writer.write doesn't return a Deferred.t. Any idea?

BTW, this is a follow-up to a previous answered question.

like image 889
Nemo Avatar asked Dec 20 '25 16:12

Nemo


1 Answers

The basic approach would be to have a queue of workers, where each worker performs an atomic seek/write1 operation. The invariant is that only one worker at a time is running. A more complicated strategy would employ a priority queue, where writes are ordered by some criterium that maximizes the throughput, e.g., writes to subsequent positions. You may also implement a sophisticated buffering strategy if you observe lots of small writes, then a good idea would be to coalesce them into larger chunks.

But let's start with a simple non-prioritized queue, implemented via Async.Pipe.t. For the positional write, we can't use the Writer interface, as it is designed for buffered sequential writes. So, we will use the Unix.lseek from Async_unix.Std and Bigstring.really_writefunction. The really_write is a regular non-asynchronous function, so we need to lift it into the Async interface using theFd.syscall_in_thread` function, e.g.,

let really_pwrite fd offset bytes = 
  Unix.lseek fd offset ~mode:`Set >>= fun (_ : int64) ->
  Fd.syscall_in_thread fd (fun desc -> 
    Bigstring.really_write desc bytes)

Note: this function will write as many bytes as system decides, but no more than the length of bytes. So you might be interested in implementing a really_pwrite function that will write all bytes.

The overall scheme would include one master thread, that will own a file descriptor and accept write requests from multiple clients via an Async.Pipe. Suppose that each write request is a message of the follwing type:

 type chunk = {
    offset : int;
    bytes : Bigstring.t;
 }

Then your master thread will look like this:

let process_requests fd = 
  Async.Pipe.iter ~f:(fun {offset; bytes} -> 
    really_pwrite fd offset bytes)

Where the really_pwrite is a function that really writes all the bytes and handles all the errors. You may also use Async.Pipe.iter' function and presort and coalesce the writes before actually executing the pwrite syscall.

One more optimization note. Allocating a bigstring is a rather expensive operation, so you may consider to pre allocate one big bigstring and serve small chunks from it. This will create a limited resource, so your clients will wait until other clients will finish their writes and release their chunks. As a result, you will have a throttled system with a limited memory footprint.


1)Ideally we should use pwrite though Janestreet provides only pwrite_assume_fd_is_nonblocking function, that doesn't release OCaml runtime when a call to the system pwrite is done, and will actually block the whole system. So we need to use a combination of a seek and write. The latter will release the OCaml runtime so that the rest of the program can continue. (Also, given their definition of nonblocking fd, this function doesn't really make much sense, as only sockets and FIFO are considered non-blocking, and as far as I know, they do not support the seek operation. I will file an issue on their bug tracker.

like image 191
ivg Avatar answered Dec 22 '25 10:12

ivg



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!