0
我正在使用clj-kafka
,我试图在REPL中为它创建一个core.async
接口。Clojure - core.async接口为apache kafka
我收到一些消息,但我的结构感觉不对:我无法停止接收消息,或者必须再次启动go
例程以接收更多消息。
这里是我的尝试:
(defn consume [topic]
(let [consume-chan (chan)]
(with-resource [c (consumer config)]
shutdown
(go (doseq [m (messages c "test")]
(>! chan message) ;; should I check the return value?
)))
consume-chan)) ;; is it the right place to return a channel ?
(def consume-chan (consume "test"))
;;(close! consume-chan)
(go (>! consume-chan "hi")) ;; manual test, but I have some messages in Kafka already
(def cons-ch (go
(with-resource [c (consumer config)]
shutdown
(doseq [m (messages c "test")]
(>! consume-chan m))))) ;; should I check something here ?
;;(close! cons-ch)
(def go-ch
(go-loop []
(if-let [km (<! consume-chan)]
(do (println "Got a value in this loop:" km)
(recur))
(do (println "Stop recurring - channel closed")))))
;;(close! go-ch)
如何使用消息的一个core.async
接口一个懒惰的序列?