2014-10-01 43 views
1

我正在实现一个线程的机制,让它有一个包含消息的队列。该队列使用java.util.concurrentLinkedBlockingQueue构建。我想要达到的是如下所示。Clojure等待没有旋转的条件

Thread with mailbox: 
defn work: 
    * do some stuff 
    * Get the head of the queue (a message): 
     - if it is "hello": 
      <do some stuff> 
      <recur work fn> 
     - if it is "bye": 
      <do some stuff> 
     - if it is none of the above, add the message to the back of queue 
      and restart from "Get the head of the queue" 
    * <reaching this point implies terminating the thread> 

,我试图用的是* Get the head of the queue缠循环,使用条件检查邮件,并在:else分支添加到队列中,如果没有匹配的条款来实现我最初的想法。其缺点是在cond的条款的任何主体中呼叫recur将总是重复该循环,而使用recur(例如,如在hello的情况下)意味着重复该功能(即,work)。所以这不是一个选择。另一个缺点是,如果这种信息需要很长时间才能到达,线程将无限期地旋转并吃掉资源。

我有(但尚未实施)的下一个想法是使用未来。该计划如下。

* Get all the matches I have to match (i.e., "hello" and "bye") 
* Start a future and pass it the list of messages: 
    * While the queue does not contain any of the messages 
     recur 
    * when found, return the first element that matches. 
* Wait for the future to deliver. 
* if it is "hello": 
    <do some stuff> 
    <recur work fn> 
    if it is "bye": 
    <do some stuff> 

在做这种方式,我得到几乎我想要的东西:

  1. 在接受"hello""bye"块,直到我有一张。
  2. 我可以让条款无限期数量相匹配的消息
  3. 我已经提取的循环行为成future阻止,这 有很好的副作用,每次我评价我cond 我敢肯定我有一个匹配的消息,不必担心重试。

我真正想要的一件事,但无法想象如何实现,就是在这种情况下的未来不会旋转。就目前而言,它会无限期地消耗穿越队列的宝贵CPU资源,而从不收到它正在寻找的消息可能是完全正常的。

也许放弃LinkedBlockedQueue并将其换算为具有方法的数据结构是有意义的,例如getEither(List<E> oneOfThese),该方法阻塞,直到其中一个元素可用。

我有一个其他想法,这是我可能用Java做的一种方式,如果队列中没有任何元素在队列中,那么调用wait()时会有上述getEither()操作。当其他线程在队列中放入消息时,我可以调用notify(),以便每个线程都会根据他想要的消息列表检查队列。

下面的代码工作正常。但是,它有纺纱问题。这基本上是我想要实现的一个非常基本的例子。

(def queue (ref '())) 

(defn contains-element [elements collection] 
    (some (zipmap elements (repeat true)) collection)) 

(defn has-element 
    [col e] 
    (some #(= e %) col)) 

(defn find-first 
     [f coll] 
     (first (filter f coll))) 

; This function is blocking, which is what I want. 
; However, it spins and thus used a LOT of cpu, 
; whit is *not* what I want.. 
(defn get-either 
    [getthese queue] 
    (dosync 
    (let [match (first (filter #(has-element getthese %) @queue)) 
      newlist (filter #(not= match %) @queue)] 

     (if (not (nil? match)) 
     (do (ref-set queue newlist) 
      match) 
     (Thread/sleep 500) 
     (recur))))) 

(defn somethread 
    [iwantthese] 
    (let [element (get-either iwantthese queue) 
     wanted (filter #(not= % element) iwantthese)] 
    (println (str "I got " element)) 
    (Thread/sleep 500) 
    (recur wanted))) 

(defn test 
    [] 
    (.start (Thread. (fn [] (somethread '(3 4 5))))) 

    (dosync (alter queue #(cons 1 %))) 
    (println "Main: added 1") 
    (Thread/sleep 1000) 

    (dosync (alter queue #(cons 2 %))) 
    (println "Main: added 2") 
    (Thread/sleep 1000) 

    (dosync (alter queue #(cons 3 %))) 
    (println "Main: added 3") 
    (Thread/sleep 1000) 

    (dosync (alter queue #(cons 4 %))) 
    (println "Main: added 4") 
    (Thread/sleep 1000) 

    (dosync (alter queue #(cons 5 %))) 
    (println "Main: added 5")   
) 

任何提示?

(万一有人注意到,没错,这就是像演员和目的是Clojure中的学术目的的实现)

回答

1

您需要2个队列而不是1个:传入队列和“死信”队列。

  1. A “线程” 应该从传入队列中的阻挡方式(LinkedBlockingQueue.take(),core.async/<!或使用药剂)读取。
  2. 如果消息不匹配任何条款:
    1. 运行子句工作:
      1. 放置消息的死队列的末尾
      2. 转到1
    2. 如果消息子句匹配
    3. 对于死亡队列中的每条消息,匹配子句,删除匹配的消息。
    4. 去1

见下面的两种实现方法。

代理

代理是非常相似的演员,“唯一”的区别是,你发送的数据/信息演员,但你发送功能剂。一种可能的实现方式是:

(defn create-actor [behaviour] 
    (agent {:dead-queue [] 
      :behaviour behaviour})) 

dead-queue将包含与任何子句不匹配的消息。这基本上是你的“队列结束”。 behaviour应该是一些match-fn的map/vector来运行。在我的具体实现中,我选择了一个地图,其中键元件匹配和值是fn到新的项目相匹配时运行:

(def actor (create-actor {3 println 
          4 (partial println "Got a ") 
          5 #(println "Got a " %)})) 

你可能会需要更复杂的behaviour数据结构。唯一重要的是要知道元素是否被处理,所以你知道元素是否必须去死队列。

要发送消息给演员:

(defn push [actor message] 
    (send actor 
     (fn [state new-message] 
      (if-let [f (get-in state [:behaviour new-message])] 
      (do 
       (f new-message) 
       state) 
      (update-in state [:dead-queue] conj new-message))) 
     message)) 

所以,如果有对behaviour匹配,该消息被立即处理。如果不是,则存储在死队列中。如果您希望behaviours不是纯函数,您可以在处理新消息后尝试匹配/处理死队列中的所有消息。在这个示例实现中,这是不可能的。

我们可以改变的演员behaviour给就死了队列中的消息有机会进行处理:

(defn change-behaviour [actor behaviour] 
    (send actor 
     (fn [state new-behaviour] 
      (let [to-process (filter new-behaviour (:dead-queue state)) 
       new-dead-queue (vec (remove (set to-process) (:dead-queue state)))] 
      (doseq [old-message to-process 
        :let [f (get new-behaviour old-message)]] 
       (f old-message)) 
      {:behaviour new-behaviour 
      :dead-queue new-dead-queue})) 
     conds)) 

,并用它的一个例子:

(push actor 4) 
(push actor 18) 
(push actor 1) 
(push actor 18) 
(push actor 5) 
(change-behaviour actor {18 (partial println "There was an")}) 

而基于core.async的相同解决方案:

(defn create-actor [behaviour] 
    (let [queue (async/chan)] 
    (async/go-loop [dead-queue [] 
        behaviour behaviour] 
    (let [[type val] (async/<! queue)] 
     (if (= type :data) 
     (if-let [f (get behaviour val)] 
      (do 
      (f val) 
      (recur dead-queue behaviour)) 
      (recur (conj dead-queue val) behaviour)) 
     (let [to-process (filter val dead-queue) 
       new-dead-queue (vec (remove (set to-process) dead-queue))] 
      (doseq [old-msg to-process 
        :let [f (get val old-msg)]] 
      (f old-msg)) 
      (recur new-dead-queue val))))) 
    queue)) 

(defn push [actor message] 
    (async/go 
    (async/>! actor [:data message]))) 

(defn change-behaviour [actor behaviour] 
    (async/go 
    (async/>! actor [:behaviour behaviour]))) 
0

你有没有考虑过使用core.async?它以轻量级的方式提供您所需要的。

+0

是的,我曾看过。但是我为了研究目的在Clojure中实现了actor模型。所以我真的需要我自己的实现。快速查看从core.async传递的消息告诉我,不可能通过过滤器缓存消息并取出我想要的消息。例如,从所有消息中取出满足谓词p的第一条消息。 – 2014-10-01 20:59:17