我正在使用Masstransit和RabbitMQ发布事件(没有消费者,只使用发布者),而此刻我正在尝试创建集成测试以验证消息已发布,如果是这样,我想检查它是否是正确的消息。要做到这一点,我创建一个消费者消费队列中的消息,并将其与我的预期进行比较。这里的问题是,我不能消费这个消息。该活动已成功发布,但无法获取消息。试图从MassTransit消费者获取消息
这是负责连接消费者
public class ServiceBusHelper
{
private IBusControl bus;
private readonly string serviceBusQueueName = ConfigurationManager.AppSettings["ServiceBusQueuename"];
private readonly string serviceBusEndpoint = ConfigurationManager.AppSettings["ServiceBusEndPoint"];
private readonly string serviceBusUsername = ConfigurationManager.AppSettings["ServiceBusUsername"];
private readonly string serviceBusPassword = ConfigurationManager.AppSettings["ServiceBusPassword"];
public ConnectHandle HandleObserver { get; set; }
public void ConnectRabbitMQ()
{
bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(
new Uri(serviceBusEndpoint),
h =>
{
h.Username(serviceBusUsername);
h.Password(serviceBusPassword);
});
cfg.ReceiveEndpoint(
serviceBusQueueName,
e =>
{
e.Consumer<ServiceBusEventsHelper>();
});
});
//Observer
var observer = new PublishedObserverHelper();
HandleObserver = bus.ConnectPublishObserver(observer);
}
}
类,这是会消耗消息
public class ServiceBusEventsHelper : IConsumer<ITransportCreatedEvent>
{
public ITransportCreatedEvent Result { get; set; }
public async Task Consume(ConsumeContext<ITransportCreatedEvent> context)
{
Result = await Task.FromResult(context.Message);
}
}
和在测试方法的类我有此
ServiceBusEventsHelper eventHelper = new ServiceBusEventsHelper();
ServiceBusHelper busHelper = new ServiceBusHelper();
try
{
busHelper.ConnectRabbitMQ();
transportResponseDto = await this.transportClient.CreateTransportAsync(transportRequest);
handle = busHelper.HandleObserver;
var eventResponse = eventHelper.Result;// Allways NULL
}
catch (Exception ex)
{
Assert.Fail(ex.GetDetailMessage());
}
我试图得到这样的消息结果
var eventResponse = eventHelper.Result;// Allways NULL
但是全部为空。
有人可以帮我吗?
我有一个服务,方法之一是CreateTransportAsync(),以及我称之为内发布
public async Task<TransportResponseDto> CreateTransportAsync(TransportDto request){
.
.
.
await this.RaiseTransportCreatedEvent(transportResponseDto);
}
private async Task RaiseTransportCreatedEvent(TransportResponseDto transportResponseDto)
{
var evt = CreateTransportEvent(transportResponseDto);
await this.serviceBus.Publish(evt).ConfigureAwait(false);
}
public class ServiceBus<T> : IServiceBus<T> where T : class
{
private readonly IBus bus;
public ServiceBus(IBus bus)
{
this.bus = bus;
}
public Task Publish(T evt)
{
return bus.Publish(evt, evt.GetType());
}
}
这是我的发布事件,和它的作品。现在我试图测试是否所有这些都与另一个解决方案中的集成测试一起工作,其中我试图创建一个使用者从队列中消费消息。然后,我想验证该消息(将其与json文件中的假消息进行比较)以查看是否一切正常。问题是我无法得到这个消息,我不知道发生了什么。 P.S:我真的不明白你试图在一点要说3. 感谢
感谢您的回复!我不知道为什么bus.StartAsync()消失,但它在那里,问题不是这样。我想,就像你说的那样,问题是我试图快速访问响应,而我不知道如何等待它。我走了告诉你更多的代码,看看你是否可以帮助我 – EGM
@EGM如果你想更新你的问题 - 做到这一点,不要为此创建一个“答案”。本质上,你正在检查一个永远不会更新的对象的属性 - 见第3点。我试图更详细地解释它。 –