2015-11-03 41 views
2

如果使用Samza的OutgoingMessageEnvelope使用这种格式来发送消息:Samza在发送消息时是否自动创建分区?

public OutgoingMessageEnvelope(SystemStream systemStream, 
           java.lang.Object partitionKey, 
           java.lang.Object key, 
           java.lang.Object message) 
Constructs a new OutgoingMessageEnvelope from specified components. 
Parameters: 
systemStream - Object representing the appropriate stream of which this envelope will be sent on. 
partitionKey - A key representing which partition of the systemStream to send this envelope on. 
key - A deserialized key to be used for the message. 
message - A deserialized message to be sent in this envelope. 

和调用流任务的过程中()方法中这种方法,并希望将传入邮件到适当的分区,将Samza创建你调用该方法时的分区?

E.g.

MessageA = {"id": "idA", "key": "keyA", "body":"some details"} 
MessageB = {"id": "idB", "key": "keyB", "body":"some more details"} 

如果我称之为内流任务的process()其中msg是一个消息实例:

public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { 
    // ... 
    String partition = msg["id"] 
    String key = msg["key"] 
    collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "PartitionedMessages"), id, key, msg)); 
    // ... 

这是否会自动创建分区,IDA和美洲开发银行为我(即我需要创造这些分区前我发信息给他们)?我希望能够将消息路由到适当的分区,并且能够使用单独的消息密钥记录压缩。

回答

3

创建主题时必须指定分区数。你不能动态添加新的分区(当然,你可以可以,但这并不容易,Samza不会自动完成)。如果主题不存在,但默认分区数量,Samza应为您创建新主题。这取决于设置。你可以测试它。

但值msg["id"]未指定分区的名称。该值仅用于计算目标分区的数量。该值被散列为一个数字,然后使用模数进行修整。像这样的东西(有多种算法,这是基本的一个):

partitionID = hash(msg["id"]) % total_number_of_partitions 

而且partitionID始终是一个非负整数。这意味着它实际上有多少个分区并不重要。它总是在一些结束。主要的想法是,如果你有两个消息具有相同的msg["id"],那么这些消息将最终在相同的分区中。这通常是你想要的。

日志压缩将按照您预期的方式工作 - 它将从特定分区中删除具有相同密钥的消息(但是如果您有两个消息具有相同密钥并且具有两个不同分区,则不会被删除)。

仅供参考,您可以使用kafkacat找出分区数量和其他有用的东西。

+0

非常感谢,这是一个非常明确和有用的答案。 – John

相关问题