跳到主要内容

操作数据库

本章内容将介绍如何在单应用项目中使用数据库查询。

你需要掌握 MySQL 基本知识,至少需要一个已经安装好的 MySQL 服务,并可以正常使用。

安装 MySQL 模块

npm install @zenweb/mysql

配置 MySQL 模块

修改 src/index.ts 配置 MySQL 模块

src/index.ts
import { create } from "zenweb";
import modMySQL from "@zenweb/mysql";

create()

.setup(modMySQL({
// 配置数据库连接池
pools: {
// MySQL 模块支持一个应用中同时使用多个实例
MASTER: { // 实例名称
host: '127.0.0.1', // 数据库地址
port: 3306, // 数据库端口
user: 'root', // 数据库用户名
password: '', // 数据库密码
database: 'test', // 数据库名称
charset: 'utf8mb4', // 数据库默认字符集
timezone: '+08:00', // 数据库时区
connectionLimit: 100, // 单应用数据库连接数上限
},
}
}))

.start();
关于实例名称

在上述配置项中 MASTER 为数据库实例名称,他只是作为使用多个实例时区分不同实例而定义,如果只是单纯使用 SQL 查询本身并没有什么特殊含义。 但是如果需要 ORM 的自动读写分离支持则需要指定名称为主库 MASTER 从库为 SLAVE* * 星号代表自编号或者其他名称,但是必须为 SLAVE 开头。

连接 MySQL 创建表

CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) NOT NULL COMMENT '姓名',
`birthday` date DEFAULT NULL COMMENT '生日',
`unreads` int(11) NOT NULL DEFAULT 0 COMMENT '未读消息数',
`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `message` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`from_user_id` int(11) NOT NULL,
`to_user_id` int(11) NOT NULL,
`content` varchar(255) DEFAULT NULL,
`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

使用 SQL 查询

创建文件 src/controller/database.ts

src/controller/database.ts
import { Get, Post, Context, QueryHelper } from "zenweb";

export class DatabaseController {
/**
* 读区数据
*/
@Get('/db/get')
async get(ctx: Context) {
const result = await ctx.mysql.query('SELECT * FROM `user`');
return { result };
}

/**
* 插入数据
*/
@Get('/db/create')
async create(ctx: Context, qh: QueryHelper) {
const data = qh.get({
name: "!trim1",
birthday: "!date",
});
const result = await ctx.mysql.query('INSERT INTO `user` (`name`, `birthday`) VALUES (?, ?)', [
data.name, data.birthday
]);
return { result };
}
}

访问 http://127.0.0.1:7001/create?name=Bob&birthday=1987-9-20 插入数据后再访问 http://127.0.0.1:7001/get 查看结果

使用事务

对于简单的增删改查使用 query() 方法即可完成,借助 nodejs 的异步特性,你可以简单的去除 await 来进行后台查询。

然而我们在业务中会经常用到数据库的事务功能,比如给用户发消息的同时需要更新未读消息数。

自动事务

src/controller/database.ts
export class DatabaseController {
/**
* 自动事务
*/
@Get('/db/tx1')
async autoTransaction(ctx: Context, qh: QueryHelper) {
const data = qh.get({
from_user_id: "!int",
to_user_id: "!int",
content: "!trim1",
});
// 开启数据库事务
await ctx.mysql.transaction(async tx => {
// 插入消息
await tx.query('INSERT INTO `message` (`from_user_id`, `to_user_id`, `content`) VALUES (?, ?, ?)', [
data.from_user_id, data.to_user_id, data.content
]);
// 更新未读消息数
await tx.query('UPDATE `user` SET `unreads` = `unreads` + 1 WHERE `id` = ?', [
data.to_user_id
]);
});
return 'ok';
}
}

使用 transaction() 回调函数即可开启一个自动事务,事物开启成功后会回调业务代码并传入参数 tx 作为本次事务的数据库线程。 在回调函数中使用 tx 数据库线程处理需要被事务化的查询,在没有问题的情况下事务会自动提交,如果需要回滚直接在回调函数中抛出异常即可。

手动事务

对于一些特殊情况也可以使用手动事务来控制事务的提交与回滚。如下代码:

src/controller/database.ts
export class DatabaseController {
/**
* 手动事务
*/
@Get('/db/tx2')
async manualTransaction(ctx: Context, qh: QueryHelper) {
const data = qh.get({
from_user_id: "!int",
to_user_id: "!int",
content: "!trim1",
});
// 使用自动连接池取得一个数据库连接线程
await ctx.mysql.auto(async conn => {
// 开启当前线程事务
await conn.beginTransaction();
try {
// 插入消息
await conn.query('INSERT INTO `message` (`from_user_id`, `to_user_id`, `content`) VALUES (?, ?, ?)', [
data.from_user_id, data.to_user_id, data.content
]);
// 更新未读消息数
await conn.query('UPDATE `user` SET `unreads` = `unreads` + 1 WHERE `id` = ?', [
data.to_user_id
]);
// 提交事务
await conn.commit();
} catch {
// 发生异常 - 回滚事务
await conn.rollback();
}
});
return 'ok';
}
}

$mysql 全局快捷方式 [v4.0+]

从 ZenWeb v4.0 开始,@zenweb/mysql 模块提供了 $mysql 全局快捷方式,让你可以在 Service 层或任意位置直接使用数据库连接池,无需通过 ctx 参数传递。

安装与导出

$mysql$getPool@zenweb/mysql 模块导出,ZenWeb 主入口也做了统一导出:

// 从 @zenweb/mysql 直接导入
import { $mysql, $getPool } from '@zenweb/mysql';

// 或从 zenweb 统一导入
import { $mysql } from 'zenweb';

使用方式

$mysql 是基于 callProxy 实现的代理对象,在请求上下文中使用时自动使用请求级连接(支持事务),在请求外使用时使用全局连接池:

src/service/user.ts
import { $mysql } from 'zenweb';

export class UserService {
async getUserById(id: number) {
// 直接使用 $mysql 执行查询,无需注入 ctx
const users = await $mysql.sql`SELECT * FROM user WHERE id = ${id}`;
return users[0];
}

async createUser(name: string, birthday: Date) {
const result = await $mysql.sql`
INSERT INTO user (name, birthday) VALUES (${name}, ${birthday})
`;
return result;
}

async transferMessage(fromUserId: number, toUserId: number, content: string) {
// $mysql 在请求上下文中使用时,自动参与当前事务
await $mysql.transaction(async tx => {
await tx.sql`
INSERT INTO message (from_user_id, to_user_id, content)
VALUES (${fromUserId}, ${toUserId}, ${content})
`;
await tx.sql`
UPDATE user SET unreads = unreads + 1 WHERE id = ${toUserId}
`;
});
}
}

$getPool()

如果需要直接获取连接池实例(例如在启动脚本或定时任务中),可以使用 $getPool()

src/scripts/sync-data.ts
import { $getPool } from '@zenweb/mysql';

async function syncData() {
const pool = $getPool();
const results = await pool.query('SELECT * FROM user WHERE active = 1');
// 处理数据...
}
提示

$mysql$getPool 在请求上下文内外均可使用。在请求上下文内会返回与 ctx.mysql 相同的连接池实例;在请求上下文外(如定时任务、启动脚本)则返回全局连接池。

集群读写分离

当业务量增长到一定规模时,通常会采用 MySQL 主从复制架构来分担读写压力。@zenweb/mysql 通过连接池集群(PoolCluster)支持自动的读写分离。

连接池命名约定

ZenWeb 使用特定的命名约定来识别主库和从库:

命名规则角色说明
MASTER主库负责所有写操作
SLAVE*从库负责读操作,* 可以是任意后缀

配置多个数据库实例时,只需按照 MASTERSLAVE 前缀命名,框架会自动识别并进行读写分离。

配置读写分离

src/index.ts
import { create } from 'zenweb';
import modMySQL from '@zenweb/mysql';

create()
.setup(modMySQL({
// 配置集群连接池
pools: {
// 主库 - 处理所有写操作(INSERT, UPDATE, DELETE)
MASTER: {
host: '10.0.0.1',
port: 3306,
user: 'root',
password: 'master-password',
database: 'myapp',
charset: 'utf8mb4',
timezone: '+08:00',
connectionLimit: 50,
},
// 从库 1 - 处理读操作(SELECT)
SLAVE1: {
host: '10.0.0.2',
port: 3306,
user: 'reader',
password: 'slave-password',
database: 'myapp',
charset: 'utf8mb4',
timezone: '+08:00',
connectionLimit: 50,
},
// 从库 2 - 可以配置多个从库分担读压力
SLAVE2: {
host: '10.0.0.3',
port: 3306,
user: 'reader',
password: 'slave-password',
database: 'myapp',
charset: 'utf8mb4',
timezone: '+08:00',
connectionLimit: 50,
},
}
}))
.start();

ORM 自动读写分离

如果使用了 @zenweb/orm(zenorm)模块,框架会根据操作类型自动选择主库或从库:

  • 写操作(INSERT、UPDATE、DELETE):自动路由到 MASTER
  • 读操作(SELECT):自动路由到 SLAVE* 节点,采用轮询策略分发

手动指定读写目标

在使用 SQL 查询时,你也可以手动指定连接池节点:

// 默认使用 MASTER(写操作)
await ctx.mysql.query('INSERT INTO user ...');

// 使用 .of() 方法指定从库读取
const slavePool = ctx.mysql.of('SLAVE*');
const users = await slavePool.query('SELECT * FROM user');
注意

事务中的所有操作(包括 SELECT)都会使用主库连接,以确保数据一致性。

连接池健康监控

@zenweb/mysql 内置了与 @zenweb/metric 模块的集成,可以自动收集数据库连接池的健康指标。

启用监控

只需要同时安装 @zenweb/metric 模块,MySQL 连接池监控会自动启用(无需额外配置):

npm install @zenweb/metric
src/index.ts
import { create } from 'zenweb';
import modMySQL from '@zenweb/mysql';
import modMetric from '@zenweb/metric';

create()
.setup(modMetric())
.setup(modMySQL({
pools: {
MASTER: {
host: '127.0.0.1',
port: 3306,
user: 'root',
password: '',
database: 'test',
},
},
// 如果需要禁用监控,可以设置 metricDisable: true
// metricDisable: true,
}))
.start();

监控指标

启用后会自动收集以下指标:

指标名称说明
mysql_actives当前活跃连接数(正在执行查询的连接)
mysql_idles当前空闲连接数(连接池中可用的连接)
mysql_queues当前等待队列长度(等待获取连接的请求数)

这些指标会通过 @zenweb/metric 的注册机制周期性上报,可以对接 Prometheus 等监控系统进行可视化展示和告警。

健康判断参考

状态判断依据
健康mysql_actives 远小于连接池上限,mysql_queues 接近 0
需关注mysql_actives 接近 connectionLimit 的 80%
危险mysql_queues 持续增长,说明连接池已满,请求在排队等待

当发现连接池压力过大时,可以采取以下措施:

  • 增大 connectionLimit 参数
  • 优化慢查询,减少单个连接占用时间
  • 增加 MySQL 从库分担读压力
  • 引入缓存层减少数据库访问