快捷搜索:  汽车  科技

netty启动多个websocket(基于Nettyredis实现websocket集群方案)

netty启动多个websocket(基于Nettyredis实现websocket集群方案)代码结构:netty包下,如下图所示:三 代码实现二 实现1 当用户连接上来时,需要保存用户与channel的映射关系2 当A用户需要向B用户推送一条消息的时候,需要根据发布到所有节点,看看B用户在那台实例上

一 背景

最近公司在做标注系统,有些用户之间需要进行消息的推送,比如管理员分配了一个任务给标注员时,需要给标注员推送一条消息

之前项目中用的spring-boot自带的websocket spring-boot-starter-websocket集成,代码臃肿,性能不是很好

这次打算用Netty来实现websocket,因为应用后期可能部署集群,用redis的sub pub功能来实现集群(用ZK也可以实现)

二 实现

1 当用户连接上来时,需要保存用户与channel的映射关系

2 当A用户需要向B用户推送一条消息的时候,需要根据发布到所有节点,看看B用户在那台实例上

三 代码实现

代码结构:netty包下,如下图所示:

netty启动多个websocket(基于Nettyredis实现websocket集群方案)(1)

一 、netty服务端启动类:

package com.minivision.label.management.netty; import com.minivision.label.management.util.SysThreadPool; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.Logginghandler; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; /** * @program: label-management-system-backend * @description: netty工具类 * @author: wangjinwei * @create: 2022-01-04 09:34 **/ @Component @Slf4j public class WebSocketServer { /** * webSocket协议名 */ private static final String WEBSOCKET_Protocol = "WebSocket"; /** * 端口号 */ @Value("${netty.server.port:8782}") private int port; private String path = "/home"; private EventLoopGroup bossGroup; private EventLoopGroup workGroup; private LoggingHandler LOGGING_HANDLER; /** * 启动 * * @throws Interruptedexception */ private void start() throws InterruptedException { // bossGroup就是parentGroup,是负责处理TCP/IP连接的 bossGroup = new NioEventLoopGroup(); // workerGroup就是childGroup 是负责处理Channel(通道)的I/O事件 workGroup = new NioEventLoopGroup(); //日志打印 LOGGING_HANDLER = new LoggingHandler(LogLevel.INFO); ServerBootstrap sb = new ServerBootstrap(); //设置全连接队列大小 sb.option(ChannelOption.SO_BACKLOG 1024); sb.group(workGroup bossGroup) .channel(NioServerSocketChannel.class) .localAddress(this.port) .childHandler(new WebSocketInitializer(path WEBSOCKET_PROTOCOL LOGGING_HANDLER)); // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功 ChannelFuture channelFuture = sb.bind(port).sync(); log.info("【Netty服务启动成功========端口:" port "】"); log.info("Server started and listen on:{}" channelFuture.channel().localAddress()); // 成功绑定到端口之后 给channel增加一个 管道关闭的监听器并同步阻塞 直到channel关闭 线程才会往下执行 结束进程。 channelFuture.channel().closeFuture().sync(); } /** * 释放资源 * * @throws InterruptedException */ @PreDestroy public void destroy() throws InterruptedException { if (bossGroup != null) { bossGroup.shutdownGracefully().sync(); } if (workGroup != null) { workGroup.shutdownGracefully().sync(); } } @PostConstruct() public void init() { // 需要开启一个新的线程来执行netty server 服务器 SysThreadPool.getThread().execute(() -> { try { start(); } catch (InterruptedException e) { log.error("【Netty服务启动失败】" e.getMessage() e); Thread.currentThread().interrupt(); } }); } }

二、handler初始化类:

package com.minivision.label.management.netty; import com.minivision.label.management.netty.handler.AuthHandler; import com.minivision.label.management.netty.handler.LoginRequestHandler; import com.minivision.label.management.netty.handler.ServerHeartBeatHandler; import com.minivision.label.management.netty.handler.WebSocketHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.TimeUnit; /** * @program: label-management-system-backend * @description: 初始化handler * @author: wangjinwei * @create: 2022-01-04 15:50 **/ @Slf4j public class WebSocketInitializer extends ChannelInitializer<SocketChannel> { private String path; /** * webSocket协议名 */ private String protocol; private LoggingHandler loggingHandler; public WebSocketInitializer(String path String protocol LoggingHandler loggingHandler) { this.path = path; this.protocol = protocol; this.loggingHandler = loggingHandler; } @Override protected void initChannel(SocketChannel ch) { log.info("收到新连接:" ch.localAddress()); // websocket协议本身是基于Http协议的,所以需要Http解码器 ch.pipeline().addLast("http-codec" new HttpServerCodec()); //netty中的日志handler ch.pipeline().addLast(loggingHandler); // 以块的方式来写的处理器 ch.pipeline().addLast("http-chunked" new ChunkedWriteHandler()); //HttpObjectAggregator的作用是将请求分段再聚合 参数是聚合字节的最大长度,将HTTP消息的多个部分合成一条完整的HTTP消息 说明: // 1、http数据在传输过程中是分段的,HttpObjectAggregator可以将多个段聚合 2、这就是为什么,当浏览器发送大量数据时,就会发送多次http请求 ch.pipeline().addLast("aggregator" new HttpObjectAggregator(8192)); /* * 说明: * 1、对应webSocket,它的数据是以帧(frame)的形式传递 * 2、浏览器请求时 ws://localhost:58080/xxx表示请求的uri * 3、核心功能是将http协议升级为ws协议,保持长连接 * 这个的作用主要是用来解决HTTP握手等问题。虽然可以自己实现,但是推荐采用这个默认的handler,它能够解决很多未知的问题。 */ ch.pipeline().addLast(new WebSocketServerProtocolHandler(path protocol true 65536 * 10)); // 进行设置心跳检测 0表示不监控 ch.pipeline().addLast(new IdleStateHandler(60 0 0 TimeUnit.SECONDS)); //心跳事件 ch.pipeline().addLast(new ServerHeartBeatHandler()); // 配置通道处理 来进行业务处理 ch.pipeline().addLast(new WebSocketHandler()); //登录处理 ch.pipeline().addLast(new LoginRequestHandler()); //后面互发消息可以插拔式校验 ch.pipeline().addLast(new AuthHandler()); } }

三、业务handler:

package com.minivision.label.management.netty.handler; import com.minivision.label.management.netty.protocol.Packet; import com.minivision.label.management.netty.protocol.PacketCodeC; import com.minivision.label.management.netty.util.SessionUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** * @program: label-management-system-backend * @description: netty业务类 * @author: wangjinwei * @create: 2022-01-04 09:34 **/ @Component @Slf4j public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isWritable()) { log.info("channel is writable try to continue flush...."); ctx.flush(); } ctx.fireChannelWritabilityChanged(); } /** * 客户端与服务器建立连接时触发 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("与客户端建立连接,通道开启!channelActive 被调用" ctx.channel().id().asLongText()); } /** * 客户端与服务器关闭连接时触发 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { String channelLongId = ctx.channel().id().asLongText(); log.info("channelInactive 被调用" channelLongId); // 删除通道 SessionUtil.unBindSession(ctx.channel()); } /** * 服务器接收客户端的数据消息 */ @Override protected void channelRead0(ChannelHandlerContext ctx TextWebSocketFrame msg) throws Exception { String content = msg.text(); log.info("服务器收到消息:{}" content); Packet packet = PacketCodeC.INSTANCE.getPacket(content); if (packet == null) { log.error("WebSocketHandler.channelRead0 decode error content={}" content); ctx.channel().close(); } else { ctx.fireChannelRead(packet); } } /** * description token校验 * * @param isAppClient 是否是APP连接 * @param token token * @return Boolean * @author wangjinwei * @date 2019年2月18日下午7:49:32 */ private boolean checkToken(String token Boolean isAppClient) { return true; } @Override public void exceptionCaught(ChannelHandlerContext ctx Throwable cause) throws Exception { log.error("" cause); super.exceptionCaught(ctx cause); } } package com.minivision.label.management.netty.handler; import com.alibaba.fastjson.JSON; import com.minivision.admin.center.open.api.TokenVerifyServiceApi; import com.minivision.admin.center.open.api.dto.TokenVerifyReq; import com.minivision.admin.center.open.api.dto.TokenVerifyResp; import com.minivision.label.management.dto.LoginUser; import com.minivision.label.management.netty.protocol.LoginRequestPacket; import com.minivision.label.management.netty.protocol.LoginResponsePacket; import com.minivision.label.management.netty.util.Attributes; import com.minivision.label.management.netty.util.SessionUtil; import com.minivision.label.management.util.SpringUtil; import com.minivision.maiot.common.base.dto.ResultDTO; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import lombok.extern.slf4j.Slf4j; import java.util.stream.Collectors; /** * @program: label-management-system-backend * @description:登陆请求处理 * @author: wangjinwei * @create: 2022-01-04 09:34 **/ @Slf4j public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> { @Override protected void channelRead0(ChannelHandlerContext ctx LoginRequestPacket loginRequestPacket) throws Exception { log.info("收到客户端请求-----------------"); LoginResponsePacket loginResponsePacket = new LoginResponsePacket(); LoginUser loginUser = getLoginUser(loginRequestPacket.getMtk()); if (loginUser != null) { ctx.channel().attr(Attributes.LOGINUSER).set(loginUser); SessionUtil.bindSession(loginUser ctx.channel()); log.info("LoginRequestHandler---userId={} userName={} : 登录成功!" loginUser.getUserId() loginUser.getUserName()); loginResponsePacket.setSuccess(true); loginResponsePacket.setUserId(loginUser.getUserId()); loginResponsePacket.setUserName(loginUser.getUserName()); //ctx.fireChannelRead(loginRequestPacket.retain()); 这样可以传递到下一个handler继续处理 } else { loginResponsePacket.setSuccess(false); loginResponsePacket.setReason("账号密码校验失败"); log.info("LoginRequestHandler--- {} 登录失败!" loginRequestPacket.getMtk()); } ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(loginResponsePacket))); if (loginUser == null) { log.info("LoginRequestHandler--- {} 登录失败!关闭channel" loginRequestPacket.getMtk()); ctx.channel().close(); } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { SessionUtil.unBindSession(ctx.channel()); } private LoginUser getLoginUser(String mtk) { TokenVerifyReq tokenVerifyReq = new TokenVerifyReq(); tokenVerifyReq.setToken(mtk); TokenVerifyServiceApi tokenVerifyServiceApi = SpringUtil.getBean(TokenVerifyServiceApi.class); ResultDTO<TokenVerifyResp> tokenVerifyResp = tokenVerifyServiceApi.verifyToken(tokenVerifyReq); TokenVerifyResp resData = tokenVerifyResp.getResData(); if (tokenVerifyResp.success() && resData.getStatus() == TokenVerifyResp.ReplyStatus.OK) { // 将认证后的信息传到后端服务 LoginUser loginUser = new LoginUser(); loginUser.setUserId(resData.getUserId()); loginUser.setUserName(resData.getUserName()); loginUser.setRoleIds(resData.getRoles().stream().map(TokenVerifyResp.UserRoleInfo::getRoleId).collect(Collectors.toList())); return loginUser; } return null; } } package com.minivision.label.management.netty.handler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; /** * @program: label-management-system-backend * @description: 心跳 * @author: wangjinwei * @create: 2022-01-04 16:06 **/ @Slf4j public class ServerHeartBeatHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx Object evt) throws Exception { //超时事件 if (evt instanceof IdleStateEvent) { IdleStateEvent idleEvent = (IdleStateEvent) evt; if (idleEvent.state() == IdleState.READER_IDLE) { //关闭通道连接 log.info("ServerHeartBeatHandler.reader_idle"); } else if (idleEvent.state() == IdleState.WRITER_IDLE) { //写 log.info("ServerHeartBeatHandler.writer_idle"); } } super.userEventTriggered(ctx evt); } } package com.minivision.label.management.netty.handler; import com.minivision.label.management.netty.util.SessionUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; /** * @program: label-management-system-backend * @description:权限可插拔 * @author: wangjinwei * @create: 2021-12-28 10:52 **/ @Slf4j public class AuthHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx Object msg) throws Exception { log.info("AuthHandler.channelRead start;"); if (!SessionUtil.hasLogin(ctx.channel())) { ctx.channel().close(); } else { ctx.pipeline().remove(this); super.channelRead(ctx msg); } } }

四、协议接口:

package com.minivision.label.management.netty.protocol.command; /** * @program: label-management-system-backend * @description: 消息类型 * @author: wangjinwei * @create: 2021-12-28 14:07 **/ public interface Command { /** * 登录请求 */ Byte LOGIN_REQUEST = 1; /** *登录响应 */ Byte LOGIN_RESPONSE = 2; /** * 客户端 消息请求 */ Byte MESSAGE_REQUEST = 3; /** * 服务端消息 响应 */ Byte MESSAGE_RESPONSE = 4; } package com.minivision.label.management.netty.protocol; import lombok.Data; import static com.minivision.label.management.netty.protocol.command.Command.LOGIN_REQUEST; /** * @program: label-management-system-backend * @description: 登陆请求消息 * @author: wangjinwei * @create: 2021-12-28 08:57 **/ @Data public class LoginRequestPacket extends Packet { private String mtk; @Override public Byte getCommand() { return LOGIN_REQUEST; } } package com.minivision.label.management.netty.protocol; import lombok.Data; import static com.minivision.label.management.netty.protocol.command.Command.LOGIN_RESPONSE; /** * @program: label-management-system-backend * @description: 登录响应消息体 * @author: wangjinwei * @create: 2021-12-28 14:12 **/ @Data public class LoginResponsePacket extends Packet { /** * 失败原因 */ private String reason; /** * 是否成功 */ private boolean success; /** * 用户id */ private String userId; /** * 用户名 */ private String userName; @Override public Byte getCommand() { return LOGIN_RESPONSE; } } package com.minivision.label.management.netty.protocol; import lombok.Data; import lombok.NoArgsConstructor; import static com.minivision.label.management.netty.protocol.command.Command.MESSAGE_REQUEST; /** * @program: label-management-system-backend * @description: 客户端消息请求 * @author: wangjinwei * @create: 2021-12-28 16:15 **/ @Data @NoArgsConstructor public class MessageRequestPacket extends Packet { private String toUserId; private String message; public MessageRequestPacket(String toUserId String message) { this.message = message; this.toUserId = toUserId; } @Override public Byte getCommand() { return MESSAGE_REQUEST; } } package com.minivision.label.management.netty.protocol; import lombok.Data; import lombok.experimental.Accessors; import static com.minivision.label.management.netty.protocol.command.Command.MESSAGE_RESPONSE; /** * @program: label-management-system-backend * @description: 客户端消息响应 * @author: wangjinwei * @create: 2021-12-28 17:09 **/ @Accessors(chain = true) @Data public class MessageResponsePacket extends Packet { private String fromUserId; private String fromUserName; private Object message; @Override public Byte getCommand() { return MESSAGE_RESPONSE; } } package com.minivision.label.management.netty.protocol; /** * @program: label-management-system-backend * @description: 消息体的父类 * @author: wangjinwei * @create: 2021-12-28 10:43 **/ public abstract class Packet { /** *版本 **/ private Byte version = 1; /** * 消息类型 --登录请求,登录响应、消息请求,消息响应 * * @return */ public abstract Byte getCommand(); } package com.minivision.label.management.netty.protocol; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import java.util.HashMap; import java.util.Map; import static com.minivision.label.management.netty.protocol.command.Command.*; /** * @program: label-management-system-backend * @description: 消息解析 * @author: wangjinwei * @create: 2021-12-28 14:07 **/ @Slf4j public class PacketCodeC { public static final PacketCodeC INSTANCE = new PacketCodeC(); /** * 协议版本类型 */ private final Map<Byte Class<? extends Packet>> packetTypeMap; private PacketCodeC() { packetTypeMap = new HashMap<>(); packetTypeMap.put(LOGIN_REQUEST LoginRequestPacket.class); packetTypeMap.put(LOGIN_RESPONSE LoginResponsePacket.class); packetTypeMap.put(MESSAGE_REQUEST MessageRequestPacket.class); packetTypeMap.put(MESSAGE_RESPONSE MessageResponsePacket.class); } private Class<? extends Packet> getRequestType(byte command) { return packetTypeMap.get(command); } public Packet getPacket(String content) { try { JSONObject jsonObject = JSON.parseObject(content); Byte command = jsonObject.getByte("command"); if (command == null) { log.error("PacketCodeC.getPacket command is null content={}" content); return null; } Class<? extends Packet> classType = getRequestType(command); if (classType == null) { return null; } Packet packet = jsonObject.toJavaObject(classType); return packet; } catch (Exception e) { log.error("PacketCodeC.getPacket" e); } return null; } }

五、用户session

package com.minivision.label.management.netty.util; import com.minivision.label.management.dto.LoginUser; import io.netty.util.AttributeKey; /** * @program: label-management-system-backend * @description: * @author: wangjinwei * @create: 2021-12-27 16:34 **/ public interface Attributes { /** * 登陆标志 */ AttributeKey<Boolean> LOGIN = AttributeKey.newInstance("login"); /** * 登陆用户信息 */ AttributeKey<LoginUser> LOGINUSER = AttributeKey.newInstance("LoginUser"); } package com.minivision.label.management.netty.util; import com.minivision.label.management.dto.LoginUser; import io.netty.channel.Channel; import io.netty.channel.group.ChannelGroup; import lombok.extern.slf4j.Slf4j; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @program: label-management-system-backend * @description: 登陆常量类 * @author: wangjinwei * @create: 2021-12-28 11:09 **/ @Slf4j public class SessionUtil { /** * 用户channel对应关系 */ private static final Map<String Channel> userIdChannelMap = new ConcurrentHashMap<>(); /** * 群组消息 */ private static final Map<String ChannelGroup> groupIdChannelGroup = new ConcurrentHashMap<>(); /** * 绑定用户与通道 * @param loginUser * @param channel */ public static void bindSession(LoginUser loginUser Channel channel) { userIdChannelMap.put(loginUser.getUserId() channel); channel.attr(Attributes.LOGINUSER).set(loginUser); } /** * 获取用户 * @param channel * @return */ public static LoginUser getSession(Channel channel) { return channel.attr(Attributes.LOGINUSER).get(); } public static boolean hasLogin(Channel channel) { return getSession(channel) != null; } public static Channel getChannel(String userId) { return userIdChannelMap.get(userId); } public static void bindChannelGroup(String groupId ChannelGroup channelGroup) { groupIdChannelGroup.put(groupId channelGroup); } public static ChannelGroup getChannelGroup(String groupId) { return groupIdChannelGroup.get(groupId); } /** * 解除绑定关系 * @param channel */ public static void unBindSession(Channel channel) { if (hasLogin(channel)) { LoginUser session = getSession(channel); userIdChannelMap.remove(session.getUserId()); channel.attr(Attributes.LOGINUSER).set(null); log.info(session " 退出登录!"); } } public static Map<String Channel> getUserIdChannelMap() { return userIdChannelMap; } }

六、redis 实现集群:直接调用RedisClient#convertAndSend方法

package com.minivision.label.management.redis; import com.minivision.label.management.service.PushService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import static com.minivision.label.management.common.constant.LabelConstants.TASK_MSG_WS_TOPIC; /** * @Description Redis消息监听者容器: * @author wangjinwei * @date 2021/4/29 15:02 */ @SuppressWarnings({"all"}) @Configuration public class RedisConfig { @Autowired private PushService pushService; /** * redis消息监听器容器 * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理 * * @param connectionFactory * @param listenerAdapter * @return */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory MessageListenerAdapter TopicAdapter MessageListenerAdapter messageListenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //添加消息监听器 container.addMessageListener(TopicAdapter new PatternTopic(TASK_MSG_WS_TOPIC)); return container; } /** * 消息监听器适配器,绑定消息处理器 * * @param receiver * @return */ @Bean MessageListenerAdapter TopicAdapter() { return new MessageListenerAdapter(new TopicListener(pushService)); } } package com.minivision.label.management.redis; import com.alibaba.fastjson.JSON; import com.minivision.label.management.netty.protocol.Packet; import com.minivision.label.management.service.PushService; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; /** * * @author wangjinwei<br> * @version 1.0<br> * @Description: 监听消息<br> * @CreateDate 2021/4/27 9:13 */ @Slf4j public class TopicListener implements MessageListener { private PushService pushService; public TopicListener(PushService pushService) { this.pushService = pushService; } @Override public void onMessage(Message message byte[] bytes) { String msg = message.toString(); RedisWebsocketMsg<Packet> redisWebsocketMsg = JSON.parseObject(msg RedisWebsocketMsg.class); pushService.pushMsg(redisWebsocketMsg.getReceiver() JSON.toJSONString(redisWebsocketMsg.getContent())); } } package com.minivision.label.management.redis; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; /** * Description:redis客户端操作 * * @author wangjinwei * @date 2021/4/28 */ @Slf4j @Component public class RedisClient { @Autowired private StringRedisTemplate redisTemplate; /** * @return void * @Description:发送群消息 * @param: topicName * @param: redisWebsocketMsg * @author wangjinwei * @date 2021/4/30 11:13 */ public void convertAndSend(String topicName RedisWebsocketMsg redisWebsocketMsg) { redisTemplate.convertAndSend(topicName JSON.toJSONString(redisWebsocketMsg)); } } package com.minivision.label.management.redis; import lombok.Data; import lombok.experimental.Accessors; import java.io.Serializable; import java.util.List; /** * websocket redis广播消息 * @author wangjinwei <br> * @version 1.0<br> * @CreateDate 2021/6/23 <br> */ @Accessors(chain = true) @Data public class RedisWebsocketMsg<T> implements Serializable { /** * 序列号 */ private static final long serialVersionUID = -1L; /** * 消息接收者 */ private List<String> receiver; /** * 内容 */ private T content; }

七、最终暴露给其他模块调用的接口:pushService#sendMsg接口

package com.minivision.label.management.service; import com.minivision.label.management.redis.RedisWebsocketMsg; import java.util.List; /** * @program: label-management-system-backend * @description: * @author: wangjinwei * @create: 2021-12-27 15:28 **/ public interface PushService { /** * 推送给指定通道 * @param userId channelId * @param msg msg */ void pushMsgToOne(String userId String msg); /** * 推送给指定用户 * @param userIds channelId * @param msg msg */ void pushMsg(List<String> userIds String msg); /** * @Description: 将消息推送给单个用户 * @param: redisWebsocketMsg * @return void * @author wangjinwei * @date 2021/4/30 10:40 */ void sendMsg(RedisWebsocketMsg redisWebsocketMsg); } package com.minivision.label.management.service.impl; import com.minivision.label.management.netty.util.SessionUtil; import com.minivision.label.management.redis.RedisClient; import com.minivision.label.management.redis.RedisWebsocketMsg; import com.minivision.label.management.service.PushService; import io.netty.channel.Channel; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; import static com.minivision.label.management.common.constant.LabelConstants.TASK_MSG_WS_TOPIC; /** * ━━━━━━佛祖保佑━━━━━━ * ; ; * ;;'( 社 * __ ;;' ' \ 会 * /' '\'~~'~' \ /'\.) 主 * ;( ) / |. 义 * ;' \ /-. ( ) \ 码 * ) / ) / )| 农 * || || \) * (_\ (_\ * ━━━━━━永无BUG━━━━━━ * @program: label-management-system-backend * @description: * @author: wangjinwei * @create: 2021-12-27 15:30 **/ @Service @Slf4j public class PushServiceImpl implements PushService { @Autowired private RedisClient redisClient; @Override public void pushMsgToOne(String userId String msg) { Channel channel = SessionUtil.getChannel(userId); if (channel == null) { log.info("未找到通道信息,数据不推送"); return; } if (!channel.isActive()) { log.info("通道已经关闭,数据不推送"); //移除通道信息 SessionUtil.unBindSession(channel); } long start = System.currentTimeMillis(); log.info("PushServiceImpl.pushMsgToOne userId={} msg={} start={}" userId msg start); channel.writeAndFlush(new TextWebSocketFrame(msg)); log.info("PushServiceImpl.pushMsgToOne userId={} msg={} end={}" userId msg System.currentTimeMillis()); } @Override public void pushMsg(List<String> userIds String msg) { if (CollectionUtils.isEmpty(userIds)) { return; } userIds.forEach(userId -> { pushMsgToOne(userId msg); }); } /** * @Description: 将消息推送给单个用户 * @param: redisWebsocketMsg * @return void * @author wangjinwei * @date 2021/4/30 10:40 */ @Override @SuppressWarnings("all") public void sendMsg(RedisWebsocketMsg redisWebsocketMsg) { long start = System.currentTimeMillis(); //给其他订阅了主题的节点发消息 因为本节点没有 redisClient.convertAndSend(TASK_MSG_WS_TOPIC redisWebsocketMsg); } }

猜您喜欢: