我有一个在Azure中配置的EventHub,也是一个用于读取数据的使用者组。它在一些日子里工作正常。突然间,我看到传入数据有所延迟(大约3天)。我使用Windows服务来使用服务器中的数据。每分钟有大约500条传入消息。任何人都可以帮我解决这个问题吗?从EventHub获取数据被延迟
回答
可能是您正在处理它们的项目太慢。因此,要完成的工作会增加,你会落后。
要获得一些见解,你是在事件流,你可以使用这样的代码:
private void LogProgressRecord(PartitionContext context)
{
if (namespaceManager == null)
return;
var currentSeqNo = context.Lease.SequenceNumber;
var lastSeqNo = namespaceManager.GetEventHubPartition(context.EventHubPath, context.ConsumerGroupName, context.Lease.PartitionId).EndSequenceNumber;
var delta = lastSeqNo - currentSeqNo;
logWriter.Write(
$"Last processed seqnr for partition {context.Lease.PartitionId}: {currentSeqNo} of {lastSeqNo} in consumergroup '{context.ConsumerGroupName}' (lag: {delta})",
EventLevel.Informational);
}
的namespaceManager是建立这样的:
namespaceManager = NamespaceManager.CreateFromConnectionString("Endpoint=sb://xxx.servicebus.windows.net/;SharedAccessKeyName=yyy;SharedAccessKey=zzz");
我把这种记录方法该CloseAsync
方法:
public Task CloseAsync(PartitionContext context, CloseReason reason)
{
LogProgressRecord(context);
return Task.CompletedTask;
}
logWriter
只是一些loggi ng类我曾经写信息到blob存储。
现在输出像
最后处理seqnr消息分区3:32780931的在consumergroup '遥测' 32823804(滞后:42873)
所以当滞后是你可以非常高正在处理很久以前发生的事件。在这种情况下,您需要扩大/缩小处理器。
如果您发现滞后,您应该测量处理给定数量的物品需要多长时间。然后,您可以尝试优化性能并查看是否有所改进。我们这样做了:
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
{
try
{
stopwatch.Restart();
// process items here
stopwatch.Stop();
await CheckPointAsync(context);
logWriter.Write(
$"Processed {events.Count()} events in {stopwatch.ElapsedMilliseconds}ms using partition {context.Lease.PartitionId} in consumergroup {context.ConsumerGroupName}.",
EventLevel.Informational);
}
}
谢谢Peter宝贵的时间..我没有任何昂贵的数据处理器操作。我只需使用EF将传入记录插入到平坦表格中。我刚刚检查了滞后现象,每个分区超过100000(有4个分区)。是否有可能运行我的Windows服务的多个实例并弥补滞后? – vishnu
是的,但请注意,根据数据库的不同,EF /数据库可能无法处理负载。每秒500条消息并不多。你应该测量你的操作时间。查看更新的答案。 –
是的。但是我有一个25的批量更新。可能25是一个小数字,我会检查。在这两者之间,消费者群体的活跃听众数是否有限制?因为我还计划制作更多的实例。在VS中,如果我尝试运行已经运行的用户组一段时间,它会引发致命错误。 – vishnu
- 1. 获取数据包的延迟(TCP \ UDP)
- 2. 延迟从特定数据库读取
- 3. 从指定时间读取EventHub数据
- 4. 从gPhone获取传感器数据时是否存在延迟?
- 5. 延迟从服务获取数据以避免空指针
- 6. 延迟mchanize从网站获取响应
- 7. 从ajax延迟响应中获取jsonObject
- 8. 从Kurento获取rtsp流的延迟
- 9. 获取更新无延迟
- 10. 倒是被延迟
- 11. jQuery:.text()被延迟
- 12. 从TCPClient读取时延迟
- 13. 延迟读取TCP套接字数据
- 14. UDP数据读取不正确(延迟)
- 15. 从Plist延迟加载数据
- 16. 从html数据设置jquery延迟
- 17. 延迟获取映射为非延迟的关联
- 18. Primefaces数据表多选模式 - 延迟加载获取
- 19. 如何获取数据帧中的延迟列值?
- 20. 在jQuery中获取来自延迟调用的JSON数据
- 21. 延迟数据库查找?
- 22. AsyncSocket延迟发送数据
- 23. 延迟填充session.upload_progress数据
- 24. 缺少数据的延迟
- 25. 如何从csv文件读取数据时减少延迟?
- 26. 使用PHP延迟时间后从网站提取数据
- 27. SOA复合不从查询中提取数据,处理延迟
- 28. Ajax问题:延迟从使用innerHTML的web服务获取数据,请指导
- 29. 从dbpool.runQuery获取延迟,而不是使用Twisted和Oracle的数据
- 30. NSCombox的comboBoxSelectionDidChange值被延迟
你是如何从eventhub读取数据的?你使用IEventProcessor实例吗? –
@PeterBons是Peter,我正在使用IEventProcessor实例。 – vishnu