如果使用Samza的OutgoingMessageEnvelope使用这种格式来发送消息:Samza在发送消息时是否自动创建分区?
public OutgoingMessageEnvelope(SystemStream systemStream,
java.lang.Object partitionKey,
java.lang.Object key,
java.lang.Object message)
Constructs a new OutgoingMessageEnvelope from specified components.
Parameters:
systemStream - Object representing the appropriate stream of which this envelope will be sent on.
partitionKey - A key representing which partition of the systemStream to send this envelope on.
key - A deserialized key to be used for the message.
message - A deserialized message to be sent in this envelope.
和调用流任务的过程中()方法中这种方法,并希望将传入邮件到适当的分区,将Samza创建你调用该方法时的分区?
E.g.
MessageA = {"id": "idA", "key": "keyA", "body":"some details"}
MessageB = {"id": "idB", "key": "keyB", "body":"some more details"}
如果我称之为内流任务的process()
其中msg
是一个消息实例:
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
// ...
String partition = msg["id"]
String key = msg["key"]
collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "PartitionedMessages"), id, key, msg));
// ...
这是否会自动创建分区,IDA和美洲开发银行为我(即我需要创造这些分区前我发信息给他们)?我希望能够将消息路由到适当的分区,并且能够使用单独的消息密钥记录压缩。
非常感谢,这是一个非常明确和有用的答案。 – John