跳到主要内容

2 篇博文 含有标签「SQL」

查看所有标签

· 阅读需 8 分钟
SELECT 
year,
json_agg(json_build_object('id', id, 'thumbUrl', thumb_url, 'createTime', create_time)) AS items
FROM (
SELECT
id,
thumb_url,
create_time,
DATE_TRUNC('year', create_time) AS year,
ROW_NUMBER() OVER (PARTITION BY DATE_TRUNC('year', create_time) ORDER BY create_time DESC) AS row_num
FROM
"Image"
) AS subquery
WHERE row_num <= 3
GROUP BY year
ORDER BY year desc

需求:写一个查询语句,根据 create_time 字段,按时间分组(例如年),以 json 数组形式返回,每个分组下最多返回三条数据,返回示例如下

[
{
"year": "2025-01-01T00:00:00.000Z",
"items": [
{ "id": 1, "create_time": "2025-07-30T02:56:43.739" },
{ "id": 2, "create_time": "2025-07-29T02:56:43.739" },
{ "id": 3, "create_time": "2025-07-31T02:56:43.739" }
]
},
{
"year": "2024-01-01T00:00:00.000Z",
"items": [
{ "id": 4, "create_time": "2024-07-30T02:56:43.739" },
{ "id": 5, "create_time": "2024-07-31T02:56:43.739" }
]
}
]

我们根据功能点一步一步实现这个需求

1 处理时间分类字段

Image 表中没有 year 或 month 字段,只有 Datetime 类型的 create_time

所以我们需要造出这个用于分组的字段

DATE_TRUNC 处理 Datetime 类型的数据,截取 create_time 字段对应的年份或月份作为要分类的字段

SELECT
id,
create_time,
DATE_TRUNC('year', create_time) AS year
FROM
"Image"

DATE_TRUNC 方法用于将日期时间戳截断到指定的精度。它接受两个参数:

  • 截断精度: 指定要截断的日期时间部分。例如:year、month、day、hour、minute、second。
  • 日期时间戳: 要截断的日期时间值。

DATE_TRUNC 方法返回一个截断后的日期时间戳。

-- 截断到年
SELECT DATE_TRUNC('year', '2023-10-26 12:34:56'); -- 返回 2023-01-01 00:00:00

-- 截断到月
SELECT DATE_TRUNC('month', '2023-10-26 12:34:56'); -- 返回 2023-10-01 00:00:00

-- 截断到日
SELECT DATE_TRUNC('day', '2023-10-26 12:34:56'); -- 返回 2023-10-26 00:00:00

-- 截断到小时
SELECT DATE_TRUNC('hour', '2023-10-26 12:34:56'); -- 返回 2023-10-26 12:00:00

-- 截断到分钟
SELECT DATE_TRUNC('minute', '2023-10-26 12:34:56'); -- 返回 2023-10-26 12:34:00

-- 截断到秒
SELECT DATE_TRUNC('second', '2023-10-26 12:34:56'); -- 返回 2023-10-26 12:34:56

2 分组

SELECT
id,
create_time,
DATE_TRUNC('year', create_time) AS year
FROM
"Image"
GROUP BY
year, id, create_time;

这里不做任何聚合,只是展示如何按年份分组

为啥不能直接 GROUP BY year

根据 SQL 标准,如果在 SELECT 子句中使用了一个字段,那么这个字段要么应该是聚合函数的一部分(比如 COUNT, SUM, AVG),要么应该出现在 GROUP BY 子句中。否则,SQL 语句将无法执行,因为数据库无法确定如何处理这些字段

例如仅按 year 分组,数据库无法知道在 2024 年这一组中应如何处理 id 和 create_time 字段,因为每一组内可能有多个不同的 id 和 create_time 值

3 聚合为 JSON 数组

为了将分组后的记录聚合为 JSON 数组,我们需要先构建 JSON 对象,然后将这些对象聚合成数组。

这时使用子查询是为了在聚合之前完成分组和所需的数据处理。

子查询的作用:

预处理数据:在进行聚合操作之前,可以在子查询中预处理数据,确保我们在最终聚合时有一个干净的数据集。

逻辑隔离:将复杂的逻辑隔离在子查询中,使主查询更简洁和易读。

通过子查询,我们先进行年份分组,然后在外层查询中进行聚合

SELECT 
year,
json_agg(json_build_object('id', id, 'createTime', create_time)) AS items
FROM (
SELECT
id,
create_time,
DATE_TRUNC('year', create_time) AS year
FROM
"Image"
) AS subquery
# 由于将 id、create_time 字段聚合成了 json 数组作为 items 字段,此时就可以只根据 year 来分组了
GROUP BY year;
  • json_agg 是 PostgreSQL 提供的一个聚合函数,用于将一组行聚合成一个 JSON 数组。每一行将作为 JSON 数组的一个元素。
  • json_build_object 是 PostgreSQL 提供的一个函数,用于创建一个 JSON 对象。它接受成对的键值参数,并返回一个 JSON 对象。

此时查询结果如下

yearitems
2024-01-01 00:00:00[{"id" : "6ccd24be-5797-4a69-8193-da5fd24d5291", "createTime" : "2024-07-31T01:13:33.806"}, {"id" ...
2025-01-01 00:00:00[{"id" : "51e4db24-0166-4b3d-81dc-0d11f35c7440", "createTime" : "2025-07-30T02:56:43.739"}, {"id" ...

4 限制每组返回条数

增加 ROW_NUMBER 窗口函数:为每条记录分配行号,按年份分区,并按创建时间降序排序。

使用子查询过滤行号:

  • ROW_NUMBER

是一种窗口函数(window function),用于为结果集中的每一行分配一个唯一的行号(从1开始)。它通常与 OVER 子句一起使用,以定义如何计算行号的窗口

语法:ROW_NUMBER() OVER (window_specification)

示例:SELECT id, name, ROW_NUMBER() OVER (ORDER BY id) AS row_num FROM employees;

在这个示例中,ROW_NUMBER 为每一行分配一个行号,按照 id 列排序

  • OVER

用于定义窗口函数的窗口(即行集分组的方式)。它可以包含一个 PARTITION BY 子句用于定义分区,以及一个 ORDER BY 子句用于定义行的排序顺序。

语法:window_function() OVER (window_specification)

window_specification 可以包含:PARTITION BY 子句:将数据分区;ORDER BY 子句:定义行的排序顺序;ROWSRANGE 子句:进一步定义窗口范围(可选)。

示例:

SELECT id, department, ROW_NUMBER() OVER (PARTITION BY department ORDER BY id) AS row_num
FROM employees;

在这个示例中,OVER 子句定义了一个窗口,按 department 分区,并按 id 列排序,为每个部门内的每一行分配一个行号。

  • PARTITION BY

子句用于将结果集划分为多个分区(子集),每个分区独立地应用窗口函数。分区中的每一行都分配一个行号,并且行号在每个分区内从1开始。

语法:PARTITION BY expression,其中 expression 可以是一个或多个列。

示例:

SELECT id, department, salary, ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) AS rank
FROM employees;

在这个示例中,PARTITION BYdepartment 列分区,并按 salary 列降序排序,为每个分区内的每一行分配一个行号(排名)。

因此我们可以在子查询中在新增 row_num 行号列,根据 year 进行分区,使用 WHERE 筛选出不超过 3 的即可

SELECT 
year,
json_agg(json_build_object('id', id, 'thumbUrl', thumb_url, 'createTime', create_time)) AS items
FROM (
SELECT
id,
thumb_url,
create_time,
DATE_TRUNC('year', create_time) AS year,
ROW_NUMBER() OVER (PARTITION BY DATE_TRUNC('year', create_time) ORDER BY create_time DESC) AS row_num
FROM
"Image"
) AS subquery
WHERE row_num <= 3
GROUP BY year;

· 阅读需 11 分钟

需求

批量更新或新增数据,输入一个原始数据的数组,根据数据中在数据库中是否存在来判断是更新还是修改

直接想到的做法就是遍历这个数组,然后逐个查询,如果查到了,就修改,反之则新增

const db = client.db("数据库名").collection("users")
const data = [
{ code: '1', name: 'a', qty: 100 },
// ...
]

for (const d of data) {
// 先用 find 去查找数据是否存在
const result = await db.findOne({ code: d.code })
if (result) {
// 如果存在,则更新
await db.updateOne({ code: d.code }, d)
}else {
// 如果不存在,则新增
await db.insertOne(d)
}
}

虽然思路很清晰,但当数据量大的时候,频繁操作数据库可能会引起一些性能问题

相比逐条处理数据,批量更新通常会带来更高的性能

可以利用 mongoDB 提供的一些批量操作 api,一次性更新多个文档

方案1 预处理

  1. 先遍历查询每条数据
  2. 数据预处理:维护一个 insertDocsupdateDocs
  3. 最后分别调用 updateManyinsertMany 来批量更新这些数据
    • 注意 updateMany 用于同时更新多个满足特定条件的文档,但是它并不适用于每个文档有不同更新条件的情况
    • 所以这里使用 bulkWrite 来批量更新
const data = [
{ code: '1', name: 'a', qty: 100 },
// ...
]

const updateDocs = []
const insertDocs = []

for (let d of data) {
const result = await db.findOne({ code: d.code })
if (result) {
updateDocs.push({
filter: { code: d.code },
update: { $set: d }
})
} else {
insertDocs.push(d)
}
}

// 批量新增
const insertResult = await db.insertMany(insertDocs)
console.log(`Inserted ${insertResult.insertedCount} new document(s).`)

// 批量更新
const bulkUpdateOps = updateDocs.map(update => ({
updateOne: {
filter: update.filter,
update: update.update,
upsert: false
}
}));
const updateResult = await collection.bulkWrite(bulkUpdateOps)
console.log(`${updateResult.matchedCount} document(s) matched the filter`)
console.log(`${updateResult.modifiedCount} document(s) were updated`)

方案2 bulkWrite

bulkWrite API 原生支持批量新增或更新的操作,利用其 upsert 配置的特性来完成这个需求

const data = [
{ code: '1', name: 'a', qty: 100 },
// ...
]
// 创建 bulkWrite 操作数组
const bulkOperations = data.map(d => ({
updateOne: {
filter: { code: d.code },
update: { $set: d },
// upsert 表示新增或更新
upsert: true // 如果不存在,那么就新增
}
}))

// 执行 bulkWrite 操作
const result = await db.bulkWrite(bulkOperations)

console.log(`${result.matchedCount} document(s) matched the query criteria.`)
console.log(`${result.modifiedCount} document(s) was/were updated.`)
console.log(`${result.upsertedCount} document(s) was/were inserted.`)
console.log(`Upserted document _id: ${result.upsertedIds}`)

批量更新

类似于 MongoDB 中的批量更新功能,关系型数据库如 MySQL 和 PostgreSQL 也提供了处理批量更新的能力。不过,批量更新的实现方式和语法因不同的数据库而异。以下是 MySQL 和 PostgreSQL 中批量更新数据的基本示例:

MySQL 批量更新

在 MySQL 中,你可以使用 INSERT ... ON DUPLICATE KEY UPDATE 语法来达到类似批量更新的效果。这要求表中有唯一索引或主键,当插入的行与现有行在唯一索引或主键上冲突时,则执行更新操作。

INSERT INTO 表名 (1,2, ...)
VALUES
(1a,2a, ...),
(1b,2b, ...),
...
ON DUPLICATE KEY UPDATE
1 = VALUES(1),2 = VALUES(2), ...;

PostgreSQL 批量更新

在 PostgreSQL 中,可以使用 INSERT ... ON CONFLICT 语法实现类似的功能。这同样依赖于表中的唯一约束或主键。

INSERT INTO 表名 (1,2, ...)
VALUES
(1a,2a, ...),
(1b,2b, ...),
...
ON CONFLICT (列名) DO UPDATE
SET1 = EXCLUDED.1,2 = EXCLUDED.2, ...;

在这两个示例中,批量更新(或称为“批量插入或更新”)的行为取决于是否有与现有数据冲突的新数据。如果有冲突,则执行更新;如果没有冲突,则插入新数据。

这些方法非常适合于需要同步大量数据或进行批量数据处理的场景。需要注意的是,具体的语法和实现可能因数据库的版本和具体配置而有所不同,因此在实际应用中应参考相关数据库的官方文档。

关联查询

mongoDB 聚合查询

MongoDB 作为一个非关系型数据库(NoSQL),其原生的设计并没有传统意义上的“连表查询”概念。在关系型数据库(如 MySQL)中,多个表通过关联字段进行关联查询(JOIN 操作),而 MongoDB 中存储数据的基本单位是集合(Collection),它们之间不像关系型数据库中的表那样自然具有关系。

然而,MongoDB 提供了 $lookup 聚合管道操作符,它允许你在聚合查询中实现类似于 SQL 中的 JOIN 功能。使用 $lookup,你可以引用其他集合的文档并将它们与当前集合的文档汇聚在一起。

以下是一个 $lookup 的图书和作者示例,模拟关系型数据库中的连表查询:

db.books.aggregate([
{
$lookup: {
from: "authors", // 要 join 的集合名
localField: "authorId", // 本集合中用于 join 的字段
foreignField: "_id", // 外部集合中用于 join 的字段
as: "authorInfo" // 查找到的数据放在哪个字段
}
}
]);

在这个例子中,我们假设 books 集合中的每本书都有一个 authorId 字段,它引用了 authors 集合中对应作者的 _id 字段。$lookup 会将 authors 集合中匹配到的作者文档添加到 authorInfo 字段中。

在 MongoDB 3.2 以上版本中,$lookup 可以非常有效地执行这种操作,使 MongoDB 更接近于传统的关系型数据库体验。

此外,MongoDB 3.6 以上版本引入了更高级的 $lookup 语法,允许进行更复杂的连接操作,包括可以实现类似左外连接的操作。

尽管 MongoDB 有 $lookup 操作符,但它并不是为了大量连表操作而设计的。在使用 MongoDB 设计数据模型时,通常推荐的做法是尽量减少需要连接操作的场景,比如通过嵌入文档(embedding)或使用文档引用(referencing)来优化查询效率。

公司规定数据库每次关联查询不能超过三张表,怎么做

使用中间表减少连接

假设我们有三个表:users (用户信息)、orders (订单信息)、和 products (产品信息)。为了减少查询时的连接数量,我们可能会创建一个中间表 order_details,其中包含了用户信息和订单信息的部分数据。

创建中间表 order_details

CREATE TABLE order_details AS
SELECT u.user_id, u.user_name, o.order_id, o.product_id, p.product_name, o.quantity
FROM users u
JOIN orders o ON u.user_id = o.user_id
JOIN products p ON o.product_id = p.product_id;

之后的查询可以直接从 order_details 中获取数据,无需再次进行三表连接。

分解查询

假设我们需要基于用户的订单来获取产品信息。我们可以先查询用户的订单,然后使用这些订单ID来查询订单中的产品信息。

  1. 查询用户订单:
SELECT order_id
FROM orders
WHERE user_id = '123';

假设这返回了订单ID列表 [456, 789]

  1. 基于订单ID查询产品信息:
SELECT product_name, quantity
FROM order_details
WHERE order_id IN (456, 789);

这样,我们就避免了一次性连接多个表。

示例3: 使用子查询

子查询可以在单个 SQL 语句中嵌入另一个查询,从而减少同时需要连接的表的数量。

查询订单中的产品信息,其中产品价格大于某个值:

SELECT o.order_id, (SELECT product_name FROM products p WHERE p.product_id = o.product_id AND p.price > 100) as expensive_products 
FROM orders o
WHERE o.user_id = '123';

这里,我们在SELECT子句中使用了子查询来获取符合条件的产品名称。

示例4: 使用公用表表达式(CTE)

查找过去一月内活跃用户的订单:

WITH ActiveUsers AS (
SELECT user_id
FROM users
WHERE last_login_date > CURRENT_DATE - INTERVAL '1 month'
),
UserOrders AS (
SELECT user_id, order_id
FROM orders
WHERE user_id IN (SELECT user_id FROM ActiveUsers)
)
SELECT u.user_name, o.order_id
FROM UserOrders o
JOIN users u ON u.user_id = o.user_id;

这里,我们使用了两个CTE(ActiveUsers 和 UserOrders)来逐步减少所需连接的表数量。

示例5: 优化数据模型

在设计数据库时,可以通过合理的数据模型设计来避免复杂的查询,例如通过适当的反归一化来预存一些经常一起查询的数据字段,或者根据查询模式调整表结构。

例如:在 orders 表中添加 user_name 字段,避免查询时每次都要连接 users 表来获取用户名。