2017-04-26 54 views
1

我很新手在rebus。Rebus停止从Azure服务总线获取消息。从100条消息只有5我可以取

我已经从零开始构建了pub/sub示例。现在我正在用户中收到消息。

我面临的问题是我发布了100条消息,然后突然当我启动订阅服务时,它仅获得100条消息中的5条消息。

Windows服务执行5次异步任务,然后熄灭。我做错了什么?

我的用户配置是这样的:

using (var activator = new BuiltinHandlerActivator()) { 
    activator.Register(() => new TestMessageHandler()); 

    Configure.With(activator) 
     .Transport(t => t.UseAzureServiceBus(Constants.connectionString, Constants.subQueue)) 
     .Routing(r => r.TypeBased().MapAssemblyOf<TestMessage>(Constants.pubQueue)‌​) 
     .Start(); 

    activator.Bus.Subscribe<TestMessage>().Wait(120000); 
} 

和我的处理程序是这样的:

public async Task Handle(TestMessage message) { 
    var message = string.Format("name: {1} and source name {2} {0} using the warp as a transport {0}", Environment.NewLine, message.Name , message.SourceName); 

    await Task.Run(() => Logger(message)); 
} 

private void Logger(TestMessage message) { 
    Console.WriteLine(message.ToString(false)); 
} 

从我张贴的代码,是有什么,我做错了什么?

+0

你可以尝试在订户中显示代码吗? – mookid8000

+0

您好,感谢您的快速回复。这是我的代码。 – hfpg2001

+0

使用(var activator = new BuiltinHandlerActivator()) { activator.Register(()=> new TestMessageHandler()); Configure.With(活化剂) .Transport(T => t.UseAzureServiceBus(Constants.connectionString,Constants.subQueue)) .Routing(R => r.TypeBased()。MapAssemblyOf (Constants.pubQueue)) 。开始(); activator.Bus.Subscribe ().Wait(120000); } – hfpg2001

回答

0

从代码

using (var activator = new BuiltinHandlerActivator()) { 
    (...) 
    activator.Bus.Subscribe<TestMessage>().Wait(120000); 
} 

好像你的用户几乎立即处置activator,从而停止总线。

这个假设很好地符合您所遇到的行为,只处理队列中的前几条消息。

您应该在运行应用程序的整个持续时间内保持激活器实例(或者返回IBus,如果您愿意的话),然后在关闭时处置它。

+0

谢谢,@mookid,但我认为等待将等待,直到排队是空的....是不是? – hfpg2001

+0

不,这不是它的工作原理...... Rebus的目的是在你的应用程序运行的整个过程中启动和保持,只要它在运行,它将处理消息,它也是设计的因此即使在队列中存在更多的消息(例如,如果队列中存在更多消息),也可以随时停止它。你想更新你的系统或其他...只是在您重新启动时恢复消息处理 – mookid8000