我正在学习clojure并通过生产者消费者的例子来试用它的并发性和有效性。Clojure生产者消费者
这样做,它感觉非常尴尬,因为不得不使用ref和deref并且还要观察和解除观察。
我试图检查其他代码片段;但是除了使用Java Condition await()和signal()方法以及Java Lock之外,是否还有更好的重新分解方法。我不想在Java中使用任何东西。
这是代码;我想我会在这里犯了许多错误与我的使用...
;a simple producer class
(ns my.clojure.producer
(:use my.clojure.consumer)
(:gen-class)
)
(def tasklist(ref (list))) ;this is declared as a global variable; to make this
;mutable we need to use the fn ref
(defn gettasklist[]
(deref tasklist) ;we need to use deref fn to return the task list
)
(def testagent (agent 0)); create an agent
(defn emptytasklist[akey aref old-val new-val]
(doseq [item (gettasklist)]
(println(str "item is") item)
(send testagent consume item)
(send testagent increment item)
)
(. java.lang.Thread sleep 1000)
(dosync ; adding a transaction for this is needed to reset
(remove-watch tasklist "key123"); removing the watch on the tasklist so that it does not
; go to a recursive call
(ref-set tasklist (list)) ; we need to make it as a ref to reassign
(println (str "The number of tasks now remaining is=") (count (gettasklist)))
)
(add-watch tasklist "key123" emptytasklist)
)
(add-watch tasklist "key123" emptytasklist)
(defn addtask [task]
(dosync ; adding a transaction for this is needed to refset
;(println (str "The number of tasks before") (count (gettasklist)))
(println (str "Adding a task") task)
(ref-set tasklist (conj (gettasklist) task)) ; we need to make it as a ref to reassign
;(println (str "The number of tasks after") (count (gettasklist)))
)
)
这里是消费者代码
(ns my.clojure.consumer
)
(defn consume[c item]
(println "In the consume method:Item is " c item )
item
)
(defn increment [c n]
(println "parmeters are" c n)
(+ c n)
)
这里是测试代码(我使用Maven来运行的Clojure代码使用NetBeans的编辑,因为这是我比较熟悉的从Java未来 - 在文件夹结构和聚甲醛 - https://github.com/alexcpn/clojure-evolve
(ns my.clojure.Testproducer
(:use my.clojure.producer)
(:use clojure.test)
(:gen-class)
)
(deftest test-addandcheck
(addtask 1)
(addtask 2)
(is(= 0 (count (gettasklist))))
(println (str "The number of tasks are") (count (gettasklist)))
)
如果任何人都可以重构这个掉以轻心,这样我可以阅读和理解那么代码会很棒;否则我想我将有机会了解更多
编辑-1
我想用一个全球性的任务清单,然后提供给其他功能通过取消引用它(DEREF),并再次使其通过可变裁判不是clojure的方式;
因此改变addTask方法直接发送传入任务代理
(defn addtask [task]
(dosync ; adding a transaction for this is needed to refset
(println (str "Adding a task") task)
;(ref-set tasklist (conj (gettasklist) task)) ; we need to make it as a ref to reassign
(def testagent (agent 0)); create an agent
(send testagent consume task)
(send testagent increment task)
)
然而,当我
(deftest test-addandcheck
(loop [task 0]
(when (< task 100)
(addtask task)
(recur (inc task))))
(is(= 0 (count (gettasklist))))
(println (str "The number of tasks are") (count (gettasklist)))
)
之后的某个时候我得到的Java拒绝执行异常测试它 - 这如果您执行Java线程,那很好,因为您可以完全控制。但是从Clojure的这看起来很奇怪,特别是因为你没有选择的线程池stratergy自己
Adding a task 85
Exception in thread "pool-1-thread-4" java.util.concurrent.RejectedExecutionExce
ption
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution
(ThreadPoolExecutor.java:1759)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.jav
a:767)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.ja
va:658)
at clojure.lang.Agent$Action.execute(Agent.java:56)
at clojure.lang.Agent$Action.doRun(Agent.java:95)
at clojure.lang.Agent$Action.run(Agent.java:106)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
utor.java:885)Adding a task 86
Adding a task 87
任务列表可以随时由任意数量的生产者线程修改。 –
这是正确的,这是您访问可变状态的方式 - 这很重要。记住你有**一个共享状态**。 原子工作奇妙,当你想确保原子更新到一个单独的状态。 _我没有看到你正在协调任何更新._ 生产者生产数据,消费者从共同共享状态消费数据。 – jppalencar