这是第一次实现流处理基础设施,我的毒性是storm 1.0.1, kafka 0.9.0和Clojure 1.5。Apache Kafka和Strom Clojure的实现
现在我有一个使用邮件系统(RabbitMQ)的背景,我喜欢它有几个原因。
- 易于安装和维护
- 尼斯前端Web门户
- 持久消息状态被保持在那里我可以开始一个消费者,它知道哪个消息没有被消耗掉。即“恰好一次”
然而它不能达到我想要的吞吐量。
现在已经走过卡夫卡它很大程度上依赖于手动保持偏移(内部在卡夫卡经纪人,Zookeper或外部)
我终于管理Clojure中创建一个喷口与源是所述卡夫卡经纪人这是噩梦。
现在想什么,我的愿望是“恰好一次短信”大多数情况下,并按照卡夫卡documentation状态
所以有效卡夫卡担保的 - 至少一次交付默认和允许用户最多执行一次通过禁止生产者重试并在处理一批消息之前提交它的偏移量来交付。正确的一次交付需要与目标存储系统的合作,但Kafka提供了这种直接实现的抵销。
这是什么转化为clojure卡夫卡喷口,发现它很难概念化。
我可能有几个boltz沿途,但终点是Postgres群集。我是否将偏移量存储在数据库中(听起来像是等待发生的竞赛危险),并且在我的风暴丛集的初始化过程中,我从Postgres获取偏移量?
同样存在将卡夫卡喷口的平行度设置为大于一的数值的危险吗?
我通常使用this作为起点,因为许多事情的例子在Clojure中不可用。对我正在使用的版本进行一些小的调整。 (我的消息不太出来,因为我希望他们,但至少我可以看到他们)
(def ^{:private true
:doc "kafka spout config definition"}
spout-config (let [cfg (SpoutConfig. (ZkHosts. "127.0.0.1:2181") "test" "/broker" (.toString (UUID/randomUUID)))]
;;(set! (. cfg scheme) (StringScheme.)) depricated
(set! (. cfg scheme) (SchemeAsMultiScheme. (StringScheme.)))
;;(.forceStartOffsetTime cfg -2)
cfg))
(defn mk-topology []
(topology
{;;"1" (spout-spec sentence-spout)
"1" (spout-spec my-kafka-spout :p 1)
"2" (spout-spec (sentence-spout-parameterized
["the cat jumped over the door"
"greetings from a faraway land"])
:p 2)}
{"3" (bolt-spec {"1" :shuffle}
split-sentence
:p 5)
"4" (bolt-spec {"3" ["word"]}
word-count
:p 1)}))
这是有见地的 –