云服务器内容精选

  • Flink作业大小表Join去重 在双流关联的业务模型中,关联算子接收到其中一个流发送的大量重复数据,则会导致下游算子需要处理大量重复数据,影响作业性能。 如A表字段(P1,A1,A2)使用如下方式关联B表字段(P1,B1,B2,B3)生成C的场景中,B表信息发生大量更新,但是B中的所需字段没有更新,在该关联中仅用到了B表的B1和B2字段,对于B表,每个记录更新只更新B3字段,B1和B2不更新,因此当B表更新,可以忽略更新后的数据。 select A.A1,B.B1,B.B2 from A join B on A.P1=B.P1 为解决如上问题可通过使用hint单独为左表(duplicate.left)或右表(duplicate.right)设置去重: 格式 为左表设置去重 /*+ OPTIONS('duplicate.left'='true')*/ 为右表设置去重 /*+ OPTIONS('duplicate.right'='true')*/ 同时为左表和右表设置去重 /*+ OPTIONS('duplicate.left'='true','duplicate.right'='true')*/ 在SQL语句中配置 如同时为左表“user_info”和右表“user_score”设置去重。 CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH ( 'connector' = 'kafka', 'topic' = 'user_info_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); CREATE table print( `user_id` VARCHAR, `user_name` VARCHAR, `score` INT ) WITH ('connector' = 'print'); CREATE TABLE user_score (user_id VARCHAR, score INT) WITH ( 'connector' = 'kafka', 'topic' = 'user_score_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); INSERT INTO print SELECT t.user_id, t.user_name, d.score FROM user_info as t JOIN -- 为左表和右表设置去重 user_score /*+ OPTIONS('duplicate.left'='true','duplicate.right'='true')*/ as d ON t.user_id = d.user_id;
  • Flink作业大小表Join Flink作业双流Join时存在大小表数据,通过内核broadcast策略确保小表数据发送到Join的task中,通过rebalance策略将大表数据打散到Join中,提高Flink SQL易用性,增强作业稳定性。 图1 Flink作业大小表Join 在使用Flink SQL时,该特性通过hints方法指定Join的左表或右表为广播表,另一张表为rebalance表,SQL语句实例如下,分别以A\C作为小表实例: 以A表作为广播表 使用Join方式 SELECT /*+ BROADCAST(A) */ a2, b2 FROM A JOIN B ON a1 = b1 使用Where方式 SELECT /*+ BROADCAST(A) */ a2, b2 FROM A, B WHERE a1 = b1 以A和C表作为广播表 SELECT /*+ BROADCAST(A, C) */ a2, b2, c2 FROM A JOIN B ON a1 = b1 JOIN C ON a1 = c1 支持通过“/*+ BROADCAST(smallTable1, smallTable2) */”方式使用该特性,兼容开源双流Join逻辑。 不支持开源双流Join和该特性的切换,因为该特性会将数据广播到每个Join算子。 不支持LEFT JOIN时小表为左表,RIGHT JOIN时小表为右表。
  • 配置Flink SQL Client支持SQL校验功能方法 通过SQL Client进行SQL作业开发时,支持进入校验模式校验SQL语法正确性。校验模式下执行SQL命令不会启动Flink job。 校验SQL语句 执行SQL shell命令时添加“-v”参数(或“--validate”参数)直接进入校验模式。 sql-client.sh -v 执行SQL shell命令时通过SET命令进入或退出校验模式。 进入校验模式:SET table.validate = true; 退出校验模式:SET table.validate = false; 校验SQL脚本 当使用“-f”参数指定SQL脚本时,可添加“-v”参数进入校验模式。 sql-client.sh -f test.sql -v
  • 多流Join场景支持配置表级别的TTL时间 本章节适用于 MRS 3.3.0及以后版本。 在Flink双流Join场景下,如果Join的左表和右表其中一个表数据变化快,需要较短时间的过期时间,而另一个表数据变化较慢,需要较长时间的过期时间。目前Flink只有表级别的TTL(Time To Live:生存时间),为了保证Join的准确性,需要将表级别的TTL设置为较长时间的过期时间,此时状态后端中保存了大量的已经过期的数据,给状态后端造成了较大的压力。为了减少状态后端的压力,可以单独为左表和右表设置不同的过期时间。不支持where子句。 可通过使用Hint方式单独为左表和右表设置不同的过期时间,如左表(state.ttl.left)设置TTL为60秒,右表(state.ttl.right)设置TTL为120秒: Hint方式格式: /*+ OPTIONS('state.ttl.left'='60S', 'state.ttl.right'='120S') */ 在SQL语句中配置示例: 示例1: CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH ( 'connector' = 'kafka', 'topic' = 'user_info_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); CREATE table print( `user_id` VARCHAR, `user_name` VARCHAR, `score` INT ) WITH ('connector' = 'print'); CREATE TABLE user_score (user_id VARCHAR, score INT) WITH ( 'connector' = 'kafka', 'topic' = 'user_score_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); INSERT INTO print SELECT t.user_id, t.user_name, d.score FROM user_info as t JOIN -- 为左表和右表设置不同的TTL时间 /*+ OPTIONS('state.ttl.left'='60S', 'state.ttl.right'='120S') */ user_score as d ON t.user_id = d.user_id; 示例2 INSERT INTO print SELECT t1.user_id, t1.user_name, t3.score FROM t1 JOIN -- 为左表和右表设置不同的TTL时间 /*+ OPTIONS('state.ttl.left' = '60S', 'state.ttl.right' = '120S') */ ( select UPPER(t2.user_id) as user_id, t2.score from t2 ) as t3 ON t1.user_id = t3.user_id; 父主题: Flink企业级能力增强
  • FlinkSQL窗口函数支持迟到数据 FlinkSQL新增窗口函数支持迟到数据特性,解决迟到数据需要处理的场景。目前支持TUMBLE、HOP、OVER、CUMULATE窗口函数的迟到数据,示例如下: CREATE TABLE T1 ( `int` INT, `double` DOUBLE, `float` FLOAT, `bigdec` DECIMAL(10, 2), `string` STRING, `name` STRING, `rowtime` TIMESTAMP(3), WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND ) WITH ( 'connector' = 'values', ); -- 该Sink的字段必须和窗口的输入数据保持一致,但顺序不要求一致 CREATE TABLE LD_SINK( `float` FLOAT, `string` STRING, `name` STRING, `rowtime` TIMESTAMP(3) ) WITH ( 'connector' = 'print', ); SELECT /*+ LATE_DATA_SINK('sink.name'='LD_SINK') */ `name`, MIN(`float`), COUNT(DISTINCT `string`) FROM TABLE( TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) GROUP BY `name`, window_start, window_end 该特性还支持窗口接收到迟到数据时输出当前窗口的开始时间和结束时间,可通过添加在Hint中'window.start.field'和'window.end.field'使用,字段类型必须是timestamp,示例如下: CREATE TABLE LD_SINK( `float` FLOAT, `string` STRING, `name` STRING, `rowtime` TIMESTAMP(3), `windowStart` TIMESTAMP(3), `windowEnd` TIMESTAMP(3) ) WITH ( 'connector' = 'print', ); SELECT /*+ LATE_DATA_SINK('sink.name'='LD_SINK', 'window.start.field'='windowStart', 'window.end.field'='windowEnd') */ `name`, MIN(`float`), COUNT(DISTINCT `string`) FROM TABLE( TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) GROUP BY `name`, window_start, window_end
  • FlinkSQL DISTRIBUTEBY FlinkSQL新增DISTRIBUTEBY特性,根据指定的字段进行分区,支持单字段及多字段,解决数据仅需要分区的场景。示例如下: SELECT /*+ DISTRIBUTEBY('id') */ id, name FROM t1; SELECT /*+ DISTRIBUTEBY('id', 'name') */ id, name FROM t1; SELECT /*+ DISTRIBUTEBY('id1') */ id as id1, name FROM t1;
  • FlinkSQL支持设置Source的并发 本章节适用于MRS 3.3.0及以后版本。 FlinkSQL支持通过使用参数“source.parallelism”设置Source算子的并发数,解决下游算子的并发数引起的一些问题,例如下游算子发送数据倾斜、背压、作业性能慢等问题。 该特性会将Source和下游算子的Forward分区改为Rebalance分区,所以当Source算子的并发数和下游算子的并发数(parallelism数)不一致时,且作业不允许数据乱序,需要在启用该特性的同时开启DISTRIBUTEBY特性,可参考Flink SQL语法增强。 如设置Source并发数为“2”并开启DISTRIBUTEBY特性: CREATE TABLE KafkaSource ( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_source', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.系统 域名 ', -- 设置Source并发数 'source.parallelism' = '2' ); CREATE TABLE KafkaSink( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_sink', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.系统域名' ); -- Insert into KafkaSink select user_id, user_name, age from KafkaSource;(未开启DISTRIBUTEBY特性) -- 开启DISTRIBUTEBY特性 Insert into KafkaSink select/*+ DISTRIBUTEBY('user_id') */ user_id, user_name, age from KafkaSource;