2013-12-20 71 views
6

由于MassTransit.UnityIntegration软件包中似乎存在一个错误,最近我遇到了很多问题,主要是由于注册名称未被考虑。多个消费者通过Unity不能在MassTransit中使用相同的消息

举例来说,如果我这样注册我的课:

var container = new UnityContainer() 
    .RegisterType<Consumes<Command1>.All, Handler1>("Handler1") 
    .RegisterType<Consumes<Command1>.All, Handler3>("Handler3"); 

几行后,我用的是LoadFrom扩展方法来注册的消费者得到的容器是这样的:

IServiceBus massTransitBus = ServiceBusFactory.New(_sbc => 
    { 
     _sbc.UseBinarySerializer(); 
     _sbc.UseControlBus(); 
     _sbc.ReceiveFrom("msmq://localhost/MyQueue"); 
     _sbc.UseMsmq(_x => 
      { 
       _x.UseSubscriptionService("msmq://localhost/mt_subscriptions"); 
       _x.VerifyMsmqConfiguration(); 
      }); 
     _sbc.Subscribe(_s => _s.LoadFrom(container)); 
    }); 

发生的事情是,当相关消息碰到总线时,我的处理程序永远不会被调用。

琢磨了一段时间后,我决定去看看实施和很清楚为什么出现这种情况:

这是LoadFrom方法中的主要代码:

public static void LoadFrom(this SubscriptionBusServiceConfigurator configurator, IUnityContainer container) 
{ 
    IList<Type> concreteTypes = FindTypes<IConsumer>(container, x => !x.Implements<ISaga>()); 
    if (concreteTypes.Count > 0) 
    { 
     var consumerConfigurator = new UnityConsumerFactoryConfigurator(configurator, container); 

     foreach (Type concreteType in concreteTypes) 
      consumerConfigurator.ConfigureConsumer(concreteType); 
    } 

    ... 

} 

注意它只能找到类型并且不会传递名称的任何信息。这是FindTypes<T>实现:

static IList<Type> FindTypes<T>(IUnityContainer container, Func<Type, bool> filter) 
{ 
    return container.Registrations 
         .Where(r => r.MappedToType.Implements<T>()) 
         .Select(r => r.MappedToType) 
         .Where(filter) 
         .ToList(); 
} 

一些间接性后,这一切都归结到这一点单行线,在UnityConsumerFactory<T>类中,实际上创建了消费者的实例:

var consumer = childContainer.Resolve<T>(); 

这绝对在有多个注册时不能用于Unity,因为在Unity中注册(然后解析)多个实现的唯一方法是在RegisterType调用中给它们一个名称,稍后在Resolve调用上指定该名称。

也许我错过了所有这一切完全基本的东西,错误是在我的部分? MassTransit Unity组件的来源可以在here找到。我没有查看其他容器的代码,因为我不熟悉它们,但我认为这已经以某种方式处理了?我认为在同一个容器内有多个消费者使用相同的消息类型实际上很常见。

在这种特殊情况下,最好不要从容器中的注册中传递Type,而且还要传递用于注册的名称。

更新

好这个问题更加清晰一点,现在特拉维斯花时间来解释它。我应该早点注意到它。

看来我应该直接注册的类型,他们在工厂里面被正确解析,就像这样:

var container = new UnityContainer() 
    .RegisterType<Handler1>() 
    .RegisterType<Handler3>(); 

用这种方法,我也可以省略注册名称,因为现在他们的构建钥匙在容器内部是不同的。

那么,如果这是我们真实的场景,这将完美的工作,但事实并非如此。让我解释我们到底在干什么:

在我们开始使用MassTransit之前,我们已经有了一个用于命令模式的接口,名为ICommandHandler<TCommand>,其中TCommand是系统中命令的基础模型。当我们开始考虑使用服务总线时,从一开始就很清楚,应该可以稍后切换到另一个服务总线实现,而没有太多麻烦。考虑到这一点,我开始在我们的指挥界面上创建一个抽象概念,以表现出MT所期望的消费者之一的行为。这是我想出的:

public class CommandHandlerToConsumerAdapter<T> : Consumes<T>.All 
    where T : class, ICommand 
{ 
    private readonly ICommandHandler<T> _commandHandler; 

    public CommandHandlerToConsumerAdapter(ICommandHandler<T> commandHandler) 
    { 
     _commandHandler = commandHandler; 
    } 

    public void Consume(T _message) 
    { 
     _commandHandler.Handle(_message); 
    } 
} 

这是一个非常简单的适配器类。它接收的ICommandHandler<T>实施,使得它像一个Consumes<T>.All实例。令人遗憾的是MT required message models to be classes,因为我们没有对我们的命令是约束,但是这是一个小的不便,并且我们的where T : class约束添加到我们的接口。

而且,由于我们的处理程序接口是在容器已经注册,这将是注册该适配器实现MT接口,让容器在它注入了真正实现的问题。举例来说,一个更现实的例子(直接从我们的代码库取):

.RegisterType<ICommandHandler<ApplicationInstallationCommand>, CommandRecorder>("Recorder") 
.RegisterType<ICommandHandler<ApplicationInstallationCommand>, InstallOperation>("Executor") 
.RegisterType<Consumes<ApplicationInstallationResult>.All, CommandHandlerToConsumerAdapter<ApplicationInstallationResult>>() 
.RegisterType<Consumes<ApplicationInstallationCommand>.All, CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>> 
    ("Recorder", new InjectionConstructor(new ResolvedParameter<ICommandHandler<ApplicationInstallationCommand>>("Recorder"))) 
.RegisterType<Consumes<ApplicationInstallationCommand>.All, CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>> 
    ("Executor", new InjectionConstructor(new ResolvedParameter<ICommandHandler<ApplicationInstallationCommand>>("Executor"))) 

命名注册有一个有点令人费解,但需要,因为我们现在有两个消费者对于同样的信息。虽然并不像我们希望的那样干净,我们可以忍受的,因为这促进了从MassTransit特定逻辑的我们的代码了巨大的去耦:适配器类是在单独的程序,由系统最终只形成参考,集装箱登记用途。这似乎是一个非常好的想法,但现在被容器集成类后面的查找逻辑证实不受支持。

请注意,我这里无法注册的具体类,因为在中间的通用适配器类。

更新2:

以下Travis的意见后,我想这个简单的代码也无法正常工作(我不明白为什么,因为它似乎完全有效)。这是一个明确的消费类工厂登记,没有任何集装箱自动化集成:

_sbc.Consume(() => container.resolve<Consumes<ApplicationInstallationCommand>.All>("Recorder")) 

这种决心调用正确给了我之前注册的CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>实例,它实现了Consumes<ApplicationInstallationCommand>.All,这反过来应该是支持的基本接口之一。在此之后,发布一个ApplicationInstallationCommand,就好像该处理程序无效或类似。

这工作虽然:

_sbc.Consume(() => (CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>) container.resolve<Consumes<ApplicationInstallationCommand>.All>("Recorder")) 

很明显的东西的API在内心深处正在处理的,而不是通用接口上立足自身在各种各样的非通用的方式编译类型。

我的意思是......这是可行的与此,但注册码是越来越令人费解没有明显原因(由于我会考虑作为MT的一部分“非标实施细则”)。也许我只是在抓秸秆?也许这一切都归结为'为什么MT不接受它自己的,已经是通用的界面?'为什么在编译时需要具体类型才能看到它是一个消息处理程序,即使我传递给它的实例在编译时键入为Consumes<X>.All

更新3:

下面特拉维斯讨论后,我决定完全删除该UnityIntegration组装并与认购独立Consumer电话去了。

我创建了一个小的扩展类中我们MassTransit特定组件,以方便的事情:

public static class CommandHandlerEx 
{ 
    public static CommandHandlerToConsumerAdapter<T> ToConsumer<T>(this ICommandHandler<T> _handler) 
     where T : class, ICommand 
    { 
     return new CommandHandlerToConsumerAdapter<T>(_handler); 
    } 
} 

最后注册这样的处理程序:

var container = new UnityContainer() 
    .RegisterType<ICommandHandler<ApplicationInstallationCommand>, CommandRecorder>("Recorder") 
    .RegisterType<ICommandHandler<ApplicationInstallationCommand>, InstallOperation>("Executor"); 

IServiceBus massTransitBus = ServiceBusFactory.New(_sbc => 
    { 
     _sbc.UseBinarySerializer(); 
     _sbc.UseControlBus(); 
     _sbc.ReceiveFrom("msmq://localhost/MyQueue"); 
     _sbc.UseMsmq(_x => 
      { 
       _x.UseSubscriptionService("msmq://localhost/mt_subscriptions"); 
       _x.VerifyMsmqConfiguration(); 
      }); 
     _sbc.Subscribe(RegisterConsumers); 
    }); 

private void RegisterConsumers(SubscriptionBusServiceConfigurator _s) 
{ 
    _s.Consumer(() => container.Resolve<ICommandHandler<ApplicationInstallationCommand>>("Recorder").ToConsumer()); 
    _s.Consumer(() => container.Resolve<ICommandHandler<ApplicationInstallationCommand>>("Executor").ToConsumer()); 
} 

使用整个前天之后尝试工作,我强烈建议你远离容器扩展程序集,如果你想从容器中预期的行为和/或如果你想自定义类等(就像我做我们的消息类与MT特定公司DE)2个主要原因:

  1. 逻辑在扩展遍历登记在容器找到消费者类。在我看来,这是可怕的设计。如果某个东西需要容器的实现,它应该在它的接口(或者它们在非Unity条件下的等价物)上调用ResolveResolveAll,而不关心什么是注册以及它们的具体类型是什么。这可能会导致严重的后果,假定容器可以返回未明确注册的类型。幸运的是,这些类不是这种情况,但我们有一个容器扩展,它根据构建密钥自动创建装饰器类型,并且不需要在容器上明确注册。

  2. 消费者注册使用ContainerRegistration实例上的MappedToType属性在容器上调用Resolve。这在任何情况下都是完全错误的,而不仅仅是MassTransit的情况。 Unity中的类型可以注册为映射(如上面的摘录所示,使用FromTo组件)或直接作为单个具体类型注册。在这两种情况下,逻辑应该使用RegisteredType类型从容器中解析。现在它的工作方式是,如果你碰巧注册了处理程序的接口,MT将完全绕过你的注册逻辑并调用具体类型的解决方案,而这可能会导致不可预知的行为,因为你认为它应该是单例就像你注册过的一样,但它最终会成为一个临时对象(默认),例如。

现在回头看看,现在我可以看到,我原来相信它是复杂得多。在这个过程中也有相当多的学习,所以这很好。

更新4:做出最终签入前

昨天我决定要重构整个适配器接近一点。我使用MassTransit的界面模式来创建我的适配器,因为我认为这是一个非常漂亮而干净的语法。

下面是结果:

public sealed class CommandHandlerToConsumerAdapter<T> 
    where T : class, ICommand 
{ 
    public sealed class All : Consumes<T>.All 
    { 
     private readonly ICommandHandler<T> m_commandHandler; 

     public All(ICommandHandler<T> _commandHandler) 
     { 
      m_commandHandler = _commandHandler; 
     } 

     public void Consume(T _message) 
     { 
      m_commandHandler.Handle(_message); 
     } 
    } 
} 

可惜的是这打破,因为在引用的万能库中的实用方法未处理的异常MassTransit的代码,就叫做ToShortTypeName扩展方法。

这里是例外:

ArgumentOutOfRangeException in MassTransit receive

在System.String.Substring(的Int32的startIndex,的Int32长度)
在Magnum.Extensions.ExtensionsToType.ToShortTypeName(类型类型)
在MassTransit.Pipeline.Sinks.ConsumerMessageSink 2.<>c__DisplayClass1.<Selector>b__0(IConsumeContext 1 context)在d:\ BuildAgent-02 \ work \ aa63c4295dfc097 \ src \ MassTransit \ Pipeline \ Sinks \ ConsumerMessageSink.cs中:第51行 at MassTransit.Pipeline.Sinks.InboundConvertMess ageSink`1。 <> c__DisplayClass2。 <> c__DisplayClass4.b__1(IConsumeContext x)的在d:\ BuildAgent-02 \工作\ aa063b4295dfc097 \ SRC \ MassTransit \管道\水槽\ InboundConvertMessageSink.cs:线45 在d MassTransit.Context.ServiceBusReceiveContext.DeliverMessageToConsumers(IReceiveContext上下文) :\ BuildAgent-02 \工作\ aa063b4295dfc097的\ src \ MassTransit \背景\ ServiceBusReceiveContext.cs:行162

回答

4

虽然我不知道统一整合,所有的容器,你必须注册你的消费者为在容器中的具体类型而不是接口。我认为这只是RegisterType<Handler1, Handler1>(),但我不完全确定。

如果您不喜欢容器的LoadFrom扩展名,则无需反复使用它。您始终可以自己解决消费者问题,并在配置中通过_sbc.Consume(() => container.resolve<YourConsumerType>())注册。 LoadFrom扩展仅仅是一个说服人们使用容器的方式。

下面的代码有效,它使用我期望的容器,而不知道你的域更多,一个使用它。如果您想了解消息如何绑定一点点,我会建议使用RabbitMQ,因为您可以轻松地查看结果放在哪里。在这一点上,这远远超出了SO问题,如果你还有其他问题,我会把它带到邮件列表中。

using System; 
using MassTransit; 
using Microsoft.Practices.Unity; 

namespace MT_Unity 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      using (var container = new UnityContainer() 
       .RegisterType<ICommandHandler<MyCommand>, MyCommandHandler>() 
       .RegisterType<CommandHandlerToConsumerAdapter<MyCommand>>()) 

      using (IServiceBus consumerBus = ServiceBusFactory.New(sbc => 
        { 
         sbc.ReceiveFrom("rabbitmq://localhost/consumer"); 
         sbc.UseRabbitMq(); 


         sbc.Subscribe(s => s.Consumer(() => container.Resolve<CommandHandlerToConsumerAdapter<MyCommand>>())); 
        })) 
      using (IServiceBus publisherBus = ServiceBusFactory.New(sbc => 
        { 
         sbc.ReceiveFrom("rabbitmq://localhost/publisher"); 
         sbc.UseRabbitMq(); 
        })) 
      { 
       publisherBus.Publish(new MyCommand()); 

       Console.ReadKey(); 
      } 
     } 
    } 

    public class CommandHandlerToConsumerAdapter<T> : Consumes<T>.All where T : class, ICommand 
    { 
     private readonly ICommandHandler<T> _commandHandler; 

     public CommandHandlerToConsumerAdapter(ICommandHandler<T> commandHandler) 
     { 
      _commandHandler = commandHandler; 
     } 

     public void Consume(T message) 
     { 
      _commandHandler.Handle(message); 
     } 
    } 

    public interface ICommand { } 
    public class MyCommand : ICommand { } 

    public interface ICommandHandler<T> where T : class, ICommand 
    { 
     void Handle(T message); 
    } 

    public class MyCommandHandler : ICommandHandler<MyCommand> 
    { 
     public MyCommandHandler() 
     { 

     } 
     public void Handle(MyCommand message) 
     { 
      Console.WriteLine("Handled MyCommand"); 
     } 
    } 

} 
+0

能否详细说明一下?我不明白你在说什么。我在那里做的是将消费。所有接口映射到两个具体类型Handler1和Handler3。这与团结是非常平常的,也许这只是语法混淆而已?顺便说一句,你可能知道谁实际编码了统一集成部分。是否有可能直接与他们联系并/或将他们指向这里?感谢像往常一样的快速回复Travis;) – julealgon

+0

https://github.com/MassTransit/MassTransit/blob/v2.8.0/src/Containers/MassTransit.UnityIntegration/UnityExtensions.cs#L69我们寻找实现'Consumes <>'不是类型''。这是我们如何在所有容器中进行的。所以当你注册时,而不是注册为'Consumes <>。All'注册为它的具体类型。 – Travis

+0

好吧,现在我明白你的意思了,注册具体类型。那么,对我而言,现在这绝对没有意义。在之前我没有意识到你们使用.MappedTo类型来稍后解析容器。我很抱歉,但这似乎完全错误。我周五获得了这些类的代码,并开始修改源代码。今天晚些时候,我可能会完成它对我来说似乎是正确的行为:我会通过注册类型和名称到工厂,并与那里的那些解决。我将编辑该问题以向您说明为什么您的方法在我的上下文中不起作用。 – julealgon

相关问题