我有一个kafka流与avro消息使用confluent.io kafka包。这对java应用程序来说工作正常。但我现在试图在JavaScript中阅读这些消息。超出范围索引this.buf.utf8Slice
我一直在尝试使用kafka-node + avsc包将缓冲区数组中的消息解码为字符串,并使用该模式。我知道confluent将前5个字节作为模式Id的魔术字节(0)+4个字节。
所以我切片的缓冲区删除这些字节,并尝试发送到avsc解码。但是,我得到一个错误
return this.buf.utf8Slice(pos, pos + len);
RangeError: out of range index at RangeError (native) at Tap.readString (C:\git\workflowapps\workItemsApp\node_modules\avsc\lib\utils.js:452:19) at StringType._read (C:\git\workflowapps\workItemsApp\node_modules\avsc\lib\types.js:612:58)
也尝试手动解码此留下大量的非utf8字符和我丢失数据的方式。
示例代码:
consumer.on('message', function(message) {
var val = message.value.slice(4);
sails.log.info('val buffer', val, val.length);
sails.log.info('hex',val.toString('hex'));
var type = avro.parse({"type":"record",
"name":"StatusEvent",
"fields":[{"name":"ApplicationUUID","type":"string"},
{"name":"StatusUUID","type":"string"},
{"name":"Name","type":"string"},
{"name":"ChangedBy","type":"string"},
{"name":"ChangedByUUID","type":"string"},
{"name":"ChangedAt","type":"long"}]
});
var decodedValue = type.fromBuffer(val);
sails.log.info('Decoded', decodedValue);
});
我以为切片从零开始。现在这是一个精确度损失错误,我认为这是模式中的很长一段时间。如果我将其更改为字符串,则错误将更改为截断的缓冲区...呃。我知道这些数据中没有空值。 – KnowHowSolutions
切换多头字符串和字符串将不起作用。要修复精确度损失错误,您应该使用“大整数”库;这里是步骤︰https://github.com/mtth/avsc/wiki/Advanced-usage#custom-long-types – mtth
基本上这已经表明我另一个问题,我已经知道,尚未修复。我有C#蜱进入avro,我真的不想要。首先解决它。感谢您的帮助。 – KnowHowSolutions