跳到主要内容

2 篇博文 含有标签「websocket」

查看所有标签

· 阅读需 9 分钟

参考文章:WebSockets vs Server-Sent-Events vs Long-Polling vs WebRTC vs WebTransport

0 轮询

数据流向:client <= server

基于传统的 HTTP 请求-响应方案,定时频繁地向服务器请求最新数据,实现伪实时通信

优点:

  • 实现简单,不需要特殊的服务器端支持。

  • 兼容性好,几乎所有的环境都支持。

缺点:

  • 高频请求会增加服务器和网络的负载。

  • 存在明显的延迟,无法真正实现实时通信。

适用场景:数据更新频率低且实时性要求不高的场景,如定期数据同步。

1 长轮询(Long-Polling)

数据流向:client <= server

客户端建立与服务器的连接,该连接在新数据可用之前保持打开状态。一旦服务器获得新信息,它就会将响应发送到客户端,并关闭连接。在收到服务器的响应后,客户端会立即启动一个新请求,并重复该过程

优点:

  • 比传统轮询更高效,减少不必要的网络流量和服务器负载。

  • 实时性更高,能够更及时地获取数据更新。

缺点:

  • 仍然会导致一定的通信延迟。效率低于 WebSockets 等其他实时技术

  • 在高并发情况下,服务器需要处理大量的打开和关闭连接的操作。

示例:

// js 客户端长轮询
function longPoll() {
fetch('http://example.com/poll')
.then(response => response.json())
.then(data => {
console.log("Received data:", data);
longPoll(); // 马上建立一个新的长轮询
})
.catch(error => {
/**
* 当连接超时或客户端离线,请求会抛出异常,此时可以尝试一段时间后再次发起长轮询
*/
setTimeout(longPoll, 10000);
});
}
longPoll(); // 执行长轮询,建立初始连接

适用场景:中等实时性要求的场景,如通知系统、消息推送等。

2 WebSocket

数据流向:client <=> server

WebSocket 为客户端和服务器之间的单一长期连接提供全双工通信通道,允许双方在建立连接后独立互相发送数据

示例:

// js 客户端 websocket 连接
const socket = new WebSocket('ws://example.com');

socket.onopen = function(event) {
console.log('Connection established');
// 向服务器发送一条消息
socket.send('Hello Server!');
};

// 接收服务器响应的消息
socket.onmessage = function(event) {
console.log('Message from server:', event.data);
};

优点:

  • 实时性高,低延迟。
  • 全双工通信,支持双向数据传输。
  • WebSocket API 简单易用。

缺点:

  • 连接不稳定时需要处理重新连接和断线重连。
  • 需要额外的心跳检测(ping-pong)来保持连接的活跃性。

适用场景:

  • 需要低延迟和高频数据更新的场景,如实时聊天、游戏、金融交易平台等。

推荐使用 Socket.IO 这样的 WebSockets 之上的库,不仅可以处理以上这些复杂情况,甚至在需要时提供对长轮询的回退。

3 Server-Sent-Events

数据流向:client <= server

服务器发送事件(SSE)提供了一种通过 HTTP 将服务器更新推送到客户端的标准方法,保持连接并持续发送数据。可以将 SSE 视为单个 HTTP 请求,其中服务端不会一次发送整个正文,而是保持连接,然后像水流一样一点一点返回响应数据

优点:

  • 简单易用,浏览器原生支持。
  • 自动处理重新连接。
  • 适用于单向数据流的场景。

缺点:

  • 仅支持服务器到客户端的单向通信。
  • 不如 WebSocket 灵活,不能进行双向通信。

适用场景:

  • 需要实时更新客户端而无需向服务器发送数据的场景,如实时新闻提要、体育比分等。
  • 与 WebSocket 不同,SSE 专为服务器到客户端的单向通信而设计,非常适合实时新闻提要、体育比分、AI 聊天等需要实时更新客户端而无需向服务器发送数据的场景。

示例:

  1. EventSource

在浏览器上可以使用 URL 初始化 EventSource 实例

// 连接服务端 sse 接口
const evtSource = new EventSource("https://example.com/events");

// 处理数据
evtSource.onmessage = event => {
console.log('got message: ' + event.data);
};

与 WebSocket 不同,EventSource 将在连接丢失时自动重新连接。

  1. fetch
async function getAiResponse() {
const res = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
content: '这是我的问题'
})
})

const reader = res.body.getReader()
const textDecoder = new TextDecoder()
while(1) {
const {done, value} = await reader.read()
if(done) {
break
}
const str = textDecoder.decode(value)
console.log(str)
}
}
  1. 服务端

在服务器端,您的脚本必须将 Content-Type 标头设置为 text/event-stream,并根据 SSE 规范设置每条消息的格式。这包括指定事件类型、数据有效负载和可选字段,如事件 ID 和重试时间。

下面以 Express 为例,介绍如何设置简单的 SSE 接口:

import express from 'express';
const app = express();
const PORT = process.env.PORT || 3000;

app.get('/events', (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
});

const sendEvent = (data) => {
// 全部 message 必须加上 'data:'
const formattedData = `data: ${JSON.stringify(data)}\n\n`;
res.write(formattedData);
};

// 没两秒发送一次事件,模拟流式数据
const intervalId = setInterval(() => {
const message = {
time: new Date().toTimeString(),
message: 'Hello from the server!',
};
sendEvent(message);
}, 2000);

// 当连接关闭后,做些清理工作
req.on('close', () => {
clearInterval(intervalId);
res.end();
});
});
app.listen(PORT, () => console.log(`Server running on http://localhost:${PORT}`));

4 WebTransport API

数据流向:client <=> server

WebTransport 利用 HTTP/3 QUIC 协议实现 Web 客户端和服务器之间的高效、低延迟通信,例如以可靠和不可靠的方式通过多个流发送数据,甚至允许数据无序发送。

优点:

  • 高效的低延迟通信。
  • 支持多流和无序数据传输。

缺点:

  • API 较复杂,开发难度较大。可以等第三方库
  • 浏览器支持不广泛,目前 Safari 和 Node.js 不支持。

适用场景:

  • 实时游戏、直播、协作平台等需要高效低延迟通信的场景。

5 WebRTC

数据流向:client <=> client

WebRTC(Web Real-Time Communication)是一个开源项目和 API 标准,可在 Web 浏览器和移动应用程序中启用实时通信(RTC)功能,支持点对点连接

优点:

  • 支持音频、视频和数据流的点对点通信。
  • 实现低延迟的实时通信。

缺点:

  • 需要信令服务器来建立连接。
  • 复杂度高,需要处理 NAT 穿透和防火墙问题。

适用场景:

  • 视频会议、实时音视频聊天、P2P 文件传输等需要实时点对点通信的场景。

· 阅读需 10 分钟

使用 socket.io 在 nextjs 中集成 WebSocket 服务,实现实时通信

Socket.IO 是一个库,可以在客户端和服务器之间实现 低延迟, 双向基于事件的 通信。

服务端使用 emit 触发 socket 事件,客户端使用 on 监听 socket 事件

准备

  • 背景

App 模式下的 routes.ts 文件中只支持定义请求方法的同名方法,并以具名方式导出,这样才可以成功映射为 GET /api/xxx 的后端路由。

但 socket io 服务是需要重写整个 req/res handler 的, 所以我们只能使用老版本(pages 模式)的写法来定义 socket 路由

还有一种方案是创建 server 文件,重写 nextjs 底层 http 服务逻辑,作为整个服务的入口。在这里注册 socket io 的连接事件 https://socket.io/how-to/use-with-nextjs

  • 类型定义

扩展 NextApiResponse,支持 SocketIoServer 类型

export type NextApiResponseServerIo = NextApiResponse & {
socket: Socket & {
server: NetServer & {
io: SocketIoServer
}
}
}
  • 初始化 socket 实例

定义 pages/api/socket/io.ts,用于初始化 socket 服务实例,并注入 res 对象中。

当客户端初始化 socket 连接时,会访问这个路由,此时就会将服务端的 socket 实例注入到 next 响应对象中

该文件默认导出了 ioHandler 方法,在方法中实例化 socket server 实例,并把 res.socket.server.io 指向了这个实例。

该接口不作任何返回行为,仅作为初始化服务端 socket 实例方法。

// pages/api/socket/io.ts
import type { Server as NetServer } from 'node:http'
import type { NextApiRequest } from 'next'
import { Server as ServerIo } from 'socket.io'
import type { NextApiResponseServerIo } from '@/types'

export const config = {
api: {
bodyParser: false,
},
}

function ioHandler(req: NextApiRequest, res: NextApiResponseServerIo) {
if (!res.socket.server.io) {
const path = '/api/socket/io'
const httpServer: NetServer = res.socket.server as any
const io = new ServerIo(httpServer, {
path,
addTrailingSlash: false,
})
res.socket.server.io = io
}
res.end()
}

export default ioHandler

服务端

继续在 pages 模式下定义消息路由 api/socket/messages,以频道消息通信为例

  • 接口逻辑

数据库中创建 message 数据,同时 emit 触发 socket 事件,最后接口正常返回 message 即可

// pages/api/socket/messages/index.ts
export default async function handler(
req: NextApiRequest,
res: NextApiResponseServerIo,
) {
// ...
const message = await db.message.create({
data: {
content,
fileUrl,
channelId: channelId as string,
memberId: member.id,
},
include: {
member: {
include: {
profile: true,
},
},
},
})

const channelKey = `chat:${channelId}:messages`

res?.socket?.server?.io?.emit(channelKey, message)

return res.status(200).json(message)
}

服务端还需要在 app/api/messages 提供 GET 方法,用于 http 分页查询 messages 数据

关于查询逻辑详见下文

客户端

  • socket Provider

客户端通过 Provider 组件向全局提供 socket 客户端实例与连接状态

// app/layout.tsx
export default function RootLayout({
children,
}: {
children: React.ReactNode
}) {
return (
<html lang="en" suppressHydrationWarning>
<body className="bg-white dark:bg-[#313338]">
<SocketProvider>
{children}
</SocketProvider>
</body>
</html>
)
}

SocketProvider 定义如下,页面挂载后访问 /api/socket/io 初始化服务端 socket 实例,并生成客户端实例,同时注册 connect 与 disconnect 事件,用于更新连接状态

// components/providers/socket-provider.tsx
'use client'

interface SocketContextType {
socket: any | null
isConnected: boolean
}

const SocketContext = createContext<SocketContextType>({
socket: null,
isConnected: false,
})

export function useSocket() {
return useContext(SocketContext)
}

export function SocketProvider({ children }: { children: React.ReactNode }) {
const [socket, setSocket] = useState(null)
const [isConnected, setIsConnected] = useState(false)

useEffect(() => {
const socketInstance = new (ClientIO as any)(process.env.NEXT_PUBLIC_SITE_URL!, {
path: '/api/socket/io',
addTrailingSlash: false
})

socketInstance.on('connect', () => {
setIsConnected(true)
})

socketInstance.on('disconnect', () => {
setIsConnected(false)
})

setSocket(socketInstance)

return () => {
socketInstance.disconnect()
}
}, [])

return (
<SocketContext.Provider value={{ socket, isConnected }}>
{children}
</SocketContext.Provider>
)
}
  • useChatSocket hook

具体的信息发送逻辑封装到 useChatSocket 中,接收 addKey

addKey 就是使用 channelId 或者 conversationId 拼接出来的唯一字符串,作为 socket 事件的标识

接着拿到全局的 socket 客户端实例,在 useEffect 中注册事件,监听服务端的相同的 key 的事件,使用传递过来的新 message 数据更新页面数据即可

// hooks/use-chat-socket.ts
interface Props {
addKey: string
}

export function useChatSocket({
addKey,
}: Props) {
const { socket } = useSocket()

useEffect(() => {
if (!socket)
return

socket.on(addKey, (message: MessageWithMemberWithProfile) => {
// 根据 queryKey(此处省略) 同步修改 react-query 查询到的数据即可
})

return () => {
socket.off(addKey)
}
}, [addKey, socket])
}

// 使用
useChatSocket({ addKey })

cursor 分页查询方案

服务端使用某条数据的 id 作为 cursor 标记进行分页查询,返回查询数据及下一次的 cursor;客户端使用 react-query 提供的 useInfiniteQuery hook 进行分页轮询数据

服务端

服务端采用 cursor 的分页方案,cursor 即某条数据的 id,作为查询的参数以及查询数据的起点。

逻辑:从 searchParams 中拿到 cursor 和 channelId,默认每次查 10 条数据,如果有 cursor 标记,则从 cursor 标记的 id 开始查(skip 掉自己);然后通过 messages.length 判断并计算 nextCursor,将 messages 和 nextCursor 做为响应体返回即可

// app/api/messages/route.ts
// 默认每次查 10 条数据
const MESSAGES_BATCH = 10

export async function GET(req: Request) {
try {
// 拿到 cursor 和 channelId 参数
const { searchParams } = new URL(req.url)

const cursor = searchParams.get('cursor')
const channelId = searchParams.get('channelId')

let messges: Message[] = []

// 如果存在 cursor,则从 cursor 标记的 id 开始查询(skip 掉自己)
if (cursor) {
messges = await db.message.findMany({
take: MESSAGES_BATCH,
skip: 1,
cursor: {
id: cursor,
},
where: {
channelId,
},
include: {
member: {
include: {
profile: true,
},
},
},
// 按创建时间倒序排列
orderBy: {
createdAt: 'desc',
},
})
}
else {
messges = await db.message.findMany({
take: MESSAGES_BATCH,
where: {
channelId,
},
include: {
member: {
include: {
profile: true,
},
},
},
orderBy: {
createdAt: 'desc',
},
})
}

// 然后通过 messages.length 判断并计算 nextCursor,
// 如果查询到的数据条数为 10,则说明还可能有下页数据,更新 nextCursor 为 最后一条的 id 即可
// 反之说明没有下页数据了,将 nextCursor 置为 null 返回即可
let nextCursor = null
if (messges.length === MESSAGES_BATCH)
nextCursor = messges[MESSAGES_BATCH - 1].id

// 将 messages 和 nextCursor 做为响应体返回即可
return NextResponse.json({
items: messges,
nextCursor,
})
}
}

客户端

使用 react-query 提供的 useInfiniteQuery hook 实现消息的查询功能

之所以使用 useInfiniteQuery 是想利用其轮询的特性,作为 socket 服务失效的备用方案

useInfiniteQuery 自带分页查询功能,其入参的 getNextPageParam 方法,将本次响应体数据作为参数,返回下次调用 queryFn 方法的入参,起到承上启下的作用,为本方案的核心方法

该 hook 返回了 data(响应数据)、fetchNextPage(发起下次请求的方法)、hasNextPage(下一页是否存在)、isFetchingNextPage(是否正在请求下页数据)、status(请求状态):

  • 其中,queryFn 就是查询数据的方法,内部可以使用 fetch 也可以使用 axios,接收从 getNextPageParam 返回的参数(cursor id)进行查询

  • 在上面的服务端逻辑中,处理并返回了 nextCursor id,所以在 getNextPageParam 中,将响应体的 nextCursor 数据设置为下次 queryFn 的参数,把这个参数作为 url params 调用接口,形成闭环

  • 其中,hasNextPage 同样也是取决于 getNextPageParam 方法是否返回了有效的 nextCursor

封装 useChatQuery hook:

interface Props {
queryKey: string // 用于标记此次查询的 key,用于 socket 同步更改数据
apiUrl: string
paramKey: 'channelId' | 'conversationId'
paramValue: string
}

export function useChatQuery({
queryKey,
apiUrl,
paramKey,
paramValue,
}: Props) {
const { isConnected } = useSocket()

const fetchMessages = async ({ pageParam = undefined }) => {
const url = qs.stringifyUrl({
url: apiUrl,
query: {
cursor: pageParam,
// 动态 key,方便查询私信或者频道消息
[paramKey]: paramValue,
},
}, { skipNull: true })

const res = await fetch(url)
return res.json()
}

const {
data,
fetchNextPage,
hasNextPage,
isFetchingNextPage,
status,
} = useInfiniteQuery({
initialPageParam: undefined,
queryKey: [queryKey],
queryFn: fetchMessages,
getNextPageParam: lastPage => lastPage?.nextCursor,
// 根据 socket 连接状态判断是否轮询
refetchInterval: isConnected ? false : 1000,
})

// 返回数据、
return {
data,
fetchNextPage,
hasNextPage,
isFetchingNextPage,
status,
}
}

使用 queryClient.setQueryData 根据 queryKey 同步修改消息数据

const queryClient = useQueryClient()

queryClient.setQueryData([queryKey], (oldData: any) => {
if (!oldData || !oldData.pages || oldData.pages.length === 0) {
return {
pages: [{
items: [message],
}],
}
}
const newData = [...oldData.pages]

newData[0] = { ...newData[0], items: [message, ...newData[0].items] }

return { ...oldData, pages: newData }
})