2017-01-19 78 views
0

我正在使用Masstransit和RabbitMQ发布事件(没有消费者,只使用发布者),而此刻我正在尝试创建集成测试以验证消息已发布,如果是这样,我想检查它是否是正确的消息。要做到这一点,我创建一个消费者消费队列中的消息,并将其与我的预期进行比较。这里的问题是,我不能消费这个消息。该活动已成功发布,但无法获取消息。试图从MassTransit消费者获取消息

这是负责连接消费者

public class ServiceBusHelper 
{ 

    private IBusControl bus; 
    private readonly string serviceBusQueueName = ConfigurationManager.AppSettings["ServiceBusQueuename"]; 
    private readonly string serviceBusEndpoint = ConfigurationManager.AppSettings["ServiceBusEndPoint"]; 
    private readonly string serviceBusUsername = ConfigurationManager.AppSettings["ServiceBusUsername"]; 
    private readonly string serviceBusPassword = ConfigurationManager.AppSettings["ServiceBusPassword"]; 

    public ConnectHandle HandleObserver { get; set; } 

    public void ConnectRabbitMQ() 
    { 
     bus = Bus.Factory.CreateUsingRabbitMq(cfg => 
     { 
      var host = cfg.Host(
       new Uri(serviceBusEndpoint), 
       h => 
       { 
        h.Username(serviceBusUsername); 
        h.Password(serviceBusPassword); 
       }); 

      cfg.ReceiveEndpoint(
      serviceBusQueueName, 
      e => 
      { 

       e.Consumer<ServiceBusEventsHelper>(); 

      }); 
     }); 

     //Observer 
     var observer = new PublishedObserverHelper(); 
     HandleObserver = bus.ConnectPublishObserver(observer); 

    } 


} 

类,这是会消耗消息

public class ServiceBusEventsHelper : IConsumer<ITransportCreatedEvent> 
{ 
    public ITransportCreatedEvent Result { get; set; } 


    public async Task Consume(ConsumeContext<ITransportCreatedEvent> context) 
    { 
     Result = await Task.FromResult(context.Message); 

    } 


} 

和在测试方法的类我有此

  ServiceBusEventsHelper eventHelper = new ServiceBusEventsHelper(); 
     ServiceBusHelper busHelper = new ServiceBusHelper(); 

     try 
     { 
      busHelper.ConnectRabbitMQ(); 
      transportResponseDto = await this.transportClient.CreateTransportAsync(transportRequest); 
      handle = busHelper.HandleObserver; 

      var eventResponse = eventHelper.Result;// Allways NULL 
     } 
     catch (Exception ex) 
     { 
      Assert.Fail(ex.GetDetailMessage()); 
     } 

我试图得到这样的消息结果

var eventResponse = eventHelper.Result;// Allways NULL 

但是全部为空。

有人可以帮我吗?

我有一个服务,方法之一是CreateTransportAsync(),以及我称之为内发布

public async Task<TransportResponseDto> CreateTransportAsync(TransportDto request){ 

    . 
    . 
    . 
    await this.RaiseTransportCreatedEvent(transportResponseDto); 
    } 

    private async Task RaiseTransportCreatedEvent(TransportResponseDto transportResponseDto) 
    { 
     var evt = CreateTransportEvent(transportResponseDto); 
     await this.serviceBus.Publish(evt).ConfigureAwait(false); 
    } 

public class ServiceBus<T> : IServiceBus<T> where T : class 
{ 
    private readonly IBus bus; 

    public ServiceBus(IBus bus) 
    { 
     this.bus = bus; 
    } 

    public Task Publish(T evt) 
    { 
     return bus.Publish(evt, evt.GetType()); 
    } 
} 

这是我的发布事件,和它的作品。现在我试图测试是否所有这些都与另一个解决方案中的集成测试一起工作,其中我试图创建一个使用者从队列中消费消息。然后,我想验证该消息(将其与json文件中的假消息进行比较)以查看是否一切正常。问题是我无法得到这个消息,我不知道发生了什么。 P.S:我真的不明白你试图在一点要说3. 感谢

回答

0
  1. 您需要通过调用bus.Start()启动总线什么。你不这样做,所以没有收到任何东西。
  2. 目前还不清楚transportClient.CreateTransportAsync做什么。谁在发布消息?
  3. 消费者根据消费的消息被实例化。你在你的“测试”中做了什么 - 你实例化一个消费者的实例并且继续引用这个实例。然后你在某处发送消息。 MassTransit会为您的客户创建一个新实例,您会更新该字段,然后处理该实例。但是您正在检查您最初创建的实例的Result,该实例从未收到任何消息。它将永远是null
  4. 从发布者向消费者传递消息需要时间。您在初始化公交车后尝试检查结果。我相当肯定你永远不会得到这么快,即使你修复(1),(2)和(3)

我不确定你到底想要测试什么。使用MassTransit发布和使用消息可处理所有传输。你可以从Github获得任何样本,构建它,运行它并看到它工作。

还有一些RabbitMQ运输测试,显示如何创建这样的东西。例如,检查ConsumerBind_Specs.cs文件。

此外,如果您想使用某个现有的消费者实例,则可以按照文档Connecting an existing consumer instance中所述将此实例连接到总线。使用e.Instance而不是e.Consumer将使您的测试工作适当地等待Consume方法完成。但是,这并不是真正流行的方法,因为您真的想将消费者范围限制为仅处理一个消息。

+0

感谢您的回复!我不知道为什么bus.StartAsync()消失,但它在那里,问题不是这样。我想,就像你说的那样,问题是我试图快速访问响应,而我不知道如何等待它。我走了告诉你更多的代码,看看你是否可以帮助我 – EGM

+0

@EGM如果你想更新你的问题 - 做到这一点,不要为此创建一个“答案”。本质上,你正在检查一个永远不会更新的对象的属性 - 见第3点。我试图更详细地解释它。 –

相关问题