2016-11-28 56 views
0

我有一个在Azure中配置的EventHub,也是一个用于读取数据的使用者组。它在一些日子里工作正常。突然间,我看到传入数据有所延迟(大约3天)。我使用Windows服务来使用服务器中的数据。每分钟有大约500条传入消息。任何人都可以帮我解决这个问题吗?从EventHub获取数据被延迟

+0

你是如何从eventhub读取数据的?你使用IEventProcessor实例吗? –

+0

@PeterBons是Peter,我正在使用IEventProcessor实例。 – vishnu

回答

1

可能是您正在处理它们的项目太慢。因此,要完成的工作会增加,你会落后。

要获得一些见解,你是在事件流,你可以使用这样的代码:

private void LogProgressRecord(PartitionContext context) 
{ 
    if (namespaceManager == null) 
     return; 

    var currentSeqNo = context.Lease.SequenceNumber; 
    var lastSeqNo = namespaceManager.GetEventHubPartition(context.EventHubPath, context.ConsumerGroupName, context.Lease.PartitionId).EndSequenceNumber; 
    var delta = lastSeqNo - currentSeqNo; 

    logWriter.Write(
      $"Last processed seqnr for partition {context.Lease.PartitionId}: {currentSeqNo} of {lastSeqNo} in consumergroup '{context.ConsumerGroupName}' (lag: {delta})", 
      EventLevel.Informational); 
} 

的namespaceManager是建立这样的:

namespaceManager = NamespaceManager.CreateFromConnectionString("Endpoint=sb://xxx.servicebus.windows.net/;SharedAccessKeyName=yyy;SharedAccessKey=zzz"); 

我把这种记录方法该CloseAsync方法:

public Task CloseAsync(PartitionContext context, CloseReason reason) 
{ 
    LogProgressRecord(context); 

    return Task.CompletedTask; 
} 

logWriter只是一些loggi ng类我曾经写信息到blob存储。

现在输出像

最后处理seqnr消息分区3:32780931的在consumergroup '遥测' 32823804(滞后:42873)

所以当滞后是你可以非常高正在处理很久以前发生的事件。在这种情况下,您需要扩大/缩小处理器。

如果您发现滞后,您应该测量处理给定数量的物品需要多长时间。然后,您可以尝试优化性能并查看是否有所改进。我们这样做了:

public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events) 
{ 
     try 
     { 
      stopwatch.Restart(); 

      // process items here 

      stopwatch.Stop(); 

      await CheckPointAsync(context); 

      logWriter.Write(
       $"Processed {events.Count()} events in {stopwatch.ElapsedMilliseconds}ms using partition {context.Lease.PartitionId} in consumergroup {context.ConsumerGroupName}.", 
       EventLevel.Informational); 
     } 
} 
+0

谢谢Peter宝贵的时间..我没有任何昂贵的数据处理器操作。我只需使用EF将传入记录插入到平坦表格中。我刚刚检查了滞后现象,每个分区超过100000(有4个分区)。是否有可能运行我的Windows服务的多个实例并弥补滞后? – vishnu

+1

是的,但请注意,根据数据库的不同,EF /数据库可能无法处理负载。每秒500条消息并不多。你应该测量你的操作时间。查看更新的答案。 –

+0

是的。但是我有一个25的批量更新。可能25是一个小数字,我会检查。在这两者之间,消费者群体的活跃听众数是否有限制?因为我还计划制作更多的实例。在VS中,如果我尝试运行已经运行的用户组一段时间,它会引发致命错误。 – vishnu