2013-06-01 59 views
8

我为这篇文章的长度提前道歉。我花了相当多的时间缩短了时间,而且这个数字尽可能小。rxjava和clojure异步之谜:期货承诺和代理人,哦我的

我有一个谜,并会感激您的帮助。这个神秘来自我在Clojure写的一个rxjava observer的行为,这个行为来自于在线样本中的几个简单的observable

一个observable同步地发送消息给其观察者的处理程序,并且我所谓的原则性观察者按预期行事。

另一个可观察的异步在另一个线程上通过Clojure future做同样的事情。完全相同的观察员不会捕获发布到其onNext的所有事件;它似乎在尾部丢失了随机数量的消息。

在等待promiseonCompleted的等待期满和等待发送到agent收集器的所有事件到期之间,存在故意竞赛。如果promise获胜,我希望看到falseonCompletedagent中可能的短队列。如果agent获胜,我期望看到trueonCompletedagent的队列中的所有消息。我不希望的结果是true对于onCompleted和从agent短队列。但是,墨菲不睡觉,这正是我所看到的。我不知道垃圾收集是否有问题,或者是一些内部排队到Clojure的STM,或者我的愚蠢,或者其他什么。

我按照自包含表单的顺序显示源代码,以便它可以通过lein repl直接运行。有三个cermonials脱身的办法:第一,leiningen项目文件,project.clj,这对0.9.0版Netflix的rxjava的声明依赖性:

(defproject expt2 "0.1.0-SNAPSHOT" 
    :description "FIXME: write description" 
    :url "http://example.com/FIXME" 
    :license {:name "Eclipse Public License" 
      :url "http://www.eclipse.org/legal/epl-v10.html"} 
    :dependencies [[org.clojure/clojure    "1.5.1"] 
       [com.netflix.rxjava/rxjava-clojure "0.9.0"]] 
    :main expt2.core) 

现在,命名空间和Clojure的要求和Java进口:

(ns expt2.core 
    (:require clojure.pprint) 
    (:refer-clojure :exclude [distinct]) 
    (:import [rx Observable subscriptions.Subscriptions])) 

最后,输出到控制台宏:

(defmacro pdump [x] 
    `(let [x# ~x] 
    (do (println "----------------") 
     (clojure.pprint/pprint '~x) 
     (println "~~>") 
     (clojure.pprint/pprint x#) 
     (println "----------------") 
     x#))) 

最后,我的观察者。我使用agent来收集任何观察者的onNext发送的消息。我使用​​收集潜在的onError。我使用promise作为onCompleted,以便观察者外部的消费者可以等待它。

(defn- subscribe-collectors [obl] 
    (let [;; Keep a sequence of all values sent: 
     onNextCollector  (agent []) 
     ;; Only need one value if the observable errors out: 
     onErrorCollector  (atom nil) 
     ;; Use a promise for 'completed' so we can wait for it on 
     ;; another thread: 
     onCompletedCollector (promise)] 
    (letfn [;; When observable sends a value, relay it to our agent" 
      (collect-next  [item] (send onNextCollector (fn [state] (conj state item)))) 
      ;; If observable errors out, just set our exception; 
      (collect-error  [excp] (reset! onErrorCollector  excp)) 
      ;; When observable completes, deliver on the promise: 
      (collect-completed [ ] (deliver onCompletedCollector true)) 
      ;; In all cases, report out the back end with this: 
      (report-collectors [ ] 
       (pdump 
       ;; Wait for everything that has been sent to the agent 
       ;; to drain (presumably internal message queues): 
       {:onNext  (do (await-for 1000 onNextCollector) 
           ;; Then produce the results: 
           @onNextCollector) 
       ;; If we ever saw an error, here it is: 
       :onError  @onErrorCollector 
       ;; Wait at most 1 second for the promise to complete; 
       ;; if it does not complete, then produce 'false'. 
       ;; I expect if this times out before the agent 
       ;; times out to see an 'onCompleted' of 'false'. 
       :onCompleted (deref onCompletedCollector 1000 false) 
       }))] 
     ;; Recognize that the observable 'obl' may run on another thread: 
     (-> obl 
      (.subscribe collect-next collect-error collect-completed)) 
     ;; Therefore, produce results that wait, with timeouts, on both 
     ;; the completion event and on the draining of the (presumed) 
     ;; message queue to the agent. 
     (report-collectors)))) 

现在,这里是一个同步观察。它沿着观察员的onNext喉咙抽25条消息,然后打电话给他们的onCompleted

(defn- customObservableBlocking [] 
    (Observable/create 
    (fn [observer]      ; This is the 'subscribe' method. 
     ;; Send 25 strings to the observer's onNext: 
     (doseq [x (range 25)] 
     (-> observer (.onNext (str "SynchedValue_" x)))) 
     ; After sending all values, complete the sequence: 
     (-> observer .onCompleted) 
     ; return a NoOpSubsription since this blocks and thus 
     ; can't be unsubscribed (disposed): 
     (Subscriptions/empty)))) 

我们订阅我们的观察员此观察到:

;;; The value of the following is the list of all 25 events: 
(-> (customObservableBlocking) 
    (subscribe-collectors)) 

它能正常工作,而且我们看到控制台

{:onNext (do (await-for 1000 onNextCollector) @onNextCollector), 
:onError @onErrorCollector, 
:onCompleted (deref onCompletedCollector 1000 false)} 
~~> 
{:onNext 
["SynchedValue_0" 
    "SynchedValue_1" 
    "SynchedValue_2" 
    "SynchedValue_3" 
    "SynchedValue_4" 
    "SynchedValue_5" 
    "SynchedValue_6" 
    "SynchedValue_7" 
    "SynchedValue_8" 
    "SynchedValue_9" 
    "SynchedValue_10" 
    "SynchedValue_11" 
    "SynchedValue_12" 
    "SynchedValue_13" 
    "SynchedValue_14" 
    "SynchedValue_15" 
    "SynchedValue_16" 
    "SynchedValue_17" 
    "SynchedValue_18" 
    "SynchedValue_19" 
    "SynchedValue_20" 
    "SynchedValue_21" 
    "SynchedValue_22" 
    "SynchedValue_23" 
    "SynchedValue_24"], 
:onError nil, 
:onCompleted true} 
---------------- 

这里在随后的结果是一个异步观察到,做完全一样的东西,只有在future的线程上:

(defn- customObservableNonBlocking [] 
    (Observable/create 
    (fn [observer]      ; This is the 'subscribe' method 
     (let [f (future 
       ;; On another thread, send 25 strings: 
       (doseq [x (range 25)] 
        (-> observer (.onNext (str "AsynchValue_" x)))) 
       ; After sending all values, complete the sequence: 
       (-> observer .onCompleted))] 
     ; Return a disposable (unsubscribe) that cancels the future: 
     (Subscriptions/create #(future-cancel f)))))) 

;;; For unknown reasons, the following does not produce all 25 events: 
(-> (customObservableNonBlocking) 
    (subscribe-collectors)) 

但是,惊喜,这里是我们在控制台上看到的:trueonCompleted,暗示promise DID NOT TIME-OUT;但只有一些异步信息。我们看到的实际消息数量因运行而异,这意味着有一些并发现象在发挥作用。线索表示赞赏。

---------------- 
{:onNext (do (await-for 1000 onNextCollector) @onNextCollector), 
:onError @onErrorCollector, 
:onCompleted (deref onCompletedCollector 1000 false)} 
~~> 
{:onNext 
["AsynchValue_0" 
    "AsynchValue_1" 
    "AsynchValue_2" 
    "AsynchValue_3" 
    "AsynchValue_4" 
    "AsynchValue_5" 
    "AsynchValue_6"], 
:onError nil, 
:onCompleted true} 
---------------- 

回答

7

Agent的await-for意味着阻止当前线程,直到所有的动作因此派出 远(从这个线程或代理人)发生的代理商,这意味着它可能你的await后在那里还有一些其他的线程可以发送消息给代理,这就是你的情况。在等待代理结束后,您在地图上的:onNext键中扣除其值,然后等待完成的承诺,在等待后结果为真,但同时其他一些消息被分派到代理被收集到载体中。

您可以通过具有:onCompleted键作为地图的第一个关键这基本上意味着等待完成,然后等待代理堂妹到那个时候没有对代理人没有更多send调用可以作为以后的事情解决了这个已经收到onCompleted。

{:onCompleted (deref onCompletedCollector 1000 false) 
:onNext  (do (await-for 0 onNextCollector) 
           @onNextCollector) 
:onError  @onErrorCollector 
} 
+0

经过验证和测试。 –