云服务器内容精选

  • 概述 dws-connector-flink是在dws-client的基础上对接Flink的一个工具,工具为对dws-client的包装,整体入库能力跟dws-client一致。dws-connector-flink为 GaussDB (DWS)团队自研工具,后续将根据GaussDB(DWS)数据库持续优化。 dws-flink-connector的DWS-Connector只支持单并发查询存量数据,暂不支持并行读取。
  • DWS-Connector版本说明 表1 版本变更记录 版本 变更描述 备注 1.0 初始化版本。 dws-connector-flink仅仅发布Scala2.11 Flink 1.12版本 1.0.2 dwsclient优化异常重试逻辑,由所有异常重试修改为只重试:连接异常、数据库只读、超时、连接数过多、加锁异常五类异常。 dws-connector-flink支持版本: Scala2.11: Flink 1.12、1.13 Scala2.12:Flink 1.12、1.13、1.15 1.0.3 已知问题修复与性能优化。 支持update写入场景。 支持使用唯一索引。 由于支持update模式,为避免原dwsClient中upsert接口歧义,故后续统一使用write接口承载写入操作,两接口能力一致推荐使用write。 - 1.0.4 增加SQL执行超时时间,避免长时间阻塞。 - 1.0.5 修复无主键表相同数据写入丢失问题。 - 1.0.6 优化攒缓存逻辑代码性能,提升在CPU不足场景的吞吐。 增加临时表复用,避免在copy merge/upsert场景频繁创建临时表。 copy增加 CS V格式,避免在数据复杂时受特殊字符干扰无法正常入库。 - 1.0.7 数据库重启中写入数据失败后支持重试。 增加as方式创建临时表,解决在有主键表无法使用copy merge/upsert的问题。 数据库字段适配为默认大小写不敏感。 Flink SQL增加打印主键参数,用于分析数据缺失时排查问题。 - 1.0.8 修复Flink SQL主键配置必须大小写和数据库一致问题。 增加设置sink并发的参数。 - 1.0.9 优化时间类型入库。 - 1.0.10 修复client并发执行delete和insert,导致有概率先执行insert后执行delete,当同主键数据在同一批缓存中先删除后写入会导致丢数据问题。 修复Kafka写dws链路中偶现写DWS异常后丢数据问题。 新增connector参数ignoreUpdateBefore,部分主要参数兼容flink-connector-jdbc。 - 父主题: DWS-Connector
  • 简介 dws-connector-flink是在dws-client的基础上对接flink的一个工具,工具为对dwsClient的包装,整体入库能力跟dwsClient一致。目前内部只实现了DynamicTableSourceFactory、DynamicTableSinkFactory两个接口,并未实现CatalogFactory,所以不支持使用Catalog的场景。 dws-flink-connector的dws connector只支持单并发查询存量数据,暂不支持并行读取。
  • Flink SQL配置参数 Flink SQL中设置的PRIMARY KEY将自动映射到dws-client中的uniqueKeys。参数跟随client版本发布,参数功能与client一致,以下参数说明表示为最新参数。 表1 数据库配置 参数 说明 默认值 connector flink框架区分connector参数,固定为dws。 - url 数据库连接地址。 - username 配置连接用户。 - password 配置密码。 - tableName 对应dws表。 - 表2 连接配置 参数 说明 默认值 connectionSize 初始dwsClient时的并发数量。 1 connectionMaxUseTimeSeconds 连接创建多少秒后强制释放(单位秒)。 3600(一小时) connectionMaxIdleMs 连接最大空闲时间,超过后将释放,(单位毫秒)。 60000 (一分钟) 表3 写入参数 参数 说明 默认值 conflictStrategy 有主键表数据写入时主键冲突策略: ignore:保持原数据,忽略更新数据。 update:用新数据中非主键列更新原数据中对应列。 replace:用新数据替换原数据。 说明: update和replace在全字段upsert时等效,在部分字段upsert时,replace相当于将数据中不包含的列设置为nul。 update writeMode 入库方式: auto:系统自动选择。 copy_merge:当存在主键时使用copy方式入临时表,从临时表merge至目标表;无主键时直接copy至目标表。 copy_upsert:当存在主键时使用copy方式入临时表,从临时表upsert至目标表;无主键时直接copy至目标表。 upsert: 有主键用upsert sql入库;无主键用insert into 入库。 UPDATE:使用update where 语法更新数据,若原表无主键可选择指定uniqueKeys,指定字段不要求必须时唯一索引,但非唯一索引可能会影响性能。 COPY_UPDATE:数据先通过copy方式入库到临时表,通过临时表加速使用update from where方式更新目标数据。 UPDATE_AUTO:批量小于copyWriteBatchSize使用UPDATE,否则使用COPY_UPDATE。 auto maxFlushRetryTimes 在入库时最大尝试次数,次数内执行成功则不抛出异常,每次重试间隔为 1秒 * 次数。 3 autoFlushBatchSize 自动刷库的批大小(攒批大小)。 5000 autoFlushMaxInterval 自动刷库的最大间隔时间(攒批时长)。 5s copyWriteBatchSize 在writeMode == auto下,使用copy的批大小。 5000 ignoreDelete 忽略flink任务中的delete。 false (1.0.10前默认true) ignoreNullWhenUpdate 是否忽略flink中字段值为null的更新, 只有在conflictStrategy == update时有效。 false metadataCacheSeconds 系统中对元数据的最大缓存时间,例如表定义信息(单位秒)。 180 copyMode copy入库格式: CSV:将数据拼接为CSV格式入库,该方式稳定,但性能略低。 DELIMITER:用分隔符将数据拼接,然后入库,该方式需要数据中不包含分隔符。 CSV createTempTableMode 创建临时表方式: AS、LIKE AS numberAsEpochMsForDatetime 如果数据库为时间类型数据源为数字类型是否将数据当成时间戳转换为对应时间类型。 false stringToDatetimeFormat 如果数据库为时间类型数据源为字符串类型,按该格式转换为时间类型,该参数配置即开启。 null sink.parallelism flink系统参数用于设置sink并发数量。 跟随上游算子 printDataPk 是否在connector接收到数据时打印数据主键,用于排查问题。 false ignoreUpdateBefore 忽略flink任务中的update_before,在大表局部更新时该参数一定打开,否则有update时会导致数据的其它列被设置为null,因为会先删除再写入数据。 true 表4 查询参数 参数 是否必填 说明 默认值 fetchSize 否 jdbc statement中fetchSize参数,用于控制查询数据库返回条数。 1000