云服务器内容精选

  • 常见问题 Q:writeMode参数设置什么值比较合适? A:根据业务场景分update(只更新存在的数据)和upsert(对于同一主键数据如果存在就更新,不存在就新增一条数据)两个类型,推荐直接使用auto方式即可,该方式下会根据数据量的大小自动选择,如果数据量较大会增大攒批参数autoFlushBatchSize,即可提升入库性能。 Q:autoFlushBatchSize和autoFlushMaxInterval怎么设置比较合适? A:autoFlushBatchSize参数用于设置最大攒批条数,autoFlushMaxInterval参数用于设置最大攒批间隔,两个参数分别从时间和空间维度管控攒批。 通过autoFlushMaxInterval可保证数据量较小时的时效性,如对时效性无强制要求通常不建议设置的太小,建议不低于3s走默认值即可。 通过autoFlushBatchSize可控制一批数据的最大条数,一般来说攒批量越大,对于整体入库性能会更好,对性能来说通常该参数的设置推荐越大越好,参数的设置根据业务数据的大小以及flink运行内存来设置,保证不内存溢出。 对于大多业务来说无需设置autoFlushMaxInterval,将autoFlushBatchSize设置为50000即可。 Q: 遇到数据库死锁了怎么办? A:通常出现死锁大致分为行锁死锁和分布式死锁。 行锁:该场景通常为同一主键数据的并发更新造成行锁,该情况可以通过对数据做key by解决,key by必须根据数据库主键做,保证同一个主键数据会在同一个并发中,破坏掉并发更新的条件,无法造成死锁。Flink SQL做key by需要Flink本身支持,对于 DLI / MRS 均能实现,如MRS flink通过增加参数“key-by-before-sink=true”可实现key by。具体怎么使用以实现方为准,对于无法使用的建议使用API方式入库。 分布式死锁:该场景通常为列存表的并发更新造成分布式死锁,暂无法解决,建议使用行存或者hstore。
  • UDF函数DnHashFunction参数说明 参数格式 dn_hash('dws表名',sink并行度,最大并行度,dws作为分布列的数据在源数据的字段名称{1,}) 参数说明 使用时上游并行度必须不多于sink并行度,DnHashFunction同样是通过进程内获取sink 算子初始化的dws client实例获取到的表元数据,如果当前进程无sin算子就会导致无法获取client实例。 使用后会增加一个hash算子,如果链路有多个算子处理业务,当执行hash算子后不可以再有改变数据分区的算子,否则数据会被再次分区就不能到达指定sink算子。 最大并行度默认flink自动调整的,算法中需要使用,因此自动调整的无法使用,必须通过参数设置固定并把设置额值作为UDF的参数,可以通过参数pipeline.max-parallelism设置或者jar方式通过API设置: StreamExecutionEnvironment evn = StreamExecutionEnvironment.getExecutionEnvironment();evn.setParallelism(1);evn.setMaxParallelism(1024); 如果分布列包含多个字段,分布列的字段顺序需要保持和DWS一致,分布列支持的字段类型和dws client一致参考参数WRITE_PARTITION_POLICY,使用功能同样需要额外配置,不可自行使用。
  • 使用flink SQL直连DN入库 该能力依赖flink sql DISTRIBUTEBY能力,mrs有提供此能力,具体请参见Flink SQL语法增强。 connector提供udf函数可根据分布列值计算出下游并并发结合flink sql DISTRIBUTEBY能力实现将数据按DN分区能力,示例: 需要在SQL中引入UDF。 CREATE temporary FUNCTION dn_hash AS 'com.huaweicloud.dws.connectors.flink.partition.DnHashFunction'; 正常写Source SQL。 CREATE TABLE users( id BIGINT, name STRING, age INT, text STRING, created_at TIMESTAMP(3), updated_at TIMESTAMP(3)) WITH ( 'connector' = 'datagen', 'fields.id.kind' = 'sequence', 'fields.id.start' = '1', 'fields.id.end' = '1000', 'fields.name.length' = '10', 'fields.age.min' = '18', 'fields.age.max' = '60', 'fields.text.length' = '5' ) Sink表定义SQL中需要新增一个字段并且要求int类型值用于接收UDF计算的结果,示例中叫dn_hash。 create table dws_users( dn_hash int, id BIGINT, name STRING, age INT, text STRING, created_at TIMESTAMP(3), updated_at TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'dws', 'url' = '%s', 'tableName' = 'test.users', 'username' = '%s', 'autoFlushBatchSize' = '50000', 'password' = '%s' ) Insert into sql使用 udf获取数据下游算子信息,同时使用DISTRIBUTEBY对返回结果做数据分区,数据就会按照udf返回信息到下游指定并行度。 insert into dws_users select /*+ DISTRIBUTEBY('dn_hash') */ dn_hash('test.users',10,1024, id) as dn_hash, * from users
  • 示例 该示例是从kafka数据源中读取数据,写入DWS结果表中,并指定攒批时间不超过10秒,每批数据最大30000条,其具体步骤如下: 在 GaussDB (DWS)数据库中创建表public.dws_order: 1 2 3 4 5 6 7 8 91011 create table public.dws_order( order_id VARCHAR, order_channel VARCHAR, order_time VARCHAR, pay_amount FLOAT8, real_pay FLOAT8, pay_time VARCHAR, user_id VARCHAR, user_name VARCHAR, area_id VARCHAR ); 消费Kafka中order_test topic中的数据作为数据源,public.dws_order作为结果表,Kafka数据为JSON格式,并且字段名称和数据库字段名称一一对应: 1 2 3 4 5 6 7 8 910111213141516171819202122232425262728293031323334353637383940 CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string) WITH ( 'connector' = 'kafka', 'topic' = 'order_test', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json');CREATE TABLE dwsSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://DWSAddress:DWSPort/DWSdbName', 'tableName' = 'dws_order', 'username' = 'DWSUserName', 'password' = 'DWSPassword', 'autoFlushMaxInterval' = '10s', 'autoFlushBatchSize' = '30000');insert into dwsSink select * from kafkaSource; 给Kafka写入测试数据: 1 {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} 等10秒后在GaussDB(DWS)表中查询结果: 1 select * from dws_order 结果如下:
  • 格式语法 SQL语法格式可能在不同Flink环境下有细微差异,具体以事件环境格式为准,with后面的参数名称及参数值以此文档为准。 1 2 3 4 5 6 7 8 9101112 create table dwsSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED))with ( 'connector' = 'dws', 'url' = '', 'tableName' = '', 'username' = '', 'password' = '');
  • Flink SQL配置参数 Flink SQL中设置的PRIMARY KEY将自动映射到dws-client中的uniqueKeys。参数跟随client版本发布,参数功能与client一致,以下参数说明表示为最新参数。 表1 数据库配置 参数 说明 默认值 connector flink框架区分connector参数,固定为dws。 - url 数据库连接地址。 - username 配置连接用户。 - password 配置密码。 - tableName 对应dws表。 - 表2 连接配置 参数 说明 默认值 connectionSize 初始dws-client时的并发数量。 1 connectionMaxUseTimeSeconds 连接创建多少秒后强制释放(单位:秒)。 3600(一小时) connectionMaxIdleMs 连接最大空闲时间,超过后将释放(单位:毫秒)。 60000(一分钟) dws client参数全量支持在flink sql通过key方式配置,下表参数为兼容1.x版本参数,当同时配置2.x和1.x参数时生效2.x版本参数值: 表3 DWS client写入参数 参数 说明 默认值 conflictStrategy 有主键表数据写入时主键冲突策略: ignore:保持原数据,忽略更新数据。 update:用新数据中非主键列更新原数据中对应列。 replace:用新数据替换原数据。 说明: update和replace在全字段upsert时等效,在部分字段upsert时,replace相当于将数据中不包含的列设置为null。 update writeMode 入库方式: auto:系统自动选择。 copy_merge:当存在主键时使用copy方式入临时表,从临时表merge至目标表;无主键时直接copy至目标表。 copy_upsert:当存在主键时使用copy方式入临时表,从临时表upsert至目标表;无主键时直接copy至目标表。 upsert: 有主键用upsert sql入库;无主键用insert into入库。 UPDATE:使用update where语法更新数据,若原表无主键可选择指定uniqueKeys,指定字段不要求必须是唯一索引,但非唯一索引可能会影响性能。 COPY_UPDATE:数据先通过copy方式入库到临时表,通过临时表加速使用update from where方式更新目标数据。 UPDATE_AUTO:批量小于copyWriteBatchSize使用UPDATE,否则使用COPY_UPDATE。 auto maxFlushRetryTimes 在入库时最大尝试次数,次数内执行成功则不抛出异常,每次重试间隔为 1秒 * 次数。 3 autoFlushBatchSize 自动刷库的批大小(攒批大小)。 5000 autoFlushMaxInterval 自动刷库的最大间隔时间(攒批时长)。 5s copyWriteBatchSize 在“writeMode == auto”下,使用copy的批大小。 5000 metadataCacheSeconds 系统中对元数据的最大缓存时间,例如表定义信息(单位秒)。 180 copyMode copy入库格式: CS V:将数据拼接为CSV格式入库,该方式稳定,但性能略低。 DELIMITER:用分隔符将数据拼接,然后入库,该方式需要数据中不包含分隔符。 CSV createTempTableMode 创建临时表方式: AS LIKE AS numberAsEpochMsForDatetime 如果数据库为时间类型数据源为数字类型,是否将数据当成时间戳转换为对应时间类型。 false stringToDatetimeFormat 如果数据库为时间类型数据源为字符串类型,按该格式转换为时间类型,该参数配置即开启。 null 表4 connector参数 参数 说明 默认值 ignoreDelete 忽略flink任务中的delete。 false (1.0.10前默认true) ignoreNullWhenUpdate 是否忽略flink中字段值为null的更新,只有在“conflictStrategy == update”时有效。 false sink.parallelism flink系统参数用于设置sink并发数量。 跟随上游算子 printDataPk 是否在connector接收到数据时打印数据主键,用于排查问题。 false ignoreUpdateBefore 忽略flink任务中的update_before,在大表局部更新时该参数一定打开,否则有update时会导致数据的其它列被设置为null,因为会先删除再写入数据。 true