Selectors not working in STOMP over sockjs spring integration setup

We are using STOMP over sockjs, spring integration and ActiveMQ as the message broker.  The two consumers we have work just fine when no selectors are used, but fail when selectors are used.  At this point we are scratching our heads and are looking for ideas.  The selector is based on a header we are adding in the presend of the outbound channel.  Selector in the test java class is set up as so: 
        String selector = "hello = 'world'"; 
        MessageConsumer consumer = session.createConsumer(destination, selector); 

client side selector is set up as so: 
        var headers = {'selector':'hello=world'}; 
        var connectCallback = function(frame) { 
            stompClient.subscribe("/topic/receipt", function(frame){console.log(frame);}, headers); 

The setup without selectors is as follows: 

Our client side: 
        var socket = new SockJS(this.getUrl()); 
        var stompClient = Stomp.over(socket); 
        stompClient.connect('', '', 
        connectCallback, 
        errorCallback 
        ); 

        var connectCallback = function(frame) { 
            stompClient.subscribe("/topic/receipt", function(frame){console.log(frame);}) 
            stompClient.send("/app/" + url.join('/'), {"content-type": "text/plain"}, "<message>test messaage</message>"); 
        }; 

On the Spring side configuration of the message broker 
@Configuration 
@EnableWebSocketMessageBroker 
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { 

    @Override 
    public void registerStompEndpoints(StompEndpointRegistry registry) { 
        registry.addEndpoint("{product}/{security}/{system}/{tenant}/v1/pub").withSockJS(); 
    } 

    @Override 
    public void configureMessageBroker(MessageBrokerRegistry registry) { 
       //registry.enableSimpleBroker("/topic", "/queue/"); 
       registry.enableStompBrokerRelay("/topic", "/queue/"); 
       registry.setApplicationDestinationPrefixes("/app"); 
    } 

    @Override 
    public void configureClientInboundChannel(ChannelRegistration registration) { 
    } 

    @Override 
    public void configureClientOutboundChannel(ChannelRegistration registration) { 
       registration.setInterceptors(new MyInterceptor()); 
    } 


When the message is published, it goes through a spring controller first before being sent to ActiveMQ 
    @MessageMapping("{product}/{security}/{system}/{tenant}/v1/pub") 
    @SendTo("/topic/receipt") 
    public String publish( @DestinationVariable ("product") String product, 
           @DestinationVariable("security") String security, 
           @DestinationVariable("system") String system, 
           @DestinationVariable("tenant") String tenant, 
           String message) throws Exception 
    { 
                //do some stuff 

In the interceptor presend, I was trying to add tags to the header/nativeHeaders and came up with this message being sent to ActiveMQ 
    @Override 
    public Message<?> preSend(Message<?> message, MessageChannel channel) { 
       StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(message); 
       headerAccessor.setHeader("priority", "5"); 
       headerAccessor.setHeader("tree", "alpha"); 
       if(StompCommand.MESSAGE.equals(headerAccessor.getCommand())) { 
        
               Map<String, Object> map = headerAccessor.toMap(); 
               map.put("key1", "value1"); 
               Map nativeHeaders = new HashMap(); 
               nativeHeaders.put("hello", Collections.singletonList("world")); 
               map.put(NativeMessageHeaderAccessor.NATIVE_HEADERS, nativeHeaders); 
              GenericMessage msg = new GenericMessage(message.getPayload(), map); 
               System.out.println("==> " + msg); 
               return msg; 
       } 
       else { 
            return message; 
       } 
    } 

One consumer is on the client side in the connectCallback, and the other consumer is the java class snippet below. 
        Message replyJMSMessage = consumer.receive(); 
        System.out.println(replyJMSMessage); 
        if (replyJMSMessage != null && replyJMSMessage instanceof BytesMessage) 
        { 
            javax.jms.BytesMessage bytesMessage = (javax.jms.BytesMessage) replyJMSMessage; 
            byte[] bytes = new byte[(int) bytesMessage.getBodyLength()]; 
            bytesMessage.readBytes(bytes); 
            System.out.println("Reply Message"); 
            // the reply message 
            String replyMessage = new String(bytes, "UTF-8"); 
            System.out.println("   " + replyMessage); 
        } 



Reply | Threaded | More    

Re: Selectors not working in STOMP over sockjs spring integration setup

artnaseef
254 posts
Did you check some of the basics?  For example - looking at the message on the broker with the webconsole or via jmx to ensure the header is set on the message?  Also jmx can be used to verify the selector set by the client. 


Sent from my iPhone

On Feb 25, 2014, at 2:32 PM, "legolas [via ActiveMQ]" <[hidden email]> wrote:

We are using STOMP over sockjs, spring integration and ActiveMQ as the message broker.  The two consumers we have work just fine when no selectors are used, but fail when selectors are used.  At this point we are scratching our heads and are looking for ideas.  The selector is based on a header we are adding in the presend of the outbound channel.  Selector in the test java class is set up as so: 
        String selector = "hello = 'world'"; 
        MessageConsumer consumer = session.createConsumer(destination, selector); 

client side selector is set up as so: 
        var headers = {'selector':'hello=world'}; 
        var connectCallback = function(frame) { 
            stompClient.subscribe("/topic/receipt", function(frame){console.log(frame);}, headers); 

The setup without selectors is as follows: 

Our client side: 
        var socket = new SockJS(this.getUrl()); 
        var stompClient = Stomp.over(socket); 
        stompClient.connect('', '', 
        connectCallback, 
        errorCallback 
        ); 

        var connectCallback = function(frame) { 
            stompClient.subscribe("/topic/receipt", function(frame){console.log(frame);}) 
            stompClient.send("/app/" + url.join('/'), {"content-type": "text/plain"}, "<message>test messaage</message>"); 
        }; 

On the Spring side configuration of the message broker 
@Configuration 
@EnableWebSocketMessageBroker 
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { 

    @Override 
    public void registerStompEndpoints(StompEndpointRegistry registry) { 
        registry.addEndpoint("{product}/{security}/{system}/{tenant}/v1/pub").withSockJS(); 
    } 

    @Override 
    public void configureMessageBroker(MessageBrokerRegistry registry) { 
       //registry.enableSimpleBroker("/topic", "/queue/"); 
       registry.enableStompBrokerRelay("/topic", "/queue/"); 
       registry.setApplicationDestinationPrefixes("/app"); 
    } 

    @Override 
    public void configureClientInboundChannel(ChannelRegistration registration) { 
    } 

    @Override 
    public void configureClientOutboundChannel(ChannelRegistration registration) { 
       registration.setInterceptors(new MyInterceptor()); 
    } 


When the message is published, it goes through a spring controller first before being sent to ActiveMQ 
    @MessageMapping("{product}/{security}/{system}/{tenant}/v1/pub") 
    @SendTo("/topic/receipt") 
    public String publish( @DestinationVariable ("product") String product, 
           @DestinationVariable("security") String security, 
           @DestinationVariable("system") String system, 
           @DestinationVariable("tenant") String tenant, 
           String message) throws Exception 
    { 
                //do some stuff 

In the interceptor presend, I was trying to add tags to the header/nativeHeaders and came up with this message being sent to ActiveMQ 
    @Override 
    public Message<?> preSend(Message<?> message, MessageChannel channel) { 
       StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(message); 
       headerAccessor.setHeader("priority", "5"); 
       headerAccessor.setHeader("tree", "alpha"); 
       if(StompCommand.MESSAGE.equals(headerAccessor.getCommand())) { 
        
               Map<String, Object> map = headerAccessor.toMap(); 
               map.put("key1", "value1"); 
               Map nativeHeaders = new HashMap(); 
               nativeHeaders.put("hello", Collections.singletonList("world")); 
               map.put(NativeMessageHeaderAccessor.NATIVE_HEADERS, nativeHeaders); 
              GenericMessage msg = new GenericMessage(message.getPayload(), map); 
               System.out.println("==> " + msg); 
               return msg; 
       } 
       else { 
            return message; 
       } 
    } 

One consumer is on the client side in the connectCallback, and the other consumer is the java class snippet below. 
        Message replyJMSMessage = consumer.receive(); 
        System.out.println(replyJMSMessage); 
        if (replyJMSMessage != null && replyJMSMessage instanceof BytesMessage) 
        { 
            javax.jms.BytesMessage bytesMessage = (javax.jms.BytesMessage) replyJMSMessage; 
            byte[] bytes = new byte[(int) bytesMessage.getBodyLength()]; 
            bytesMessage.readBytes(bytes); 
            System.out.println("Reply Message"); 
            // the reply message 
            String replyMessage = new String(bytes, "UTF-8"); 
            System.out.println("   " + replyMessage); 
        } 






To start a new topic under ActiveMQ - User, email [hidden email] 
To unsubscribe from ActiveMQ - User, click here.
NAML
Reply | Threaded | More    

Re: Selectors not working in STOMP over sockjs spring integration setup

legolas
2 posts
Looks like the custom header is not included in the message when I look at it via the broker webconsole.   

In addition, it looks like it goes through the interceptor before it goes to the controller.  The way I understand it is when it leaves the controller, the return value is sent as a payload in a message to ActiveMQ.  How do I set a header or property in the message that gets sent to ActiveMQ from the controller? 

When the message is published, it goes through a spring controller first before being sent to ActiveMQ 
    @MessageMapping("{product}/{security}/{system}/{tenant}/v1/pub") 
    @SendTo("/topic/receipt") 
    public String publish( @DestinationVariable ("product") String product, 
           @DestinationVariable("security") String security, 
           @DestinationVariable("system") String system, 
           @DestinationVariable("tenant") String tenant, 
           String message) throws Exception 
    { 
                //do some stuff 
               return message // this return value is sent as a payload in a message to ActiveMQ 
     } 
<quote author="artnaseef">
Did you check some of the basics?  For example - looking at the message on the broker with the webconsole or via jmx to ensure the header is set on the message?  Also jmx can be used to verify the selector set by the client. 



출처 - http://activemq.2283324.n4.nabble.com/Selectors-not-working-in-STOMP-over-sockjs-spring-integration-setup-td4678358.html


'Framework & Platform > Spring' 카테고리의 다른 글

spring - spring mvc test  (0) 2014.04.17
spring data - mongodb tip  (0) 2014.04.04
spring - springx2013-websocket  (0) 2014.03.29
spring - WebSocket STOMP  (0) 2014.03.28
spring - WebSocket  (0) 2014.03.27
Posted by linuxism
,


WebSocket Apps
Spring Framework 4 (part 1)

Rossen Stoyanchev


Part 1:

Comprehensive Intro to WebSocket

in Spring Framework 4.0


Part 2:

Building WebSocket Applications

The WebSocket Protocol

How Did We Get Here?

How WebSocket Works

Practical Considerations

Java WebSocket API

Relation To the Servlet API

Server Availability

Spring Framework 4.0

Example




import org.springframework.web.socket.*;


public class MyHandler extends TextWebSocketHandlerAdapter {


  @Override
  public void handleTextMessage(WebSocketSession session,
      TextMessage message) throws Exception {

    session.sendMessage(message);

  }

}

Basic Config


@Configuration
@EnableWebSocket
public class WsConfig implements WebSocketConfigurer {


  @Override
  public void registerWebSocketHandlers(
        WebSocketHandlerRegistry registry) {

    registry.addHandler(new MyHandler(), "/echo");
  }

}

Browser client


var ws = new WebSocket("wss://localhost:8080/myapp/echo");

ws.onopen = function () {
  console.log("opened");
};

ws.onmessage = function (event) {
  console.log('message: ' + event.data);
};

ws.onclose = function (event) {
  console.log('closed:' + event.code);
};

HandshakeInterceptor

Add Interceptor


@Configuration
@EnableWebSocket
public class WsConfig implements WebSocketConfigurer {


  @Override
  public void registerWebSocketHandlers(
        WebSocketHandlerRegistry registry) {

    registry.addHandler(echoHandler(), "/echo")
        .addInterceptors(new MyHandshakeInterceptor());
  }

}

Per-Session Handler

Configure Per-Session Handler


@Configuration
@EnableWebSocket
public class WsConfig implements WebSocketConfigurer {


  @Bean
  public WebSocketHandler snakeHandler() {
    return new PerConnectionWebSocketHandler(
        SnakeWebSocketHandler.class);
  }


  @Override
  public void registerWebSocketHandlers(
        WebSocketHandlerRegistry registry) {

    registry.addHandler(snakeHandler(), "/snake");
  }

}

Deployment Notes

SockJS

SockJS Protocol

SockJS URL Scheme




    GET  /echo

    GET  /echo/info

    POST /echo/<server>/<session>/<transport>

SockJsService

Diagram with SockJS Service

Configure SockJS




@Configuration
@EnableWebSocket
public class WsConfig implements WebSocketConfigurer {


  @Override
  public void registerWebSocketHandlers(
        WebSocketHandlerRegistry registry) {

    registry.addHandler(myHandler(), "/echo").withSockJS();
  }

}

WebSocket Handler

(same as before)




import org.springframework.web.socket.*;


public class MyHandler extends TextWebSocketHandlerAdapter {

  @Override
  public void handleTextMessage(WebSocketSession session,
      TextMessage message) throws Exception {

    session.sendMessage(message);

  }

}

Server Versions

Detecting Client Disconnects

WebSocket/SockJS Sample



https://github.com/rstoyanchev/

spring-websocket-test


Questions?


WebSocket Apps
Spring Framework 4 (part 2)

Rossen Stoyanchev


Non-Trivial Applications

Using a WebSocket API is a bit like

writing a custom Servlet application


Except WebSocket is even lower level

You'll likely have 1 WebSocket handler

for the whole application

Annotations can't help much either...

...not without making assumptions about

what's in a message!

"The basic issue is there isn't enough information in an incoming websocket message for the container to know where to route it if there are multiple methods where it may land."



Danny Coward, JSR-356 lead
In response to question on user mailing list

"The subprotocol attribute of the handshake might serve as a useful place to attach this kind of message description/meta data, or some JSR356 specific headers in the handshake. Of course, the other issue is that the other end is often javascript, which would need to participate in some way in such a scheme."
"These are all things we'll likely look at in the next version."

From next reply on same thread

The Sub-protocol Attribute

Furthermore..

Message Brokers an Option?

We Need a Bit of Both Worlds


Many Approaches Exist



Logos of projects


The Spring Approach




STOMP

STOMP Frame


STOMP Frame Content


Client-to-Server Commands

Server-to-Client Commands

"Destination" Header

Keep in mind:


A server cannot send unsolicited messages

client must subscribe first

Example

Client Sends Message


SEND frame



Example

Client Subscribes To Receive Messages


SUBSCRIBE frame

Example

Client Receives Message


MESSAGE frame

STOMP vs plain WebSocket

STOMP over WebSocket

Spring Framework 4.0

Basic Configuration


@Configuration
@EnableWebSocketMessageBroker
public class Config
    implements WebSocketMessageBrokerConfigurer{


  @Override
  public void registerStompEndpoints(StompEndpointRegistry r){
    r.addEndpoint("/stomp");
  }

  @Override
  public void configureMessageBroker(MessageBrokerConfigurer c){
    c.enableSimpleBroker("/topic/");
    c.setApplicationDestinationPrefixes("/app");
  }

}


Architecture diagram


Handle a Message



@Controller
public class GreetingController {


  @MessageMapping("/greetings")
  public void handle(String greeting) {
    // ...
  }

}

@MessageMapping

Return Values

Send via Return Value



@Controller
public class GreetingController {

  // A message is broadcast to "/topic/greetings"

  @MessageMapping("/greetings")
  public String greet(String greeting) {
      return "[" + getTimestamp() + "]: " + greeting;
  }

}

Send to Different Destination

with @SendTo



@Controller
public class GreetingController {


  @MessageMapping("/greetings")
  @SendTo("/topic/wishes")
  public String greet(String greeting) {
      return "[" + getTimestamp() + "]: " + greeting;
  }

}

Send via SimpMessagingTemplate


@Controller
public class GreetingController {

  @Autowired
  private SimpMessagingTemplate template;


  @RequestMapping(value="/greetings", method=POST)
  public void greet(String greeting) {
    String text = "[" + getTimeStamp() + "]:" + greeting;
    this.template.convertAndSend("/topic/wishes", text);
  }

}

Handle Subscription

(Request-Reply Pattern)


@Controller
public class PortfolioController {


  @SubscribeEvent("/positions")
  public List<Position> getPositions(Principal p) {
    Portfolio portfolio = ...
    return portfolio.getPositions();
  }

}

Plug in Message Broker

Steps to Use Message Broker

Enable "Broker Relay"


@Configuration
@EnableWebSocketMessageBroker
public class Config
    implements WebSocketMessageBrokerConfigurer{


  @Override
  public void configureMessageBroker(MessageBrokerConfigurer c){
    c.enableStompBrokerRelay("/queue/", "/topic/");
    c.setApplicationDestinationPrefixes("/app");
  }

}

Architecture diagram


Authentication

Destination "/user/**"

UserDestinationHandler

Client Subscribes

To "/user/queue/..."


var socket = new SockJS('/myapp/portfolio');
var client = Stomp.over(socket);

client.connect('', '', function(frame) {

  client.subscribe("/user/queue/trade-confirm",function(msg){
    // ...
  });

  client.subscribe("/user/queue/errors",function(msg){
    // ...
  });

}

Send Reply To User



@Controller
public class GreetingController {

  // Message sent to "/user/{username}/queue/greetings"

  @MessageMapping("/greetings")
  @SendToUser
  public String greet(String greeting) {
      return "[" + getTimestamp() + "]: " + greeting;
  }

}

Send Error To User


@Controller
public class GreetingController {



  @MessageExceptionHandler
  @SendToUser("/queue/errors")
  public String handleException(IllegalStateException ex) {
    return ex.getMessage();
  }

}

Send Message To User

via SimpMessagingTemplate



@Service
public class TradeService {

  @Autowired
  private SimpMessagingTemplate template;


  public void executeTrade(Trade trade) {
    String user = trade.getUser();
    String dest = "/queue/trade-confirm";
    TradeResult result = ...
    this.template.convertAndSendToUser(user, dest, result);
  }

}

Architecture diagram



Managing Inactive Queues

(with Full-Featured Broker)

Stock Portfolio App



https://github.com/rstoyanchev/

spring-websocket-portfolio






'Framework & Platform > Spring' 카테고리의 다른 글

spring data - mongodb tip  (0) 2014.04.04
spring - STOMP interceptor  (0) 2014.03.31
spring - WebSocket STOMP  (0) 2014.03.28
spring - WebSocket  (0) 2014.03.27
spring - @Scheduled  (0) 2014.02.11
Posted by linuxism
,


SPRING FRAMEWORK 4.0 M2: WEBSOCKET MESSAGING ARCHITECTURES

Rossen Stoyanchev

Overview

As I wrote previously, a WebSocket API is only the starting point for WebSocket-style messaging applications. Many practical challenges remain. As one Tomcat mailing list user mused recently:

it does seem to me that websockets is still not really "production-ready", (I am not talking about the Tomcat implementation per se, but more generally) … native websockets capability in IE is only available since IE-10 and that solutions which allow this to work in lower IE versions are a bit "iffy" (relying on a diversion through Adobe's FlashPlayer e.g.). (Most of our customers are largish corporations, which are not going to update their browsers, nor open special ports in their firewalls, just to please us).

The first milestone of Spring Framework 4.0 provided server-side support for SockJS, the best and the most comprehensive WebSocket browser fallback options. You will need fallback options in browsers that don't support WebSocket and in situations where network proxies prevent its use. Simply put SockJS enables you to build WebSocket applications today and rely on transparent fallback options when necessary.

Even with fallback options, bigger challenges remain. A socket is a very low-level abstraction and the vast majority of web applications today do not program to sockets. This is why the WebSocket protocol defines a sub-protocol mechanism that essentially enables, and encourages, the use of higher-level protocols over WebSocket, much like we use HTTP over TCP.

The second milestone of Spring Framework 4.0 enables the use of higher-level messaging protocols over WebSocket. To demonstrate this we've put together a sample application.

Stock Portfolio Sample

The Stock Portfolio sample application, available on Github, loads a user's portfolio positions, allows buying and selling shares, consumes price quotes, and displays position updates. It is a reasonably simple application. Yet it handles a number of common tasks that are likely to come up in browser-based messaging applications.

Snapshot of Stock Portfolio Application

So how do we put together an application like that? From HTTP and REST we are used to relying on URLs along with HTTP verbs to express what needs to be done. Here we have a socket and lots of messages. How do you tell who a message is for and what the message means?

Browser and Server exchange messages but what's in the message?

Browser and server must agree on a common message format before such semantics can be expressed. Several protocols exist that can help. We chose STOMP for this milestone due to its simplicity and wide support.

Simple/Streaming Text-Oriented Messaging Protocol (STOMP)

STOMP is a messaging protocol created with simplicity in mind. It is based on frames modelled on HTTP. A frame consists of a command, optional headers, and optional body.

For example the Stock Portfolio application needs to receive stock quotes, so the client sends a SUBSCRIBE frame where the destination header indicates what the client wants to subscribe to:

SUBSCRIBE
id:sub-1
destination:/topic/price.stock.*

As stock quotes become available, the server sends a MESSAGE frame with a matching destination and subscription id as well as a content-type header and a body:

MESSAGE
subscription:sub-1
message-id:wm2si1tj-4
content-type: application/json
destination:/topic/stocks.PRICE.STOCK.NASDAQ.EMC
 
{\"ticker\":\"EMC\",\"price\":24.19}

To do all that in the browser we use stomp.js and the SockJS client:

var socket = new SockJS('/spring-websocket-portfolio/portfolio');
var client = Stomp.over(socket);
 
var onConnect = function() {
  client.subscribe("/topic/price.stock.*", function(message) {
      // process quote
  });
};
client.connect('guest', 'guest', onConnect);

This is a huge gain already!! We have a standard message format and client-side support.

Now we can move one to the server side.

Message-Broker Solution

One server-side option is a pure message-broker solution where messages are sent directly to a traditional message broker like RabbitMQ, ActiveMQ, etc. Most, if not all brokers, support STOMP over TCP but increasingly they support it over WebSocket too while RabbitMQ goes further and also supports SockJS. Our architecture would look like this:

Browser sends STOMP messages to broker, application connects to broker via AMQP or JMS

This is a robust and scalable solution but arguably not the best fit for the problem at hand. Message brokers have typically been used within the enterprise. Exposing them directly over the web isn't ideal.

If we've learned anything from REST it is that we don't want to expose details about the internals of our system like the database or the domain model.

Furthermore, as a Java developer you want to apply security, validation, and add application logic. In a message-broker solution the application server sits behind the message broker, which is a significant departure from what most web application developer are used to.

This is why a library such as socket.io is popular. It is simple and it targets the needs of web applications. On other hand we must not ignore the capabilities of message brokers to handle messages, they are really good at it and messaging is a hard problem. We need the best of both.

Application and Message-Broker Solution

Another approach is to make the application handle incoming messages and serve as intermediary between web clients and the message broker. Messages from clients can flow to the broker through the application and reversely messages from the broker can flow back to clients through the application. This gives the application a chance to examine the incomingmessage type and "destination" header and decide whether to handle the message or pass it on to the broker.

Browser sends messages to application that in turn sends messages to a message broker

This is the approach we've chosen. To illustrate better here are some scenarios.

Load portfolio positions

  • Client requests portfolio positions
  • The application handles the request by loading and returning the data to the subscription
  • The message broker is not involved in this interaction

Subscribe for stock quotes

  • Client sends subscription request for stock quotes
  • The application passes the message to the message broker
  • The message broker propagates the message to all subscribed clients

Receive stock quotes

  • QuoteService sends stock quote message to the message broker
  • The message broker propagates the message to all subscribed clients

Execute a trade

  • Client sends trade request
  • The application handles it, submits the trade for execution through the TradeService
  • The message broker is not involved in this interaction

Receive position update

  • Trade service sends a position update message to a queue on the message broker
  • The message broker sends the position update to the client
  • Sending messages to a specific user is covered in more detail further below

Strictly speaking the use of a message broker is optional. We provide an out-of-the-box "simple" alternative for getting-started. However the use of a message broker is recommended for scalability and for deployments with multiple application servers.

Code Snippets

Let's see some examples of client and server-side code.

This is portfolio.js requesting portfolio positions:

stompClient.subscribe("/app/positions", function(message) {
  self.portfolio().loadPositions(JSON.parse(message.body));
});

On the server side PortfolioController detects the request and returns portfolio positions demonstrating a request-reply interaction that is very common in web applications. Since we use Spring Security to protect HTTP requests, including the one leading to the WebSocket handshake, the principal method argument below is taken from the user principal Spring Security set on the HttpServletRequest.

@Controller
public class PortfolioController {
 
  // ...
 
  @SubscribeEvent("/app/positions")
  public List<PortfolioPosition> getPortfolios(Principal principal) {
    String user = principal.getName();
    Portfolio portfolio = this.portfolioService.findPortfolio(user);
    return portfolio.getPositions();
  }
}

This is portfolio.js sending a trade request:

stompClient.send("/app/trade", {}, JSON.stringify(trade));

On the server side PortfolioController sends the trade for execution:

@Controller
public class PortfolioController {
 
  // ...
 
  @MessageMapping(value="/app/trade")
  public void executeTrade(Trade trade, Principal principal) {
    trade.setUsername(principal.getName());
    this.tradeService.executeTrade(trade);
  }
}

PortfolioController can also handle unexpected exceptions by sending a message to the user.

@Controller
public class PortfolioController {
 
  // ...
 
  @MessageExceptionHandler
  @ReplyToUser(value="/queue/errors")
  public String handleException(Throwable exception) {
    return exception.getMessage();
  }
}

What about sending messages from within the application to subscribed clients? This is how the QuoteService sends quotes:

@Service
public class QuoteService {
 
  private final MessageSendingOperations<String> messagingTemplate;
 
  @Scheduled(fixedDelay=1000)
  public void sendQuotes() {
    for (Quote quote : this.quoteGenerator.generateQuotes()) {
      String destination = "/topic/price.stock." + quote.getTicker();
      this.messagingTemplate.convertAndSend(destination, quote);
    }
  }
}

And this is how the TradeService sends position updates after a trade is executed:

@Service
public class TradeService {
 
  // ...
 
  @Scheduled(fixedDelay=1500)
  public void sendTradeNotifications() {
    for (TradeResult tr : this.tradeResults) {
      String queue = "/queue/position-updates";
      this.messagingTemplate.convertAndSendToUser(tr.user, queue, tr.position);
    }
  }
}

And just in case you're wondering… yes PortfolioController can also contain Spring MVC methods (e.g. @RequestMapping) as suggested in this ticket by a developer who previously built an online game application:

Yes, having [message] mappings and spring mvc mappings consolidated would be nice. There is no reason why they can't be unified.

And just like the QuoteService and TradeService, Spring MVC controller methods can publish messages too.

Messaging Support For Spring Applications

For a long time Spring Integration has provided first-class abstractions for the well-knownEnterprise Integration patterns as well as lightweight messagings. While working on this milestone we realized the latter was exactly what we needed to build on.

As a result I'm pleased to announce we've moved a selection of Spring Integration types to the Spring Framework into a new module predictably called spring-messaging. Besides core abstractions such as MessageMessageChannelMessageHandler, and others, the new module contains all the annotations and classes to support the new features described in this post.

With that in mind we can now look at a diagram of the internal architecture of the Stock Portfolio application:

Diagram of internal architecture with message broker

StompWebSocketHandler puts incoming client messages on the "dispatch" message channel. There are 3 subscribers to this channel. The first one delegates to annotated methods, the second relays messages to a STOMP message broker, while the third one handles messages to individual users by transforming the destination into a unique queue name to which the client is subscribed (more detail to come).

By default the application runs with a "simple" message broker provided as a getting-started option. As explained in the sample README, you can alternate between the "simple" and a full-featured message broker by activating and de-activating profiles.

Diagram of internal architecture with simple broker

Another possible configuration change is to switch from Executor to Reactor-based implementations of MessageChannel for message passing. The Reactor project that recently released a first milestone is also used to manage TCP connections between the application and the message broker.

You can see the full application configuration that also includes the new Spring Security Java configuration. You might also be interested in the improved STS support for Java configuration.

Sending Messages To a Single User

It is easy to see how messages can be broadcast to multiple subscribed clients, just publish a message to a topic. It is more difficult to see how to send a message to a specific user. For example you may catch an exception and would like to send an error message. Or you may have received a trade confirmation and would like to send it to the user.

In traditional messaging applications it is common to create a temporary queue and set a "reply-to" header on any message to which a reply is expected. This works but feels rather cumbersome in web applications. The client must remember to set the necessary header on all applicable messages and the server application may need to keep track and pass this around. Sometimes such information may simply not be readily available, e.g. while handling an HTTP POST as an alternative to passing messages.

To support this requirement, we send a unique queue suffix to every connected client. The suffix can then be appended to create unique queue names.

client.connect('guest', 'guest', function(frame) {
 
  var suffix = frame.headers['queue-suffix'];
 
  client.subscribe("/queue/error" + suffix, function(msg) {
    // handle error
  });
 
  client.subscribe("/queue/position-updates" + suffix, function(msg) {
    // handle position update
  });
 
});

Then on the server-side an @MessageExceptionHandler method (or any message-handling method) can add an @ReplyToUser annotation to send the return value as a message.

@MessageExceptionHandler
@ReplyToUser(value="/queue/errors")
public String handleException(Throwable exception) {
  // ...
}

All other classes, like the TradeService, can use a messaging template to achieve the same.

String user = "fabrice";
String queue = "/queue/position-updates";
this.messagingTemplate.convertAndSendToUser(user, queue, position);

In both cases internally we locate the user queue suffix (through the configuredUserQueueSuffixResolver) in order to reconstruct the correct queue name. At the moment there is only one simple resolver implementation. However, it would be easy to add a Redisimplementation that would support the same feature regardless of whether the user is connected to this or another application server.

Conclusion

Hopefully this has been a useful introduction of the new functionality. Rather than making the post longer, I encourage you to check the sample and consider what it means for applications you write or intend to write. It is a perfect time for feedback as we work towards a release candidate in early September.

To use Spring Framework 4.0.0.M2 add the http://repo.springsource.org/libs-milestone or thehttp://repo.springsource.org/milestone repositories to your configuration. The former includes transient dependencies as explained in our Repository FAQ.

SpringOne 2GX 2013 is around the corner

Book your place at SpringOne in Santa Clara soon. It's simply the best opportunity to find out first hand all that's going on and to provide direct feedback. Expect a number of significant new announcements this year. Check recent blog posts to see what I mean and there is more to come!

SIMILAR POSTS

Share this Post
  • Digg
  •  
  • Sphinn
  •  
  • del.icio.us
  •  
  • Facebook
  •  
  • Mixx
  •  
  • Google Bookmarks
  •  
  • DZone
  •  
  • LinkedIn
  •  
  • Slashdot
  •  
  • Technorati
  •  
  • Twitter
 





출처 - http://assets.spring.io/wp/WebSocketBlogPost.html








https://github.com/rstoyanchev/spring-websocket-portfolio








Using Spring 4 WebSocket, sockJS and Stomp support to implement two way server client communication

One exciting new feature of Spring 4 is the support for WebSocket, SockJS and STOMP messaging. This allows two way communication between the server and its clients in a Spring MVC web application using the standard point-to-point and publish-subscribe messaging protocols. In this post, I will demonstrate how to set up a basic boilerplate project to start using this new feature. It is in part based on this article.

Maven Setup

First we need to add the Spring messaging modules in the POM file:

<dependency>
 <groupId>org.springframework</groupId>
 <artifactId>spring-messaging</artifactId>
 <version>4.0.0.RELEASE</version>
 </dependency>
 <dependency>
 <groupId>org.springframework</groupId>
 <artifactId>spring-websocket</artifactId>
 <version>4.0.0.RELEASE</version>
 </dependency>

Spring MVC Configuration

Next, we need to add the message broker config to the Spring MVC config XML file.

<beans
 ...
 xmlns:websocket="http://www.springframework.org/schema/websocket"
 xsi:schemaLocation="
 ...

http://www.springframework.org/schema/websocket

 http://www.springframework.org/schema/websocket/spring-websocket-4.0.xsd">
<websocket:message-broker application-destination-prefix="/app">
       <websocket:stomp-endpoint path="/hello">
            <websocket:sockjs/>
       </websocket:stomp-endpoint>
       <websocket:simple-broker prefix="/topic"/>
</websocket:message-broker>
<!-- Other MVC config omitted here-->

The main thing here is the set up of the message broker for handling the message exchange between the server and its clients. This is done via the <message-broker> and its child tags. The tag <websocket:simple-broker>indicates we are using in-memory message broker.

It is easy to understand together with the server and client codes so I will include them below first before attempting to give a bit more explanations by cross-referencing the client and server codes.

Spring MVC Controller

Below is my Spring MVC Controller

 @Controller
 public class MessageController {
      @MessageMapping("/hello")
      @SendTo("/topic/greetings")
      public Greeting greeting(HelloMessage message) throws Exception {
           return new Greeting("Hello, " + message.getName() + "!");
     }
}

The method argument HelloMessage and output Greeting are just POJOs representing the body of the messages being sent and returned.

public class Greeting {
    private String content;
    public Greeting(String content) {
           this.content = content;
    }
    public String getContent() {
      return content;
    }
}
public class HelloMessage {
    private String name;
    public String getName() {
        return name;
    }
}

Client sockJS and STOMP codes

On the client side, I use the sockJS protocol fallback option as outlined in the Spring documentation. The javascript codes are included below

// Create stomp client over sockJS protocol (see Note 1)
 var socket = new SockJS("/hello");
 var stompClient = Stomp.over(socket);

 // callback function to be called when stomp client is connected to server (see Note 2)
 var connectCallback = function() {
      alert("connected!");
      stompClient.subscribe('/topic/greetings', function(greeting){
           alert(JSON.parse(greeting.body).content);
      });
 }; 

 // callback function to be called when stomp client could not connect to server (see Note 3)
 var errorCallback = function(error) {
      // display the error's message header:
      alert(error.headers.message);
 };

 // Connect as guest (Note 4)
 stompClient.connect("guest", "guest", connectCallback, errorCallback);

Note

  1. The client starts by create a sockJS by specifying the endpoint (ie. /hello) to connect to and then a stomp client is created over the socket. The endpoint here should match that defined in the Spring MVC configuration in the lines. Note also the 2nd line referring to sockJS.

    <websocket:stomp-endpoint path=”/hello”>
    <websocket:sockjs/>
    </websocket:stomp-endpoint>

  2. Then a callback function is created and assigned to a variable connectCallback. This is called when a successful connection is made by the stomp client. This allows us to start making subscriptions to messages (as in codes, repeated below) and sending messages. Note the subscription is for the topic “/topic/greetings”

    stompClient.subscribe(‘/topic/greetings’, function(greeting){
    alert(JSON.parse(greeting.body).content);
    });

  3. A error callback function is defined if stomp client fails to connect to server.
  4. This line makes the connection registering the callback functions.

Now we are ready to send messages from the client, e.g. using the following javascript function

// function to send message
 function fnSayHi() {
       stompClient.send("/app/hello", {}, JSON.stringify({ 'name': 'Joe' }));
 }

The message will be sent to the Spring MVC message handler method greeting() as defined via the annotation @MessageMapping(“/hello”).

 <websocket:message-broker application-destination-prefix=”/app”>

Note the prefix “/app” is defined in the Spring config as  application-destination-prefix attribute of the message broker: Note also, the use of @SendTo annotation to direct the message to a given destination. I repeat the controller method below

 @MessageMapping("/hello")
 @SendTo("/topic/greetings")
 public Greeting greeting(HelloMessage message) throws Exception {
      return new Greeting("Hello, " + message.getName() + "!");
 }

That’s it for now.



출처 - http://raymondhlee.wordpress.com/2014/01/19/using-spring-4-websocket-sockjs-and-stomp-support-to-implement-two-way-server-client-communication/








'Framework & Platform > Spring' 카테고리의 다른 글

spring - STOMP interceptor  (0) 2014.03.31
spring - springx2013-websocket  (0) 2014.03.29
spring - WebSocket  (0) 2014.03.27
spring - @Scheduled  (0) 2014.02.11
spring data - mongoDB Date type  (0) 2014.01.15
Posted by linuxism
,