Skip to main content

TimescaleDB 基础

· 13 min read

TimescaleDB 是一个 PostgreSQL 扩展, 为基于时间序列的数据/事件数据提供高性能的实时分析能力.

目前使用 TimescaleDB 主要用于大规模的事件分析, 它本身打包为 postgresql 扩展, 安装和使用都非常简单.

由于 TimescaleDB 为 postgresql 扩展(extension), 下面先简单介绍此概念.

什么是 postgresql 扩展

postgresql 的扩展指的是一种 "插件模块", 可在当前数据库加载, 用于提供 postgresql 本身未提供的额外功能/数据类型/处理函数/操作符等.

使用 psql 命令时, 可通过 \dx 显示当前已安装的扩展, 例如安装 TimescaleDB 后:

psql (17.2 (Ubuntu 17.2-1.pgdg22.04+1))
Type "help" for help.

postgres=# \dx
List of installed extensions
Name | Version | Schema | Description
---------------------+---------+------------+---------------------------------------------------------------------------------------
plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language
timescaledb | 2.17.2 | public | Enables scalable inserts and complex queries for time-series data (Community Edition)
timescaledb_toolkit | 1.19.0 | public | Library of analytical hyperfunctions, time-series pipelining, and other SQL utilities
(3 rows)

其中有三行, 可以看到它们的名字/版本/可作用于的目标 schema/介绍.

当然也可以通过 SELECT * FROM pg_extension; 获取插件列表, 结果和上述一致.

若要查看所有可用插件, 可使用 SELECT * FROM pg_available_extensions;.

TimescaleDB 安装

这里只介绍使用 Docker 安装, 此方法可以快速启动一个实例.

TimescaleDB 的 Docker 镜像分两个版本:

  • 基于 Ubuntu 的完整版本: 带 -ha 后缀, 提供最为完整的 TimescaleDB 体验, 其中包括完整的 TimescaleDB Toolkit, 以及 PostGISPatroni 插件.
    • PostGIS: 用于地理数据处理.
    • Patroni 用于管理 TimescaleDB 集群.
    • TimescaleDB Toolkit: 提供许多额外的 hyperfunctions.
      • hyperfunction: TimescaleDB 专用的数据库操作过程, 用于分析处理时间序列数据(IoT 设备数据, 事件数据, 市场分析, 用户行为分析, 金融数据, 加密货币数据的等.)
  • 基于 Alpine 的版本: 不带 -ha 后缀.

如下为选用基于 Ubuntu 的 HA 版本启动容器的脚本:

docker run -d --name timescaledb \
-p 5435:5432 \
-e POSTGRES_PASSWORD=mypass \
-v /home/ray/workspace/data_db/timescale:/home/postgres/pgdata/data \
timescale/timescaledb-ha:pg17

创建容器后, 可以验证连接:

  • 使用 docker exec: docker exec -it timescaledb psql -d "postgres://postgres:password@localhost/postgres"
  • 使用可视化 sql 连接工具: 如 dbeaver.

基本使用

根据官方 github 仓库 README, 进行一些基本操作验证. TimescaleDB 核心是 Hypertable.

什么是 Hypertable

Hypertable 是在 PostgreSQL 表上增加了针对时间序列数据特殊处理能力的表. 它拥有全部普通表的功能.

具体看, Hypertable 可自动将数据按时间进行分块(提高性能), 将时间序列数据的管理进行极大简化.

在 TimescaleDB 中, Hypertable 和普通表可以共存, 这样可以选择普通表存放一般的关系型数据, 而使用 Hypertable 存放时间序列数据.

Hypertable 可极大提升插入和查询性能(通过按时间参数对表进行分块). 实现上, 是通过数据库去管理和维护分块的, 使用时和普通 postgresql 表无差异.

由于分块大小直接影响插入和查询性能, TimescaleDB 默认按 7 day 对时间数据进行分块, 分块太大则无法完整装入内存, 有太多小分块则会影响前期查询时间和压缩. 因此分块最佳实践是:

  1. 在存放数据前, 预先设置 chunk_time_interval 为合适的分块时长.
  2. 单块数据及其对应缓存总大小匹配约 25% 主存大小(若 64GB 内存, 则对应 16 GB).

比如一个 64GB 的机器, 每天写入数据(逻辑总字节)约 6 GB, 则设置 2 天是合适的(2*6=12GB). 又如一个 8GB 机器, 对应的表每天写入约 150 MB, 则该表分块设置 2 周比较合适.

创建 hypertable

-- 0. 若未加载 timescaledb 扩展, 则加载: https://www.postgresql.org/docs/current/sql-createextension.html
CREATE EXTENSION IF NOT EXISTS timescaledb;

-- 1. 创建一个普通的 postgresql 表: 表名为 conditions
CREATE TABLE conditions (
time TIMESTAMPTZ NOT NULL,
location TEXT NOT NULL,
temperature DOUBLE PRECISION NULL,
humidity DOUBLE PRECISION NULL
);

-- 2. 将该表转化为 hypertable: 指定表名, 以及分块的列(这里是 time 列)
SELECT create_hypertable('conditions', by_range('time'));

用作分块的列类型可以是: timestamptz, date, integer. 最佳实践是使用 timestamptz.

而后修改表的分块大小(时长).

设置/修改 HyperTable 的分块大小

可使用如下查询当前分块默认全局设置:

SELECT * 
FROM timescaledb_information.dimensions
WHERE hypertable_name = 'metrics';

建表时设置分块大小:

SELECT create_hypertable(
'conditions',
by_range('time', INTERVAL '1 day')
);

修改表的分块大小:

SELECT set_chunk_time_interval('conditions', INTERVAL '24 hours');

需要注意: 修改后的设置只会在新的块上应用, 不会影响老的块. 这也意味着若老的块未被填满前, 该设置是不会立即生效的. 这种情况的解决方案是去创建一个新的 hypertable 应用新的设置.

查询某个表对应的分块:

-- 查询所有:
SELECT show_chunks('conditions');

-- 查询 3 天前的:
SELECT show_chunks('conditions', older_than => INTERVAL '3 days');

Hypertable 的数据压缩

TimescaleDB 的 hypercore 是一种混合行列(hybrid row-columnstore)存储技术, 可在将数据存储压缩 90% 的基础上, 提供高性能时间序列查询能力.

在 HyperTable 配置 columnstore:

ALTER TABLE conditions SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'location'
);

然后, 设置历史数据的压缩(历史数据大概率不会再插入了), 比如 3 天前的数据自动放入到 columnstore:

SELECT add_compression_policy('conditions', INTERVAL '3 days');

查看表大小:

SELECT pg_size_pretty( hypertable_size('conditions'));

查询表的压缩状态:

SELECT * FROM chunk_compression_stats('conditions');

手动压缩某个分块:

SELECT compress_chunk('<chunk_name>');

-- 如
SELECT compress_chunk('_timescaledb_internal._hyper_11_7_chunk');

数据插入和查询

建表 -> 转 Hypertable -> 设置 Hypertable 分块 -> 设置压缩 几个步骤后, 我们来看看数据的插入和查询.

简单的数据插入和查询如下(和 postgresql 的普通插入/查询类似):

-- 插入:
INSERT INTO conditions
VALUES
(NOW(), 'office', 70.0, 50.0),
(NOW(), 'basement', 66.5, 60.0),
(NOW(), 'garage', 77.0, 65.2);

-- 查询:
SELECT
COUNT(*)
FROM
conditions
WHERE
time > NOW() - INTERVAL '12 hours';

查询功能1: Time Bucket

Time Bucket 用于将数据通过时间长度进行聚合, 在一些场景下非常有用(比如统计一天用户的某个行为数量).

接着上面的 conditions 表, 统计计算日平均温度:

SELECT
time_bucket('1 day', time) AS bucket,
AVG(temperature) AS avg_temp
FROM
conditions
GROUP BY
bucket
ORDER BY
bucket ASC;

查询功能2: 连续的聚合(基于 time bucket)

在大型数据集上, 每次创建 bucket 肯定是开销较大的, 因此可以进行自动创建, 让查询更快速.(会在后台自动刷新对应 query)

比如还是统计日平均/最高/最低温度:

-- 1. 创建 日平均/最高/最低温度 视图, 名为 conditions_summary_daily
CREATE MATERIALIZED VIEW conditions_summary_daily
WITH (timescaledb.continuous) AS
SELECT
location,
time_bucket(INTERVAL '1 day', time) AS bucket,
AVG(temperature),
MAX(temperature),
MIN(temperature)
FROM
conditions
GROUP BY
location,
bucket;

-- 2. 对 conditions_summary_daily 视图进行每小时更新:
SELECT
add_continuous_aggregate_policy(
'conditions_summary_daily',
start_offset => INTERVAL '1 month',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 hour'
);

比如实际工作中, 统计日访问某个页面的不同渠道来源数量, 此方法非常高效.

HyperTable 修改

常用操作举例:

添加列:

ALTER TABLE conditions
ADD COLUMN humidity DOUBLE PRECISION NULL;

修改表名:

ALTER TABLE conditions
RENAME TO weather;

创建 Unique 约束: 在 Hypertable 上创建 Unique 约束时, 必须包含 Patition 列.

比如一个使用 time 列进行分块的表, 创建 Unique Index:

CREATE UNIQUE INDEX idx_deviceid_time
ON hypertable_example(device_id, time);

因此, 如果将一个带 Unique Index 的普通表转化为 Hypertable 时, 若分块列不在 Unique Index 中, 则无法进行转化.

删除表:

DROP TABLE <TABLE_NAME>;

使用 chunk skipping 加速查询

可通过其加速在查询非分块列名作为 WHERE 条件下的性能.

事件数据分析用例: 一个 APP 的 GA4 数据分析

我们探索 TimescaleDB 的主要目的是进行事件数据高效存储和读取. 因此结合此用例记录使用过程.

建立 HyperTable

建表 -> 转 Hypertable -> 设置 Hypertable 分块 -> 设置压缩

根据原始数据情况, 创建一个表, 这里直接使用 GA4 导入 BigQuery 时形成的通用 schema:

-- 略

从 GA4 拉取数据

使用 GA4 API 或 BigQuery API 即可, 不再赘述.

GA4 数据存入 HyperTable

拉取的数据存入 Hypertable 时, 在代码中执行 Insert 即可.

在 for 中单条数据插入会非常耗时, 如下介绍 batch 插入方法:

package main

import (
"context"
"fmt"
"os"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)

func main() {
// 已准备好 results: 待插入数据数组
// type result struct {
// Time time.Time
// SensorId int
// Temperature float64
// CPU float64
// }

// 1. 插入语句
queryInsertTimeseriesData := `
INSERT INTO sensor_data (time, sensor_id, temperature, cpu) VALUES ($1, $2, $3, $4);
`

// 2. 创建 batch
batch := &pgx.Batch{}
// 3. 将每条插入语句加入到 batch 队列(queue)
for i := range results {
var r result
r = results[i]
batch.Queue(queryInsertTimeseriesData, r.Time, r.SensorId, r.Temperature, r.CPU)
}

// 这条语句用于在最后检查插入的条数
batch.Queue("select count(*) from sensor_data")

// 4. 将 batch 发送到连接池(或连接)
br := dbpool.SendBatch(ctx, batch)
// 5. 执行所有在 batch queue 中的语句
_, err = br.Exec()
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to execute statement in batch queue %v\n", err)
return
}
fmt.Println("Successfully batch inserted data")

// 打印待插入条数
fmt.Printf("size of results: %d\n", len(results))

// 获取实际插入条数
var rowsInserted int
err = br.QueryRow().Scan(&rowsInserted)
fmt.Printf("size of table: %d\n", rowsInserted)

// 关闭
err = br.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to closer batch %v\n", err)
return
}
}

使用 batch insert 后, 10 万条数据插入时间对比:

  • for 循环每条数据调用 exec: 5min
  • batch insert: 10s

能够满足正常生产环境需求.

调查 timescaledb 的压缩: 压缩后物理存储数据大小 / 压缩前物理存储数据大小 = 30%, 即可以减肥 70% 左右.

原始数据拉取保存到本地时, 可以使用 feather 或 avro 存放, 这样数据大小可以进一步压缩, 便于后续单独使用.

读取数据并进行分析

数据分析, 由分析业务需求决定, 这里暂时不展开, 后续补充.