快速入门Spring Boot Websocket

概念

WebSocket

一种基于单TCP连接的全双工通讯协议,在Web中可以提供与服务器双向通讯的能力

SockJS

一种类WebSocket的方案,它提供多种传输方式实现回退机制,确保在各种环境中都能实现双向通信
SockJS会提供以下传输方式:

  1. WebSocket(首选)
  2. Server-Sent Event(SSE)
  3. HTTP流
  4. HTTP轮询
  5. JSONP轮询

STOMP

STOMP(Simple Text Oriented Protocol)是一种轻量级、基于文本的协议,支持订阅(topic)或点对点(queue)通讯,STOMP通讯中通常含有以下角色:

  1. 客户端(Client):发送/接收消息,订阅topic/queue
  2. 应用服务器(Application Server):对消息进行业务处理以及路由
  3. 消息代理(Broker):接收客户端连接,管理目的地(queue/topic),对消息进行持久化

在整个STOMP中应用服务器更多充当起一个”中间件”和”代理服务器”的身份,对消息的持久化和向订阅topic/queue的设备发送数据的工作通常由Broker进行

STMOP结构示例

STOMP的最小单位为帧(frame),帧由以下部分组成:

1
2
3
4
5
6
7
COMMAND
header1:value1
header2:value2
...
[空行]
[消息体(可选)]
^@
  1. 帧类型(COMMAND)
  2. 消息头
  3. 消息体
  4. 空字节(脱出字符表示法为^@)

其中帧类型包括CONNECT(别名STOMP),CONNECTED,SEND,SUBSCRIBE,UNSUBSCRIBE,MESSAGE,ACK/NACK,BEGIN/COMMIT/ABORT,DISCONNECT,RECEIPT,ERROR

实例

添加依赖

添加spring-boot-starter-websocket

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

注意:保持spring-boot-starter-websocket与spring-boot本体的版本一致

添加配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
@EnableWebSocketMessageBroker //启用STOMP支持
public class STOMPConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws-chat").setAllowedOrigins("*"); //添加端点用于WebSocket握手,设置CORS为*,若需要启用SockJS则加上.withSockJS()
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//启用简单内存Broker,也可以使用拓展Broker(如RabbitMQ)
registry.enableSimpleBroker("/topic");
//设置应用目的地前缀,当SEND帧的前缀为/app时会尝试使用MessageMapping进行匹配
registry.setApplicationDestinationPrefixes("/app");
}
}

定义消息类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Data
public class Message {
@JsonSerialize(using = ToStringSerializer.class)
private Long id;
@JsonSerialize(using = ToStringSerializer.class)
private Long localId;
private Integer userId;
private Integer sessionId;
@JsonSerialize(using = ToStringSerializer.class)
private Long fileId;
private String content;
private Long createTime;
private Long updateTime;
private Long deleteTime;
}

配置Controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Controller
public class MessageController {

private final MessageManager messageManager;

@Autowired
public MessageController(MessageManager messageManager) {
this.messageManager = messageManager;
}

@MessageMapping("/chat/send")
public void sendMessage(@Payload Message message) {
messageManager.sendMessage(message, (UserInfo) principal);
}
}

配置Service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Service
public class MessageManager {
private final SimpMessagingTemplate simpMessagingTemplate;

@Autowired
public MessageManager(SimpMessagingTemplate simpMessagingTemplate) {
this.simpMessagingTemplate = simpMessagingTemplate;
}

public void sendMessage(Message message) {
//若使用SimpleBroker可以在这里进行持久化操作,但通常持久化由Broker完成
//可添加鉴权操作
simpMessagingTemplate.convertAndSend(
"/topic/session." + message.getSessionId(),
message
);
}
}

前端示例

前端使用sockjs-client或原生websocket建立双向连接并使用@stomp/stompjs进行STOMP通讯

1
2
3
4
5
6
7
8
9
10
const socket = new SockJS('http://localhost:8080/ws')
const client = new Client({
webSocketFactory: () => socket, //若使用原生WebSocket连接建议直接使用BrokerURL指定端点地址
onConnect: () => {
logger.info("message service connected")
client.subscribe("/topic/session.xxx", (msg) => {
//TODO
})
},
})

其他问题

为什么不合并Broker和应用服务器

SimpleBroker通常只在开发/简单需求环境下使用,生产环境下一般使用外置Broker(如RabbitMQ),比起SimpleBroker外置Broker拥有更多功能:

  • 更多的消息路由类型(exchange、routing key、DLQ等)
  • 持久化消息
  • 支持集群、镜像队列、故障转移
  • 多个应用服务器可连接同一Broker,共享消息状态,同理多个服务也可共享消息状态

queue和topic有什么区别

queue的消息只会被消费一次,既当有多个消费者时所有消费者会消费竞争

topic的消息则会复制成N份,每个订阅者各一份

queue/topic并不是通过协议命令区分,一般会将topic/queue放在路径前缀中,STOMP协议本身没有强制,在,决定其行为的是Broker本身

如何进行鉴权操作

STOMP有消息头,可以在连接帧中将鉴权信息附加到消息头中,在连接时由Spring Boot入站拦截器拦截并进行鉴权操作

前端:

1
2
3
4
5
6
7
8
9
10
11
12
13
const socket = new SockJS('http://localhost:8080/ws')
const client = new Client({
webSocketFactory: () => socket, //若使用原生WebSocket连接建议直接使用BrokerURL指定端点地址
connectHeaders: {
Authorization: "xxx" //此处携带鉴权信息
},
onConnect: () => {
logger.info("message service connected")
client.subscribe("/topic/session.xxx", (msg) => {
//TODO
})
},
})

后端拦截器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Component
@Slf4j
public class AuthChannelInterceptor implements ChannelInterceptor {

@Autowired
public AuthChannelInterceptor(JWTManager jwtManager) {
}

@Override
public Message<?> preSend(@NotNull Message<?> message, @NotNull MessageChannel channel) {
var accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);

if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) {
//TODO:鉴权操作
accessor.getUser(User)//User为实现了Principal接口的对象的
}

return message;
}
}

注意:不要用StompHeaderAccessor.wrap()获取并修改Accessor,wrap()会创建一个新的Accessor对象,导致修改不会被消息携带

1
2
3
public static StompHeaderAccessor wrap(Message<?> message) {
return new StompHeaderAccessor(message);
}

配置类注册拦截器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
@EnableWebSocketMessageBroker
public class STOMPConfig implements WebSocketMessageBrokerConfigurer {
private final AuthChannelInterceptor AuthChannelInterceptor;

@Autowired
public STOMPConfig(AuthChannelInterceptor authChannelInterceptor) {
this.AuthChannelInterceptor = authChannelInterceptor;
}

//...
//其他STOMP配置
//...

@Override
public void configureClientInboundChannel(org.springframework.messaging.simp.config.ChannelRegistration registration) {
registration.interceptors(AuthChannelInterceptor);
}
}

控制层:

1
2
3
4
@MessageMapping("/chat/send")
public void sendMessage(@NotNull @Payload Message message, Principal principal) {
messageManager.sendMessage(message, principal);
}

连接帧中设置的Principal会与WebSocket的sessionId绑定,并在整个连接的生命周期中保留