Framework & Platform/Spring

spring - STOMP interceptor

linuxism 2014. 3. 31. 10:57


Selectors not working in STOMP over sockjs spring integration setup

We are using STOMP over sockjs, spring integration and ActiveMQ as the message broker.  The two consumers we have work just fine when no selectors are used, but fail when selectors are used.  At this point we are scratching our heads and are looking for ideas.  The selector is based on a header we are adding in the presend of the outbound channel.  Selector in the test java class is set up as so: 
        String selector = "hello = 'world'"; 
        MessageConsumer consumer = session.createConsumer(destination, selector); 

client side selector is set up as so: 
        var headers = {'selector':'hello=world'}; 
        var connectCallback = function(frame) { 
            stompClient.subscribe("/topic/receipt", function(frame){console.log(frame);}, headers); 

The setup without selectors is as follows: 

Our client side: 
        var socket = new SockJS(this.getUrl()); 
        var stompClient = Stomp.over(socket); 
        stompClient.connect('', '', 
        connectCallback, 
        errorCallback 
        ); 

        var connectCallback = function(frame) { 
            stompClient.subscribe("/topic/receipt", function(frame){console.log(frame);}) 
            stompClient.send("/app/" + url.join('/'), {"content-type": "text/plain"}, "<message>test messaage</message>"); 
        }; 

On the Spring side configuration of the message broker 
@Configuration 
@EnableWebSocketMessageBroker 
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { 

    @Override 
    public void registerStompEndpoints(StompEndpointRegistry registry) { 
        registry.addEndpoint("{product}/{security}/{system}/{tenant}/v1/pub").withSockJS(); 
    } 

    @Override 
    public void configureMessageBroker(MessageBrokerRegistry registry) { 
       //registry.enableSimpleBroker("/topic", "/queue/"); 
       registry.enableStompBrokerRelay("/topic", "/queue/"); 
       registry.setApplicationDestinationPrefixes("/app"); 
    } 

    @Override 
    public void configureClientInboundChannel(ChannelRegistration registration) { 
    } 

    @Override 
    public void configureClientOutboundChannel(ChannelRegistration registration) { 
       registration.setInterceptors(new MyInterceptor()); 
    } 


When the message is published, it goes through a spring controller first before being sent to ActiveMQ 
    @MessageMapping("{product}/{security}/{system}/{tenant}/v1/pub") 
    @SendTo("/topic/receipt") 
    public String publish( @DestinationVariable ("product") String product, 
           @DestinationVariable("security") String security, 
           @DestinationVariable("system") String system, 
           @DestinationVariable("tenant") String tenant, 
           String message) throws Exception 
    { 
                //do some stuff 

In the interceptor presend, I was trying to add tags to the header/nativeHeaders and came up with this message being sent to ActiveMQ 
    @Override 
    public Message<?> preSend(Message<?> message, MessageChannel channel) { 
       StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(message); 
       headerAccessor.setHeader("priority", "5"); 
       headerAccessor.setHeader("tree", "alpha"); 
       if(StompCommand.MESSAGE.equals(headerAccessor.getCommand())) { 
        
               Map<String, Object> map = headerAccessor.toMap(); 
               map.put("key1", "value1"); 
               Map nativeHeaders = new HashMap(); 
               nativeHeaders.put("hello", Collections.singletonList("world")); 
               map.put(NativeMessageHeaderAccessor.NATIVE_HEADERS, nativeHeaders); 
              GenericMessage msg = new GenericMessage(message.getPayload(), map); 
               System.out.println("==> " + msg); 
               return msg; 
       } 
       else { 
            return message; 
       } 
    } 

One consumer is on the client side in the connectCallback, and the other consumer is the java class snippet below. 
        Message replyJMSMessage = consumer.receive(); 
        System.out.println(replyJMSMessage); 
        if (replyJMSMessage != null && replyJMSMessage instanceof BytesMessage) 
        { 
            javax.jms.BytesMessage bytesMessage = (javax.jms.BytesMessage) replyJMSMessage; 
            byte[] bytes = new byte[(int) bytesMessage.getBodyLength()]; 
            bytesMessage.readBytes(bytes); 
            System.out.println("Reply Message"); 
            // the reply message 
            String replyMessage = new String(bytes, "UTF-8"); 
            System.out.println("   " + replyMessage); 
        } 



Reply | Threaded | More    

Re: Selectors not working in STOMP over sockjs spring integration setup

artnaseef
254 posts
Did you check some of the basics?  For example - looking at the message on the broker with the webconsole or via jmx to ensure the header is set on the message?  Also jmx can be used to verify the selector set by the client. 


Sent from my iPhone

On Feb 25, 2014, at 2:32 PM, "legolas [via ActiveMQ]" <[hidden email]> wrote:

We are using STOMP over sockjs, spring integration and ActiveMQ as the message broker.  The two consumers we have work just fine when no selectors are used, but fail when selectors are used.  At this point we are scratching our heads and are looking for ideas.  The selector is based on a header we are adding in the presend of the outbound channel.  Selector in the test java class is set up as so: 
        String selector = "hello = 'world'"; 
        MessageConsumer consumer = session.createConsumer(destination, selector); 

client side selector is set up as so: 
        var headers = {'selector':'hello=world'}; 
        var connectCallback = function(frame) { 
            stompClient.subscribe("/topic/receipt", function(frame){console.log(frame);}, headers); 

The setup without selectors is as follows: 

Our client side: 
        var socket = new SockJS(this.getUrl()); 
        var stompClient = Stomp.over(socket); 
        stompClient.connect('', '', 
        connectCallback, 
        errorCallback 
        ); 

        var connectCallback = function(frame) { 
            stompClient.subscribe("/topic/receipt", function(frame){console.log(frame);}) 
            stompClient.send("/app/" + url.join('/'), {"content-type": "text/plain"}, "<message>test messaage</message>"); 
        }; 

On the Spring side configuration of the message broker 
@Configuration 
@EnableWebSocketMessageBroker 
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { 

    @Override 
    public void registerStompEndpoints(StompEndpointRegistry registry) { 
        registry.addEndpoint("{product}/{security}/{system}/{tenant}/v1/pub").withSockJS(); 
    } 

    @Override 
    public void configureMessageBroker(MessageBrokerRegistry registry) { 
       //registry.enableSimpleBroker("/topic", "/queue/"); 
       registry.enableStompBrokerRelay("/topic", "/queue/"); 
       registry.setApplicationDestinationPrefixes("/app"); 
    } 

    @Override 
    public void configureClientInboundChannel(ChannelRegistration registration) { 
    } 

    @Override 
    public void configureClientOutboundChannel(ChannelRegistration registration) { 
       registration.setInterceptors(new MyInterceptor()); 
    } 


When the message is published, it goes through a spring controller first before being sent to ActiveMQ 
    @MessageMapping("{product}/{security}/{system}/{tenant}/v1/pub") 
    @SendTo("/topic/receipt") 
    public String publish( @DestinationVariable ("product") String product, 
           @DestinationVariable("security") String security, 
           @DestinationVariable("system") String system, 
           @DestinationVariable("tenant") String tenant, 
           String message) throws Exception 
    { 
                //do some stuff 

In the interceptor presend, I was trying to add tags to the header/nativeHeaders and came up with this message being sent to ActiveMQ 
    @Override 
    public Message<?> preSend(Message<?> message, MessageChannel channel) { 
       StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(message); 
       headerAccessor.setHeader("priority", "5"); 
       headerAccessor.setHeader("tree", "alpha"); 
       if(StompCommand.MESSAGE.equals(headerAccessor.getCommand())) { 
        
               Map<String, Object> map = headerAccessor.toMap(); 
               map.put("key1", "value1"); 
               Map nativeHeaders = new HashMap(); 
               nativeHeaders.put("hello", Collections.singletonList("world")); 
               map.put(NativeMessageHeaderAccessor.NATIVE_HEADERS, nativeHeaders); 
              GenericMessage msg = new GenericMessage(message.getPayload(), map); 
               System.out.println("==> " + msg); 
               return msg; 
       } 
       else { 
            return message; 
       } 
    } 

One consumer is on the client side in the connectCallback, and the other consumer is the java class snippet below. 
        Message replyJMSMessage = consumer.receive(); 
        System.out.println(replyJMSMessage); 
        if (replyJMSMessage != null && replyJMSMessage instanceof BytesMessage) 
        { 
            javax.jms.BytesMessage bytesMessage = (javax.jms.BytesMessage) replyJMSMessage; 
            byte[] bytes = new byte[(int) bytesMessage.getBodyLength()]; 
            bytesMessage.readBytes(bytes); 
            System.out.println("Reply Message"); 
            // the reply message 
            String replyMessage = new String(bytes, "UTF-8"); 
            System.out.println("   " + replyMessage); 
        } 






To start a new topic under ActiveMQ - User, email [hidden email] 
To unsubscribe from ActiveMQ - User, click here.
NAML
Reply | Threaded | More    

Re: Selectors not working in STOMP over sockjs spring integration setup

legolas
2 posts
Looks like the custom header is not included in the message when I look at it via the broker webconsole.   

In addition, it looks like it goes through the interceptor before it goes to the controller.  The way I understand it is when it leaves the controller, the return value is sent as a payload in a message to ActiveMQ.  How do I set a header or property in the message that gets sent to ActiveMQ from the controller? 

When the message is published, it goes through a spring controller first before being sent to ActiveMQ 
    @MessageMapping("{product}/{security}/{system}/{tenant}/v1/pub") 
    @SendTo("/topic/receipt") 
    public String publish( @DestinationVariable ("product") String product, 
           @DestinationVariable("security") String security, 
           @DestinationVariable("system") String system, 
           @DestinationVariable("tenant") String tenant, 
           String message) throws Exception 
    { 
                //do some stuff 
               return message // this return value is sent as a payload in a message to ActiveMQ 
     } 
<quote author="artnaseef">
Did you check some of the basics?  For example - looking at the message on the broker with the webconsole or via jmx to ensure the header is set on the message?  Also jmx can be used to verify the selector set by the client. 



출처 - http://activemq.2283324.n4.nabble.com/Selectors-not-working-in-STOMP-over-sockjs-spring-integration-setup-td4678358.html