2012-01-19 232 views
1

我正在学习clojure并通过生产者消费者的例子来试用它的并发性和有效性。Clojure生产者消费者

这样做,它感觉非常尴尬,因为不得不使用ref和deref并且还要观察和解除观察。

我试图检查其他代码片段;但是除了使用Java Condition await()和signal()方法以及Java Lock之外,是否还有更好的重新分解方法。我不想在Java中使用任何东西。

这是代码;我想我会在这里犯了许多错误与我的使用...

;a simple producer class 
(ns my.clojure.producer 
    (:use my.clojure.consumer) 
    (:gen-class) 
) 

(def tasklist(ref (list))) ;this is declared as a global variable; to make this 
    ;mutable we need to use the fn ref 


(defn gettasklist[] 
    (deref tasklist) ;we need to use deref fn to return the task list 
) 

(def testagent (agent 0)); create an agent 


(defn emptytasklist[akey aref old-val new-val] 

    (doseq [item (gettasklist)] 
     (println(str "item is") item) 
     (send testagent consume item) 
     (send testagent increment item) 
    ) 
     (. java.lang.Thread sleep 1000) 
    (dosync ; adding a transaction for this is needed to reset 

     (remove-watch tasklist "key123"); removing the watch on the tasklist so that it does not 
             ; go to a recursive call 
     (ref-set tasklist (list)) ; we need to make it as a ref to reassign 
     (println (str "The number of tasks now remaining is=") (count (gettasklist))) 

    ) 
    (add-watch tasklist "key123" emptytasklist) 
) 
(add-watch tasklist "key123" emptytasklist) 

    (defn addtask [task] 
    (dosync ; adding a transaction for this is needed to refset 
     ;(println (str "The number of tasks before") (count (gettasklist))) 
     (println (str "Adding a task") task) 
     (ref-set tasklist (conj (gettasklist) task)) ; we need to make it as a ref to reassign 
     ;(println (str "The number of tasks after") (count (gettasklist))) 
    ) 
) 

这里是消费者代码

(ns my.clojure.consumer 
) 
(defn consume[c item] 

    (println "In the consume method:Item is " c item ) 
    item 
) 
(defn increment [c n] 
    (println "parmeters are" c n) 
    (+ c n) 
) 

这里是测试代码(我使用Maven来运行的Clojure代码使用NetBeans的编辑,因为这是我比较熟悉的从Java未来 - 在文件夹结构和聚甲醛 - https://github.com/alexcpn/clojure-evolve

(ns my.clojure.Testproducer 
     (:use my.clojure.producer) 
     (:use clojure.test) 
     (:gen-class) 
) 

(deftest test-addandcheck 

    (addtask 1) 
    (addtask 2) 
    (is(= 0 (count (gettasklist)))) 
    (println (str "The number of tasks are") (count (gettasklist))) 

) 

如果任何人都可以重构这个掉以轻心,这样我可以阅读和理解那么代码会很棒;否则我想我将有机会了解更多

编辑-1

我想用一个全球性的任务清单,然后提供给其他功能通过取消引用它(DEREF),并再次使其通过可变裁判不是clojure的方式;

因此改变addTask方法直接发送传入任务代理

(defn addtask [task] 
    (dosync ; adding a transaction for this is needed to refset 

     (println (str "Adding a task") task) 

     ;(ref-set tasklist (conj (gettasklist) task)) ; we need to make it as a ref to reassign 
     (def testagent (agent 0)); create an agent 
     (send testagent consume task) 
     (send testagent increment task) 

    ) 

然而,当我

(deftest test-addandcheck 
    (loop [task 0] 
    (when (< task 100) 
     (addtask task) 
     (recur (inc task)))) 

    (is(= 0 (count (gettasklist)))) 
    (println (str "The number of tasks are") (count (gettasklist))) 

) 

之后的某个时候我得到的Java拒绝执行异常测试它 - 这如果您执行Java线程,那很好,因为您可以完全控制。但是从Clojure的这看起来很奇怪,特别是因为你没有选择的线程池stratergy自己

Adding a task 85 
Exception in thread "pool-1-thread-4" java.util.concurrent.RejectedExecutionExce 
ption 
     at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution 
(ThreadPoolExecutor.java:1759) 
     at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.jav 
a:767) 
     at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.ja 
va:658) 
     at clojure.lang.Agent$Action.execute(Agent.java:56) 
     at clojure.lang.Agent$Action.doRun(Agent.java:95) 
     at clojure.lang.Agent$Action.run(Agent.java:106) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec 
utor.java:885)Adding a task 86 
Adding a task 87 

回答

1

我在Computing Folder Sizes Asynchronously取得与生产者/消费者模式类似的Clojure程序。

我不认为你真的需要使用参考。 你有一个可变状态的单任务列表,你想同步更改。变化相对较快,不依赖于任何其他外部状态,而仅依赖于消费者的变量。

至于与原子有交换!功能可以帮助您按照我了解您的需要的方式进行更改。

你可以看看我的代码Computing folder size我认为它至少可以显示你正确使用原子&代理。我玩了很多,所以它应该是正确的。

希望它有帮助!

问候, 扬

+0

任务列表可以随时由任意数量的生产者线程修改。 –

+0

这是正确的,这是您访问可变状态的方式 - 这很重要。记住你有**一个共享状态**。 原子工作奇妙,当你想确保原子更新到一个单独的状态。 _我没有看到你正在协调任何更新._ 生产者生产数据,消费者从共同共享状态消费数据。 – jppalencar

4

我觉得造型生产&消费者用Clojure将使用lamina channels做最简单的(最有效和)。

+0

谢谢,我相信这是最好的方法,但我不想现在使用任何库,因为我正在学习 –

+0

我想使用全局任务列表并通过取消引用将其提供给其他函数(deref)并且再次让它变成ref,这不是clojure的方式; –

0

我正在看Twitter风暴的Clojure示例。他只是使用LinkedBlockingQueue。它易于使用,并发并且表现良好。当然,它缺乏不可改变的Clojure解决方案的性吸引力,但它会运作良好。

+0

我使用BlockingQueue Future Task ets在Java中完成了多线程程序。而且它很容易和高效。对于我来说,多线程的Clojure作为一个clojure amateaur并没有太大的吸引力,如果我必须为生产者和消费者编写这么多丑陋的代码,那么这么多的ref和deref –

0

我遇到一些使用情况下,我需要的能力:

  • 严格控制工作线程的数量对生产者和消费者都侧
  • 控制“工作的最大尺寸排队”,以限制内存消耗
  • 检测,当所有工作已经完成,这样我就可以关闭工人

我发现Clojure的内置并发功能(虽然惊人的简单和有用的自己的权利)使前两个重点难点。 lamina看起来不错,但我没有看到一种方式,它将解决我的特殊用例,而不是我需要在基于BlockingQueue的实现方面做的额外配件。

所以,我结束了一个简单的clojure库,试图解决我的问题。它基本上只是一个围绕BlockingQueue的封装,它试图隐藏一些Java结构并提供更高级的生产者 - 消费者API。我还不完全满意API;这将有可能演变远一点......但它的操作:

https://github.com/cprice-puppet/freemarket

用法示例:

(def myproducer (producer producer-work-fn num-workers max-work)) 
(def myconsumer (consumer myproducer consumer-work-fn num-workers max-results)) 
(doseq [result (work-queue->seq (:result-queue myconsumer))] 
    (println result)) 

意见/建议/贡献将受到欢迎!