概念
WebSocket
一种基于单TCP连接的全双工通讯协议,在Web中可以提供与服务器双向通讯的能力
SockJS
一种类WebSocket的方案,它提供多种传输方式实现回退机制,确保在各种环境中都能实现双向通信
SockJS会提供以下传输方式:
- WebSocket(首选)
- Server-Sent Event(SSE)
- HTTP流
- HTTP轮询
- JSONP轮询
STOMP
STOMP(Simple Text Oriented Protocol)是一种轻量级、基于文本的协议,支持订阅(topic)或点对点(queue)通讯,STOMP通讯中通常含有以下角色:
- 客户端(Client):发送/接收消息,订阅topic/queue
- 应用服务器(Application Server):对消息进行业务处理以及路由
- 消息代理(Broker):接收客户端连接,管理目的地(queue/topic),对消息进行持久化
在整个STOMP中应用服务器更多充当起一个”中间件”和”代理服务器”的身份,对消息的持久化和向订阅topic/queue的设备发送数据的工作通常由Broker进行

STOMP的最小单位为帧(frame),帧由以下部分组成:
1 2 3 4 5 6 7
| COMMAND header1:value1 header2:value2 ... [空行] [消息体(可选)] ^@
|
- 帧类型(COMMAND)
- 消息头
- 消息体
- 空字节(脱出字符表示法为^@)
其中帧类型包括CONNECT(别名STOMP),CONNECTED,SEND,SUBSCRIBE,UNSUBSCRIBE,MESSAGE,ACK/NACK,BEGIN/COMMIT/ABORT,DISCONNECT,RECEIPT,ERROR
实例
添加依赖
添加spring-boot-starter-websocket
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
|
注意:保持spring-boot-starter-websocket与spring-boot本体的版本一致
添加配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Configuration @EnableWebSocketMessageBroker public class STOMPConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/ws-chat").setAllowedOrigins("*"); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/topic"); registry.setApplicationDestinationPrefixes("/app"); } }
|
定义消息类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Data public class Message { @JsonSerialize(using = ToStringSerializer.class) private Long id; @JsonSerialize(using = ToStringSerializer.class) private Long localId; private Integer userId; private Integer sessionId; @JsonSerialize(using = ToStringSerializer.class) private Long fileId; private String content; private Long createTime; private Long updateTime; private Long deleteTime; }
|
配置Controller
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Controller public class MessageController {
private final MessageManager messageManager;
@Autowired public MessageController(MessageManager messageManager) { this.messageManager = messageManager; }
@MessageMapping("/chat/send") public void sendMessage(@Payload Message message) { messageManager.sendMessage(message, (UserInfo) principal); } }
|
配置Service
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Service public class MessageManager { private final SimpMessagingTemplate simpMessagingTemplate;
@Autowired public MessageManager(SimpMessagingTemplate simpMessagingTemplate) { this.simpMessagingTemplate = simpMessagingTemplate; }
public void sendMessage(Message message) { simpMessagingTemplate.convertAndSend( "/topic/session." + message.getSessionId(), message ); } }
|
前端示例
前端使用sockjs-client或原生websocket建立双向连接并使用@stomp/stompjs进行STOMP通讯
1 2 3 4 5 6 7 8 9 10
| const socket = new SockJS('http://localhost:8080/ws') const client = new Client({ webSocketFactory: () => socket, onConnect: () => { logger.info("message service connected") client.subscribe("/topic/session.xxx", (msg) => { }) }, })
|
其他问题
为什么不合并Broker和应用服务器
SimpleBroker通常只在开发/简单需求环境下使用,生产环境下一般使用外置Broker(如RabbitMQ),比起SimpleBroker外置Broker拥有更多功能:
- 更多的消息路由类型(exchange、routing key、DLQ等)
- 持久化消息
- 支持集群、镜像队列、故障转移
- 多个应用服务器可连接同一Broker,共享消息状态,同理多个服务也可共享消息状态
queue和topic有什么区别
queue的消息只会被消费一次,既当有多个消费者时所有消费者会消费竞争
topic的消息则会复制成N份,每个订阅者各一份
queue/topic并不是通过协议命令区分,一般会将topic/queue放在路径前缀中,STOMP协议本身没有强制,在,决定其行为的是Broker本身
如何进行鉴权操作
STOMP有消息头,可以在连接帧中将鉴权信息附加到消息头中,在连接时由Spring Boot入站拦截器拦截并进行鉴权操作
前端:
1 2 3 4 5 6 7 8 9 10 11 12 13
| const socket = new SockJS('http://localhost:8080/ws') const client = new Client({ webSocketFactory: () => socket, connectHeaders: { Authorization: "xxx" }, onConnect: () => { logger.info("message service connected") client.subscribe("/topic/session.xxx", (msg) => { }) }, })
|
后端拦截器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Component @Slf4j public class AuthChannelInterceptor implements ChannelInterceptor {
@Autowired public AuthChannelInterceptor(JWTManager jwtManager) { }
@Override public Message<?> preSend(@NotNull Message<?> message, @NotNull MessageChannel channel) { var accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) { accessor.getUser(User) }
return message; } }
|
注意:不要用StompHeaderAccessor.wrap()获取并修改Accessor,wrap()会创建一个新的Accessor对象,导致修改不会被消息携带
1 2 3
| public static StompHeaderAccessor wrap(Message<?> message) { return new StompHeaderAccessor(message); }
|
配置类注册拦截器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Configuration @EnableWebSocketMessageBroker public class STOMPConfig implements WebSocketMessageBrokerConfigurer { private final AuthChannelInterceptor AuthChannelInterceptor;
@Autowired public STOMPConfig(AuthChannelInterceptor authChannelInterceptor) { this.AuthChannelInterceptor = authChannelInterceptor; }
@Override public void configureClientInboundChannel(org.springframework.messaging.simp.config.ChannelRegistration registration) { registration.interceptors(AuthChannelInterceptor); } }
|
控制层:
1 2 3 4
| @MessageMapping("/chat/send") public void sendMessage(@NotNull @Payload Message message, Principal principal) { messageManager.sendMessage(message, principal); }
|
连接帧中设置的Principal会与WebSocket的sessionId绑定,并在整个连接的生命周期中保留