Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Clojure multithreading output streams

I'm sort of a newbie to Clojure. I have multiple threads trying to write to an output stream and If I'm not mistaken sockets and their streams are not thread safe meaning bits can be mixed up if I write to them simultaneously. One of the main benefits of clojure is inbuilt concurrency handling of race conditions. How can I utilize this for my scenario?

I tried looking into atoms, refs and so on. I initially thought declaring the output stream as an atom would work but I'm not too sure, as it seems it avoids changing the atom state simultaneously (using swap!) however i think you can dereference an atom from multiple threads meaning multiple threads will deref the atom holding my output stream and write to it concurrently.

Any advise will be most helpful.

thanks in advance

(defn send-my-data [output data-bytes]
  (try 
    (.write output)
    (.flush output)
    (catch Exception excp
       (println (format "issue %s" (.printStackTrace excp))))

Now all my threads call this function anytime they want to write data to the output stream

like image 747
sqwale Avatar asked Mar 04 '26 12:03

sqwale


2 Answers

Agents are often considered the correct tool for this sort of task. They take a queue of tasks to run on their internal state and run them in the order they where received. They also play nicely with the rest of Clojure's STM. For instance messages sent to an agent form within a transaction are sent exactly once and only when the transaction commits.

user> (let [output-agent (agent "")] 
        (dotimes [x 10] 
          (send output-agent (fn [_] (println "hello" x)))))
nil
hello 0
hello 1
hello 2
hello 3
hello 4
hello 5
hello 6
hello 7
hello 8
hello 9

In this example the action to be taken is an anonymous function that ignores it's input and just prints something.

like image 112
Arthur Ulfeldt Avatar answered Mar 06 '26 02:03

Arthur Ulfeldt


You can use locking if you need to ensure that no other thread is using an object (and want to wait at that point in your code and not do anything in that particular thread until that object is unlocked so you can lock it).

user> (dotimes [i 10] (future (println \h \e \l \l \o)))
hhh h e
nil
 eh  le  l lo 
h e lh he  e ll h  el  ll  lo 
e  e l l oh
l l oo
l  ol

l lo o

 e

o
 l l o

user> (dotimes [i 10] (future (locking *out* (println \h \e \l \l \o))))
h
nil
 e l l o
h e l l o
h e l l o
h e l l o
h e l l o
h e l l o
h e l l o
h e l l o
h e l l o
h e l l o

user> 
like image 36
noisesmith Avatar answered Mar 06 '26 03:03

noisesmith