2016-03-28 42 views
0

我有一个kafka流与avro消息使用confluent.io kafka包。这对java应用程序来说工作正常。但我现在试图在JavaScript中阅读这些消息。超出范围索引this.buf.utf8Slice

我一直在尝试使用kafka-node + avsc包将缓冲区数组中的消息解码为字符串,并使用该模式。我知道confluent将前5个字节作为模式Id的魔术字节(0)+4个字节。

所以我切片的缓冲区删除这些字节,并尝试发送到avsc解码。但是,我得到一个错误

return this.buf.utf8Slice(pos, pos + len);

RangeError: out of range index at RangeError (native) at Tap.readString (C:\git\workflowapps\workItemsApp\node_modules\avsc\lib\utils.js:452:19) at StringType._read (C:\git\workflowapps\workItemsApp\node_modules\avsc\lib\types.js:612:58)

也尝试手动解码此留下大量的非utf8字符和我丢失数据的方式。

示例代码:

consumer.on('message', function(message) { 
     var val = message.value.slice(4); 
     sails.log.info('val buffer', val, val.length); 
     sails.log.info('hex',val.toString('hex')); 
     var type = avro.parse({"type":"record", 
     "name":"StatusEvent", 
     "fields":[{"name":"ApplicationUUID","type":"string"}, 
     {"name":"StatusUUID","type":"string"}, 
     {"name":"Name","type":"string"}, 
     {"name":"ChangedBy","type":"string"}, 
     {"name":"ChangedByUUID","type":"string"}, 
     {"name":"ChangedAt","type":"long"}] 
     }); 

     var decodedValue = type.fromBuffer(val); 
     sails.log.info('Decoded', decodedValue); 
    }); 

回答

1

slice(4)应该是slice(5)(否则你只跳过4开出5头字节)。您也许能够找到有用的信息here

+0

我以为切片从零开始。现在这是一个精确度损失错误,我认为这是模式中的很长一段时间。如果我将其更改为字符串,则错误将更改为截断的缓冲区...呃。我知道这些数据中没有空值。 – KnowHowSolutions

+0

切换多头字符串和字符串将不起作用。要修复精确度损失错误,您应该使用“大整数”库;这里是步骤︰https://github.com/mtth/avsc/wiki/Advanced-usage#custom-long-types – mtth

+0

基本上这已经表明我另一个问题,我已经知道,尚未修复。我有C#蜱进入avro,我真的不想要。首先解决它。感谢您的帮助。 – KnowHowSolutions