Strutured Streaming(Spark 2.3)

Spark 2.3 Strutured Streaming

为了继续实现 Spark 更快,更轻松,更智能的目标,Spark 2.3 在许多模块都做了重要的更新,比如 Structured Streaming 引入了低延迟的连续处理(continuous processing);支持 stream-to-stream joins;通过改善 pandas UDFs 的性能来提升 PySpark;支持第四种调度引擎 Kubernetes clusters(其他三种分别是自带的独立模式Standalone,YARN、Mesos)。除了这些比较具有里程碑的重要功能外,Spark 2.3 还有以下几个重要的更新:
引入 DataSource v2 APIs
矢量化(Vectorized)的 ORC reader
Spark History Server v2 with K-V store
基于 Structured Streaming 的机器学习管道API模型
MLlib 增强
Spark SQL 增强
这篇文章将简单地介绍上面一些高级功能和改进,更多的特性请参见 Spark 2.3 release notes

这里主要针对Spark2.3 对Structured Streaming的新特性进行描述。

The simplest way to perform streaming analytics is not having to reason about streaming.

毫秒延迟的连续流处理

Apache Spark 2.0 的 Structured Streaming 将微批次处理(micro-batch processing)从它的高级 APIs 中解耦出去,原因有两个:首先,开发人员更容易学习这些 API,不需要考虑这些 APIs 的微批次处理情况;其次,它允许开发人员将一个流视为一个无限表,他们查询流的数据,就像他们查询静态表一样简便。
但是,为了给开发人员提供不同的流处理模式,社区引入了一种新的毫秒级低延迟(millisecond low-latency连续模式(continuous mode)。
在内部,结构化的流引擎逐步执行微批中的查询计算,执行周期由触发器间隔决定,这个延迟对大多数真实世界的流应用程序来说是可以容忍的。

微批处理(micro-batch)

Structured Streaming默认使用微量批处理执行模型。这意味着Spark流引擎会定期检查流源,并对自上次批量结束后到达的新数据运行批量查询。At a high-level, it looks like this:

在微批处理体系结构中,驱动程序通过将记录偏移量保存到预写的日志中(write-ahead-log)来检查进度,然后用它来重新启动查询。请注意,为了获得确定性的重新执行和端对端语义,在微批处理启动之前,要在下一个微批处理中处理的范围偏移保存到日志中。因此,数据源中可用的记录可能不得不等待当前的微批次在其偏移记录之前完成,并且在下一个微批中处理它。过程看起来像这样。

In this architecture, the driver checkpoints the progress by saving the records offsets to a write-ahead-log, which may be then used to restart the query. Note that the range offsets to be processed in the next micro-batch is saved to the log before the micro-batch has started in order to get deterministic re-executions and end-to-end semantics. As a result, a record that is available at the source may have to wait for the current micro-batch to be completed before its offset is logged and the next micro-batch processes it. At the record level, the timeline looks like this.

这会导致更多的延迟时间。

连续处理(Continuous Processing)

而对于连续模式,流读取器连续拉取源数据并处理数据,而不是按指定的触发时间间隔读取一批数据。通过不断地查询源数据和处理数据,新的记录在到达时立即被处理,将等待时间缩短到毫秒,满足低延迟的应用程序的需求,具体如下面图所示:

目前连续模式支持 map-like Dataset 操作,包括投影(projections)、selections以及其他 SQL 函数,但是不支持 current_timestamp(), current_date() 以及聚合函数。它还支持将 Kafka 作为数据源和数据存储目的地(sink),也支持 console 和 memory sink。
现在,开发人员可以根据延迟要求选择模式连续或微量批处理,来构建大规模实时流式传输应用程序,同时这些系统还能够享受到 Structured Streaming 提供的 fault-tolerance 和 reliability guarantees 特性。
简单来说,Spark 2.3 中的连续模式是实验性的,它提供了以下特性:
端到端的毫秒级延迟
至少一次语义保证
支持 map-like 的 Dataset 操作

流与流进行Join

Spark 2.0 版本的 Structured Streaming 支持流 DataFrame/Dataset 和静态数据集之间的 join,但是 Spark 2.3 带来了期待已久的流和流的 Join 操作。支持内连接和外连接,可用在大量的实时场景中。
广告收益是流与流进行Join的典型用例。例如,展示广告流和广告点击流共享您希望进行流式分析的公共关键字(如adId)和相关数据,根据这些数据你可以分析出哪些广告更容易被点击。

这个例子看起来很简单,但是实现流和流的Join需要解决很多技术难题,如下:

  • 缓存处理延迟数据:需要缓存延迟的数据,直到从其他流中找到匹配的事件;
  • 先置缓冲区大小:限制流式连接缓冲区大小的唯一方法是将延迟超过某个阈值的数据丢弃。此最大延迟阈值应由用户根据业务需求和系统资源限制之间的平衡进行配置。
  • 明确定义的语义:在静态连接和流式连接之间保持一致的SQL连接语义,具有或不具有上述阈值。

幸运的事Spark2.3解决了所有这些问题,因此我们使用SQL连接的清晰语义来表达计算,并控制相关事件之间的延迟容忍。
假设有两个不同的kafka流,我们要通过两个流中的共有的adID属性进行连接。让我们来看看代码是怎么实现的:

与所有的结构化数据流的查询相同,代码是完全一样的。将kafka流数据当作静态的Dataframe一样去定义。当执行此查询时,结构化流式处理引擎会根据需要将点击和展示作为流状态进行缓冲。对于特定广告,只要接收到两个相关事件(即收到第二个事件后),即会生成联合输出。随着数据到达,连接的输出将逐步生成并写入查询接收器(例如,另一个Kafka)。