I'm trying to compute folder size in parallel. Maybe it's naive approach. What I do, is that I give computation of every branch node (directory) to an agent. All leaf nodes have their file sizes added to my-size. Well it doesn't work. :)
'scan' works ok, serially. 'pscan' prints only files from first level.
(def agents (atom []))
(def my-size (atom 0))
(def root-dir (clojure.java.io/file "/"))
(defn scan [listing]
  (doseq [f listing]
    (if (.isDirectory f)
      (scan (.listFiles f))
      (swap! my-size #(+ % (.length f))))))
(defn pscan [listing]
  (doseq [f listing]
    (if (.isDirectory f)
      (let [a (agent (.listFiles f))]
        (do (swap! agents #(conj % a))
            (send-off a pscan)
            (println (.getName f))))
    (swap! my-size #(+ %  (.length f))))))
Do you have any idea, what have i done wrong?
Thanks.
No need to keep state using atoms. Pure functional:
(defn psize [f]
  (if (.isDirectory f)
    (apply + (pmap psize (.listFiles f)))
    (.length f)))
It's not :)
I tried to solve this issue better. I realized that i'm doing blocking I/O operations so pmap doesn't do the job. I was thinking maybe giving chunks of directories (branches) to agents to process it independently would make sense. Looks it does :) Well I haven't benchmarked it yet.
It works, but, there might be some problems with symbolic links on UNIX-like systems.
(def user-dir (clojure.java.io/file "/home/janko/projects/"))
(def root-dir (clojure.java.io/file "/"))
(def run? (atom true))
(def *max-queue-length* 1024)
(def *max-wait-time* 1000)    ;; wait max 1 second then process anything left
(def *chunk-size* 64)
(def queue (java.util.concurrent.LinkedBlockingQueue. *max-queue-length* ))
(def agents (atom []))
(def size-total (atom 0))
(def a (agent []))
(defn branch-producer [node]
  (if @run?
    (doseq [f node]
      (when (.isDirectory f)
    (do (.put queue f)
        (branch-producer (.listFiles f)))))))
(defn producer [node]
  (future
    (branch-producer node)))
(defn node-consumer [node]
  (if (.isFile node)
    (.length node)
    0))
(defn chunk-length []
  (min (.size queue) *chunk-size*))
(defn compute-sizes [a]
  (doseq [i (map (fn [f] (.listFiles f)) a)]
    (swap! size-total #(+ % (apply + (map node-consumer i))))))
(defn consumer []
  (future
    (while @run?
      (when-let [size (if (zero? (chunk-length))
            false
            (chunk-length))] ;appropriate size of work
      (binding [a (agent [])]                    
        (dotimes [_ size]         ;give us all directories to process
          (when-let [item (.poll queue)]
            (set! a (agent (conj @a item)))))
        (swap! agents #(conj % a))
        (send-off a compute-sizes))
      (Thread/sleep *max-wait-time*)))))
You can start it by typing
    (producer (list user-dir))
    (consumer)
For result type
    @size-total
You can stop it by (there are running futures - correct me if I'm wrong)
    (swap! run? not)
If you find any errors/mistakes, you're welcome to share your ideas!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With