2012-10-04 131 views
5

我有一个小型的Clojure消费者/发布者接收消息,处理消息并通过RabbitMQ将消息发送给其他消费者。Clojure消息处理/异步,多线程

我已经定义了一个消息处理程序,它处理单独线程中的消息(与主线程分离)。 从下面的代码可以看出,线程同步接收和发送消息,所有这些都发生在由功能启动的事件循环中。

所以,问题是,什么是“Clojure方法”来创建这些同步消息处理程序的N大小的线程池?我猜非Clojure的方式是通过Java interop手动产生多个线程。

另外,考虑到处理不是CPU密集型的,这会加快消息处理的速度吗?考虑到发布花费的时间多于处理花费的时间,再次将这些消息处理程序设置为异步会更好吗?

最后,我将如何去测量这些竞争方法的性能(我来自Ruby/Javascript世界,并且那里没有多线程)?

注意: 我知道这一切可以通过只是水平缩放和产卵多JVM进程听完消息总线是可以避免的,但由于应用程序将被部署在Heroku上,我想作为使用每个动态/流程中尽可能多的资源。

(defn message-handler 
    [ch metadata ^bytes payload] 
    (let [msg (json/parse-string (String. payload "UTF-8")) 
     processed-message (process msg)] 
    (lb/publish ch "e.events" "" processed-message))) 

(defn -main 
    [& args] 
    (let [conn   (rmq/connect {:uri (System/getenv "MSGQ")}) 
     ch   (lch/open conn) 
     q-name  "q.events.tagger" 
     e-sub-name "e.events.preproc" 
     e-pub-name "e.events" 
     routing-key "tasks.taggify"] 
    (lq/declare ch q-name :exclusive false :auto-delete false) 
    (le/declare ch e-pub-name "fanout" :durable false) 
    (lq/bind ch q-name e-sub-name :routing-key routing-key) 
    (.start (Thread. (fn [] 
         (lcm/subscribe ch q-name message-handler :auto-ack true)))))) 

在一个更基本的音符......我将如何去重构这个代码,以支持一个额外的参数注册消息处理程序回调,像这样:

(.start (Thread. (fn [] 
         (lcm/subscribe ch q-name (message-handler pub-name) :auto-ack true)))))) 

和然后用参考发布:

(lb/publish ch pub-name "" processed-message))) 

来代替字面:

(lb/publish ch "e.events" "" processed-message))) 

回答

2

对于问题的第二部分时,可以使用局部应用,如下所示:

(defn message-handler 
    [pub-name ch metadata ^bytes payload] 
    (let [msg (json/parse-string (String. payload "UTF-8")) 
     processed-message (process msg)] 
    (lb/publish ch pub-name "" processed-message))) 



(.start 
    (Thread. 
    (fn [] 
     (lcm/subscribe ch q-name (partial message-handler e-pub-name) :auto-ack true)))))) 
1

这是一个非常大的话题,你可能会考虑把这个问题分解成几个不同的问题,但简洁的答案是:use agents

+0

感谢您的提示,就行了。 – neektza