I want to figure out how best to create an async component, or accommodate async code in a Component-friendly way. This is the best I can come up with, and... it just doesn't feel quite right.
The Gist: take words, uppercase
them and reverse
them, finally print
them.
Problem 1: I can't get the system
to stop at the end. I expect to see a println
of the individual c-chan
s stopping, but don't.
Problem 2: How do I properly inject deps. into the producer
/consumer
fns? I mean, they're not components, and I think they should not be components since they have no sensible lifecycle.
Problem 3: How do I idiomatically handle the async/pipeline
-creating side-effects named a>b
, and b>c
? Should a pipeline
be a component?
(ns pipelines.core
(:require [clojure.core.async :as async
:refer [go >! <! chan pipeline-blocking close!]]
[com.stuartsierra.component :as component]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PIPELINES
(defn a>b [a> b>]
(pipeline-blocking 4
b>
(map clojure.string/upper-case)
a>))
(defn b>c [b> c>]
(pipeline-blocking 4
c>
(map (comp (partial apply str)
reverse))
b>))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PRODUCER / CONSUMER
(defn producer [a>]
(doseq [word ["apple" "banana" "carrot"]]
(go (>! a> word))))
(defn consumer [c>]
(go (while true
(println "Your Word Is: " (<! c>)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; SYSTEM
(defn pipeline-system [config-options]
(let [c-chan (reify component/Lifecycle
(start [this]
(println "starting chan: " this)
(chan 1))
(stop [this]
(println "stopping chan: " this)
(close! this)))]
(-> (component/system-map
:a> c-chan
:b> c-chan
:c> c-chan)
(component/using {}))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; RUN IT!
(def system (atom nil))
(let [_ (reset! system (component/start (pipeline-system {})))
_ (a>b (:a> @system) (:b> @system))
_ (b>c (:b> @system) (:c> @system))
_ (producer (:a> @system))
_ (consumer (:c> @system))
_ (component/stop @system)])
EDIT:
I started thinking about the following, but I'm not quite sure if it's closing properly...
(extend-protocol component/Lifecycle
clojure.core.async.impl.channels.ManyToManyChannel
(start [this]
this)
(stop [this]
(close! this)))
I rewrote your example a little to make it reloadable:
(ns pipeline
(:require [clojure.core.async :as ca :refer [>! <!]]
[clojure.string :as s]))
(defn upverse [from to]
(ca/pipeline-blocking 4
to
(map (comp s/upper-case
s/reverse))
from))
(defn produce [ch xs]
(doseq [word xs]
(ca/go (>! ch word))))
(defn consume [ch]
(ca/go-loop []
(when-let [word (<! ch)]
(println "your word is:" word)
(recur))))
(defn start-engine []
(let [[from to] [(ca/chan) (ca/chan)]]
(upverse to from)
(consume from)
{:stop (fn []
(ca/close! to)
(ca/close! from)
(println "engine is stopped"))
:process (partial produce to)}))
this way you can just do (start-engine)
and use it to process word sequences:
boot.user=> (require '[pipeline])
boot.user=> (def engine (pipeline/start-engine))
#'boot.user/engine
boot.user=> ((engine :process) ["apple" "banana" "carrot"])
your word is: TORRAC
your word is: ANANAB
your word is: ELPPA
boot.user=> ((engine :process) ["do" "what" "makes" "sense"])
your word is: OD
your word is: SEKAM
your word is: ESNES
your word is: TAHW
boot.user=> ((:stop engine))
engine is stopped
;; engine would not process anymore
boot.user=> ((engine :process) ["apple" "banana" "carrot"])
nil
Depending on how you intend to use this pipeline, a state management framework like Component might not be needed at all: no need to add anything "just in case", starting and stopping the pipeline in this case is a matter of calling two functions.
However in case this pipeline is used within a larger app with more states you could definitely benefit from a state management library.
I am not a fan of Component primarily because it requires a full app buyin (which makes it a framework), but I do respect other people using it.
I would recommend to either not use anything specific in case the app is small: you, for example could compose this pipeline with other pipelines / logic and kick it off from -main
, but if the app is any bigger and has more unrelated states, here is all you need to do to add mount to it:
(defstate engine :start (start-engine)
:stop ((:stop engine)))
boot.user=> (mount/start)
{:started ["#'pipeline/engine"]}
boot.user=> ((engine :process) ["do" "what" "makes" "sense"])
your word is: OD
your word is: SEKAM
your word is: ESNES
your word is: TAHW
boot.user=> (mount/stop)
engine is stopped
{:stopped ["#'pipeline/engine"]}
Here is a gist with a full example that includes build.boot
.
You can just download and play with it via boot repl
In case you are already hooked on Component, this should get you started:
(defrecord WordEngine []
component/Lifecycle
(start [component]
(merge component (start-engine)))
(stop [component]
((:stop component))
(assoc component :process nil :stop nil)))
This, on start, would create a WordEngine
object that would have a :process
method.
You won't be able to call it as you would a normal Clojure function: i.e. from REPL or any namespace just by :require
ing it, unless you pass a reference to the whole system around which is not recommended.
So in order to call it, this WordEngine
would need to be plugged into a Component system, and injected into yet another Component which can then destructure the :process
function and call it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With