我正在实现一个线程的机制,让它有一个包含消息的队列。该队列使用java.util.concurrent
的LinkedBlockingQueue
构建。我想要达到的是如下所示。Clojure等待没有旋转的条件
Thread with mailbox:
defn work:
* do some stuff
* Get the head of the queue (a message):
- if it is "hello":
<do some stuff>
<recur work fn>
- if it is "bye":
<do some stuff>
- if it is none of the above, add the message to the back of queue
and restart from "Get the head of the queue"
* <reaching this point implies terminating the thread>
,我试图用的是* Get the head of the queue
缠循环,使用条件检查邮件,并在:else
分支添加到队列中,如果没有匹配的条款来实现我最初的想法。其缺点是在cond
的条款的任何主体中呼叫recur
将总是重复该循环,而使用recur
(例如,如在hello
的情况下)意味着重复该功能(即,work
)。所以这不是一个选择。另一个缺点是,如果这种信息需要很长时间才能到达,线程将无限期地旋转并吃掉资源。
我有(但尚未实施)的下一个想法是使用未来。该计划如下。
* Get all the matches I have to match (i.e., "hello" and "bye")
* Start a future and pass it the list of messages:
* While the queue does not contain any of the messages
recur
* when found, return the first element that matches.
* Wait for the future to deliver.
* if it is "hello":
<do some stuff>
<recur work fn>
if it is "bye":
<do some stuff>
在做这种方式,我得到几乎我想要的东西:
- 在接受
"hello"
或"bye"
块,直到我有一张。 - 我可以让条款无限期数量相匹配的消息
- 我已经提取的循环行为成
future
阻止,这 有很好的副作用,每次我评价我cond
我敢肯定我有一个匹配的消息,不必担心重试。
我真正想要的一件事,但无法想象如何实现,就是在这种情况下的未来不会旋转。就目前而言,它会无限期地消耗穿越队列的宝贵CPU资源,而从不收到它正在寻找的消息可能是完全正常的。
也许放弃LinkedBlockedQueue
并将其换算为具有方法的数据结构是有意义的,例如getEither(List<E> oneOfThese)
,该方法阻塞,直到其中一个元素可用。
我有一个其他想法,这是我可能用Java做的一种方式,如果队列中没有任何元素在队列中,那么调用wait()
时会有上述getEither()
操作。当其他线程在队列中放入消息时,我可以调用notify()
,以便每个线程都会根据他想要的消息列表检查队列。
例
下面的代码工作正常。但是,它有纺纱问题。这基本上是我想要实现的一个非常基本的例子。
(def queue (ref '()))
(defn contains-element [elements collection]
(some (zipmap elements (repeat true)) collection))
(defn has-element
[col e]
(some #(= e %) col))
(defn find-first
[f coll]
(first (filter f coll)))
; This function is blocking, which is what I want.
; However, it spins and thus used a LOT of cpu,
; whit is *not* what I want..
(defn get-either
[getthese queue]
(dosync
(let [match (first (filter #(has-element getthese %) @queue))
newlist (filter #(not= match %) @queue)]
(if (not (nil? match))
(do (ref-set queue newlist)
match)
(Thread/sleep 500)
(recur)))))
(defn somethread
[iwantthese]
(let [element (get-either iwantthese queue)
wanted (filter #(not= % element) iwantthese)]
(println (str "I got " element))
(Thread/sleep 500)
(recur wanted)))
(defn test
[]
(.start (Thread. (fn [] (somethread '(3 4 5)))))
(dosync (alter queue #(cons 1 %)))
(println "Main: added 1")
(Thread/sleep 1000)
(dosync (alter queue #(cons 2 %)))
(println "Main: added 2")
(Thread/sleep 1000)
(dosync (alter queue #(cons 3 %)))
(println "Main: added 3")
(Thread/sleep 1000)
(dosync (alter queue #(cons 4 %)))
(println "Main: added 4")
(Thread/sleep 1000)
(dosync (alter queue #(cons 5 %)))
(println "Main: added 5")
)
任何提示?
(万一有人注意到,没错,这就是像演员和目的是Clojure中的学术目的的实现)
是的,我曾看过。但是我为了研究目的在Clojure中实现了actor模型。所以我真的需要我自己的实现。快速查看从core.async传递的消息告诉我,不可能通过过滤器缓存消息并取出我想要的消息。例如,从所有消息中取出满足谓词p的第一条消息。 – 2014-10-01 20:59:17