TimescaleDB 基础
TimescaleDB 是一个 PostgreSQL 扩展, 为基于时间序列的数据/事件数据提供高性能的实时分析能力.
目前使用 TimescaleDB 主要用于大规模的事件分析, 它本身打包为 postgresql 扩展, 安装和使用都非常简单.
由于 TimescaleDB 为 postgresql 扩展(extension), 下面先简单介绍此概念.
什么是 postgresql 扩展
postgresql 的扩展指的是一种 "插件模块", 可在当前数据库加载, 用于提供 postgresql 本身未提供的额外功能/数据类型/处理函数/操作符等.
使用 psql
命令时, 可通过 \dx
显示当前已安装的扩展, 例如安装 TimescaleDB 后:
psql (17.2 (Ubuntu 17.2-1.pgdg22.04+1))
Type "help" for help.
postgres=# \dx
List of installed extensions
Name | Version | Schema | Description
---------------------+---------+------------+---------------------------------------------------------------------------------------
plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language
timescaledb | 2.17.2 | public | Enables scalable inserts and complex queries for time-series data (Community Edition)
timescaledb_toolkit | 1.19.0 | public | Library of analytical hyperfunctions, time-series pipelining, and other SQL utilities
(3 rows)
其中有三行, 可以看到它们的名字/版本/可作用于的目标 schema/介绍.
当然也可以通过 SELECT * FROM pg_extension;
获取插件列表, 结果和上述一致.
若要查看所有可用插件, 可使用 SELECT * FROM pg_available_extensions;
.
TimescaleDB 安装
这里只介绍使用 Docker 安装, 此方法可以快速启动一个实例.
TimescaleDB 的 Docker 镜像分两个版本:
- 基于 Ubuntu 的完整版本: 带
-ha
后缀, 提供最为完整的 TimescaleDB 体验, 其中包括完整的 TimescaleDB Toolkit, 以及 PostGIS 和 Patroni 插件.- PostGIS: 用于地理数据处理.
- Patroni 用于管理 TimescaleDB 集群.
- TimescaleDB Toolkit: 提供许多额外的 hyperfunctions.
- hyperfunction: TimescaleDB 专用的数据库操作过程, 用于分析处理时间序列数据(IoT 设备数据, 事件数据, 市场分析, 用户行为分析, 金融数据, 加密货币数据的等.)
- 基于 Alpine 的版本: 不带
-ha
后缀.
如下为选用基于 Ubuntu 的 HA 版本启动容器的脚本:
docker run -d --name timescaledb \
-p 5435:5432 \
-e POSTGRES_PASSWORD=mypass \
-v /home/ray/workspace/data_db/timescale:/home/postgres/pgdata/data \
timescale/timescaledb-ha:pg17
创建容器后, 可以验证连接:
- 使用
docker exec
:docker exec -it timescaledb psql -d "postgres://postgres:password@localhost/postgres"
- 使用可视化 sql 连接工具: 如 dbeaver.
基本使用
根据官方 github 仓库 README, 进行一些基本操作验证. TimescaleDB 核心是 Hypertable.
什么是 Hypertable
Hypertable 是在 PostgreSQL 表上增加了针对时间序列数据特殊处理能力的表. 它拥有全部普通表的功能.
具体看, Hypertable 可自动将数据按时间进行分块(提高性能), 将时间序列数据的管理进行极大简化.
在 TimescaleDB 中, Hypertable 和普通表可以共存, 这样可以选择普通表存放一般的关系型数据, 而使用 Hypertable 存放时间序列数据.
Hypertable 可极 大提升插入和查询性能(通过按时间参数对表进行分块). 实现上, 是通过数据库去管理和维护分块的, 使用时和普通 postgresql 表无差异.
由于分块大小直接影响插入和查询性能, TimescaleDB 默认按 7 day
对时间数据进行分块, 分块太大则无法完整装入内存, 有太多小分块则会影响前期查询时间和压缩. 因此分块最佳实践是:
- 在存放数据前, 预先设置
chunk_time_interval
为合适的分块时长. - 单块数据及其对应缓存总大小匹配约 25% 主存大小(若 64GB 内存, 则对应 16 GB).
比如一个 64GB 的机器, 每天写入数据(逻辑总字节
)约 6 GB, 则设置 2 天是合适的(2*6=12GB). 又如一个 8GB 机器, 对应的表每天写入约 150 MB, 则该表分块设置 2 周比较合适.
创建 hypertable
-- 0. 若未加载 timescaledb 扩展, 则加载: https://www.postgresql.org/docs/current/sql-createextension.html
CREATE EXTENSION IF NOT EXISTS timescaledb;
-- 1. 创建一个普通的 postgresql 表: 表名为 conditions
CREATE TABLE conditions (
time TIMESTAMPTZ NOT NULL,
location TEXT NOT NULL,
temperature DOUBLE PRECISION NULL,
humidity DOUBLE PRECISION NULL
);
-- 2. 将该表转化为 hypertable: 指定表名, 以及分块的列(这里是 time 列)
SELECT create_hypertable('conditions', by_range('time'));
用作分块的列类型可以是: timestamptz
, date
, integer
. 最佳实践是使用 timestamptz
.
而后修改表的分块大小(时长).
设置/修改 HyperTable 的分块大小
可使用如下查询当前分块默认全局设置:
SELECT *
FROM timescaledb_information.dimensions
WHERE hypertable_name = 'metrics';
建表时设置分块大小:
SELECT create_hypertable(
'conditions',
by_range('time', INTERVAL '1 day')
);
修改表的分块大小:
SELECT set_chunk_time_interval('conditions', INTERVAL '24 hours');
需要注意: 修改后的设置只会在新的块上应用, 不会影响老的块. 这也意味着若老的块未被填 满前, 该设置是不会立即生效的. 这种情况的解决方案是去创建一个新的 hypertable 应用新的设置.
查询某个表对应的分块:
-- 查询所有:
SELECT show_chunks('conditions');
-- 查询 3 天前的:
SELECT show_chunks('conditions', older_than => INTERVAL '3 days');
Hypertable 的数据压缩
TimescaleDB 的 hypercore 是一种混合行列(hybrid row-columnstore)存储技术, 可在将数据存储压缩 90% 的基础上, 提供高性能时间序列查询能力.
在 HyperTable 配置 columnstore:
ALTER TABLE conditions SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'location'
);
然后, 设置历史数据的压缩(历史数据大概率不会再插入了), 比如 3 天前的数据自动放入到 columnstore:
SELECT add_compression_policy('conditions', INTERVAL '3 days');
查看表大小:
SELECT pg_size_pretty( hypertable_size('conditions'));
查询表的压缩状态:
SELECT * FROM chunk_compression_stats('conditions');
SELECT compress_chunk('<chunk_name>');
-- 如
SELECT compress_chunk('_timescaledb_internal._hyper_11_7_chunk');
数据插入和查询
建表 -> 转 Hypertable -> 设置 Hypertable 分块 -> 设置压缩 几个步骤后, 我们来看看数据 的插入和查询.
简单的数据插入和查询如下(和 postgresql 的普通插入/查询类似):
-- 插入:
INSERT INTO conditions
VALUES
(NOW(), 'office', 70.0, 50.0),
(NOW(), 'basement', 66.5, 60.0),
(NOW(), 'garage', 77.0, 65.2);
-- 查询:
SELECT
COUNT(*)
FROM
conditions
WHERE
time > NOW() - INTERVAL '12 hours';
查询功能1: Time Bucket
Time Bucket 用于将数据通过时间长度进行聚合, 在一些场景下非常有用(比如统计一天用户的某个行为数量).
接着上面的 conditions
表, 统计计算日平均温度:
SELECT
time_bucket('1 day', time) AS bucket,
AVG(temperature) AS avg_temp
FROM
conditions
GROUP BY
bucket
ORDER BY
bucket ASC;
查询功能2: 连续的聚合(基于 time bucket)
在大型数据集上, 每次创建 bucket 肯定是开销较大的, 因此可以进行自动创建, 让查询更快速.(会在后台自动刷新对应 query)
比如还是统计日平均/最高/最低温度:
-- 1. 创建 日平均/最高/最低温度 视图, 名为 conditions_summary_daily
CREATE MATERIALIZED VIEW conditions_summary_daily
WITH (timescaledb.continuous) AS
SELECT
location,
time_bucket(INTERVAL '1 day', time) AS bucket,
AVG(temperature),
MAX(temperature),
MIN(temperature)
FROM
conditions
GROUP BY
location,
bucket;
-- 2. 对 conditions_summary_daily 视图进行每小时更新:
SELECT
add_continuous_aggregate_policy(
'conditions_summary_daily',
start_offset => INTERVAL '1 month',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 hour'
);
比如实际工作中, 统计日访问某个页面的不同渠道来源数量, 此方法非常高效.
HyperTable 修改
常用操作举例:
添加列:
ALTER TABLE conditions
ADD COLUMN humidity DOUBLE PRECISION NULL;
修改表名:
ALTER TABLE conditions
RENAME TO weather;
创建 Unique 约束: 在 Hypertable 上创建 Unique 约束时, 必须包含 Patition 列.
比如一个使用 time
列进行分块的表, 创建 Unique Index:
CREATE UNIQUE INDEX idx_deviceid_time
ON hypertable_example(device_id, time);
因此, 如果将一个带 Unique Index 的普通表转化为 Hypertable 时, 若分块列不在 Unique Index 中, 则无法进行转化.
删除表:
DROP TABLE <TABLE_NAME>;
使用 chunk skipping 加速查询
可通过其加速在查询非分块列名作为 WHERE 条件下的性能.
事件数据分析用例: 一个 APP 的 GA4 数据分析
我们探索 TimescaleDB 的主要目的是进行事件数据高效存储和读取. 因此结合此用例记录使用过程.
建立 HyperTable
建表 -> 转 Hypertable -> 设置 Hypertable 分块 -> 设置压缩
根据原始数据情况, 创建一个表, 这里直接使用 GA4 导入 BigQuery 时形成的通用 schema:
-- 略
从 GA4 拉取数据
使用 GA4 API 或 BigQuery API 即可, 不再赘述.
GA4 数据存入 HyperTable
拉取的数据存入 Hypertable 时, 在代码中执行 Insert 即可.
在 for 中单条数据插入会非常耗时, 如下介绍 batch 插入方法:
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
func main() {
// 已准备好 results: 待插入数据数组
// type result struct {
// Time time.Time
// SensorId int
// Temperature float64
// CPU float64
// }
// 1. 插入语句
queryInsertTimeseriesData := `
INSERT INTO sensor_data (time, sensor_id, temperature, cpu) VALUES ($1, $2, $3, $4);
`
// 2. 创建 batch
batch := &pgx.Batch{}
// 3. 将每条插入语句加入到 batch 队列(queue)
for i := range results {
var r result
r = results[i]
batch.Queue(queryInsertTimeseriesData, r.Time, r.SensorId, r.Temperature, r.CPU)
}
// 这条语句用于在最后检查插入的条数
batch.Queue("select count(*) from sensor_data")
// 4. 将 batch 发送到连接池(或连接)
br := dbpool.SendBatch(ctx, batch)
// 5. 执行所有在 batch queue 中的语句
_, err = br.Exec()
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to execute statement in batch queue %v\n", err)
return
}
fmt.Println("Successfully batch inserted data")
// 打印待插入条数
fmt.Printf("size of results: %d\n", len(results))
// 获取实际插入条数
var rowsInserted int
err = br.QueryRow().Scan(&rowsInserted)
fmt.Printf("size of table: %d\n", rowsInserted)
// 关闭
err = br.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to closer batch %v\n", err)
return
}
}
使用 batch insert 后, 10 万条数据插入时间对比:
- for 循环每条数据调用 exec:
5min
- batch insert:
10s
能够满足正常生产环境需求.
调查 timescaledb 的压缩: 压缩后物理存储数据大小
/ 压缩前物理存储数据大小
= 30%
, 即可以减肥 70% 左右.
原始数据拉取保存到本地时, 可以使用 feather 或 avro 存放, 这样数据大小可以进一步压缩, 便于后续单独使用.
读取数据并进行分析
数据分析, 由分析业务需求决定, 这里暂时不展开, 后续补充.