1

这是第一次实现流处理基础设施,我的毒性是storm 1.0.1, kafka 0.9.0和Clojure 1.5。Apache Kafka和Strom Clojure的实现

现在我有一个使用邮件系统(RabbitMQ)的背景,我喜欢它有几个原因。

  1. 易于安装和维护
  2. 尼斯前端Web门户
  3. 持久消息状态被保持在那里我可以开始一个消费者,它知道哪个消息没有被消耗掉。即“恰好一次”

然而它不能达到我想要的吞吐量。

现在已经走过卡夫卡它很大程度上依赖于手动保持偏移(内部在卡夫卡经纪人,Zookeper或外部)

我终于管理Clojure中创建一个喷口与源是所述卡夫卡经纪人这是噩梦。

现在想什么,我的愿望是“恰好一次短信”大多数情况下,并按照卡夫卡documentation状态

所以有效卡夫卡担保的 - 至少一次交付默认和允许用户最多执行一次通过禁止生产者重试并在处理一批消息之前提交它的偏移量来交付。正确的一次交付需要与目标存储系统的合作,但Kafka提供了这种直接实现的抵销。

这是什么转化为clojure卡夫卡喷口,发现它很难概念化。

我可能有几个boltz沿途,但终点是Postgres群集。我是否将偏移量存储在数据库中(听起来像是等待发生的竞赛危险),并且在我的风暴丛集的初始化过程中,我从Postgres获取偏移量?

同样存在将卡夫卡喷口的平行度设置为大于一的数值的危险吗?

我通常使用this作为起点,因为许多事情的例子在Clojure中不可用。对我正在使用的版本进行一些小的调整。 (我的消息不太出来,因为我希望他们,但至少我可以看到他们)

(def ^{:private true 
    :doc "kafka spout config definition"} 
    spout-config (let [cfg (SpoutConfig. (ZkHosts. "127.0.0.1:2181") "test" "/broker" (.toString (UUID/randomUUID)))] 
      ;;(set! (. cfg scheme) (StringScheme.)) depricated 
      (set! (. cfg scheme) (SchemeAsMultiScheme. (StringScheme.)))     
      ;;(.forceStartOffsetTime cfg -2) 
      cfg)) 



(defn mk-topology [] 
(topology 
    {;;"1" (spout-spec sentence-spout) 
    "1" (spout-spec my-kafka-spout :p 1) 
    "2" (spout-spec (sentence-spout-parameterized 
       ["the cat jumped over the door" 
        "greetings from a faraway land"]) 
       :p 2)} 
    {"3" (bolt-spec {"1" :shuffle} 
       split-sentence 
       :p 5) 
    "4" (bolt-spec {"3" ["word"]} 
       word-count 
       :p 1)})) 

回答

1

对于任何分布式系统中这是不可能的,以确保所要做的工作的一部分,将在准确进行工作一旦。在某些情况下,某些事情会失败,并且需要重试(这称为“至少一次”处理)或不重试(这称为“至多一次”处理),尽管您不能完全确定其中间并得到“恰好一次”处理。你可以得到非常接近完全一次处理。

诀窍是,在您的流程结束时,抛出第二个副本,如果您发现工作已完成两次。这是索引进入的地方。当您将结果保存到数据库中时,请查看是否使用比此工作索引更新的索引作为已保存。如果您发现后续工作存在,请将工作抛出并不保存。至于说明文件,那种解释只是对已经做了很多次的人“向前迈进”...

+0

这是有见地的 –