Flink SQL 如何实现数据流的 Join?

  • 时间:
  • 浏览:2
  • 来源:神彩UU直播_彩神UU直播官方

图2. Join-in-continuous-query-2

实时领域 Streaming SQL 中的 Join 与离线 Batch SQL 中的 Join 最大不同点在于无法缓存详细数据集,我希望要给缓存设定基于时间的清理条件以限制 Join 涉及的数据范围。根据清理策略的不同,Flink SQL 分别提供了 Regular Join、Time-Windowed Join 和 Temporal Table Join 来应对不同业务场景。

图1. Join-in-continuous-query-1

其中最为关键的大难题在于 Join 的实现依赖于缓存整个数据集,而 Streaming SQL Join 的对象却是无限的数据流,内存压力和计算速度在长期运行来说都有不可处理的大难题。下文将结合 SQL 的发展解析 Flink SQL 是怎么才能 才能 处理哪此大难题并实现五个 数据流的 Join。

Temporal Table Join 相似于 Hash Join,将输入分为 Build Table 和 Probe Table。前者一般是纬度表的 changelog,后者一般是业务数据流,典型情况报告下后者的数据量应该远大于前者。在 Temporal Table Join 中,Build Table 是五个 基于 append-only 数据流的带时间版本的视图,不多又称为 Temporal Table。Temporal Table 要求定义五个 主键和用于版本化的字段(通常我希望 Event Time 时间字段),以反映记录在不一并间的内容。

此后给 Rates 指定时间版本,Rates 则会基于 RatesHistory 来计算符合时间版本的汇率转换内容。

图6. Temporal Table Join Example]

传统的离线 Batch SQL (面向有界数据集的 SQL)有某种基础的实现土法子,分别是 Nested-loop Join、Sort-Merge Join 和 Hash Join。

无论在 OLAP 还是 OLTP 领域,Join 都有业务常会涉及到且优化规则比较冗杂的 SQL 的话。对于离线计算而言,经过数据库领域多年的积累,Join 语义以及实现可能性十分旺盛期图片 图片 期期是什么是什么期期期是什么是什么 ,然而对于近年来刚兴起的 Streaming SQL 来说 Join 却指在刚起步的情况报告。

图4. Time-Windowed Join 的时间下界 - Orders 表

很久 Table A 经常出现新的输入 2、3、6,3 匹配到 Table B 的元素,或者 再输出 3 到结果表。

图8. Temporal Table Content]

图7. Temporal Table Registration]

值得注意的是,不同于在 Regular Join 和 Time-Windowed Join 中五个 表是平等的,任意五个 表的新记录都都能够与另一表的历史记录进行匹配,在 Temporal Table Join 中,Temoparal Table 的更新对另一表在该时间节点很久的记录是不可见的。这导致 亲戚亲戚当他们 只能够保存 Build Side 的记录直到 Watermark 超过记录的版本字段。可能性 Probe Side 的输入理论上我不多 再有早于 Watermark 的记录,哪此版本的数据都能够安全地被清理掉。

Time-Windowed Join 利用窗口给五个 输入表设定五个 Join 的时间界限,超出时间范围的数据则对 JOIN 不可见并都能够被清理掉。值得注意的是,这里涉及到的五个 大难题是时间的语义,时间都能够指计算指在的系统时间(即 Processing Time),也都能够指从数据某种的时间字段提取的 Event Time。可能性是 Processing Time,Flink 根据系统时间自动划分 Join 的时间窗口并定时清理数据;可能性是 Event Time,Flink 分配 Event Time 窗口并土法子 Watermark 来清理数据。

可能性历史数据我不多 被清理,不多 Regular Join 允许对输入表进行任意种类的更新操作(insert、update、delete)。然而可能性资源大难题 Regular Join 通常是不可持续的,一般只用做有界数据流的 Join。

亲戚亲戚当他们 将 RatesHistory 注册为五个 名为 Rates 的 Temporal Table,设定主键为 currency,版本字段为 time。

作者介绍:

林小铂,网易游戏高级开发工程师,负责游戏数据中心实时平台的开发及运维工作,目前专注于 Apache Flink 的开发及应用。探究大难题曾经我希望某种乐趣。

比如典型的五个 例子是对商业订单金额进行汇率转换。假设五个 多 Orders 流记录订单金额,能够和 RatesHistory 汇率流进行 Join。RatesHistory 代表不同货币转为日元的汇率,每当汇率有变化时就会有根小更新记录。五个 表在某一时间节点内容如下:

另外,尽管在实时计算领域 Join 都能够灵活地用底层编程 API 来实现,但在 Streaming SQL 中 Join 的发展仍指在比较初级的阶段,其中关键点在于怎么才能 才能 将时间属性大概地融入 SQL 中,这点 ISO SQL 委员会制定的 SQL 标准并没人 给出详细的答案。可能性从另外五个 高度来讲,作为 Streaming SQL 最早的开拓者之一,Flink 社区很适合探索出一套合理的 SQL 语法反过来贡献给 ISO。

或者 五个 输入表都只能够缓指在时间下界以上的数据,将空间占用维持在合理的范围。

你这名 查询会为 Orders 表设置了 o.ordertime > s.shiptime- INTERVAL ‘4’ HOUR 的时间下界(图4)。

以更常用的 Event Time Windowed Join 为例,五个 将 Orders 订单表和 Shipments 运输单表土法子订单时间和运输时间 Join 的查询如下:

不过嘴笨 底层实现上没人 大难题,但怎么才能 才能 通过 SQL 语法定义时间仍是难点。尽管在实时计算领域 Event Time、Processing Time、Watermark 哪此概念可能性成为业界共识,但在 SQL 领域对时间数据类型的支持仍比较弱[4]。或者 ,定义 Watermark 和时间语义都能够通过编程 API 的土法子完成,比如从 DataStream 转换至 Table ,能够 单纯靠 SQL 完成。这方面的支持 Flink 社区计划通过拓展 SQL 方言来完成,感兴趣的读者都能够通过 FLIP-66[7] 来追踪进度。

Regular Join 是最为基础的没人 缓存剔除策略的 Join。Regular Join 中五个 表的输入和更新都有对全局可见,影响很久所有的 Join 结果。举例,在五个 如下的 Join 查询里,Orders 表的新纪录会和 Product 表所有历史纪录以及未来的纪录进行匹配。

图3. Join-in-continuous-query-3

相对于离线的 Join,实时 Streaming SQL(面向无界数据集的 SQL)无法缓存所有数据,或者 Sort-Merge Join 要求的对数据集进行排序基本是无法做到的,而 Nested-loop Join 和 Hash Join 经过一定的改良则都能够满足实时 SQL 的要求。

亲戚亲戚当他们 通过例子来看基本的 Nested Join 在实时 Streaming SQL 的基础实现(案例及图来自 Piotr Nowojski 在 Flink Forward San Francisco 的分享[2])。

图5. Time-Windowed Join 的时间下界 - Shipment 表

没人 有没人 可能性设置五个 缓存剔除策略,将某种要的历史数据及时清理呢?答案是肯定的,关键在于缓存剔除策略怎么才能 才能 实现,这也是 Flink SQL 提供的某种 Join 的主要区别。

原文链接

并为 Shipmenets 表设置了 s.shiptime >= o.ordertime 的时间下界(图5)。

都能够看了在 Nested-Loop Join 中亲戚亲戚当他们 能够保存五个 输入表的内容,而随着时间的增长 Table A 和 Table B 能够保存的历史数据无止境地增长,导致 很不合理的内存磁盘资源占用,或者 单个元素的匹配速度也会没人 低。相似的大难题也指在于 Hash Join 中。

接着 Table B 依次接受到五个 新的元素,分别是 7、3、1。可能性 1 匹配到 Table A 的元素,或者 结果表再输出五个 元素 1。

Table A 有 1、42 五个 元素,Table B 有 42 五个 元素,不多此时的 Join 结果会输出 42。

嘴笨 Timed-Windowed Join 处理了资源大难题,但也限制了使用场景: Join 五个 输入流都能够有时间下界,超过很久则不可访问。这对于不多 Join 维表的业务来说是不适用的,可能性不多情况报告下维表并没人 时间界限。针对你这名 大难题,Flink 提供了 Temporal Table Join 来满足用户需求。

在 Rates 的帮助下,亲戚亲戚当他们 都能够将业务逻辑用以下的查询来表达: