我在TimescaleDB中监视、收集和存储不同组织的事件数据。
我有一个带有一个数据库的TimescaleDB。每个组织在数据库中都有自己的模式。每个模式有两个表:一个设备表和一个事件表。
设备表存储有关给定组织的设备的信息,而事件表是存储从不同设备收集的所有事件的时间序列表。事件表具有指向设备表的列(外键)。
这就是我的数据库的结构方式。
▼ Servers (1)
▼ TimescaleDB Cloud
▼ Databases (1)
▼ myTimescaleDB
▼ Schemas (12)
▼ organization_1 (12)
▼ Tables (2)
▼ device
▼ Columns (3)
device_id
device_name
device_type
▼ event
▼ Columns (5)
event_id
time
device_id (fk)
event_source
event_type
▼ organization_2 (12)
▼ Tables (2)
▼ device
▼ Columns (3)
device_id
device_name
device_type
▼ event
▼ Columns (5)
event_id
time
device_id (fk)
event_source
event_type
我想要创建一个预定的用户定义的操作,基本上确保每个设备最多只有10.000个事件。因此,假设每5分钟,用户定义的操作应该计算每个模式中每个设备的事件数。任何有超过10.000个事件的设备都应该被裁剪为只有10.000。不过,我们总想把那些旧的事情整理一下。我该怎么做?
发布于 2022-07-07 12:12:23
你的设备每X秒发送一次数据吗?因为如果有这样的情况,也可以创建保留策略。假设每个设备每5秒接收一次新事件,所以每分钟有12次,(12 * 60 * 24) =每天17280次。所以,17280中的10k大约是一天中的58%。
您也可以做一些数学,以获得一些小时,并使用保留策略删除其余的。
这将是采用保留政策,这最终是背景工作,做的正是删除。唯一的区别是,它们的效率要高得多,因为它们完全放弃了块。如果不是这样的话,您需要在这里请求帮助时使用自定义后台作业。
要构建适当的场景,您可以尝试构建一个查询来验证您想要的内容。下面是一个使用窗口函数获取适当记录的片段:
WITH summary AS (
SELECT time,
device,
ROW_NUMBER() OVER(PARTITION BY device
ORDER BY time DESC ) AS rank
FROM conditions )
SELECT *
FROM summary
WHERE rank = 10000;
它会返回如下内容:
┌────────────────────────┬────────┬───────┐
│ time │ device │ rank │
├────────────────────────┼────────┼───────┤
│ 2000-01-07 21:17:05+00 │ 0 │ 10000 │
│ 2000-01-04 10:56:19+00 │ 1 │ 10000 │
│ 2000-01-04 11:37:45+00 │ 2 │ 10000 │
│ 2000-01-04 11:53:32+00 │ 3 │ 10000 │
│ 2000-01-04 11:42:57+00 │ 4 │ 10000 │
│ 2000-01-04 10:13:28+00 │ 5 │ 10000 │
│ 2000-01-04 11:30:52+00 │ 6 │ 10000 │
│ 2000-01-04 11:38:55+00 │ 7 │ 10000 │
│ 2000-01-04 11:46:30+00 │ 8 │ 10000 │
现在只需要将查询与delete子句结合起来:
WITH summary AS (
SELECT time,
device,
ROW_NUMBER() OVER(PARTITION BY device
ORDER BY time DESC ) AS rank
FROM conditions )
DELETE FROM conditions USING summary
WHERE summary.rank = 10000 and conditions.time < summary.time and summary.device = conditions.device;
确认它如你所期望的那样有效:
select device, count(1) from conditions group by 1;
┌────────┬───────┐
│ device │ count │
├────────┼───────┤
│ 0 │ 10000 │
│ 1 │ 10000 │
│ 2 │ 10000 │
│ 3 │ 10000 │
│ 4 │ 10000 │
│ 5 │ 10000 │
│ 6 │ 10000 │
您可以将它封装到后台函数中:
CREATE OR REPLACE PROCEDURE limit_devices_data(job_id int, config jsonb) LANGUAGE PLPGSQL AS
$$
BEGIN
RAISE NOTICE 'DELETING in the job % with config %', job_id, config;
WITH summary AS (
SELECT time,
device,
ROW_NUMBER() OVER(PARTITION BY device
ORDER BY time DESC ) AS rank
FROM conditions )
DELETE FROM conditions USING summary
WHERE summary.rank = 10000 and conditions.time < summary.time and summary.device = conditions.device;
COMMIT;
END
$$;
并将作业添加到每5分钟运行一次,或者根据您的需要:
SELECT add_job('limit_devices_data','5 minutes', initial_start => now() + INTERVAL '5 seconds');
如果您有太多的数据,可能需要增加max_runtime:
SELECT alter_job(job_id, max_runtime => INTERVAL '1 minute');
由于在不同的模式下有几个表,所以我建议您查看一下执行结合格式来迭代几个表。可以使用information.hypertables视图查询超表名称。
https://stackoverflow.com/questions/72896482
复制相似问题