MAPREDUCE服务 MRS-Flink SQL语法增强:FlinkSQL支持设置Source的并发
FlinkSQL支持设置Source的并发
本章节适用于 MRS 3.3.0及以后版本。
FlinkSQL支持通过使用参数“source.parallelism”设置Source算子的并发数,解决下游算子的并发数引起的一些问题,例如下游算子发送数据倾斜、背压、作业性能慢等问题。
该特性会将Source和下游算子的Forward分区改为Rebalance分区,所以当Source算子的并发数和下游算子的并发数(parallelism数)不一致时,且作业不允许数据乱序,需要在启用该特性的同时开启DISTRIBUTEBY特性,可参考Flink SQL语法增强。
如设置Source并发数为“2”并开启DISTRIBUTEBY特性:
CREATE TABLE KafkaSource ( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_source', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.系统 域名 ', -- 设置Source并发数 'source.parallelism' = '2' ); CREATE TABLE KafkaSink( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_sink', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.系统域名' ); -- Insert into KafkaSink select user_id, user_name, age from KafkaSource;(未开启DISTRIBUTEBY特性) -- 开启DISTRIBUTEBY特性 Insert into KafkaSink select/*+ DISTRIBUTEBY('user_id') */ user_id, user_name, age from KafkaSource;
- GaussDB数据库SQL语法_SQL语法_CREATE_ALTER
- GaussDB(DWS)常用SQL_常用SQL命令_SQL语法
- 什么是Flink OpenSource SQL_数据湖探索_Flink OpenSource SQL
- Hudi服务_什么是Hudi_如何使用Hudi
- MapReduce服务_什么是Hive_如何使用Hive
- MapReduce服务_什么是ClickHouse_如何使用ClickHouse
- MapReduce服务_什么是ZooKeeper_如何使用ZooKeeper
- MapReduce服务_什么是Hue_如何使用Hue
- GaussDB版本查询_华为云GaussDB的作用_高斯数据库版本查询_华为云
- 什么是数据湖探索服务_数据湖探索DLI用途与特点