IntermediateDataset
IntermediateDataset
是在 JobGraph
中对中间结果的抽象。我们知道,JobGraph
是对 StreamGraph
进一步进行优化后得到的逻辑图,它尽量把可以 chain 到一起 operator 合并为一个 JobVertex
,而 IntermediateDataset
就表示一个 JobVertex
的输出结果。JobVertex
的输入是 JobEdge
,而 JobEdge
可以看作是 IntermediateDataset
的消费者。一个 JobVertex
也可能产生多个 IntermediateDataset
。需要说明的一点是,目前一个 IntermediateDataset
实际上只会有一个 JobEdge
作为消费者,也就是说,一个 JobVertex
的下游有多少 JobVertex
需要依赖当前节点的数据,那么当前节点就有对应数量的 IntermediateDataset
。
这时候还没有向集群去提交任务,在 Client 端会将 StreamGraph 生成 JobGraph,JobGraph 就是做为向集群提交的最基本的单元。在生成 JobGrap 的时候会做一些优化,将一些没有 Shuffle 机制的节点进行合并。有了 JobGraph 后就会向集群进行提交,进入运行阶段。
Jobvertex->Intermedia Dateset->JobEage
然后我们概念化这样一张物理执行图,可以看到每个 Task 在接收数据时都会通过这样一个InputGate 可以认为是负责接收数据的,再往前有这样一个 ResultPartition 负责发送数据,在 ResultPartition 又会去做分区跟下游的 Task 保持一致,就形成了 ResultSubPartition 和 InputChannel 的对应关系。这就是从逻辑层上来看的网络传输的通道,基于这么一个概念我们可以将反压的问题进行拆解。
ExecutionGraph -> Intermedia ATA
task ->ResultPartion->InputGate
ResultSubParition-->InputChannel 一一对应
问题拆解:反压传播两个阶段
假设最下游的 Task (Sink)出现了问题,处理速度降了下来这时候是如何将这个压力反向传播回去呢?这时候就分为两种情况:
跨 TaskManager ,反压如何从 InputGate 传播到 ResultPartition
TaskManager 内,反压如何从 ResultPartition 传播到 InputGate
TaskManager 只有一个 Network BufferPool 被所有的 Task 共享,初始化时会从 Off-heap Memory 中申请内存,申请后内存管理通过 Network BufferPool 来进行,不需要依赖 JVM GC 的机制去释放。有了 Network BufferPool 之后可以为每一个 ResultSubPartition 创建 Local BufferPool
反压过程:
- 首先InputChannel 的 Buffer 被用尽于
- 会向 Local BufferPool 申请新的 Buffer
- 如果 Local BufferPool没有可用的,或是 Local BufferPool 的最大可用 Buffer 到了上限 无法向 Network BufferPool 申请,没有办法去读取新的数据
- 这时 Netty AutoRead 就会被禁掉,Netty 就不会从 Socket 的 Buffer 中读取数据了。
- 导致消费端的Socket的buffer也被用尽,这时就会将 Window = 0 发送给发送端(前文提到的 TCP 滑动窗口的机制)。
- 这时发送端的 Socket 就会停止发送。导致 Socket 的 Buffer 也被用尽,发送端的Netty 检测到 Socket 无法写了之后就会停止向 Socket 写数据。
- Netty 停止写了之后,所有的数据就会阻塞在 Netty 的 Buffer 当中了,但是 Netty 的 Buffer 是无界的,可以通过 Netty 的水位机制中的 high watermark 控制他的上界。当超过了 high watermark,Netty 就会将其 channel 置为不可写,ResultSubPartition 在写之前都会检测 Netty 是否可写,发现不可写就会停止向 Netty 写数据。
8.这时候所有的压力都来到了 ResultSubPartition,和接收端一样他会不断的向 Local BufferPool 和 Network BufferPool 申请内存直至用尽buffer
9.record write写阻塞,Operator 就会停止写数据,达到跨 TaskManager 的反压。
-
跨 TaskManager 数据传输
-
TaskManager 内反压过程
下游的 TaskManager 反压导致本 TaskManager 的 ResultSubPartition 无法继续写入数据,于是 Record Writer 的写也被阻塞住了,因为 Operator 需要有输入才能有计算后的输出,输入跟输出都是在同一线程执行, Record Writer 阻塞了,Record Reader 也停止从 InputChannel 读数据,这时上游的 TaskManager 还在不断地发送数据,最终将这个 TaskManager 的 Buffer 耗尽。具体流程可以参考下图,这就是 TaskManager 内的反压过程。
TCP-based 反压的弊端
在一个 TaskManager 中可能要执行多个 Task,如果多个 Task 的数据最终都要传输到下游的同一个 TaskManager 就会复用同一个 Socket 进行传输,这个时候如果单个 Task 产生反压,就会导致复用的 Socket 阻塞,其余的 Task 也无法使用传输,checkpoint barrier 也无法发出导致下游执行 checkpoint 的延迟增大。
依赖最底层的 TCP 去做流控,会导致反压传播路径太长,导致生效的延迟比较大。
Credit-based 反压(SINCE FLINK1.5)
这个机制简单的理解起来就是在 Flink 层面实现类似 TCP 流控的反压机制来解决上述的弊端,Credit 可以类比为 TCP 的 Window 机制。
反压过程
Flink 层面实现反压机制,就是每一次 ResultSubPartition 向 InputChannel 发送消息的时候都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息,算完之后如果有充足的 Buffer 就会返还给上游一个 Credit 告知他可以发送消息(图上两个 ResultSubPartition 和 InputChannel 之间是虚线是因为最终还是要通过 Netty 和 Socket 去通信)
过了一段时间后,由于上游的发送速率要大于下游的接受速率,下游的 TaskManager 的 Buffer 已经到达了申请上限,这时候下游就会向上游返回 Credit = 0,ResultSubPartition 接收到之后就不会向 Netty 去传输数据,上游 TaskManager 的 Buffer 也很快耗尽,达到反压的效果,这样在 ResultSubPartition 层就能感知到反压, 不用通过 Socket 和 Netty 一层层地向上反馈,降低了反压生效的延迟。 同时也不会将 Socket 去阻塞,解决了由于一个 Task 反压导致 TaskManager 和 TaskManager 之间的 Socket 阻塞的问题
总结
- 网络流控是为了在上下游速度不匹配的情况下,防止下游出现过载
- 网络流控有静态限速和动态反压两种手段
- Flink 1.5 之前是基于 TCP 流控 + bounded buffer 实现反压
- Flink 1.5 之后实现了自己托管的 credit – based 流控机制,在应用层模拟 TCP 的流控机制
有了动态反压,静态限速是不是完全没有作用了?
我们流计算的结果最终是要输出到一个外部的存储(Storage),外部数据存储到 Sink 端的反压是不一定会触发的,这要取决于外部存储的实现,像 Kafka 这样是实现了限流限速的消息中间件可以通过协议将反压反馈给 Sink 端,但是像 ES 无法将反压进行传播反馈给 Sink 端*(极端情况导致job failover,es端socket发生 time out),这种情况下为了防止外部存储在大的数据量下被打爆,我们就可以通过静态限速的方式在 Source 端去做限流。
网络上传输的数据会写到 Task 的 InputGate(IG) 中,经过 Task 的处理后,再由 Task 写到 ResultPartition(RS) 中。每个 Task 都包括了输入和输入,输入和输出的数据存在 Buffer
中(都是字节数据)。Buffer
是 MemorySegment 的包装
类。
根据配置,Flink 会在 NetworkBufferPool
中生成一定数量(默认2048)的内存块 MemorySegment
,内存块的总数量就代表了网络传输中所有可用的内存。NetworkEnvironment 和 NetworkBufferPool 是 Task 之间共享的,每个 TM 只会实例化一个
Task 线程启动时,会为 Task 的 InputGate(IG)和 ResultPartition(RP) 分别创建一个 LocalBufferPool(缓冲池)并设置可申请的 MemorySegment(内存块)数量。IG 对应的缓冲池初始的内存块数量与 IG 中 InputChannel 数量一致,RP 对应的缓冲池初始的内存块数量与 RP 中的 ResultSubpartition 数量一致。不过,每当创建或销毁缓冲池时,NetworkBufferPool 会计算剩余空闲的内存块数量,并平均分配给已创建的缓冲池。
在 Task 线程执行过程中,当 Netty 接收端收到数据时,为了将 Netty 中的数据拷贝到 Task 中,InputChannel(实际是 RemoteInputChannel)会向其对应的缓冲池申请内存块(上图中的①)。如果缓冲池中也没有可用的内存块且已申请的数量还没到池子上限,则会向 NetworkBufferPool 申请内存块(上图中的②)并交给 InputChannel 填上数据(上图中的③和④)。
- 当一个内存块被消费完成之后(在输入端是指内存块中的字节被反序列化成对象了,在输出端是指内存块中的字节写入到 Netty Channel 了),会调用
Buffer.recycle()
方法,会将内存块还给 LocalBufferPool (上图中的⑤)。如果LocalBufferPool中当前申请的数量超过了池子容量(由于上文提到的动态容量,由于新注册的 Task 导致该池子容量变小),则LocalBufferPool会将该内存块回收给 NetworkBufferPool(上图中的⑥)。如果没超过池子容量,则会继续留在池子中,减少反复申请的开销
反压的过程
这里我们需要注意两个场景:
- 本地传输:如果 Task 1 和 Task 2 运行在同一个 worker 节点(TaskManager),该 buffer 可以直接交给下一个 Task。一旦 Task 2 消费了该 buffer,则该 buffer 会被缓冲池1回收。如果 Task 2 的速度比 1 慢,那么 buffer 回收的速度就会赶不上 Task 1 取 buffer 的速度,导致缓冲池1无可用的 buffer,Task 1 等待在可用的 buffer 上。最终形成 Task 1 的降速。
- 远程传输:在输出端,通过 Netty 的水位值机制来保证不往网络中写入太多数据。如果网络中的数据(Netty输出缓冲中的字节数)超过了高水位值,我们会等到其降到低水位值以下才继续写入数据。
Netty 水位值机制
初始化 NettyServer 时配置的水位值参数。
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, config.getMemorySegmentSize() + 1);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 2 * config.getMemorySegmentSize());
当输出缓冲中的字节数超过了高水位值, 则 Channel.isWritable() 会返回false。当输出缓存中的字节数又掉到了低水位值以下, 则 Channel.isWritable() 会重新返回true。Flink 中发送数据的核心代码在 PartitionRequestQueue 中
private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
if (fatalError) {
return;
}
Buffer buffer = null;
try {
// channel.isWritable() 配合 WRITE_BUFFER_LOW_WATER_MARK
// 和 WRITE_BUFFER_HIGH_WATER_MARK 实现发送端的流量控制
if (channel.isWritable()) {
// 注意: 一个while循环也就最多只发送一个BufferResponse, 连续发送BufferResponse是通过writeListener回调实现的
while (true) {
if (currentPartitionQueue == null && (currentPartitionQueue = queue.poll()) == null) {
return;
}
buffer = currentPartitionQueue.getNextBuffer();
if (buffer == null) {
// 跳过这部分代码
...
}
else {
// 构造一个response返回给客户端
BufferResponse resp = new BufferResponse(buffer, currentPartitionQueue.getSequenceNumber(), currentPartitionQueue.getReceiverId());
if (!buffer.isBuffer() &&
EventSerializer.fromBuffer(buffer, getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class) {
// 跳过这部分代码。batch 模式中 subpartition 的数据准备就绪,通知下游消费者。
...
}
// 将该response发到netty channel, 当写成功后,
// 通过注册的writeListener又会回调进来, 从而不断地消费 queue 中的请求
channel.writeAndFlush(resp).addListener(writeListener);
return;
}
}
}
}
catch (Throwable t) {
if (buffer != null) {
buffer.recycle();
}
throw new IOException(t.getMessage(), t);
}
}
// 当水位值降下来后(channel 再次可写),会重新触发发送函数
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
writeAndFlushNextMessageIfPossible(ctx.channel());
}
反压监控
Flink 在使用了一个 trick 来实现对反压的监控。如果一个 Task 因为反压而降速了,那么它会卡在向 LocalBufferPool 申请内存块上。那么这时候,该 Task 的 stack trace 就会长下面这样:
java.lang.Object.wait(Native Method)
o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) <--- BLOCKING request
[...]
那么事情就简单了。通过不断地采样每个 task 的 stack trace(线程堆栈) 就可以实现反压监控。
Flink 的实现中,只有当 Web 页面切换到某个 Job 的 Backpressure 页面,才会对这个 Job 触发反压检测,因为反压检测还是挺昂贵的。 JobManager的StackTraceSampleCoordinator
( A coordinator for triggering and collecting stack traces of running tasks)
会通过 Akka 给每个 TaskManager 发送
TriggerStackTraceSample消息。默认情况下,
TaskManager 会触发100次 stack trace 采样,每次
间隔 50ms(也就是说一次反压检测至少要等待5秒钟)。并将这
100 次采样的结果返回给 JobManager,由 JobManager 来计算
反压比率
(
反压出现的次数/采样的次数),最终展现在 UI 上。UI 刷新的默认周期是一分钟,目的是不对 TaskManager 造成太大的负担。
Flink 如何在吞吐量和延迟之间做权衡?
我们分析了上述的网络传输后,知道每个 SubTask 输出的数据并不是直接输出到下游,而是在 ResultSubPartition 中有一个 Buffer 用来缓存一批数据后,再 Flush 到 Netty 发送到下游 SubTask。那到底哪些情况会触发 Buffer Flush 到 Netty 呢?
Buffer 变满时
Buffer timeout 时
特殊事件来临时,例如:
CheckPoint 的 barrier
来临时
Flink 在数据传输时,会把数据序列化成二进制然后写到 Buffer 中,当 Buffer 满了,需要 Flush(默认为32KiB
,通过taskmanager.memory.segment-size设置)。但是当流量低峰或者测试环节,可能1分钟都没有 32 KB
的数据,就会导致1分钟内的数据都积攒在 Buffer 中不会发送到下游 Task 去处理,从而导致数据出现延迟,这并不是我们想看到的。所以 Flink 有一个 Buffer timeout
的策略,意思是当数据量比较少,Buffer 一直没有变满时,后台的 Output flusher
线程会强制地将 Buffer 中的数据 Flush 到下游。Flink 中默认 timeout 时间是 100ms
,即:Buffer 中的数据要么变满时 Flush,要么最多等 100ms 也会 Flush 来保证数据不会出现很大的延迟。当然这个可以通过 env.setBufferTimeout
(timeoutMillis) 来控制超时时间。
- timeoutMillis > 0 表示最长等待 timeoutMillis 时间,就会flush
- timeoutMillis = 0 表示每条数据都会触发 flush,直接将数据发送到下游,相当于没有Buffer了(避免设置为0,可能导致性能下降)
- timeoutMillis = -1 表示只有等到 buffer满了或 CheckPoint的时候,才会flush。相当于取消了 timeout 策略
严格来讲,Output flusher 不提供任何保证——它只向 Netty 发送通知,而 Netty 线程会按照能力与意愿进行处理。这也意味着如果存在反压,则 Output flusher 是无效的。言外之意,如果反压很严重,下游 Buffer 都满了,当然不能强制一直往下游发数据。
一些特殊的消息如果通过 RecordWriter 发送,也会触发立即 Flush 缓存的数据。其中最重要的消息包括 Checkpoint barrier 以及 end-of-partition 事件,这些事件应该尽快被发送,而不应该等待 Buffer 被填满或者 Output flusher 的下一次 Flush。当然如果出现反压,CheckPoint barrier 也会等待,不能发送到下游。
引入 Network buffers
以获得更高的资源利用率
和更高的吞吐量
,代价是让一些记录在 Buffer 中等待一段时间。虽然可以通过缓冲区超时给出此等待时间的上限,但你可能知道有关这两个维度(延迟和吞吐量)之间权衡的更多信息:显然,无法同时获得这两者
参考:
https://ververica.cn/developers/advanced-tutorial-2-analysis-of-network-flow-control-and-back-pressure/
http://wuchong.me/blog/2016/04/26/flink-internals-how-to-handle-backpressure/
[https://www.cnblogs.com/pengblog2020/p/12161716.html]