检测到您已登录华为云国际站账号,为了您更好的体验,建议您访问国际站服务网站 https://www.huaweicloud.com/intl/zh-cn
不再显示此消息
设置偏移量提交时间间隔 手动提交 在有些场景可能对消费偏移量有更精确的管理,以保证消息不被重复消费以及消息不被丢失。假设对拉取到的消息需要进行写入数据库处理,或者用于其他网络访问请求等等复杂的业务处理,在这种场景下,所有的业务处理完成后才认为消息被成功消费,此时必须手动控制偏移量的提交。使用手动提交相关参数设置如下:
rRequest(); descriptor.setTransferTaskName(taskName); // 配置DLI相关信息:数据库和内表名称。可通过数据湖探索(简称DLI)控制台创建和查询,DLI表需为内表 descriptor.setDliDatabaseName("dis_dli");
数据下载的消费模式 同Kafka类似,当前dis kafka adapter支持三种消费模式。 assign模式 由用户手动指定consumer实例消费哪些具体分区,此时不会拥有group management机制,也就是当group内消费者数量变化或者通道扩缩容的时候不会有重新分配分区的行为发生。代码样例如下所示:
数据下载的消费模式 同Kafka类似,当前dis kafka adapter支持三种消费模式。 assign模式 由用户手动指定consumer实例消费哪些具体分区,此时不会拥有group management机制,也就是当group内消费者数量变化或者通道扩缩容的时候不会有重新分配分区的行为发生。代码样例如下所示:
缺省值:LATEST 枚举值: LATEST TRIM_HORIZON dli_database_name 是 String 存储该通道数据的DLI数据库名称。 dli_table_name 是 String 存储该通道数据的DLI表名称。 说明: 仅支持数据位置为DLI的表,且用户需具有该表的插入权限。
单击“购买接入通道”,进入“购买接入通道”页面。 “高级配置”页签,选择“现在配置”,展开标签页。 输入新添加标签的键和值。 系统支持添加多个标签,最多可添加10个标签,并取各个标签的交集,对目标通道进行搜索。 图1 添加标签 您也可对现有通道增加标签,详见管理标签。 搜索目标通道 在现有通道列表页,按标签键或标签值搜索目标通道。
通道创建的时间,13位时间戳。 last_modified_time Long 通道最近一次修改的时间,13位时间戳。 status String 通道的当前状态。 CREATING:创建中 RUNNING:运行中 TERMINATING:删除中 TERMINATED:已删除 枚举值: CREATING
metadata="" #用户消费程序端的元数据信息,元数据信息的最大长度为1000个字符 partitionId可通过查询通道详情获取,需要先传入当前设置的通道名称。 配置好以上参数,执行commitCheckpoint_sample.py文件调用commitCheckpoint_test方法,响应201表示成功。
符,需要先通过创建App接口创建。 checkpoint_type 是 String Checkpoint类型。 LAST_READ:在数据库中只记录序列号。 枚举值: LAST_READ stream_name 是 String 已创建的通道名称。 partition_id 是
该Checkpoint关联App名称。 最小长度:1 最大长度:50 checkpoint_type 是 String Checkpoint类型。 LAST_READ:在数据库中只记录序列号。 枚举值: LAST_READ partition_id 否 String 该Checkpoint所属的通道分区标识符。可定义为如下两种样式:-
该Checkpoint关联App名称。 checkpoint_type 是 String Checkpoint类型。 LAST_READ:在数据库中只记录序列号。 枚举值: LAST_READ 请求参数 表3 请求Header参数 参数 是否必选 参数类型 描述 X-Auth-Token
同步提交当前消费的offset void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) 接口 支持 同步提交指定的offset void commitAsync() 接口 支持 异步提交当前消费的offset
同步提交当前消费的offset void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) 接口 支持 同步提交指定的offset void commitAsync() 接口 支持 异步提交当前消费的offset
ADVANCED:高级通道,表示5MB带宽。 枚举值: COMMON ADVANCED data_type 否 String 源数据类型。 BLOB:存储在数据库管理系统中的一组二进制数据。 JSON:一种开放的文件格式,以易读的文字为基础,用来传输由属性值或者序列性的值组成的数据对象。 CSV:纯
将获取DIS Agent包中获取的“dis-agent-X.X.X.zip”压缩包保存到本地。 解压“dis-agent-X.X.X.zip”压缩包至当前目录。 父主题: 使用Agent上传数据
为保证访问密钥的安全,访问密钥仅在初次生成时自动下载,后续不可再次通过管理控制台界面获取。请在生成后妥善保管。 检查项目ID 项目ID表示租户的资源,账号ID对应当前账号,IAM用户ID对应当前用户。用户可在对应页面下查看不同Region对应的项目ID、账号ID和用户ID。 注册并登录管理控制台。 在用户名的下拉列表中单击“我的凭证”。
为保证访问密钥的安全,访问密钥仅在初次生成时自动下载,后续不可再次通过管理控制台界面获取。请在生成后妥善保管。 检查项目ID 项目ID表示租户的资源,账号ID对应当前账号,IAM用户ID对应当前用户。用户可在对应页面下查看不同Region对应的项目ID、账号ID和用户ID。 注册并登录管理控制台。 在用户名的下拉列表中单击“我的凭证”。
取值范围:24~72。 单位:小时。 空表示使用缺省值。 缺省值:24 data_type 否 String 源数据类型。 BLOB:存储在数据库管理系统中的一组二进制数据。 JSON:一种开放的文件格式,以易读的文字为基础,用来传输由属性值或者序列性的值组成的数据对象。 CSV:纯
partitionId="shardId-0000000000" #分区的唯一标识符 partitionId可通过查询通道详情获取,需要先传入当前设置的通道名称。 配置好以上参数,执行getCheckpoint_sample.py文件调用getCheckpoint_test方法,响应结果如下:
取值范围: 1h 3h 12h 可自定义查看监控信息的时间段。 单击“自定义”页签后的,分别设置开始时间和结束时间。 其中,结束时间不能晚于当前的系统时间。 开始时间与结束时间的差值不超过72h。 分区监控 分区编号 流分区编号,默认从0开始。取值方式:从下拉框选择。 该分区的总输入/输出流量(KB/秒)