netty启动多个websocket(基于Nettyredis实现websocket集群方案)
netty启动多个websocket(基于Nettyredis实现websocket集群方案)代码结构:netty包下,如下图所示:三 代码实现二 实现1 当用户连接上来时,需要保存用户与channel的映射关系2 当A用户需要向B用户推送一条消息的时候,需要根据发布到所有节点,看看B用户在那台实例上
一 背景
最近公司在做标注系统,有些用户之间需要进行消息的推送,比如管理员分配了一个任务给标注员时,需要给标注员推送一条消息
之前项目中用的spring-boot自带的websocket spring-boot-starter-websocket集成,代码臃肿,性能不是很好
这次打算用Netty来实现websocket,因为应用后期可能部署集群,用redis的sub pub功能来实现集群(用ZK也可以实现)
二 实现
1 当用户连接上来时,需要保存用户与channel的映射关系
2 当A用户需要向B用户推送一条消息的时候,需要根据发布到所有节点,看看B用户在那台实例上
三 代码实现
代码结构:netty包下,如下图所示:
一 、netty服务端启动类:
package com.minivision.label.management.netty;
import com.minivision.label.management.util.SysThreadPool;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.Logginghandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
/**
* @program: label-management-system-backend
* @description: netty工具类
* @author: wangjinwei
* @create: 2022-01-04 09:34
**/
@Component
@Slf4j
public class WebSocketServer {
/**
* webSocket协议名
*/
private static final String WEBSOCKET_Protocol = "WebSocket";
/**
* 端口号
*/
@Value("${netty.server.port:8782}")
private int port;
private String path = "/home";
private EventLoopGroup bossGroup;
private EventLoopGroup workGroup;
private LoggingHandler LOGGING_HANDLER;
/**
* 启动
*
* @throws Interruptedexception
*/
private void start() throws InterruptedException {
// bossGroup就是parentGroup,是负责处理TCP/IP连接的
bossGroup = new NioEventLoopGroup();
// workerGroup就是childGroup 是负责处理Channel(通道)的I/O事件
workGroup = new NioEventLoopGroup();
//日志打印
LOGGING_HANDLER = new LoggingHandler(LogLevel.INFO);
ServerBootstrap sb = new ServerBootstrap();
//设置全连接队列大小
sb.option(ChannelOption.SO_BACKLOG 1024);
sb.group(workGroup bossGroup)
.channel(NioServerSocketChannel.class)
.localAddress(this.port)
.childHandler(new WebSocketInitializer(path WEBSOCKET_PROTOCOL LOGGING_HANDLER));
// 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
ChannelFuture channelFuture = sb.bind(port).sync();
log.info("【Netty服务启动成功========端口:" port "】");
log.info("Server started and listen on:{}" channelFuture.channel().localAddress());
// 成功绑定到端口之后 给channel增加一个 管道关闭的监听器并同步阻塞 直到channel关闭 线程才会往下执行 结束进程。
channelFuture.channel().closeFuture().sync();
}
/**
* 释放资源
*
* @throws InterruptedException
*/
@PreDestroy
public void destroy() throws InterruptedException {
if (bossGroup != null) {
bossGroup.shutdownGracefully().sync();
}
if (workGroup != null) {
workGroup.shutdownGracefully().sync();
}
}
@PostConstruct()
public void init() {
// 需要开启一个新的线程来执行netty server 服务器
SysThreadPool.getThread().execute(() -> {
try {
start();
} catch (InterruptedException e) {
log.error("【Netty服务启动失败】" e.getMessage() e);
Thread.currentThread().interrupt();
}
});
}
}
二、handler初始化类:
package com.minivision.label.management.netty;
import com.minivision.label.management.netty.handler.AuthHandler;
import com.minivision.label.management.netty.handler.LoginRequestHandler;
import com.minivision.label.management.netty.handler.ServerHeartBeatHandler;
import com.minivision.label.management.netty.handler.WebSocketHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
/**
* @program: label-management-system-backend
* @description: 初始化handler
* @author: wangjinwei
* @create: 2022-01-04 15:50
**/
@Slf4j
public class WebSocketInitializer extends ChannelInitializer<SocketChannel> {
private String path;
/**
* webSocket协议名
*/
private String protocol;
private LoggingHandler loggingHandler;
public WebSocketInitializer(String path String protocol LoggingHandler loggingHandler) {
this.path = path;
this.protocol = protocol;
this.loggingHandler = loggingHandler;
}
@Override
protected void initChannel(SocketChannel ch) {
log.info("收到新连接:" ch.localAddress());
// websocket协议本身是基于Http协议的,所以需要Http解码器
ch.pipeline().addLast("http-codec" new HttpServerCodec());
//netty中的日志handler
ch.pipeline().addLast(loggingHandler);
// 以块的方式来写的处理器
ch.pipeline().addLast("http-chunked" new ChunkedWriteHandler());
//HttpObjectAggregator的作用是将请求分段再聚合 参数是聚合字节的最大长度,将HTTP消息的多个部分合成一条完整的HTTP消息 说明:
// 1、http数据在传输过程中是分段的,HttpObjectAggregator可以将多个段聚合 2、这就是为什么,当浏览器发送大量数据时,就会发送多次http请求
ch.pipeline().addLast("aggregator" new HttpObjectAggregator(8192));
/*
* 说明:
* 1、对应webSocket,它的数据是以帧(frame)的形式传递
* 2、浏览器请求时 ws://localhost:58080/xxx表示请求的uri
* 3、核心功能是将http协议升级为ws协议,保持长连接
* 这个的作用主要是用来解决HTTP握手等问题。虽然可以自己实现,但是推荐采用这个默认的handler,它能够解决很多未知的问题。
*/
ch.pipeline().addLast(new WebSocketServerProtocolHandler(path protocol true 65536 * 10));
// 进行设置心跳检测 0表示不监控
ch.pipeline().addLast(new IdleStateHandler(60 0 0 TimeUnit.SECONDS));
//心跳事件
ch.pipeline().addLast(new ServerHeartBeatHandler());
// 配置通道处理 来进行业务处理
ch.pipeline().addLast(new WebSocketHandler());
//登录处理
ch.pipeline().addLast(new LoginRequestHandler());
//后面互发消息可以插拔式校验
ch.pipeline().addLast(new AuthHandler());
}
}
三、业务handler:
package com.minivision.label.management.netty.handler;
import com.minivision.label.management.netty.protocol.Packet;
import com.minivision.label.management.netty.protocol.PacketCodeC;
import com.minivision.label.management.netty.util.SessionUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @program: label-management-system-backend
* @description: netty业务类
* @author: wangjinwei
* @create: 2022-01-04 09:34
**/
@Component
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isWritable()) {
log.info("channel is writable try to continue flush....");
ctx.flush();
}
ctx.fireChannelWritabilityChanged();
}
/**
* 客户端与服务器建立连接时触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("与客户端建立连接,通道开启!channelActive 被调用" ctx.channel().id().asLongText());
}
/**
* 客户端与服务器关闭连接时触发
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String channelLongId = ctx.channel().id().asLongText();
log.info("channelInactive 被调用" channelLongId);
// 删除通道
SessionUtil.unBindSession(ctx.channel());
}
/**
* 服务器接收客户端的数据消息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx TextWebSocketFrame msg) throws Exception {
String content = msg.text();
log.info("服务器收到消息:{}" content);
Packet packet = PacketCodeC.INSTANCE.getPacket(content);
if (packet == null) {
log.error("WebSocketHandler.channelRead0 decode error content={}" content);
ctx.channel().close();
} else {
ctx.fireChannelRead(packet);
}
}
/**
* description token校验
*
* @param isAppClient 是否是APP连接
* @param token token
* @return Boolean
* @author wangjinwei
* @date 2019年2月18日下午7:49:32
*/
private boolean checkToken(String token Boolean isAppClient) {
return true;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx Throwable cause) throws Exception {
log.error("" cause);
super.exceptionCaught(ctx cause);
}
}
package com.minivision.label.management.netty.handler;
import com.alibaba.fastjson.JSON;
import com.minivision.admin.center.open.api.TokenVerifyServiceApi;
import com.minivision.admin.center.open.api.dto.TokenVerifyReq;
import com.minivision.admin.center.open.api.dto.TokenVerifyResp;
import com.minivision.label.management.dto.LoginUser;
import com.minivision.label.management.netty.protocol.LoginRequestPacket;
import com.minivision.label.management.netty.protocol.LoginResponsePacket;
import com.minivision.label.management.netty.util.Attributes;
import com.minivision.label.management.netty.util.SessionUtil;
import com.minivision.label.management.util.SpringUtil;
import com.minivision.maiot.common.base.dto.ResultDTO;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import java.util.stream.Collectors;
/**
* @program: label-management-system-backend
* @description:登陆请求处理
* @author: wangjinwei
* @create: 2022-01-04 09:34
**/
@Slf4j
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx LoginRequestPacket loginRequestPacket) throws Exception {
log.info("收到客户端请求-----------------");
LoginResponsePacket loginResponsePacket = new LoginResponsePacket();
LoginUser loginUser = getLoginUser(loginRequestPacket.getMtk());
if (loginUser != null) {
ctx.channel().attr(Attributes.LOGINUSER).set(loginUser);
SessionUtil.bindSession(loginUser ctx.channel());
log.info("LoginRequestHandler---userId={} userName={} : 登录成功!" loginUser.getUserId() loginUser.getUserName());
loginResponsePacket.setSuccess(true);
loginResponsePacket.setUserId(loginUser.getUserId());
loginResponsePacket.setUserName(loginUser.getUserName());
//ctx.fireChannelRead(loginRequestPacket.retain()); 这样可以传递到下一个handler继续处理
} else {
loginResponsePacket.setSuccess(false);
loginResponsePacket.setReason("账号密码校验失败");
log.info("LoginRequestHandler--- {} 登录失败!" loginRequestPacket.getMtk());
}
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(loginResponsePacket)));
if (loginUser == null) {
log.info("LoginRequestHandler--- {} 登录失败!关闭channel" loginRequestPacket.getMtk());
ctx.channel().close();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
SessionUtil.unBindSession(ctx.channel());
}
private LoginUser getLoginUser(String mtk) {
TokenVerifyReq tokenVerifyReq = new TokenVerifyReq();
tokenVerifyReq.setToken(mtk);
TokenVerifyServiceApi tokenVerifyServiceApi = SpringUtil.getBean(TokenVerifyServiceApi.class);
ResultDTO<TokenVerifyResp> tokenVerifyResp = tokenVerifyServiceApi.verifyToken(tokenVerifyReq);
TokenVerifyResp resData = tokenVerifyResp.getResData();
if (tokenVerifyResp.success() && resData.getStatus() == TokenVerifyResp.ReplyStatus.OK) {
// 将认证后的信息传到后端服务
LoginUser loginUser = new LoginUser();
loginUser.setUserId(resData.getUserId());
loginUser.setUserName(resData.getUserName());
loginUser.setRoleIds(resData.getRoles().stream().map(TokenVerifyResp.UserRoleInfo::getRoleId).collect(Collectors.toList()));
return loginUser;
}
return null;
}
}
package com.minivision.label.management.netty.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
/**
* @program: label-management-system-backend
* @description: 心跳
* @author: wangjinwei
* @create: 2022-01-04 16:06
**/
@Slf4j
public class ServerHeartBeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx Object evt) throws Exception {
//超时事件
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleEvent = (IdleStateEvent) evt;
if (idleEvent.state() == IdleState.READER_IDLE) {
//关闭通道连接
log.info("ServerHeartBeatHandler.reader_idle");
} else if (idleEvent.state() == IdleState.WRITER_IDLE) {
//写
log.info("ServerHeartBeatHandler.writer_idle");
}
}
super.userEventTriggered(ctx evt);
}
}
package com.minivision.label.management.netty.handler;
import com.minivision.label.management.netty.util.SessionUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
/**
* @program: label-management-system-backend
* @description:权限可插拔
* @author: wangjinwei
* @create: 2021-12-28 10:52
**/
@Slf4j
public class AuthHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx Object msg) throws Exception {
log.info("AuthHandler.channelRead start;");
if (!SessionUtil.hasLogin(ctx.channel())) {
ctx.channel().close();
} else {
ctx.pipeline().remove(this);
super.channelRead(ctx msg);
}
}
}
四、协议接口:
package com.minivision.label.management.netty.protocol.command;
/**
* @program: label-management-system-backend
* @description: 消息类型
* @author: wangjinwei
* @create: 2021-12-28 14:07
**/
public interface Command {
/**
* 登录请求
*/
Byte LOGIN_REQUEST = 1;
/**
*登录响应
*/
Byte LOGIN_RESPONSE = 2;
/**
* 客户端 消息请求
*/
Byte MESSAGE_REQUEST = 3;
/**
* 服务端消息 响应
*/
Byte MESSAGE_RESPONSE = 4;
}
package com.minivision.label.management.netty.protocol;
import lombok.Data;
import static com.minivision.label.management.netty.protocol.command.Command.LOGIN_REQUEST;
/**
* @program: label-management-system-backend
* @description: 登陆请求消息
* @author: wangjinwei
* @create: 2021-12-28 08:57
**/
@Data
public class LoginRequestPacket extends Packet {
private String mtk;
@Override
public Byte getCommand() {
return LOGIN_REQUEST;
}
}
package com.minivision.label.management.netty.protocol;
import lombok.Data;
import static com.minivision.label.management.netty.protocol.command.Command.LOGIN_RESPONSE;
/**
* @program: label-management-system-backend
* @description: 登录响应消息体
* @author: wangjinwei
* @create: 2021-12-28 14:12
**/
@Data
public class LoginResponsePacket extends Packet {
/**
* 失败原因
*/
private String reason;
/**
* 是否成功
*/
private boolean success;
/**
* 用户id
*/
private String userId;
/**
* 用户名
*/
private String userName;
@Override
public Byte getCommand() {
return LOGIN_RESPONSE;
}
}
package com.minivision.label.management.netty.protocol;
import lombok.Data;
import lombok.NoArgsConstructor;
import static com.minivision.label.management.netty.protocol.command.Command.MESSAGE_REQUEST;
/**
* @program: label-management-system-backend
* @description: 客户端消息请求
* @author: wangjinwei
* @create: 2021-12-28 16:15
**/
@Data
@NoArgsConstructor
public class MessageRequestPacket extends Packet {
private String toUserId;
private String message;
public MessageRequestPacket(String toUserId String message) {
this.message = message;
this.toUserId = toUserId;
}
@Override
public Byte getCommand() {
return MESSAGE_REQUEST;
}
}
package com.minivision.label.management.netty.protocol;
import lombok.Data;
import lombok.experimental.Accessors;
import static com.minivision.label.management.netty.protocol.command.Command.MESSAGE_RESPONSE;
/**
* @program: label-management-system-backend
* @description: 客户端消息响应
* @author: wangjinwei
* @create: 2021-12-28 17:09
**/
@Accessors(chain = true)
@Data
public class MessageResponsePacket extends Packet {
private String fromUserId;
private String fromUserName;
private Object message;
@Override
public Byte getCommand() {
return MESSAGE_RESPONSE;
}
}
package com.minivision.label.management.netty.protocol;
/**
* @program: label-management-system-backend
* @description: 消息体的父类
* @author: wangjinwei
* @create: 2021-12-28 10:43
**/
public abstract class Packet {
/**
*版本
**/
private Byte version = 1;
/**
* 消息类型 --登录请求,登录响应、消息请求,消息响应
*
* @return
*/
public abstract Byte getCommand();
}
package com.minivision.label.management.netty.protocol;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import static com.minivision.label.management.netty.protocol.command.Command.*;
/**
* @program: label-management-system-backend
* @description: 消息解析
* @author: wangjinwei
* @create: 2021-12-28 14:07
**/
@Slf4j
public class PacketCodeC {
public static final PacketCodeC INSTANCE = new PacketCodeC();
/**
* 协议版本类型
*/
private final Map<Byte Class<? extends Packet>> packetTypeMap;
private PacketCodeC() {
packetTypeMap = new HashMap<>();
packetTypeMap.put(LOGIN_REQUEST LoginRequestPacket.class);
packetTypeMap.put(LOGIN_RESPONSE LoginResponsePacket.class);
packetTypeMap.put(MESSAGE_REQUEST MessageRequestPacket.class);
packetTypeMap.put(MESSAGE_RESPONSE MessageResponsePacket.class);
}
private Class<? extends Packet> getRequestType(byte command) {
return packetTypeMap.get(command);
}
public Packet getPacket(String content) {
try {
JSONObject jsonObject = JSON.parseObject(content);
Byte command = jsonObject.getByte("command");
if (command == null) {
log.error("PacketCodeC.getPacket command is null content={}" content);
return null;
}
Class<? extends Packet> classType = getRequestType(command);
if (classType == null) {
return null;
}
Packet packet = jsonObject.toJavaObject(classType);
return packet;
} catch (Exception e) {
log.error("PacketCodeC.getPacket" e);
}
return null;
}
}
五、用户session
package com.minivision.label.management.netty.util;
import com.minivision.label.management.dto.LoginUser;
import io.netty.util.AttributeKey;
/**
* @program: label-management-system-backend
* @description:
* @author: wangjinwei
* @create: 2021-12-27 16:34
**/
public interface Attributes {
/**
* 登陆标志
*/
AttributeKey<Boolean> LOGIN = AttributeKey.newInstance("login");
/**
* 登陆用户信息
*/
AttributeKey<LoginUser> LOGINUSER = AttributeKey.newInstance("LoginUser");
}
package com.minivision.label.management.netty.util;
import com.minivision.label.management.dto.LoginUser;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @program: label-management-system-backend
* @description: 登陆常量类
* @author: wangjinwei
* @create: 2021-12-28 11:09
**/
@Slf4j
public class SessionUtil {
/**
* 用户channel对应关系
*/
private static final Map<String Channel> userIdChannelMap = new ConcurrentHashMap<>();
/**
* 群组消息
*/
private static final Map<String ChannelGroup> groupIdChannelGroup = new ConcurrentHashMap<>();
/**
* 绑定用户与通道
* @param loginUser
* @param channel
*/
public static void bindSession(LoginUser loginUser Channel channel) {
userIdChannelMap.put(loginUser.getUserId() channel);
channel.attr(Attributes.LOGINUSER).set(loginUser);
}
/**
* 获取用户
* @param channel
* @return
*/
public static LoginUser getSession(Channel channel) {
return channel.attr(Attributes.LOGINUSER).get();
}
public static boolean hasLogin(Channel channel) {
return getSession(channel) != null;
}
public static Channel getChannel(String userId) {
return userIdChannelMap.get(userId);
}
public static void bindChannelGroup(String groupId ChannelGroup channelGroup) {
groupIdChannelGroup.put(groupId channelGroup);
}
public static ChannelGroup getChannelGroup(String groupId) {
return groupIdChannelGroup.get(groupId);
}
/**
* 解除绑定关系
* @param channel
*/
public static void unBindSession(Channel channel) {
if (hasLogin(channel)) {
LoginUser session = getSession(channel);
userIdChannelMap.remove(session.getUserId());
channel.attr(Attributes.LOGINUSER).set(null);
log.info(session " 退出登录!");
}
}
public static Map<String Channel> getUserIdChannelMap() {
return userIdChannelMap;
}
}
六、redis 实现集群:直接调用RedisClient#convertAndSend方法
package com.minivision.label.management.redis;
import com.minivision.label.management.service.PushService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import static com.minivision.label.management.common.constant.LabelConstants.TASK_MSG_WS_TOPIC;
/**
* @Description Redis消息监听者容器:
* @author wangjinwei
* @date 2021/4/29 15:02
*/
@SuppressWarnings({"all"})
@Configuration
public class RedisConfig {
@Autowired
private PushService pushService;
/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
*
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory
MessageListenerAdapter TopicAdapter MessageListenerAdapter messageListenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//添加消息监听器
container.addMessageListener(TopicAdapter new PatternTopic(TASK_MSG_WS_TOPIC));
return container;
}
/**
* 消息监听器适配器,绑定消息处理器
*
* @param receiver
* @return
*/
@Bean
MessageListenerAdapter TopicAdapter() {
return new MessageListenerAdapter(new TopicListener(pushService));
}
}
package com.minivision.label.management.redis;
import com.alibaba.fastjson.JSON;
import com.minivision.label.management.netty.protocol.Packet;
import com.minivision.label.management.service.PushService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
/**
*
* @author wangjinwei<br>
* @version 1.0<br>
* @Description: 监听消息<br>
* @CreateDate 2021/4/27 9:13
*/
@Slf4j
public class TopicListener implements MessageListener {
private PushService pushService;
public TopicListener(PushService pushService) {
this.pushService = pushService;
}
@Override
public void onMessage(Message message byte[] bytes) {
String msg = message.toString();
RedisWebsocketMsg<Packet> redisWebsocketMsg = JSON.parseObject(msg RedisWebsocketMsg.class);
pushService.pushMsg(redisWebsocketMsg.getReceiver() JSON.toJSONString(redisWebsocketMsg.getContent()));
}
}
package com.minivision.label.management.redis;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
/**
* Description:redis客户端操作
*
* @author wangjinwei
* @date 2021/4/28
*/
@Slf4j
@Component
public class RedisClient {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* @return void
* @Description:发送群消息
* @param: topicName
* @param: redisWebsocketMsg
* @author wangjinwei
* @date 2021/4/30 11:13
*/
public void convertAndSend(String topicName RedisWebsocketMsg redisWebsocketMsg) {
redisTemplate.convertAndSend(topicName JSON.toJSONString(redisWebsocketMsg));
}
}
package com.minivision.label.management.redis;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
import java.util.List;
/**
* websocket redis广播消息
* @author wangjinwei <br>
* @version 1.0<br>
* @CreateDate 2021/6/23 <br>
*/
@Accessors(chain = true)
@Data
public class RedisWebsocketMsg<T> implements Serializable {
/**
* 序列号
*/
private static final long serialVersionUID = -1L;
/**
* 消息接收者
*/
private List<String> receiver;
/**
* 内容
*/
private T content;
}
七、最终暴露给其他模块调用的接口:pushService#sendMsg接口
package com.minivision.label.management.service;
import com.minivision.label.management.redis.RedisWebsocketMsg;
import java.util.List;
/**
* @program: label-management-system-backend
* @description:
* @author: wangjinwei
* @create: 2021-12-27 15:28
**/
public interface PushService {
/**
* 推送给指定通道
* @param userId channelId
* @param msg msg
*/
void pushMsgToOne(String userId String msg);
/**
* 推送给指定用户
* @param userIds channelId
* @param msg msg
*/
void pushMsg(List<String> userIds String msg);
/**
* @Description: 将消息推送给单个用户
* @param: redisWebsocketMsg
* @return void
* @author wangjinwei
* @date 2021/4/30 10:40
*/
void sendMsg(RedisWebsocketMsg redisWebsocketMsg);
}
package com.minivision.label.management.service.impl;
import com.minivision.label.management.netty.util.SessionUtil;
import com.minivision.label.management.redis.RedisClient;
import com.minivision.label.management.redis.RedisWebsocketMsg;
import com.minivision.label.management.service.PushService;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import static com.minivision.label.management.common.constant.LabelConstants.TASK_MSG_WS_TOPIC;
/**
* ━━━━━━佛祖保佑━━━━━━
* ; ;
* ;;'( 社
* __ ;;' ' \ 会
* /' '\'~~'~' \ /'\.) 主
* ;( ) / |. 义
* ;' \ /-. ( ) \ 码
* ) / ) / )| 农
* || || \)
* (_\ (_\
* ━━━━━━永无BUG━━━━━━
* @program: label-management-system-backend
* @description:
* @author: wangjinwei
* @create: 2021-12-27 15:30
**/
@Service
@Slf4j
public class PushServiceImpl implements PushService {
@Autowired
private RedisClient redisClient;
@Override
public void pushMsgToOne(String userId String msg) {
Channel channel = SessionUtil.getChannel(userId);
if (channel == null) {
log.info("未找到通道信息,数据不推送");
return;
}
if (!channel.isActive()) {
log.info("通道已经关闭,数据不推送");
//移除通道信息
SessionUtil.unBindSession(channel);
}
long start = System.currentTimeMillis();
log.info("PushServiceImpl.pushMsgToOne userId={} msg={} start={}" userId msg start);
channel.writeAndFlush(new TextWebSocketFrame(msg));
log.info("PushServiceImpl.pushMsgToOne userId={} msg={} end={}" userId msg System.currentTimeMillis());
}
@Override
public void pushMsg(List<String> userIds String msg) {
if (CollectionUtils.isEmpty(userIds)) {
return;
}
userIds.forEach(userId -> {
pushMsgToOne(userId msg);
});
}
/**
* @Description: 将消息推送给单个用户
* @param: redisWebsocketMsg
* @return void
* @author wangjinwei
* @date 2021/4/30 10:40
*/
@Override
@SuppressWarnings("all")
public void sendMsg(RedisWebsocketMsg redisWebsocketMsg) {
long start = System.currentTimeMillis();
//给其他订阅了主题的节点发消息 因为本节点没有
redisClient.convertAndSend(TASK_MSG_WS_TOPIC redisWebsocketMsg);
}
}