2015-11-17 47 views
0

有什么方法可以同步使用EasyNetQ从RabbitMQ消耗原始字节消息吗?如何使用EasyNetQ同步消耗来自RabbitMQ的原始字节消息?

我需要保证按顺序处理和确认来自未以EasyNetQ格式发布的系统的消息。我知道在单个线程的消费者运行,但IAdvancedBus接口只提供了一个方法以消耗原料的消息:

IDisposable Consume(IQueue queue, Func<byte[], MessageProperties, MessageReceivedInfo, Task> onMessage); 

Task返回类型意味着消费者是异步运行的回调,因此可以处理这些消息出来订单。

如果没有,更改代码以支持此任何想法?我会做的接口方法:

IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage); 

RabbitAdvancedBus实现它,但我不知道在哪里的代码会去准确。

回答

0

我收到的是,在EasyNetQ谷歌集团工作的响应:

要执行同步,你可以这样做:

bus.Advanced.Consume(queue, (bytes, properties, info) => 
{ 
    // do your synchronous work..... 
    return Task.CompletedTask; 
}); 

或添加扩展名:

using System; 
using System.Threading.Tasks; 
using EasyNetQ; 
using EasyNetQ.Consumer; 
using EasyNetQ.Loggers; 
using EasyNetQ.Topology; 

namespace ConsoleApplication4 
{ 
    public static class RabbitAdvancedBusConsumeExtension 
    { 
     public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage) 
    { 
     return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info))); 
    } 

    public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage, Action<IConsumerConfiguration> configure) 
    { 
     return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)), configure); 
    } 

    private static Task ExecuteSynchronously(Action action) 
    { 
     var tcs = new TaskCompletionSource<object>(); 
     try 
     { 
      action(); 
      tcs.SetResult(null); 
     } 
     catch (Exception e) 
     { 
      tcs.SetException(e); 
     } 
     return tcs.Task; 
    } 
} 

class Program 
{ 
    static void Main(string[] args) 
    { 
     var bus = RabbitHutch.CreateBus("host=localhost", x => x.Register<IEasyNetQLogger>(s => new ConsoleLogger())); 

     var queue = bus.Advanced.QueueDeclare(); 
     bus.Advanced.Consume(queue, (bytes, properties, info) => 
     { 
      // ..... 
     }); 
    } 
} 
} 

更新:此功能在版本0.52.0.410中添加:

https://github.com/EasyNetQ/EasyNetQ/pull/505

0

这是一个有趣的问题。我本人不是EasyNetQ专家,也许别人会来,并给你一个更好的答案。 但是我已经熟悉了EasyNetQ code base大约一年,并且在我看来,弄清接线消费者(以及消费者被调用时)发生的事情是很棘手的。

我首先想指出的是,只是通过更改方法的签名,并不保证邮件按顺序处理。看,例如在此实现你的建议的接口:

IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage) 
{ 
    Func<byte[], MessageProperties, MessageReceivedInfo, Task> taskWrapper = (bytes, properties, info) => 
    { 
     onMessage(bytes, properties, info); 
     return new Task(() => { }); 
    }; 
    Consume(queue, taskWrapper); 
} 

它调用原始Consume方法,我们真的不知道以后会发生什么,对不对?

如果我在哪里在你的鞋,我会做以下事情之一:(!它不是那么靠谱)

  1. 使用Official RabbitMq Client和消费信息,形成有
  2. 也许看看RawRabbit,我一直致力于使用vNext标准的RabbitMq之上的薄层。它仅支持消费消息的异步签名,但编写同步实现Subscriber.cs(使用像AsyncEx这样的同步库)不应该很困难。
  3. 更改业务逻辑的建模。我不确定这是否适用于您的情况,但通常情况下,如果每个消息都按正确的顺序处理是关键任务,则应该以某种方式对其进行建模,以便消耗方法可以验证此消息是否接下来一致。 (另外,我不认为EasyNetQ保证消息序列,所以你可能想要为每个新版本的框架验证它)。

希望这会有所帮助!

+0

感谢您的考虑。当我说“更改方法签名”时,我的意思是在接口上添加一个方法,使其清楚地明确代码是同步执行的(然后在类中实现)。我收到了Google小组的回复(见我的回答) – Ralphie

相关问题