Introduction
January 7, 2022
A springboot websocket clustered message push component written by netty.
This is a websocket component written with netty that supports cluster broadcasting, which perfectly solves the problem that websocket cannot communicate with the entire business cluster when establishing a connection with user.
Maven Dependencies
<dependency>
<groupId>plus.jdk</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>1.1.1</version>
</dependency>
Configuration
plus.jdk.websocket.enabled=true
# Specify host
plus.jdk.websocket.host=0.0.0.0
# Specify the websocket port
plus.jdk.websocket.port=10001
# Specify a custom-implemented validator
plus.jdk.websocket.session-authenticator=plus.jdk.broadcast.test.session.WSSessionAuthenticator
# The number of threads in the boss thread pool, the default is 1
plus.jdk.websocket.boss-loop-group-threads=1
# The number of worker thread pool threads, if not specified, the default is the number of CPU cores * 2
plus.jdk.websocket.worker-loop-group-threads=5
# Whether to allow cross-domain
plus.jdk.websocket.cors-allow-credentials=true
# cross-domain header
plus.jdk.websocket.cors-origins[0]=""
# Whether to use NioEventLoopGroup to handle requests
plus.jdk.websocket.use-event-executor-group=true
# Specify the number of NioEventLoopGroup thread pools
plus.jdk.websocket.event-executor-group-threads=0
# connection timeout
#plus.jdk.websocket.connect-timeout-millis=
# Specifies the maximum number of connections that the kernel queues for this socket
#plus.jdk.websocket.SO_BACKLOG=
# The spin count is used to control how many times the underlying socket.write(...) is called per Netty write operation
#plus.jdk.websocket.write-spin-count=
# log level
plus.jdk.websocket.log-level=debug
# udp broadcast listening port, if it is less than or equal to 0, it will not listen for messages
plus.jdk.websocket.broadcast-monitor-port=10300
# Whether to print the received UDP broadcast information to the log
plus.jdk.websocket.print-broadcast-message=true
Example of use
Implement session authentication related logic according to your own business
Define the implementation of the business on the session
It is enough to implement the IWsSession
interface for session authentication.
This interface encapsulates a userId
, the parameter user can customize its type. There is also a channel which is the connection between the current machine and the user
package plus.jdk.broadcast.test.session;
import io.netty.channel.Channel;
import lombok.AllArgsConstructor;
import lombok.Data;
import plus.jdk.websocket.model.IWsSession;
@Data
@AllArgsConstructor
public class MyWsSession implements IWsSession<String> {
private String userId;
private Channel channel;
}
Session
validator
Custom The authenticator must implement the IWSSessionAuthenticator
interface. The usage example is as follows:
import io.netty.channel.Channel;
import io.netty.handler.codec.http.FullHttpRequest;
import org.springframework.stereotype.Component;
import plus.jdk.broadcast.model.Monitor;
import plus.jdk.websocket.common.HttpWsRequest;
import plus.jdk.websocket.global.IWSSessionAuthenticatorManager;
import plus.jdk.websocket.model.IWsSession;
import plus.jdk.websocket.properties.WebsocketProperties;
@Component
public class WSSessionAuthenticator implements IWSSessionAuthenticatorManager<String, MyWsSession> {
/**
* The session information is verified in the connection handshake stage.
* If the verification fails, an exception is thrown directly to terminate the handshake process.
* If the verification is successful, return a custom session, and use a public storage service such as redis to
* record which machine the current user has established a connection with
*/
@Override
public MyWsSession authenticate(Channel channel, FullHttpRequest req, String path, WebsocketProperties properties) {
HttpWsRequest httpWsRequest = new HttpWsRequest(req);
String uid = httpWsRequest.getQueryValue("uid");
return new MyWsSession(uid, channel);
}
/**
* When the connection is disconnected, the callback for destroying the session
*/
@Override
public void onSessionDestroy(IWsSession<?> session, String path, WebsocketProperties properties) {
}
/**
* Returns which machines the current user has established a connection with, and needs to send broadcast push messages to these machines
*/
@Override
public Monitor[] getUserConnectedMachine(String userId, String path, WebsocketProperties properties) {
return new Monitor[]{new Monitor("127.0.0.1", properties.getBroadcastMonitorPort())};
}
}
Specify the validator component via configuration
plus.jdk.websocket.session-authenticator=plus.jdk.broadcast.test.session.WSSessionAuthenticator
Write relevant business logic
As above, after implementing Session authentication, you can continue to write the business logic of websocket.
package plus.jdk.broadcast.test.websocket;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestParam;
import plus.jdk.broadcast.test.session.MyWsSession;
import plus.jdk.websocket.annotations.*;
@WebsocketHandler(values = {"/ws/message"})
public class DemoHandler {
@OnWsHandshake
public void doHandshake(FullHttpRequest req, MyWsSession session) {
}
@OnWsOpen
public void onOpen(Channel channel, FullHttpRequest req, MyWsSession session, HttpHeaders headers, @RequestParam String uid) {
session.sendText("onOpen" + System.currentTimeMillis());
}
@OnWsMessage
public void onWsMessage(MyWsSession session, String data) {
session.sendText("onWsMessage" + System.currentTimeMillis());
session.sendText("receive data" + data);
session.sendText("onWsMessage, id:" + session.getChannel().id().asShortText());
}
@OnWsEvent
public void onWsEvent(Object data, MyWsSession session) {
session.sendText("onWsEvent" + System.currentTimeMillis());
}
@OnWsBinary
public void OnWsBinary(MyWsSession session, byte[] data) {
session.sendText("OnWsBinary" + System.currentTimeMillis());
}
@OnWsError
public void onWsError(MyWsSession session, Throwable throwable) {
session.sendText("onWsError" + throwable.getMessage());
}
@OnWsClose
public void onWsClose(MyWsSession session){
session.sendText("onWsClose" + System.currentTimeMillis());
}
}
Actively push messages to user clients using websocket connections
package plus.jdk.broadcast.test.controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import plus.jdk.websocket.global.SessionGroupManager;
import plus.jdk.websocket.model.IWsSession;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentLinkedDeque;
@RestController
public class MessageController {
/**
* The bean instance is already encapsulated at the bottom
*/
@Resource
private SessionGroupManager sessionGroupManager;
@RequestMapping(value = "/message/send", method = {RequestMethod.GET})
public Object sendMessage(@RequestParam String uid, @RequestParam String content) {
// Call the sessionGroupManager.getSession() function to get all connections of the current user in this instance.
// You can implement your own session definition in the implementation of IWSSessionAuthenticator to distribute messages to different devices
// Or report to the remote end which machines the current user is connected to
ConcurrentLinkedDeque<IWsSession<?>> sessions = sessionGroupManager.getSession(uid, "/ws/message");
// send text message,
// If you implement the getUserConnectedMachine method in the IWSSessionAuthenticatorManager interface as required,
// it will send a broadcast and push message to the machine that has established a connection with the user
sessionGroupManager.sendText(uid, "/ws/message", content);
// send binary message
sessionGroupManager.sendBinary(uid, "/ws/message", content.getBytes(StandardCharsets.UTF_8));
return "success";
}
}
Loading...