跳到主要内容

@zenweb/websocket

本模块为 ZenWeb 提供 WebSocket 支持,基于 ws 库实现。

主要功能:

  • 无缝集成 ZenWeb 控制器,通过路由映射处理 WebSocket 连接
  • 支持注入 WebSocket 实例和 SocketSession 会话管理器
  • 内置 LocalSocketSession(单进程)和 RedisSocketSession(多进程集群)两种会话管理方案
  • 集成 @zenweb/metric 模块,自动收集活跃 WebSocket 连接数
  • 兼容 ZenWeb 中间件体系,支持在中间件中处理鉴权、日志等逻辑

安装

npm install @zenweb/websocket ws
前置依赖

本模块依赖 @zenweb/controller@zenweb/log,请确保已安装并配置。

配置

WebSocketSetupOption 选项:

参数类型默认值说明
pathstring \| string[] \| RegExp \| RegExp[]-支持 WebSocket 连接的路径。不设置则所有路由路径都支持处理 WS 连接
failCodenumber4000业务逻辑失败时返回的 WS 关闭代码(范围 4000-4999)
errorCodenumber4500业务逻辑发生意外错误时返回的 WS 关闭代码(范围 4000-4999)
session.redisRedisOptions-Redis 服务配置,用于 RedisSocketSession 集群会话管理
src/index.ts
import { create } from 'zenweb';
import websocket from '@zenweb/websocket';

create()
.setup(websocket({
path: '/ws', // 仅允许 /ws 路径建立 WebSocket 连接
failCode: 4000, // 业务失败关闭代码
errorCode: 4500, // 服务异常关闭代码
}))
.start();

核心概念

注入对象

本模块注册了以下可注入对象:

  • WebSocket — 当前 WebSocket 连接实例,等同于 ws 库的 WebSocket 对象
  • SocketSession — 会话管理器(根据是否配置 Redis 自动选择 LocalSocketSessionRedisSocketSession

与 Metric 模块集成

如果项目已安装 @zenweb/metric 模块,WebSocket 模块会自动注册 websocket_actives 指标,用于监控当前活跃的 WebSocket 连接数。

使用

基本连接处理

在控制器中通过路由装饰器(如 @Get)声明 WebSocket 端点,并在参数中注入 WebSocket 实例:

import { Context, Get } from 'zenweb';
import { WebSocket } from '@zenweb/websocket';

export class WsController {
@Get()
async connect(ws: WebSocket, ctx: Context) {
// 连接建立成功
ws.send(JSON.stringify({ type: 'connected', message: '欢迎连接' }));

// 监听客户端消息
ws.on('message', (data) => {
const msg = JSON.parse(data.toString());
console.log('收到消息:', msg);
});

// 监听连接关闭
ws.on('close', () => {
console.log('连接已关闭');
});
}
}

消息处理模式

推荐使用 switch/case 结构处理不同类型的消息:

import { Context, Get } from 'zenweb';
import { WebSocket } from '@zenweb/websocket';

interface WSMessage {
type: 'ping' | 'subscribe' | 'unsubscribe';
channel?: string;
}

export class WsController {
@Get()
async connect(ws: WebSocket, ctx: Context) {
ws.on('message', (data) => {
try {
const msg: WSMessage = JSON.parse(data.toString());
switch (msg.type) {
case 'ping':
ws.send(JSON.stringify({ type: 'pong' }));
break;
case 'subscribe':
// 订阅逻辑
break;
case 'unsubscribe':
// 取消订阅逻辑
break;
default:
ws.send(JSON.stringify({ type: 'error', message: '未知消息类型' }));
}
} catch (err: any) {
ws.send(JSON.stringify({ type: 'error', message: err.message }));
}
});
}
}

会话管理

会话管理器(SocketSession)用于在多个 WebSocket 连接之间实现消息互通,支持同一用户多端登录以及多用户之间的消息推送。

SocketSession 接口

方法参数说明
add(key, ws)key: 会话标识,ws: WebSocket 实例将 WebSocket 实例添加到指定会话,自动处理连接关闭时的清理
send(key, data)key: 会话标识,data: string \| Buffer向指定会话标识下的所有连接发送消息
has(key)key: 会话标识检查指定会话标识下是否有活跃连接

LocalSocketSession

基于本地内存的会话管理,适用于单进程环境。

  • 同一进程内的多个 WebSocket 连接可以消息互通
  • 不适用于集群环境,可作为本地开发和测试使用
import { Context, Get } from 'zenweb';
import { WebSocket, LocalSocketSession } from '@zenweb/websocket';

export class WsController {
@Get()
async connect(ws: WebSocket, ss: LocalSocketSession, ctx: Context) {
// 将当前连接加入到 "notifications" 会话组
await ss.add('notifications', ws);

// 向 "notifications" 组的所有连接发送消息
ss.send('notifications', JSON.stringify({
type: 'info',
message: '新通知',
}));

// 检查是否有连接
const hasClients = ss.has('notifications');
}
}

RedisSocketSession

基于 Redis Pub/Sub 的会话管理,适用于多进程、多实例集群环境。

  • 通过 Redis 的发布/订阅机制实现跨进程消息互通
  • 所有进程中的 WebSocket 连接都能收到同一会话标识的消息
  • 需要在配置中指定 Redis 连接信息
src/index.ts
import { create } from 'zenweb';
import websocket from '@zenweb/websocket';

create()
.setup(websocket({
path: '/ws',
session: {
redis: {
host: '127.0.0.1',
port: 6379,
password: '',
db: 0,
},
},
}))
.start();

在控制器中注入 RedisSocketSession

import { Context, Get } from 'zenweb';
import { WebSocket, RedisSocketSession } from '@zenweb/websocket';

export class WsController {
@Get()
async connect(ws: WebSocket, ss: RedisSocketSession, ctx: Context) {
// 使用方式与 LocalSocketSession 完全一致
await ss.add('channel:123', ws);

// 消息会通过 Redis Pub/Sub 广播到所有进程
ss.send('channel:123', JSON.stringify({ type: 'broadcast', data: 'Hello' }));
}
}

完整示例:聊天室

以下是一个完整的聊天室示例,包含加入房间、离开房间、发送消息、撤回消息等功能。

src/controller/chat-ws.ts
import { Context, Get } from 'zenweb';
import { WebSocket, RedisSocketSession } from '@zenweb/websocket';

interface WSMessage {
type: 'join' | 'leave' | 'message' | 'recall';
roomId?: number;
messageId?: number;
content?: string;
}

export class ChatWs {
@Get()
async ws(ws: WebSocket, ss: RedisSocketSession, ctx: Context) {
// 从 query 参数获取 token 进行身份验证
const token = ctx.query.token as string;
if (!token) {
ws.send(JSON.stringify({ type: 'error', message: '未登录' }));
ws.close();
return;
}

// 验证 token 并获取用户信息(根据实际项目替换)
const user = await verifyUser(token);
if (!user) {
ws.send(JSON.stringify({ type: 'error', message: '登录已过期' }));
ws.close();
return;
}

let currentRoomId: number | null = null;

// 处理客户端消息
ws.on('message', async (data) => {
try {
const msg: WSMessage = JSON.parse(data.toString());
switch (msg.type) {
case 'join':
if (msg.roomId) {
await handleJoin(ws, ss, user, msg.roomId);
currentRoomId = msg.roomId;
}
break;

case 'leave':
if (currentRoomId) {
await handleLeave(ss, user, currentRoomId);
currentRoomId = null;
}
break;

case 'message':
if (currentRoomId && msg.content) {
await handleMessage(ss, user, currentRoomId, msg.content);
}
break;

case 'recall':
if (currentRoomId && msg.messageId) {
await handleRecall(ss, user, currentRoomId, msg.messageId);
}
break;

default:
ws.send(JSON.stringify({ type: 'error', message: '未知消息类型' }));
}
} catch (err: any) {
ws.send(JSON.stringify({ type: 'error', message: err.message }));
}
});

// 连接关闭时自动离开房间
ws.once('close', async () => {
if (currentRoomId) {
await handleLeave(ss, user, currentRoomId);
}
});
}
}

async function handleJoin(
ws: WebSocket,
ss: RedisSocketSession,
user: { id: number; name: string },
roomId: number,
) {
const roomKey = `room:${roomId}`;
// 将 WebSocket 连接加入房间会话
await ss.add(roomKey, ws);

// 通知房间内所有用户
ss.send(roomKey, JSON.stringify({
type: 'user_joined',
userId: user.id,
userName: user.name,
}));

ws.send(JSON.stringify({ type: 'joined', roomId }));
}

async function handleLeave(
ss: RedisSocketSession,
user: { id: number; name: string },
roomId: number,
) {
const roomKey = `room:${roomId}`;
ss.send(roomKey, JSON.stringify({
type: 'user_left',
userId: user.id,
userName: user.name,
}));
}

async function handleMessage(
ss: RedisSocketSession,
user: { id: number; name: string },
roomId: number,
content: string,
) {
// 保存消息到数据库(根据实际项目替换)
const message = await saveMessage(roomId, user.id, content);

const roomKey = `room:${roomId}`;
ss.send(roomKey, JSON.stringify({
type: 'message',
id: message.id,
userId: user.id,
userName: user.name,
content,
createdAt: message.createdAt,
}));
}

async function handleRecall(
ss: RedisSocketSession,
user: { id: number; name: string },
roomId: number,
messageId: number,
) {
// 执行撤回逻辑(根据实际项目替换)
await recallMessage(messageId);

const roomKey = `room:${roomId}`;
ss.send(roomKey, JSON.stringify({
type: 'recalled',
messageId,
}));
}

集群部署说明

在生产环境中,通常会有多个 Node.js 进程或容器实例。此时应使用 RedisSocketSession 进行会话管理:

  1. 配置 Redis 连接:在 websocket()session.redis 选项中指定 Redis 服务器信息
  2. 注入 RedisSocketSession:在控制器中注入 RedisSocketSession 而非 LocalSocketSession
  3. 消息广播send() 方法通过 Redis Pub/Sub 将消息广播到所有进程,确保每个进程中的 WebSocket 连接都能收到消息
提示
  • LocalSocketSession 无需额外配置,适合开发环境或单进程部署
  • RedisSocketSession 需要安装 ioredis 并配置 Redis 连接信息
  • 会话中的 WebSocket 客户端是自动维护的,无需手动关闭