Web实时消息推送的几种方案

消息推送常见方案、轮询、websocket、sse

消息推送一般分为 Web 端消息推送和移动端消息推送,分为推和拉两种形式。

短轮询

拉的方式,最简单的实现方式就是前端写个定时器,每隔一段时间向后台请求未读消息。然而如果推送数据不会频繁变更,无论后端此时是否有新的消息产生,客户端都会进行请求,势必会对服务端造成很大压力,浪费带宽和服务器资源。而如果数据频繁变更,比如:每 10s 请求一次配置,如果在第 11s 时配置更新了,那么推送将会延迟 9s,等待下一次请求,又造成数据延迟。

长轮询

长轮询是对上边短轮询的一种改进版本,在尽可能减少对服务器资源浪费的同时,保证消息的相对实时性。长轮询在中间件中应用的很广泛,比如 Nacos 和 Apollo 配置中心,消息队列 Kafka、RocketMQ 中都有用到长轮询。

原理:客户端发起请求后,服务端不会立即返回请求结果,而是将请求挂起等待一段时间,如果此段时间内服务端数据变更,立即响应客户端请求,若是一直无变化则等到指定的超时时间后响应请求,客户端重新发起长链接。

iframe 流

在页面中插入一个隐藏的<iframe>标签,通过在src中请求消息数量 API 接口,由此在服务端和客户端之间创建一条长连接,服务端持续向iframe传输数据。
缺点:rame 流的服务器开销很大,还不如短轮询,而且 IE、Chrome 等浏览器一直会处于 loading 状态,很不推荐!!!

这种方式实现简单,前端只要一个<iframe>标签搞定了

1
2
3
4
5
<iframe
	src="/iframe/message"
	style="display:none"
>
</iframe>

后端直接组装 HTML、JS 返回即可

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@Controller
@RequestMapping("/iframe")
public class IframeController {
    @GetMapping(path = "message")
    public void message(HttpServletResponse response) throws IOException, InterruptedException {
        while (true) {
            response.setHeader("Pragma", "no-cache");
            response.setDateHeader("Expires", 0);
            response.setHeader("Cache-Control", "no-cache,no-store");
            response.setStatus(HttpServletResponse.SC_OK);
            response.getWriter().print(" <script type=\"text/javascript\">\n" +
                    "parent.document.getElementById('clock').innerHTML = \"" + count.get() + "\";" +
                    "parent.document.getElementById('count').innerHTML = \"" + count.get() + "\";" +
                    "</script>");
        }
    }
}

SSE

  • 服务端向客户端推送消息,其实除了可以用WebSocket(长连接、双向)这种耳熟能详的机制外,还有一种服务器发送事件(Server-Sent Events),简称 SSE。这是一种服务器端到客户端(浏览器)的单向消息推送。
  • ChatGPT 就是采用的 SSE。对于需要长时间等待响应的对话场景,ChatGPT 采用了一种巧妙的策略:它会将已经计算出的数据"推送"给用户,并利用 SSE 技术在计算过程中持续返回数据。这样做的好处是可以避免用户因等待时间过长而选择关闭页面。

**SSE 基于 HTTP 协议,**在服务器和客户端之间打开一个单向通道,服务端响应的不再是一次性的数据包而是text/event-stream类型的数据流信息,在有数据变更时从服务器流式传输到客户端,有点类似于在线视频播放,视频流会连续不断的推送到浏览器

前端实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<script>
	let source = null;
	let userId = 7777;
	if (window.EventSource) {
		// 建立连接
		source = new EventSource('http://localhost:7777/sse/sub/' + userId);
		setMessageInnerHTML('连接用户=' + userId);
		/**
		 * 连接一旦建立,就会触发open事件
		 * 另一种写法:source.onopen = function (event) {}
		 */
		source.addEventListener(
			'open',
			function (e) {
				setMessageInnerHTML('建立连接。。。');
			},
			false
		);
		/**
		 * 客户端收到服务器发来的数据
		 * 另一种写法:source.onmessage = function (event) {}
		 */
		source.addEventListener('message', function (e) {
			setMessageInnerHTML(e.data);
		});
	} else {
		setMessageInnerHTML('你的浏览器不支持SSE');
	}
</script>

后端实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

/**
 * 创建连接
 */
public static SseEmitter connect(String userId) {
    try {
        // 设置超时时间,0表示不过期。默认30秒
        SseEmitter sseEmitter = new SseEmitter(0L);
        // 注册回调
        sseEmitter.onCompletion(completionCallBack(userId));
        sseEmitter.onError(errorCallBack(userId));
        sseEmitter.onTimeout(timeoutCallBack(userId));
        sseEmitterMap.put(userId, sseEmitter);
        count.getAndIncrement();
        return sseEmitter;
    } catch (Exception e) {
        log.info("创建新的sse连接异常,当前用户:{}", userId);
    }
    return null;
}

/**
 * 给指定用户发送消息
 */
public static void sendMessage(String userId, String message) {

    if (sseEmitterMap.containsKey(userId)) {
        try {
            sseEmitterMap.get(userId).send(message);
        } catch (IOException e) {
            log.error("用户[{}]推送异常:{}", userId, e.getMessage());
            removeUser(userId);
        }
    }
}

WebSocket

基于 TCP 连接的双全工协议,工作流程如下:

  1. 客户端向服务器发送一个 HTTP 请求,请求头中包含 Upgrade: websocketSec-WebSocket-Key 等字段,表示要求升级协议为 WebSocket;
  2. 服务器收到这个请求后,会进行升级协议的操作,如果支持 WebSocket,它将回复一个 HTTP 101 状态码,响应头中包含 ,Connection: UpgradeSec-WebSocket-Accept: xxx 等字段、表示成功升级到 WebSocket 协议。
  3. 客户端和服务器之间建立了一个 WebSocket 连接 ,可以进行双向的数据传输,通过心跳机制 来保持 WebSocket 连接的稳定性和活跃性。数据以帧(frames)的形式进行传送,而不是传统的 HTTP 请求和响应。WebSocket 的每条消息可能会被切分成多个数据帧(最小单位)。发送端会将消息切割成多个帧发送给接收端,接收端接收消息帧,并将关联的帧重新组装成完整的消息。
  4. 客户端或服务器可以主动发送一个关闭帧,表示要断开连接。另一方收到后,也会回复一个关闭帧,然后双方关闭 TCP 连接。

MQTT

  • 基于发布/订阅模式的轻量级通讯协议,通过订阅相应的主题来获取消息,是物联网中的一个标准传输协议。
  • TCP 协议位于传输层,MQTT 协议位于应用层,MQTT 协议构建于 TCP/IP 协议上,也就是说只要支持 TCP/IP 协议栈的地方,都可以使用 MQTT 协议。
comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计