2017-03-03 126 views
0

我想实现Spring集成而我运行应用程序,我可以接收订阅主题的消息,但如何能在运行时添加更多的主题春MQTT集成

我试图从控制器

以下功能
@RequestMapping("/suscribetest") 
    @ResponseBody 
public String subscribeTest(){ 
    AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(MqttInboundBeans.class); 
    MqttPahoMessageDrivenChannelAdapter messageChannel = context.getBean("inbound",MqttPahoMessageDrivenChannelAdapter.class); 
    messageChannel.addTopic("test", 2); 
    return ""; 
} 

和下面是我的豆子设置用于订阅主题

@Configuration 
public class MqttInboundBeans { 
    @Autowired 
private UserService service; 


@Bean 
public MessageChannel mqttInputChannel() { 
    return new DirectChannel(); 
} 

@Bean 
public MessageProducer inbound() { 
    MqttPahoMessageDrivenChannelAdapter adapter = 
      new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient", 
              "DATA/#", "LD/#"); 
    adapter.setCompletionTimeout(0); 
    adapter.setConverter(new DefaultPahoMessageConverter()); 
    adapter.setQos(2); 
    adapter.setOutputChannel(mqttInputChannel()); 
    return adapter; 
} 

@Bean 
@ServiceActivator(inputChannel = "mqttInputChannel") 
public MessageHandler handler() { 
    return new MessageHandler() { 

     @Override 
     public void handleMessage(Message<?> message) throws MessagingException { 
      System.out.println(message.getHeaders()+" "+message.getPayload()); 

     } 

    }; 
} 
} 

,我收到以下错误

2017-03-03 17:29:47.846 ERROR 4524 --- [bio-8080-exec-8] o.s.boot.web.support.ErrorPageFilter  : Forwarding to error page from request [/suscribetest] due to exception [Error creating bean with name 'mqttInboundBeans': Unsatisfied dependency expressed through field 'service'; 
nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'com.ehydromet.service.UserService' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {@org.springframework.beans.factory.annotation.Autowired(required=true)}] 

org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'mqttInboundBeans': Unsatisfied dependency expressed through field 'service'; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'com.ehydromet.service.UserService' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {@org.springframework.beans.factory.annotation.Autowired(required=true)} 
    at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.inject(AutowiredAnnotationBeanPostProcessor.java:588) ~[spring-beans-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java:88) ~[spring-beans-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessPropertyValues(AutowiredAnnotationBeanPostProcessor.java:366) ~[spring-beans-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1225) ~[spring-beans-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:552) ~[spring-beans-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483) ~[spring-beans-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306) ~[spring-beans-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230) ~[spring-beans-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302) ~[spring-beans-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197) ~[spring-beans-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:759) ~[spring-beans-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:866) ~[spring-context-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:542) ~[spring-context-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.context.annotation.AnnotationConfigApplicationContext.<init>(AnnotationConfigApplicationContext.java:84) ~[spring-context-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at com.ehydromet.springboot.controller.LatestDataController.subscribeTest(LatestDataController.java:72) ~[LatestDataController.class:na] 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_79] 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_79] 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_79] 
    at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_79] 
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:220) ~[spring-web-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:134) ~[spring-web-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:116) ~[spring-webmvc-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827) ~[spring-webmvc-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738) ~[spring-webmvc-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85) ~[spring-webmvc-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:963) ~[spring-webmvc-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:897) ~[spring-webmvc-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970) ~[spring-webmvc-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:861) ~[spring-webmvc-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:621) ~[servlet-api.jar:na] 
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846) ~[spring-webmvc-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:728) ~[servlet-api.jar:na] 
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:305) [catalina.jar:7.0.37] 
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:210) [catalina.jar:7.0.37] 
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) [spring-web-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:243) [catalina.jar:7.0.37] 
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:210) [catalina.jar:7.0.37] 
    at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:89) ~[spring-web-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) [spring-web-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:243) [catalina.jar:7.0.37] 
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:210) [catalina.jar:7.0.37] 
    at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:77) ~[spring-web-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) [spring-web-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:243) [catalina.jar:7.0.37] 
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:210) [catalina.jar:7.0.37] 
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:197) ~[spring-web-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) [spring-web-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:243) [catalina.jar:7.0.37] 
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:210) [catalina.jar:7.0.37] 
    at org.springframework.boot.web.support.ErrorPageFilter.doFilter(ErrorPageFilter.java:117) [spring-boot-1.4.3.RELEASE.jar:1.4.3.RELEASE] 
    at org.springframework.boot.web.support.ErrorPageFilter.access$000(ErrorPageFilter.java:61) [spring-boot-1.4.3.RELEASE.jar:1.4.3.RELEASE] 
    at org.springframework.boot.web.support.ErrorPageFilter$1.doFilterInternal(ErrorPageFilter.java:92) [spring-boot-1.4.3.RELEASE.jar:1.4.3.RELEASE] 
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) [spring-web-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.boot.web.support.ErrorPageFilter.doFilter(ErrorPageFilter.java:110) [spring-boot-1.4.3.RELEASE.jar:1.4.3.RELEASE] 
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:243) [catalina.jar:7.0.37] 
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:210) [catalina.jar:7.0.37] 
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:222) [catalina.jar:7.0.37] 
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:123) [catalina.jar:7.0.37] 
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:472) [catalina.jar:7.0.37] 
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:171) [catalina.jar:7.0.37] 
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:99) [catalina.jar:7.0.37] 
    at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:936) [catalina.jar:7.0.37] 
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:118) [catalina.jar:7.0.37] 
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:407) [catalina.jar:7.0.37] 
    at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1004) [tomcat-coyote.jar:7.0.37] 
    at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:589) [tomcat-coyote.jar:7.0.37] 
    at org.apache.tomcat.util.net.JIoEndpoint$SocketProcessor.run(JIoEndpoint.java:312) [tomcat-coyote.jar:7.0.37] 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_79] 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_79] 
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79] 
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'com.ehydromet.service.UserService' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {@org.springframework.beans.factory.annotation.Autowired(required=true)} 
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.raiseNoMatchingBeanFound(DefaultListableBeanFactory.java:1474) ~[spring-beans-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1102) ~[spring-beans-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1064) ~[spring-beans-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.inject(AutowiredAnnotationBeanPostProcessor.java:585) ~[spring-beans-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    ... 70 common frames omitted 

谢谢。

回答

0

这与MQTT无关,您的MqttInboundBeans对您没有声明的UserService类型的bean具有@Autowired依赖性。

编辑

新增例如:

@SpringBootApplication 
@Controller 
public class So42578862Application { 

    public static void main(String[] args) { 
     SpringApplication.run(So42578862Application.class, args); 
    } 

    @Autowired 
    private MqttPahoMessageDrivenChannelAdapter adapter; 

    @RequestMapping("/subscribeTest") 
    @ResponseBody 
    public String handle() { 
     if (adapter.getTopic().length == 0) { 
      adapter.addTopic("DATA/#", "LD/#"); 
     } 
     return ""; 
    } 

    @Bean 
    public MqttPahoMessageDrivenChannelAdapter inbound() { 
     MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", 
       "testClient", new String[0]); 
     adapter.setCompletionTimeout(0); 
     adapter.setConverter(new DefaultPahoMessageConverter()); 
     adapter.setQos(2); 
     adapter.setOutputChannelName("mqttInputChannel"); 
     return adapter; 
    } 

    @Bean 
    @ServiceActivator(inputChannel = "mqttInputChannel") 
    public MessageHandler handler() { 
     return new MessageHandler() { 

      @Override 
      public void handleMessage(Message<?> message) throws MessagingException { 
       System.out.println(message.getHeaders() + " " + message.getPayload()); 

      } 

     }; 
    } 

    @InboundChannelAdapter(channel = "out", poller = @Poller(fixedDelay = "5000")) 
    public String message() { 
     return "foo"; 
    } 

    @Bean 
    @ServiceActivator(inputChannel = "out") 
    public MqttPahoMessageHandler outHandler() { 
     MqttPahoMessageHandler outHandler = new MqttPahoMessageHandler("tcp://localhost:1883", "outClient"); 
     outHandler.setDefaultTopic("DATA/foo"); 
     return outHandler; 
    } 

} 
+0

是,我需要它来保存从消息处理程序的消息,有什么建议? – Rawat

+0

如何在运行时添加新的susbscribe主题 – Rawat

+0

'没有符合条件的'com.ehydromet.service.UserService'类型的可用bean意味着它的含义 - 您在任何地方都没有'UserService'' @ Bean'您对此应用程序上下文的配置。 –