Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I improve this Clojure Component+async example?

I want to figure out how best to create an async component, or accommodate async code in a Component-friendly way. This is the best I can come up with, and... it just doesn't feel quite right.

The Gist: take words, uppercase them and reverse them, finally print them.

Problem 1: I can't get the system to stop at the end. I expect to see a println of the individual c-chans stopping, but don't.

Problem 2: How do I properly inject deps. into the producer/consumer fns? I mean, they're not components, and I think they should not be components since they have no sensible lifecycle.

Problem 3: How do I idiomatically handle the async/pipeline-creating side-effects named a>b, and b>c? Should a pipeline be a component?

(ns pipelines.core
  (:require [clojure.core.async :as async
             :refer [go >! <! chan pipeline-blocking close!]]
            [com.stuartsierra.component :as component]))


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PIPELINES
(defn a>b [a> b>]
  (pipeline-blocking 4
                     b>
                     (map clojure.string/upper-case)
                     a>))
(defn b>c [b> c>]
  (pipeline-blocking 4
                     c>
                     (map (comp (partial apply str)
                                reverse))
                     b>))


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PRODUCER / CONSUMER
(defn producer [a>]
  (doseq [word ["apple" "banana" "carrot"]]
    (go (>! a> word))))

(defn consumer [c>]
  (go (while true
        (println "Your Word Is: " (<! c>)))))



;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; SYSTEM
(defn pipeline-system [config-options]
  (let [c-chan (reify component/Lifecycle
                 (start [this]
                   (println "starting chan: " this)
                   (chan 1))
                 (stop [this]
                   (println "stopping chan: " this)
                   (close! this)))]
    (-> (component/system-map
         :a> c-chan
         :b> c-chan
         :c> c-chan)
        (component/using {}))))


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; RUN IT!
(def system (atom nil))
(let [_      (reset! system (component/start (pipeline-system {})))
      _      (a>b (:a> @system) (:b> @system))
      _      (b>c (:b> @system) (:c> @system))
      _      (producer (:a> @system))
      _      (consumer (:c> @system))
      _      (component/stop @system)])

EDIT:

I started thinking about the following, but I'm not quite sure if it's closing properly...

(extend-protocol component/Lifecycle
  clojure.core.async.impl.channels.ManyToManyChannel
  (start [this]
    this)
  (stop [this]
    (close! this)))
like image 658
Josh.F Avatar asked Oct 16 '25 19:10

Josh.F


1 Answers

I rewrote your example a little to make it reloadable:

Reloadable Pipeline

(ns pipeline
  (:require [clojure.core.async :as ca :refer [>! <!]]
            [clojure.string :as s]))

(defn upverse [from to]
  (ca/pipeline-blocking 4
                        to
                        (map (comp s/upper-case
                                   s/reverse))
                        from))
(defn produce [ch xs]
  (doseq [word xs]
    (ca/go (>! ch word))))

(defn consume [ch]
  (ca/go-loop []
              (when-let [word (<! ch)]
                (println "your word is:" word)
                (recur))))

(defn start-engine []
  (let [[from to] [(ca/chan) (ca/chan)]]
    (upverse to from)
    (consume from)
    {:stop (fn []
             (ca/close! to)
             (ca/close! from)
             (println "engine is stopped"))
     :process (partial produce to)}))

this way you can just do (start-engine) and use it to process word sequences:

REPL time

boot.user=> (require '[pipeline])

boot.user=> (def engine (pipeline/start-engine))
#'boot.user/engine

running with it

boot.user=> ((engine :process) ["apple" "banana" "carrot"])

your word is: TORRAC
your word is: ANANAB
your word is: ELPPA

boot.user=> ((engine :process) ["do" "what" "makes" "sense"])

your word is: OD
your word is: SEKAM
your word is: ESNES
your word is: TAHW

stopping it

boot.user=> ((:stop engine))
engine is stopped

;; engine would not process anymore
boot.user=> ((engine :process) ["apple" "banana" "carrot"])
nil

State Management

Depending on how you intend to use this pipeline, a state management framework like Component might not be needed at all: no need to add anything "just in case", starting and stopping the pipeline in this case is a matter of calling two functions.

However in case this pipeline is used within a larger app with more states you could definitely benefit from a state management library.

I am not a fan of Component primarily because it requires a full app buyin (which makes it a framework), but I do respect other people using it.

mount

I would recommend to either not use anything specific in case the app is small: you, for example could compose this pipeline with other pipelines / logic and kick it off from -main, but if the app is any bigger and has more unrelated states, here is all you need to do to add mount to it:

(defstate engine :start (start-engine)
                 :stop ((:stop engine)))

starting pipeline

boot.user=> (mount/start)
{:started ["#'pipeline/engine"]}

running with it

boot.user=> ((engine :process) ["do" "what" "makes" "sense"])

your word is: OD
your word is: SEKAM
your word is: ESNES
your word is: TAHW

stopping it

boot.user=> (mount/stop)
engine is stopped
{:stopped ["#'pipeline/engine"]}

Here is a gist with a full example that includes build.boot.

You can just download and play with it via boot repl


[EDIT]: to answer the comments

In case you are already hooked on Component, this should get you started:

(defrecord WordEngine []
  component/Lifecycle

  (start [component]
    (merge component (start-engine)))

  (stop [component]
    ((:stop component))
    (assoc component :process nil :stop nil)))

This, on start, would create a WordEngine object that would have a :process method.

You won't be able to call it as you would a normal Clojure function: i.e. from REPL or any namespace just by :requireing it, unless you pass a reference to the whole system around which is not recommended.

So in order to call it, this WordEngine would need to be plugged into a Component system, and injected into yet another Component which can then destructure the :process function and call it.

like image 149
tolitius Avatar answered Oct 18 '25 14:10

tolitius



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!