2017-01-24 55 views
0

我在项目中创建了一个正在工作的通知系统。我实际的代码是:ActiveMQ Spring Stomp:我如何更改现有代码以创建持久订阅

我的客户(JavaScript的):

let connectWebSocket =() => { 
    socket = new SockJS(context.backend + '/myWebSocketEndPoint'); 
    stompClient = Stomp.over(socket); 
    stompClient.connect({},function (frame) { 
    stompClient.subscribe('/topic/notification', function(response){ 
     alert(response); 
    }); 
    }); 
} 
connectWebSocket(); 

服务器(Java与Spring)

public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{ 

@Override 
public void configureMessageBroker(MessageBrokerRegistry config) { 
    config.enableSimpleBroker("/topic"); 
} 

@Override 
public void registerStompEndpoints(StompEndpointRegistry registry) { 
    registry.addEndpoint("/myWebSocketEndPoint") 
      .setAllowedOrigins("*") 
      .withSockJS(); 
    } 
} 

这是工作。现在我想在用户离线时发送给用户通知:当他们进行登录时,我会发送(自动)他们的通知。我必须用activeMQ来做到这一点。我见过一些例子,但不太了解它们..有人可以指示我如何准确地编辑我的代码并实现持久订阅?非常感谢

编辑:我已经更新了我的客户端代码:

let connectWebSocket =() => { 
    let clientId =user.profile.id; 
    socket = new SockJS(context.backend + '/myWebSocketEndPoint'); 
    stompClient = Stomp.over(socket); 
    stompClient.connect({"client-id": clientId},{},function (frame) { 
    stompClient.subscribe('/topic/notification', function(response){ 
     alert(response); 
    },{"activemq.subscriptionName": clientId}); 
    }); 
} 

但是当用户离线时,如果通知到达,当他返回在线,不发送通知him..I想我不得不改变我的服务器端

的pom.xml

<dependency> 
    <groupId>org.apache.activemq</groupId> 
    <artifactId>activemq-all</artifactId> 
    <version>5.14.2</version> 
</dependency> 

EDIT2:: 在pom.xml中有正确的依赖关系,我现在有一个错误。我有这样的配置:

@Override 
public void configureMessageBroker(MessageBrokerRegistry config) { 
    config.enableStompBrokerRelay("/topic/"); 
} 

但是当我运行我的代码,我看到这个错误:

2017/01/24 17:17:15.751 ERROR [org.springframework.boot.SpringApplication:839] Application startup failed 
org.springframework.context.ApplicationContextException: Failed to start bean 'stompBrokerRelayMessageHandler'; nested exception is java.lang.NoClassDefFoundError: reactor/io/codec/Codec 

EDIT3:这是我如何发送通知给客户:

@Component 
public class MenuItemNotificationSender { 

@Autowired 
private SimpMessagingTemplate messagingTemplate; 

@Autowired 
public MenuItemNotificationSender(SimpMessagingTemplate messagingTemplate){ 
    this.messagingTemplate = messagingTemplate; 
} 

public void sendNotification(MenuItemDto menuItem) { 
    messagingTemplate.convertAndSend("/topic/notification", menuItem); 
} 
} 

回答

1

如果您使用默认的AMQ配置,这是默认行为,如果您要发送给用户,则消息将持久保留, 当他们脱机时,你也需要使用持久的怀孕。

编辑

Persistent Messaging in STOMP STOMP messages are non-persistent by default. To use persistent messaging add the following STOMP header to all SEND requests: persistent:true. This default is the opposite of that for JMS messages.

要保留发送的消息,JS客户端上你需要头部添加到该方法:

stompClient.send(destination, {"persistent":"true" }, body); 

更新MenuItemNotificationSender像这样:

public void sendNotification(MenuItemDto menuItem) { 
    Map<String, Object> headers = new HashMap<>(); 
    headers.put("JMSDeliveryMode", 2); 
    headers.put("persistent", "true"); 
    messagingTemplate.convertAndSend("/topic/notification", menuItem, headers); 
} 

看看

http://activemq.apache.org/how-do-i-make-messages-durable.html

http://activemq.apache.org/how-do-durable-queues-and-topics-work.html

,使长期订阅与跺脚:

stompClient.connect({"client-id": "my-client-id" },, function (frame) { 

     console.log('Connected: ' + frame); 

     stompClient.subscribe(topic, function (message) { 
     ..... 
     ..... 
     }, {"activemq.subscriptionName": "my-client-id"}); 
    }, function(frame) { 
     console.log("Web socket disconnected"); 
    }); 

UPDATE

@Configuration 
@EnableWebSocketMessageBroker 
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer{ 



@Bean(initMethod = "start", destroyMethod = "stop") 
public BrokerService broker() throws Exception { 
    final BrokerService broker = new BrokerService(); 
    //broker.addConnector("tcp://localhost:61616"); 
    broker.addConnector("stomp://localhost:61613"); 
    broker.addConnector("vm://localhost"); 
    PersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); 
    File dir = new File(System.getProperty("user.home") + File.separator + "kaha"); 
    if (!dir.exists()) { 
     dir.mkdirs(); 
    } 
    persistenceAdapter.setDirectory(dir); 
    broker.setPersistenceAdapter(persistenceAdapter); 
    broker.setPersistent(true); 
    return broker; 
} 

@Override 
public void configureMessageBroker(MessageBrokerRegistry config) { 
    // if AMQ is running in local not needed to set relayHost & relayPort 
    config.enableStompBrokerRelay("/topic/") 
    .setRelayHost(relayHost) 
    .setRelayPort(relayPort) 
    // user pwd if needed 
    //.setSystemLogin(activeMqLogin) 
    //.setSystemPasscode(activeMqPassword) 
    ; 
} 

@Override 
public void registerStompEndpoints(StompEndpointRegistry registry) { 
    registry.addEndpoint("/myWebSocketEndPoint") 
      .setAllowedOrigins("*") 
      .withSockJS(); 
    } 
} 

use parent pom

<parent> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-parent</artifactId> 
    <version>1.4.3.RELEASE</version> 
    <relativePath /> <!-- lookup parent from repository --> 
</parent> 





<dependency> 
    <groupId>io.projectreactor</groupId> 
    <artifactId>reactor-core</artifactId> 
</dependency> 
<dependency> 
    <groupId>io.projectreactor</groupId> 
    <artifactId>reactor-net</artifactId> 
</dependency> 
<dependency> 
    <groupId>io.projectreactor.spring</groupId> 
    <artifactId>reactor-spring-context</artifactId> 
</dependency> 

<dependency> 
    <groupId>org.apache.activemq</groupId> 
    <artifactId>activemq-kahadb-store</artifactId> 
</dependency> 
<dependency> 
    <groupId>org.apache.activemq</groupId> 
    <artifactId>activemq-stomp</artifactId> 
</dependency> 
<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-activemq</artifactId> 
</dependency> 
相关问题