Flink算子状态为何只能用ListState?

news/2024/10/16 16:13:52 标签: flink, 大数据

前言

Flink 将状态是否要按照 key 进行分类,将状态分为键值状态(Keyed State)和算子状态(Operator State)两种,两者除了状态本身的作用域不同外,其中算子状态的状态类型更是被 Flink 限制为 ListState,这是为什么呢?

使用算子状态

算子状态的作用域为当前 subTask,使用算子状态,Flink 算子的每个subTask只能访问当前subTask的数据,不能夸subTask访问。典型的应用场景就是 FlinkKafkaConsumer 使用算子状态保存 Kafka Topic 中的每个分区的消费偏移量。

在Flink中,要想使用算子状态,可以选择实现 CheckpointedFunction 接口

public interface CheckpointedFunction {
    void snapshotState(FunctionSnapshotContext var1) throws Exception;

    void initializeState(FunctionInitializationContext var1) throws Exception;
}
  • snapshotState Flink作业执行快照时调用该方法,开发者可以控制往ListState写入哪些数据
  • initializeState Flink作业启动或者异常容错从快照恢复时调用这个方法

Flink作业启动或异常恢复时会调用 CheckpointedFunction#initializeState,通过入参 FunctionInitializationContext 来获取算子状态。

要想获取算子状态,首先得先定义状态描述符,因为算子状态被强制限定为列表状态,所以只能用 ListStateDescriptor。然后通过入参 FunctionInitializationContext#getOperatorStateStore 对象来获取 ListState。

@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    this.elementsState = functionInitializationContext.getOperatorStateStore().getListState(
            new ListStateDescriptor<>("elements", Integer.class)
    );
}

算子状态实战

算子状态在业务场景中并不常用,除了 FlinkKafkaConsumer 使用算子状态保存 Kafka Topic 中分区的消费偏移量外,Sink 算子使用算子状态作为写出数据的缓冲区也是一个较为常用的场景。

MySQL 是常用的关系型数据库,在流计算场景中,它也是一种常用的数据汇存储引擎,用来保存流计算的结果。但是MySQL的写入TPS通常不高,一般在几百甚至几千,上万已经是很夸张了。但是Flink作为一款高性能的流计算引擎,动辄十万百万的TPS数据流入,如果计算结果每次都写入MySQL,势必会压垮MySQL。此时可以在 Sink 算子上使用算子状态作为缓冲区,先缓存一部分数据,最后再一次性批量写MySQL,以此来减轻MySQL的压力。

举个例子,现在有一个数据源,会源源不断的产生一批数字,现在要开发一个 Flink 作业,计算这些数字的和,然后把结果写入到 MySQL,为了减轻MySQL的写入压力,要求 Sink 算子可以缓冲一部分数据再批量写。

如下代码所示,SumResultBufferingSink 实现了CheckpointedFunction 接口,元素到达时会先写入缓冲区 elements,缓冲区满才会累计求和后写入MySQL。同时,在执行快照时,也会把elements缓冲区的数据写入到elementsState,异常恢复时,再将elementsState数据恢复到缓冲区。

public class OperatorStateFuature {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.fromElements(1, 2, 3, 4, 5, 6)
                .keyBy(i -> "all")
                .sum(0)
                .addSink(new SumResultBufferingSink(3));
        environment.execute();
    }

    public static class SumResultBufferingSink implements SinkFunction<Integer>, CheckpointedFunction {

        private final int bufferSize;
        private final List<Integer> elements;
        private ListState<Integer> elementsState;

        public SumResultBufferingSink(int bufferSize) {
            this.bufferSize = bufferSize;
            this.elements = new ArrayList<>(bufferSize);
        }

        @Override
        public void invoke(Integer value, Context context) throws Exception {
            elements.add(value);
            if (elements.size() >= bufferSize) {
                int sum = elements.stream().mapToInt(Integer::intValue).sum();
                System.err.println("write to db : sum=" + sum);
                elements.clear();
            }
        }

        @Override
        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            System.err.println("---snapshotState start---");
            elementsState.clear();
            elementsState.addAll(elements);
            System.err.println("---snapshotState end---");
        }

        @Override
        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            System.err.println("---initializeState start---");
            this.elementsState = functionInitializationContext.getOperatorStateStore().getListState(
                    new ListStateDescriptor<>("elements", Integer.class)
            );
            // 是否从故障中恢复
            if (functionInitializationContext.isRestored()) {
                Iterator<Integer> iterator = elementsState.get().iterator();
                while (iterator.hasNext()) {
                    elements.add(iterator.next());
                }
            }
            System.err.println("---initializeState end---");
        }
    }
}

使用 SumResultBufferingSink 后,缓冲区大小为3,六个元素只会写两次DB。

ListState和UnionListState

OperatorStateStore 提供了两个方法获取 ListState

public interface OperatorStateStore {
  <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
  <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
}

ListState和UnionListState 有什么区别呢?

两者的区别在于,快照恢复或者算子并行度发生改变时,算子状态值的分配方式是不同的。

  • ListState 采用平均分割分配,状态重新分配时,所有subTask的ListState会先合并到一起,再采用 Round-Robin 策略将列表中的状态分配到各个subTask
  • UnionListState 采用合并分配,状态重新分配时,所有subTask的ListState合并到一起得到一个完整的列表,再将这个完整的列表发给每个subTask。

Tips:UnionListState要慎用,当列表中的元素非常多时,有内存溢出的风险。

算子状态为什么限制ListState

回到开篇提出的问题,为什么Flink要限制算子状态只能使用 ListState 类型?

本质上,是Flink异常恢复,或者算子并行度发生变化时,算子状态数据如何分配的问题。最简单公平的分配算法就是平均分配,那么除了 ListState 这种列表类型,其它如 ValueState,MapState 等数据结构实在是不方便数据划分啊,所以Flink才限制算子状态必须是 ListState 类型。当然,Flink 也给了开发者两种选择,一是 ListState 的平均分配,二是 UnionListState 给你全量的状态,程序自己来分配,更加灵活。


http://www.niftyadmin.cn/n/5708199.html

相关文章

python车牌号OCR识别(centos版)

在实际应用中&#xff0c;车牌号的识别(OCR)是一个非常重要的需求&#xff0c;尤其是在停车场管理、道路监控等场景中。本文将介绍如何在CentOS环境下&#xff0c;通过Docker容器&#xff0c;基于PaddleOCR来实现车牌号的识别。具体内容包括构建Docker镜像的步骤、相关依赖安装…

浅谈自查木马远控,预防电信诈骗

Hello&#xff0c;今天给大家带来一起关于自查木马远控&#xff0c;预防电信诈骗的文章&#xff0c;本文所介绍的方法可以有效查杀市面90%的后门木马&#xff0c;有效避免木马类电信诈骗。 问一&#xff1a;木马远控为什么会和电信诈骗有关&#xff1f;&#xff08;骗术公开&a…

OpenCV-人脸检测

文章目录 一、人脸检测流程二、关键方法三、代码示例四、注意事项 OpenCV是一个开源的计算机视觉和机器学习软件库&#xff0c;它提供了多种人脸检测方法&#xff0c;以下是对OpenCV人脸检测的详细介绍&#xff1a; 一、人脸检测流程 人脸检测是识别图像中人脸位置的过程&…

开源 AI 智能名片链动 2+1 模式 S2B2C 商城小程序助力个人品牌发展

摘要&#xff1a;本文探讨了开源 AI 智能名片链动 21 模式 S2B2C 商城小程序在个人品牌打造中的作用。通过分析该小程序如何扫清认知障碍、提供发展路径及帮助不同类型的人士获取铁杆粉丝&#xff0c;阐述了其在个人品牌建设方面的独特价值和使命。同时&#xff0c;强调了有一技…

【pyspark学习从入门到精通7】DataFrames_2

目录 创建 DataFrames 生成我们自己的 JSON 数据 创建 DataFrame 创建临时表 简单的 DataFrame 查询 DataFrame API 查询 SQL 查询 创建 DataFrames 通常&#xff0c;您会通过使用 SparkSession&#xff08;或在 PySpark shell 中调用 spark&#xff09;导入数据来创建 …

几何完备的3D分子生成/优化扩散模型 GCDM-SBDD - 评测

GCDM 是一个新的 3D 分子生成扩散模型&#xff0c;与之前的 EDM 相比&#xff0c;GCDM 优化了其中的图神神经网络部分&#xff0c;使用手性敏感的 SE3 等变神经网络 GCPNET 代替了 EDM 中的 EGNN&#xff0c;让节点间消息传递、聚合根据手性不同而进行。本文对 GCDM-SBDD&#…

Stable Diffusion【应用篇】【插画转绘】:建筑风景图片的插画转绘制作教程

学好 AI绘画 不论是就业还是做副业赚钱都不错&#xff0c;但要学会 AI绘画 还是要有一个学习规划。最后大家分享一份全套的 AI绘画 学习资料&#xff0c;给那些想学习 AI绘画 的小伙伴们一点帮助&#xff01; 图片的插画转绘有很多种不同的风格&#xff0c;今天我们分享另一种…

【Unity - 屏幕截图】技术要点

在Unity中想要实现全屏截图或者截取某个对象区域的图片都是可以通过下面的函数进行截取 Texture2D/// <summary>/// <para>Reads the pixels from the current render target (the screen, or a RenderTexture), and writes them to the texture.</para>/…