2015-09-14 49 views
0

我正在使用clj-kafka,我试图在REPL中为它创建一个core.async接口。Clojure - core.async接口为apache kafka

我收到一些消息,但我的结构感觉不对:我无法停止接收消息,或者必须再次启动go例程以接收更多消息。

这里是我的尝试:

(defn consume [topic] 
    (let [consume-chan (chan)] 
    (with-resource [c (consumer config)] 
     shutdown 
     (go (doseq [m (messages c "test")] 
        (>! chan message) ;; should I check the return value? 
        ))) 
    consume-chan)) ;; is it the right place to return a channel ? 


    (def consume-chan (consume "test")) 
    ;;(close! consume-chan) 

    (go (>! consume-chan "hi")) ;; manual test, but I have some messages in Kafka already 

    (def cons-ch (go 
       (with-resource [c (consumer config)] 
        shutdown 
        (doseq [m (messages c "test")] 
        (>! consume-chan m))))) ;; should I check something here ? 

    ;;(close! cons-ch) 

    (def go-ch 
    (go-loop [] 
     (if-let [km (<! consume-chan)] 
     (do (println "Got a value in this loop:" km) 
       (recur)) 
     (do (println "Stop recurring - channel closed"))))) 

    ;;(close! go-ch) 

如何使用消息的一个core.async接口一个懒惰的序列?

回答

0

这里是我会做什么:

  • >!<!回报为零,如果该通道是封闭的,所以要确保退出循环当这种情况发生 - 这样你可以很容易地从外部结束循环通过关闭频道。

  • 使用try/catch检查go块内的异常,并将任何异常作为返回值使其不会丢失。

  • 检查读取值的例外情况,以捕获通道内的任何内容。

  • go块返回一个通道,块内代码的返回值(比如上面的例外)将放在通道上。检查这些渠道的例外情况,可能会重新抛出。

现在,您可以写这样的信道:

(defn write-seq-to-channel 
    [channel 
    values-seq] 
    (a/go 
    (try 
     (loop [values values-seq] 
     (when (seq values) 
      (when (a/>! channel (first values)) 
      (recur (rest values))))) 
     (catch Throwable e 
     e)))) 

,你这样写的:

(defn read-from-channel-and-print 
    [channel] 
    (a/go 
    (try 
     (loop [] 
     (let [value (a/<! channel)] 
      (when value 
      (when (instance? Throwable value) 
       (throw value)) 
      (println "Value read:" value) 
      (recur)))) 
     (catch Throwable e 
     e)))) 

现在,您将有两个通道,所以使用类似alts!alts!!检查您的循环退出。完成后关闭频道。