@zenweb/websocket
本模块为 ZenWeb 提供 WebSocket 支持,基于 ws 库实现。
主要功能:
- 无缝集成 ZenWeb 控制器,通过路由映射处理 WebSocket 连接
- 支持注入
WebSocket实例和SocketSession会话管理器 - 内置
LocalSocketSession(单进程)和RedisSocketSession(多进程集群)两种会话管理方案 - 集成
@zenweb/metric模块,自动收集活跃 WebSocket 连接数 - 兼容 ZenWeb 中间件体系,支持在中间件中处理鉴权、日志等逻辑
安装
npm install @zenweb/websocket ws
前置依赖
本模块依赖 @zenweb/controller 和 @zenweb/log,请确保已安装并配置。
配置
WebSocketSetupOption 选项:
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
path | string \| string[] \| RegExp \| RegExp[] | - | 支持 WebSocket 连接的路径。不设置则所有路由路径都支持处理 WS 连接 |
failCode | number | 4000 | 业务逻辑失败时返回的 WS 关闭代码(范围 4000-4999) |
errorCode | number | 4500 | 业务逻辑发生意外错误时返回的 WS 关闭代码(范围 4000-4999) |
session.redis | RedisOptions | - | Redis 服务配置,用于 RedisSocketSession 集群会话管理 |
src/index.ts
import { create } from 'zenweb';
import websocket from '@zenweb/websocket';
create()
.setup(websocket({
path: '/ws', // 仅允许 /ws 路径建立 WebSocket 连接
failCode: 4000, // 业务失败关闭代码
errorCode: 4500, // 服务异常关闭代码
}))
.start();
核心概念
注入对象
本模块注册了以下可注入对象:
WebSocket— 当前 WebSocket 连接实例,等同于ws库的WebSocket对象SocketSession— 会话管理器(根据是否配置 Redis 自动选择LocalSocketSession或RedisSocketSession)
与 Metric 模块集成
如果项目已安装 @zenweb/metric 模块,WebSocket 模块会自动注册 websocket_actives 指标,用于监控当前活跃的 WebSocket 连接数。
使用
基本连接处理
在控制器中通过路由装饰器(如 @Get)声明 WebSocket 端点,并在参数中注入 WebSocket 实例:
import { Context, Get } from 'zenweb';
import { WebSocket } from '@zenweb/websocket';
export class WsController {
@Get()
async connect(ws: WebSocket, ctx: Context) {
// 连接建立成功
ws.send(JSON.stringify({ type: 'connected', message: '欢迎连接' }));
// 监听客户端消息
ws.on('message', (data) => {
const msg = JSON.parse(data.toString());
console.log('收到消息:', msg);
});
// 监听连接关闭
ws.on('close', () => {
console.log('连接已关闭');
});
}
}
消息处理模式
推荐使用 switch/case 结构处理不同类型的消息:
import { Context, Get } from 'zenweb';
import { WebSocket } from '@zenweb/websocket';
interface WSMessage {
type: 'ping' | 'subscribe' | 'unsubscribe';
channel?: string;
}
export class WsController {
@Get()
async connect(ws: WebSocket, ctx: Context) {
ws.on('message', (data) => {
try {
const msg: WSMessage = JSON.parse(data.toString());
switch (msg.type) {
case 'ping':
ws.send(JSON.stringify({ type: 'pong' }));
break;
case 'subscribe':
// 订阅逻辑
break;
case 'unsubscribe':
// 取消订阅逻辑
break;
default:
ws.send(JSON.stringify({ type: 'error', message: '未知消息类型' }));
}
} catch (err: any) {
ws.send(JSON.stringify({ type: 'error', message: err.message }));
}
});
}
}
会话管理
会话管理器(SocketSession)用于在多个 WebSocket 连接之间实现消息互通,支持同一用户多端登录以及多用户之间的消息推送。
SocketSession 接口
| 方法 | 参数 | 说明 |
|---|---|---|
add(key, ws) | key: 会话标识,ws: WebSocket 实例 | 将 WebSocket 实例添加到指定会话,自动处理连接关闭时的清理 |
send(key, data) | key: 会话标识,data: string \| Buffer | 向指定会话标识下的所有连接发送消息 |
has(key) | key: 会话标识 | 检查指定会话标识下是否有活跃连接 |
LocalSocketSession
基于本地内存的会话管理,适用于单进程环境。
- 同一进程内的多个 WebSocket 连接可以消息互通
- 不适用于集群环境,可作为本地开发和测试使用
import { Context, Get } from 'zenweb';
import { WebSocket, LocalSocketSession } from '@zenweb/websocket';
export class WsController {
@Get()
async connect(ws: WebSocket, ss: LocalSocketSession, ctx: Context) {
// 将当前连接加入到 "notifications" 会话组
await ss.add('notifications', ws);
// 向 "notifications" 组的所有连接发送消息
ss.send('notifications', JSON.stringify({
type: 'info',
message: '新通知',
}));
// 检查是否有连接
const hasClients = ss.has('notifications');
}
}
RedisSocketSession
基于 Redis Pub/Sub 的会话管理,适用于多进程、多实例集群环境。
- 通过 Redis 的发布/订阅机制实现跨进程消息互通
- 所有进程中的 WebSocket 连接都能收到同一会话标识的消息
- 需要在配置中指定 Redis 连接信息
src/index.ts
import { create } from 'zenweb';
import websocket from '@zenweb/websocket';
create()
.setup(websocket({
path: '/ws',
session: {
redis: {
host: '127.0.0.1',
port: 6379,
password: '',
db: 0,
},
},
}))
.start();
在控制器中注入 RedisSocketSession:
import { Context, Get } from 'zenweb';
import { WebSocket, RedisSocketSession } from '@zenweb/websocket';
export class WsController {
@Get()
async connect(ws: WebSocket, ss: RedisSocketSession, ctx: Context) {
// 使用方式与 LocalSocketSession 完全一致
await ss.add('channel:123', ws);
// 消息会通过 Redis Pub/Sub 广播到所有进程
ss.send('channel:123', JSON.stringify({ type: 'broadcast', data: 'Hello' }));
}
}
完整示例:聊天室
以下是一个完整的聊天室示例,包含加入房间、离开房间、发送消息、撤回消息等功能。
src/controller/chat-ws.ts
import { Context, Get } from 'zenweb';
import { WebSocket, RedisSocketSession } from '@zenweb/websocket';
interface WSMessage {
type: 'join' | 'leave' | 'message' | 'recall';
roomId?: number;
messageId?: number;
content?: string;
}
export class ChatWs {
@Get()
async ws(ws: WebSocket, ss: RedisSocketSession, ctx: Context) {
// 从 query 参数获取 token 进行身份验证
const token = ctx.query.token as string;
if (!token) {
ws.send(JSON.stringify({ type: 'error', message: '未登录' }));
ws.close();
return;
}
// 验证 token 并获取用户信息(根据实际项目替换)
const user = await verifyUser(token);
if (!user) {
ws.send(JSON.stringify({ type: 'error', message: '登录已过期' }));
ws.close();
return;
}
let currentRoomId: number | null = null;
// 处理客户端消息
ws.on('message', async (data) => {
try {
const msg: WSMessage = JSON.parse(data.toString());
switch (msg.type) {
case 'join':
if (msg.roomId) {
await handleJoin(ws, ss, user, msg.roomId);
currentRoomId = msg.roomId;
}
break;
case 'leave':
if (currentRoomId) {
await handleLeave(ss, user, currentRoomId);
currentRoomId = null;
}
break;
case 'message':
if (currentRoomId && msg.content) {
await handleMessage(ss, user, currentRoomId, msg.content);
}
break;
case 'recall':
if (currentRoomId && msg.messageId) {
await handleRecall(ss, user, currentRoomId, msg.messageId);
}
break;
default:
ws.send(JSON.stringify({ type: 'error', message: '未知消息类型' }));
}
} catch (err: any) {
ws.send(JSON.stringify({ type: 'error', message: err.message }));
}
});
// 连接关闭时自动离开房间
ws.once('close', async () => {
if (currentRoomId) {
await handleLeave(ss, user, currentRoomId);
}
});
}
}
async function handleJoin(
ws: WebSocket,
ss: RedisSocketSession,
user: { id: number; name: string },
roomId: number,
) {
const roomKey = `room:${roomId}`;
// 将 WebSocket 连接加入房间会话
await ss.add(roomKey, ws);
// 通知房间内所有用户
ss.send(roomKey, JSON.stringify({
type: 'user_joined',
userId: user.id,
userName: user.name,
}));
ws.send(JSON.stringify({ type: 'joined', roomId }));
}
async function handleLeave(
ss: RedisSocketSession,
user: { id: number; name: string },
roomId: number,
) {
const roomKey = `room:${roomId}`;
ss.send(roomKey, JSON.stringify({
type: 'user_left',
userId: user.id,
userName: user.name,
}));
}
async function handleMessage(
ss: RedisSocketSession,
user: { id: number; name: string },
roomId: number,
content: string,
) {
// 保存消息到数据库(根据实际项目替换)
const message = await saveMessage(roomId, user.id, content);
const roomKey = `room:${roomId}`;
ss.send(roomKey, JSON.stringify({
type: 'message',
id: message.id,
userId: user.id,
userName: user.name,
content,
createdAt: message.createdAt,
}));
}
async function handleRecall(
ss: RedisSocketSession,
user: { id: number; name: string },
roomId: number,
messageId: number,
) {
// 执行撤回逻辑(根据实际项目替换)
await recallMessage(messageId);
const roomKey = `room:${roomId}`;
ss.send(roomKey, JSON.stringify({
type: 'recalled',
messageId,
}));
}
集群部署说明
在生产环境中,通常会有多个 Node.js 进程或容器实例。此时应使用 RedisSocketSession 进行会话管理:
- 配置 Redis 连接:在
websocket()的session.redis选项中指定 Redis 服务器信息 - 注入
RedisSocketSession:在控制器中注入RedisSocketSession而非LocalSocketSession - 消息广播:
send()方法通过 Redis Pub/Sub 将消息广播到所有进程,确保每个进程中的 WebSocket 连接都能收到消息
提示
LocalSocketSession无需额外配置,适合开发环境或单进程部署RedisSocketSession需要安装ioredis并配置 Redis 连接信息- 会话中的 WebSocket 客户端是自动维护的,无需手动关闭