快捷搜索:  汽车  科技

聊天室websocket百万级别推送方案(多人聊天室WebSocket集群)

聊天室websocket百万级别推送方案(多人聊天室WebSocket集群)当我们要实现分布式的时候,我们则需要在各个机器上共享这些信息,所以我们需要一个Publish/Subscribe的中间件。我们现在使用Redis作为我们的解决方案。如何改造为分布式集群WebSocket集群/分布式改造:实现多人在线聊天室为何要改造为分布式集群总结一下:实现了分布式WebSocket后,我们可以将流量负载均衡到不同的服务器上并提供一种通信机制让各个服务器能进行消息同步(不然用户A连上服务器A,用户B脸上服务器B,它们发消息的时候对方都没法收到)。

文章来源:https://dwz.cn/jPGInM6l作者:蛮三刀把刀

书接上文,我们开始对我们的小小聊天室进行集群化改造。

本文内容摘要:

  • 为何要改造为分布式集群
  • 如何改造为分布式集群
  • 用户在聊天室集群如何发消息
  • 用户在聊天室集群如何接收消息
  • 补充知识点:STOMP 简介
  • 功能一:向聊天室集群中的全体用户发消息——redis的订阅/发布
  • 功能二:集群集群用户上下线通知——Redis订阅发布
  • 功能三:集群用户信息维护——Redis集合
  • WebSocket集群还有哪些可能性

本文源码:(妈妈再也不用担心我无法复现文章代码啦)

https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/集群版

正文

WebSocket集群/分布式改造:实现多人在线聊天室

为何要改造为分布式集群

分布式就是为了解决单点故障问题,想象一下,如果一个服务器承载了1000个大佬同时聊天,服务器突然挂了,1000个大佬瞬间全部掉线,大概明天你就被大佬们吊起来打了。当聊天室改为集群后,就算服务器A挂了,服务器B上聊天的大佬们还可以愉快的聊天,并且在前端还能通过代码,让连接A的大佬们快速重连至存活的服务器B,继续和大家愉快的聊天,岂不美哉!

总结一下:实现了分布式WebSocket后,我们可以将流量负载均衡到不同的服务器上并提供一种通信机制让各个服务器能进行消息同步(不然用户A连上服务器A,用户B脸上服务器B,它们发消息的时候对方都没法收到)。

如何改造为分布式集群

当我们要实现分布式的时候,我们则需要在各个机器上共享这些信息,所以我们需要一个Publish/Subscribe的中间件。我们现在使用Redis作为我们的解决方案。

1. 用户在聊天室集群如何发消息

假设我们的聊天室集群有服务器A和B,用户Alice连接在A上,Bob连接在B上、

Alice向聊天室的服务器A发送消息,A服务器必须要将收到的消息转发到Redis,才能保证聊天室集群的所有服务器(也就是A和B)能够拿到消息。否则,只有Alice在的服务器A能够读到消息,用户Bob在的服务器B并不能收到消息,A和B也就无法聊天了。

可以看到,我们在代码里监听了redis频道msgToAll,这个是在application.properties定义的,当然如果你懒得定义,这里可以写死。

5. 聊天室集群:发消息改造

我们单机聊天室的发送消息Controller是这样的:

@MessageMapping("/chat.sendMessage") @SendTo("/topic/public") public ChatMessage sendMessage(@Payload ChatMessage chatMessage) { return chatMessage;

我们前端发给我们消息后,直接给/topic/public转发这个消息,让其他用户收到。

在集群中,我们需要把消息转发给Redis,并且不转发给前端,而是让服务端监听Redis消息,在进行消息发送。

将Controller改为:

聊天室websocket百万级别推送方案(多人聊天室WebSocket集群)(1)

你会发现我们在代码中使用了JsonUtil将实体类ChatMessage转为了Json发送给了Redis,这个Json工具类需要使用到FaskJson依赖:

  1. pom添加FastJson依赖

<!-- json --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.58</version> </dependency>

2.添加Json解析工具类JsonUtil,提供对象转Json,Json转对象的能力

聊天室websocket百万级别推送方案(多人聊天室WebSocket集群)(2)

这样,我们接收到用户发送消息的请求时,就将消息转发给了redis的频道websocket.msgToAll

6. 聊天室集群:接收消息改造

单机的聊天室,我们接收消息是通过Controller直接把消息转发到所有人的频道上,这样就能在所有人的聊天框显示。

在集群中,我们需要服务器把消息从Redis中拿出来,并且推送到自己管的用户那边,我们在Service层实现消息的推送。

  • 在处理消息之后发送消息:
  • 正如前面看到的那样,使用 @MessageMapping 或者 @SubscribeMapping 注解可以处理客户端发送过来的消息,并选择方法是否有返回值。
  • 如果 @MessageMapping注解的控制器方法有返回值的话,返回值会被发送到消息代理,只不过会添加上"/topic"前缀。可以使用@SendTo 重写消息目的地;
  • 如果 @SubscribeMapping注解的控制器方法有返回值的话,返回值会直接发送到客户端,不经过代理。如果加上@SendTo 注解的话,则要经过消息代理。
  • 在应用的任意地方发送消息:
  • spring-websocket 定义了一个 SimpMessageSendingOperations 接口(或者使用SimpMessagingTemplate ),可以实现自由的向任意目的地发送消息,并且订阅此目的地的所有用户都能收到消息。

我们在service实现发送,需要使用上述第二种方法。

新建类service/ChatService:

聊天室websocket百万级别推送方案(多人聊天室WebSocket集群)(3)

我们在哪里调用这个service呢,我们需要在监听到消息后调用,所以我们就要有下面的Redis监听消息处理专用类

新建类redis/RedisListenerHandle:

聊天室websocket百万级别推送方案(多人聊天室WebSocket集群)(4)

聊天室websocket百万级别推送方案(多人聊天室WebSocket集群)(5)

7. 看看效果

这样,我们的改造就基本完成了!我们看一下效果

我们将服务器运行在8080上,然后打开localhost:8080,起名Alice进入聊天室

随后,我们在application.properties中将端口server.port=8081

再次运行程序(别忘了开启IDEA的“允许启动多个并行服务”设置,不然会覆盖掉你的8080服务,如下图),在8081启动一个聊天室,起名Bob进入聊天室。

聊天室websocket百万级别推送方案(多人聊天室WebSocket集群)(6)

如下两图,我们已经可以在不同端口的两个聊天室,互相聊天了!(注意看url)

聊天室websocket百万级别推送方案(多人聊天室WebSocket集群)(7)

聊天室websocket百万级别推送方案(多人聊天室WebSocket集群)(8)

在互相发送消息是,我们还可以使用命令行监听下Redis的频道websocket.msgToAll,可以看到双方传送的消息。如下图:

聊天室websocket百万级别推送方案(多人聊天室WebSocket集群)(9)

我们还可以打开Chrome的F12控制台,查看前端的控制台发送消息的log,如下图:

聊天室websocket百万级别推送方案(多人聊天室WebSocket集群)(10)

大功告成了吗?

功能实现了,但是并不完美!你会发现,Bob的加入并没有提醒Bob进入了聊天室(在单机版是有的),这是因为我们在“加入聊天室”的代码还没有修改,在加入时,只有Bob的服务器B里的其他用户知道Bob加入了聊天室。我们还能再进一步!

功能二/功能三:集群用户上下线通知,集群用户信息存储

我们需要弥补上面的不足,将用户上线下线的广播发送到所有服务器上。

此外,我还希望以后能够查询集群中所有的在线用户,我们在redis中添加一个set,来保存用户名,这样就可以随时得到在线用户的数量和名称。

1. 在application.properties添加频道名定义

# Redis定义 redis.channel.userStatus = websocket.userStatus redis.set.onlineUsers = websocket.onlineUsers

我们增加两个定义

  • 第一个是新增redis频道websocket.userStatus用来广播用户上下线消息
  • 第二个是redis的set,用来保存在线用户信息

2. 在RedisListenerBean添加新频道监听

container.addMessageListener(listenerAdapter new PatternTopic(userStatus));

3. 在ChatService中添加

public void alertUserStatus(@Payload ChatMessage chatMessage) { LOGGER.info("Alert user online by simpMessageSendingOperations:" chatMessage.toString()); simpMessageSendingOperations.convertAndSend("/topic/public" chatMessage); }

在service中我们向本服务器的用户广播消息,用户上线或者下线的消息都通过这里传达。

4. 修改ChatController中的addUser方法

聊天室websocket百万级别推送方案(多人聊天室WebSocket集群)(11)

我们修改了addUser方法,在这里往redis中广播用户上线的消息,并把用户名username写入redis的set中(websocket.onlineUsers)

5. 修改WebSocketEventListener中的handleWebSocketDisconnectListener方法

聊天室websocket百万级别推送方案(多人聊天室WebSocket集群)(12)

在用户关闭网页时,websocket会调用该方法,我们在这里需要把用户从redis的在线用户set里删除,并且向集群发送广播,说明该用户退出聊天室。

6. 修改Redis监听类RedisListenerHandle

else if (userStatus.equals(topic)) { ChatMessage chatMessage = JsonUtil.parseJsonToObj(rawMsg ChatMessage.class); if (chatMessage != null) { chatService.alertUserStatus(chatMessage); }

在监听类中我们接受了来自userStatus频道的消息,并调用service

7. 看看效果

聊天室websocket百万级别推送方案(多人聊天室WebSocket集群)(13)

聊天室websocket百万级别推送方案(多人聊天室WebSocket集群)(14)

此外,我们还可以在Reids中查询到用户信息:

聊天室websocket百万级别推送方案(多人聊天室WebSocket集群)(15)

WebSocket集群还有哪些可能性

有了这两篇文章的基础, 我们当然还能实现以下的功能:

  • 某用户A单独私信给某用户B,或者私信给某用户群(用户B和C)
  • 系统提供外部调用接口,给指定用户/用户群发送消息,实现消息推送
  • 系统提供外部接口,实时获取用户数据(人数/用户信息)

感兴趣的同学可以自己试试看。

总结

我们在本文中把单机版的聊天室改为了分布式聊天室,大大提高了聊天室可用性。

本文工程源代码:

单机版:

https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/单机版

集群版:

https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/集群版

猜您喜欢: