云服务器内容精选

  • 响应参数 状态码: 200 表3 响应Body参数 参数名称 参数类型 说明 is_success Boolean 执行请求是否成功。“true”表示请求执行成功。 message String 系统提示信息,执行成功时,信息可能为空。 状态码: 400 表4 响应Body参数 参数 参数类型 描述 error_code String 错误码。 error_msg String 错误描述信息。 状态码: 500 表5 响应Body参数 参数 参数类型 描述 error_code String 错误码。 error_msg String 错误描述信息。
  • 概述 dws-connector-flink是在dws-client的基础上对接Flink的一个工具,工具为对dws-client的包装,整体入库能力跟dws-client一致。dws-connector-flink为 GaussDB (DWS)团队自研工具,后续将根据GaussDB(DWS)数据库持续优化。 dws-flink-connector的DWS-Connector只支持单并发查询存量数据,暂不支持并行读取。
  • 常见问题 Q:writeMode参数设置什么值比较合适? A:根据业务场景分update(只更新存在的数据)和upsert(对于同一主键数据如果存在就更新,不存在就新增一条数据)两个类型,推荐直接使用auto方式即可,该方式下会根据数据量的大小自动选择,如果数据量较大会增大攒批参数autoFlushBatchSize,即可提升入库性能。 Q:autoFlushBatchSize和autoFlushMaxInterval怎么设置比较合适? A:autoFlushBatchSize参数用于设置最大攒批条数,autoFlushMaxInterval参数用于设置最大攒批间隔,两个参数分别从时间和空间维度管控攒批。 通过autoFlushMaxInterval可保证数据量较小时的时效性,如对时效性无强制要求通常不建议设置的太小,建议不低于3s走默认值即可。 通过autoFlushBatchSize可控制一批数据的最大条数,一般来说攒批量越大,对于整体入库性能会更好,对性能来说通常该参数的设置推荐越大越好,参数的设置根据业务数据的大小以及flink运行内存来设置,保证不内存溢出。 对于大多业务来说无需设置autoFlushMaxInterval,将autoFlushBatchSize设置为50000即可。 Q: 遇到数据库死锁了怎么办? A:通常出现死锁大致分为行锁死锁和分布式死锁。 行锁:该场景通常为同一主键数据的并发更新造成行锁,该情况可以通过对数据做key by解决,key by必须根据数据库主键做,保证同一个主键数据会在同一个并发中,破坏掉并发更新的条件,无法造成死锁。Flink SQL做key by需要Flink本身支持,对于 DLI / MRS 均能实现,如MRS flink通过增加参数“key-by-before-sink=true”可实现key by。具体怎么使用以实现方为准,对于无法使用的建议使用API方式入库。 分布式死锁:该场景通常为列存表的并发更新造成分布式死锁,暂无法解决,建议使用行存或者hstore。
  • Flink SQL配置参数 Flink SQL中设置的PRIMARY KEY将自动映射到dws-client中的uniqueKeys。参数跟随client版本发布,参数功能与client一致,以下参数说明表示为最新参数。 表1 数据库配置 参数 说明 默认值 connector flink框架区分connector参数,固定为dws。 - url 数据库连接地址。 - username 配置连接用户。 - password 配置密码。 - tableName 对应dws表。 - 表2 连接配置 参数 说明 默认值 connectionSize 初始dws-client时的并发数量。 1 connectionMaxUseTimeSeconds 连接创建多少秒后强制释放(单位:秒)。 3600(一小时) connectionMaxIdleMs 连接最大空闲时间,超过后将释放(单位:毫秒)。 60000(一分钟) 表3 写入参数 参数 说明 默认值 conflictStrategy 有主键表数据写入时主键冲突策略: ignore:保持原数据,忽略更新数据。 update:用新数据中非主键列更新原数据中对应列。 replace:用新数据替换原数据。 说明: update和replace在全字段upsert时等效,在部分字段upsert时,replace相当于将数据中不包含的列设置为null。 update writeMode 入库方式: auto:系统自动选择。 copy_merge:当存在主键时使用copy方式入临时表,从临时表merge至目标表;无主键时直接copy至目标表。 copy_upsert:当存在主键时使用copy方式入临时表,从临时表upsert至目标表;无主键时直接copy至目标表。 upsert: 有主键用upsert sql入库;无主键用insert into入库。 UPDATE:使用update where语法更新数据,若原表无主键可选择指定uniqueKeys,指定字段不要求必须时唯一索引,但非唯一索引可能会影响性能。 COPY_UPDATE:数据先通过copy方式入库到临时表,通过临时表加速使用update from where方式更新目标数据。 UPDATE_AUTO:批量小于copyWriteBatchSize使用UPDATE,否则使用COPY_UPDATE。 auto maxFlushRetryTimes 在入库时最大尝试次数,次数内执行成功则不抛出异常,每次重试间隔为 1秒 * 次数。 3 autoFlushBatchSize 自动刷库的批大小(攒批大小)。 5000 autoFlushMaxInterval 自动刷库的最大间隔时间(攒批时长)。 5s copyWriteBatchSize 在“writeMode == auto”下,使用copy的批大小。 5000 ignoreDelete 忽略flink任务中的delete。 false (1.0.10前默认true) ignoreNullWhenUpdate 是否忽略flink中字段值为null的更新,只有在“conflictStrategy == update”时有效。 false metadataCacheSeconds 系统中对元数据的最大缓存时间,例如表定义信息(单位秒)。 180 copyMode copy入库格式: CS V:将数据拼接为CSV格式入库,该方式稳定,但性能略低。 DELIMITER:用分隔符将数据拼接,然后入库,该方式需要数据中不包含分隔符。 CSV createTempTableMode 创建临时表方式: AS LIKE AS numberAsEpochMsForDatetime 如果数据库为时间类型数据源为数字类型,是否将数据当成时间戳转换为对应时间类型。 false stringToDatetimeFormat 如果数据库为时间类型数据源为字符串类型,按该格式转换为时间类型,该参数配置即开启。 null sink.parallelism flink系统参数用于设置sink并发数量。 跟随上游算子 printDataPk 是否在connector接收到数据时打印数据主键,用于排查问题。 false ignoreUpdateBefore 忽略flink任务中的update_before,在大表局部更新时该参数一定打开,否则有update时会导致数据的其它列被设置为null,因为会先删除再写入数据。 true
  • 示例 该示例是从kafka数据源中读取数据,写入DWS结果表中,并指定攒批时间不超过10秒,每批数据最大30000条,其具体步骤如下: 在GaussDB(DWS)数据库中创建表public.dws_order: 1 2 3 4 5 6 7 8 9 10 11 create table public.dws_order( order_id VARCHAR, order_channel VARCHAR, order_time VARCHAR, pay_amount FLOAT8, real_pay FLOAT8, pay_time VARCHAR, user_id VARCHAR, user_name VARCHAR, area_id VARCHAR ); 消费Kafka中order_test topic中的数据作为数据源,public.dws_order作为结果表,Kafka数据为JSON格式,并且字段名称和数据库字段名称一一对应: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'order_test', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE dwsSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://DWSAddress:DWSPort/DWSdbName', 'tableName' = 'dws_order', 'username' = 'DWSUserName', 'password' = 'DWSPassword', 'autoFlushMaxInterval' = '10s', 'autoFlushBatchSize' = '30000' ); insert into dwsSink select * from kafkaSource; 给Kafka写入测试数据: 1 {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} 等10秒后在GaussDB(DWS)表中查询结果: 1 select * from dws_order 结果如下:
  • 格式语法 SQL语法格式可能在不同Flink环境下有细微差异,具体以事件环境格式为准,with后面的参数名称及参数值以此文档为准。 1 2 3 4 5 6 7 8 9 10 11 12 create table dwsSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'dws', 'url' = '', 'tableName' = '', 'username' = '', 'password' = '' );
  • Flink关键特性 流式处理 高吞吐、高性能、低时延的实时流处理引擎,能够提供毫秒级时延处理能力。 丰富的状态管理 流处理应用需要在一定时间内存储所接收到的事件或中间结果,以供后续某个时间点访问并进行后续处理。Flink提供了丰富的状态管理相关的特性,包括: 多种基础状态类型:Flink提供了多种不同数据结构的状态支持,如ValueState、ListState、MapState等。用户可以基于业务模型选择最高效、合适状态类型。 丰富的State Backend:State Backend负责管理应用程序的状态,并根据需要进行Checkpoint。Flink提供了不同State Backend,State可以存储在内存上或RocksDB等上,并支持异步以及增量的Checkpoint机制。 精确一次语义:Flink的Checkpoint和故障恢复能力保证了任务在故障发生前后的应用状态一致性,为某些特定的存储支持了事务型输出的功能,即使在发生故障的情况下,也能够保证精确一次的输出。 丰富的时间语义 时间是流处理应用的重要组成部分,对于实时流处理应用来说,基于时间语义的窗口聚合、检测、匹配等运算是很常见的。Flink提供了丰富的时间语义。 Event-time:使用事件本身自带的时间戳进行计算,使乱序到达或延迟到达的事件处理变得更加简单。 Watermark:Flink引入Watermark概念,用以衡量事件时间的发展。Watermark也为平衡处理时延和数据完整性提供了灵活的保障。当处理带有Watermark的事件流时,在计算完成之后仍然有相关数据到达时,Flink提供了多种处理选项,如将数据重定向(side output)或更新之前完成的计算结果。 Processing-time和Ingestion-time。 高度灵活的流式窗口:Flink能够支持时间窗口、计数窗口、会话窗口,以及数据驱动的自定义窗口,可以通过灵活的触发条件定制,实现复杂的流式计算模式。 容错机制 分布式系统,单个Task或节点的崩溃或故障,往往会导致整个任务的失败。Flink提供了任务级别的容错机制,保证任务在异常发生时不会丢失用户数据,并且能够自动恢复。 Checkpoint:Flink基于Checkpoint实现容错,用户可以自定义对整个任务的Checkpoint策略,当任务出现失败时,可以将任务恢复到最近一次Checkpoint的状态,从数据源重发快照之后的数据。 Savepoint:一个Savepoint就是应用状态的一致性快照,Savepoint与Checkpoint机制相似,但Savepoint需要手动触发,Savepoint保证了任务在升级或迁移时,不丢失当前流应用的状态信息,便于任何时间点的任务暂停和恢复。 Flink SQL Table API和SQL借助了Apache Calcite来进行查询的解析,校验以及优化,可以与DataStream和DataSet API无缝集成,并支持用户自定义的标量函数,聚合函数以及表值函数。简化数据分析、ETL等应用的定义。下面代码示例展示了如何使用Flink SQL语句定义一个会话点击量的计数应用。 SELECT userId, COUNT(*) FROM clicks GROUP BY SESSION(clicktime, INTERVAL '30' MINUTE), userId 有关Flink SQL的更多信息,请参见:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html。 CEP in SQL Flink允许用户在SQL中表示CEP(Complex Event Processing)查询结果以用于模式匹配,并在Flink上对事件流进行评估。 CEP SQL通过MATCH_RECOGNIZE的SQL语法实现。MATCH_RECOGNIZE子句自Oracle Database 12c起由Oracle SQL支持,用于在SQL中表示事件模式匹配。CEP SQL使用举例如下: SELECT T.aid, T.bid, T.cid FROM MyTable MATCH_RECOGNIZE ( PARTITION BY userid ORDER BY proctime MEASURES A.id AS aid, B.id AS bid, C.id AS cid PATTERN (A B C) DEFINE A AS name = 'a', B AS name = 'b', C AS name = 'c' ) AS T
  • Flink结构 Flink服务包含了两个重要的角色:FlinkResource和FlinkServer。 FlinkResource:提供客户端配置管理,是必须安装的角色。 FlinkServer:基于Web的作业管理二次开发平台,可直接在界面开发与管理FlinkSQL作业。具有运维管理界面化、作业开发SQL标准化等特点。 Flink结构如图2所示。 图2 Flink结构 Flink整个系统包含三个部分: Client Flink Client主要给用户提供向Flink系统提交用户任务(流式作业)的能力。 TaskManager Flink系统的业务执行节点,执行具体的用户任务。TaskManager可以有多个,各个TaskManager都平等。 JobManager Flink系统的管理节点,管理所有的TaskManager,并决策用户任务在哪些TaskManager执行。JobManager在HA模式下可以有多个,但只有一个主JobManager。 如果您想了解更多关于Flink架构的信息,请参考链接:https://ci.apache.org/projects/flink/flink-docs-master/docs/concepts/flink-architecture/。
  • Flink简介 Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。 Flink最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。 Flink技术栈如图1所示。 图1 Flink技术栈 Flink在当前版本中重点构建如下特性: DataStream Checkpoint 窗口 Job Pipeline 配置表
  • Flink原理 Stream & Transformation & Operator 用户实现的Flink程序是由Stream和Transformation这两个基本构建块组成。 Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。 当一个Flink程序被执行的时候,它会被映射为Streaming Dataflow。一个Streaming Dataflow是由一组Stream和Transformation Operator组成,它类似于一个DAG图,在启动的时候从一个或多个Source Operator开始,结束于一个或多个Sink Operator。 图3为一个由Flink程序映射为Streaming Dataflow的示意图。 图3 Flink DataStream示例 图3中“FlinkKafkaConsumer”是一个Source Operator,Map、KeyBy、TimeWindow、Apply是Transformation Operator,RollingSink是一个Sink Operator。 Pipeline Dataflow 在Flink中,程序是并行和分布式的方式运行。一个Stream可以被分成多个Stream分区(Stream Partitions),一个Operator可以被分成多个Operator Subtask。 Flink内部有一个优化的功能,根据上下游算子的紧密程度来进行优化。 紧密度低的算子则不能进行优化,而是将每一个Operator Subtask放在不同的线程中独立执行。一个Operator的并行度,等于Operator Subtask的个数,一个Stream的并行度(分区总数)等于生成它的Operator的并行度,如图4所示。 图4 Operator 紧密度高的算子可以进行优化,优化后可以将多个Operator Subtask串起来组成一个Operator Chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行,如图5所示。 图5 Operator chain 图5中上半部分表示的是将Source和Map两个紧密度高的算子优化后串成一个Operator Chain,实际上一个Operator Chain就是一个大的Operator的概念。图中的Operator Chain表示一个Operator,KeyBy表示一个Operator,Sink表示一个Operator,它们通过Stream连接,而每个Operator在运行时对应一个Task,也就是说图中的上半部分有3个Operator对应的是3个Task。 图5中下半部分是上半部分的一个并行版本,对每一个Task都并行化为多个Subtask,这里只是演示了2个并行度,Sink算子是1个并行度。
  • 对有更新操作的数据流进行聚合计算时要注意数据准确性问题 在针对更新数据进行聚合需要选择合适的解决方案,否则聚合结果会是错误的。 例如: Create table t1( id int, partid int, value int ); select partid,sum(value) from t1 group by partid; 第一批数据:[1,1,10],[2,1,11],[3,2,8] 聚合结果:[1,21],[2,8] 第二批数据:[2,1,12] //对ID=2的记录进行更新。 错误结果:[1,33],[2,8] //若是无法识别是对ID=2的数据进行了更新。 聚合结果:[1,22],[2,8] //识别为更新操作可以得到正确结果。 对于如何识别是更新数据有三种方式: 通过状态后端解决 通过状态后端存储所有原始数据,新来的数据根据状态来判断是否是更新操作,进而通过Flink聚合回撤机制实现聚合结果数据的更新。 优点:可以解决聚合准确性问题,而且对用户友好,对数据没有要求。 缺点:大数据量情况下状态后端存储的数据比较多。 通过CDC格式数据解决 CDC格式数据是指更新操作记录中会同时包含更新前数据和更新后数据。通过更新前的内容来回撤掉之前的聚合结果,通过更新后的数据更新最新的计算结果。 优点:不需要有大的状态后端存储,整体计算资源压力要小于基于状态后端的方案。 缺点:需要依赖于数据格式,常见的方式通过CDC采集工具,将数据采集到Kafka,然后Flink读Kafka数据进行计算。 通过changelog数据解决 changelog与CDC格式的数据类似,只不过存储的方式不同,CDC格式数据会将更新前和更新后的数据在一行记录,而changelog数据会将更新数据拆分成两行,一行是对更新前数据的删除操作,一行是更新后的数据插入操作记录。Flink在计算的时候会将基于更新数据的聚合结果删除,在将基于更新后数据的计算结果插入。changelog可以基于Hudi表实现,基于CDC格式的数据可以转为changelog数据存储到Hudi的MOR表的log文件中,也可以基于状态后端生成Hudi的changelog数据。 优点:可以基于湖存储实现更新数据聚合一致性保证。 缺点: Hudi的MOR表中仅在log文件中存在changelog数据,如果Flink作业计算延迟导致上游数据积压,而Hudi又清理了log文件,就会导致changelog丢失。针对这种情况需要保留版本数多一点,且给Flink作业合理的资源配置避免数据积压周期超过了清理周期。 基于状态后端生成changelog也是依赖于状态后端的,状态后端通常是会配置TTL时间的,不会永久保留。这种场景下更新操作是任意更新,没有一定时间周期限制。例如更新近一个月的数据,TTL设置大于一个月即可;若更新全部数据,就需要设置TTL为永久,不适用于大表。 目前changelog的MOR表,仅支持Flink引擎进行compaction处理,不支持Spark引擎。
  • 多流Join场景事实流表个数不超过三个 当Join表过多时,状态后端压力太大会导致端到端时延增加。 【示例】实时Join维表数3个: CREATE TABLE table1(id int, param1 string) with(...); CREATE TABLE table2(id int, param2 string) with(...); CREATE TABLE table3(id int, param3 string) with(...); CREATE TABLE orders ( order_id STRING, price DECIMAL(32,2), currency STRING, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time ) WITH (/* ... */); select o.*, t1.param1, t2.param2, t3.param3 from orders AS o JOIN table1 AS t1 ON o.order_id = t1.id JOIN table2 AS t2 ON o.order_id = t2.id JOIN table3 AS t3 ON o.order_id = t3.id;
  • 关联嵌套层级不超过三层 嵌套层级越多,回撤流的的数据量越大。 【示例】关联嵌套3层: SELECT * FROM table1 WHERE column1 IN ( SELECT column1 FROM table2 WHERE column2 IN ( SELECT column2 FROM table3 WHERE column3 = 'value' ) )
  • 维表lookup join场景维度表个数不超过五个 Hudi维度表都在TM heap中,当维表过多时heap中保存的维表数据过多,TM会不断GC,导致作业性能下降。 【示例】lookup join维表数5个: CREATE TABLE table1(id int, param1 string) with(...); CREATE TABLE table2(id int, param2 string) with(...); CREATE TABLE table3(id int, param3 string) with(...); CREATE TABLE table4(id int, param4 string) with(...); CREATE TABLE table5(id int, param5 string) with(...); CREATE TABLE orders ( order_id STRING, price DECIMAL(32,2), currency STRING, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time ) WITH (/* ... */); select o.*, t1.param1, t2.param2, t3.param3, t4.param4, t5.param5 from orders AS o JOIN table1 FOR SYSTEM_TIME AS OF o.proc_time AS t1 ON o.order_id = t1.id JOIN table2 FOR SYSTEM_TIME AS OF o.proc_time AS t2 ON o.order_id = t2.id JOIN table3 FOR SYSTEM_TIME AS OF o.proc_time AS t3 ON o.order_id = t3.id JOIN table4 FOR SYSTEM_TIME AS OF o.proc_time AS t4 ON o.order_id = t4.id JOIN table5 FOR SYSTEM_TIME AS OF o.proc_time AS t5 ON o.order_id = t5.id;
  • 场景说明 在安全集群环境下,各个组件之间的相互通信不能够简单的互通,而需要在通信之前进行相互认证,以确保通信的安全性。用户在提交Flink应用程序时,需要与Yarn、HDFS等之间进行通信。那么提交Flink的应用程序中需要设置安全认证,确保Flink程序能够正常运行。 当前Flink系统支持认证和加密传输,要使用认证和加密传输,用户需要安装Flink客户端并配置安全认证,本章节以“/opt/hadoopclient”为客户端安装目录为例,介绍安装客户端及配置安全认证。客户端安装目录请根据实际修改。