检测到您已登录华为云国际站账号,为了您更好的体验,建议您访问国际站服务网站 https://www.huaweicloud.com/intl/zh-cn
不再显示此消息
含义与kafka设置相同,但默认值为StringSerializer (kafka必须配置) linger.ms 参数 支持 含义与kafka设置相同,但默认值为50(kafka是0),目的是提高Rest接口的上传效率 batch.size 参数 支持 含义与kafka设置相同,但默认值为
含义与kafka设置相同,但默认值为StringSerializer (kafka必须配置) linger.ms 参数 支持 含义与kafka设置相同,但默认值为50(kafka是0),目的是提高Rest接口的上传效率 batch.size 参数 支持 含义与kafka设置相同,但默认值为
on_key。 sequence_number String 该条数据的序列号。 data String 下载的数据。 下载的数据为序列化之后的二进制数据(Base64编码后的字符串)。 比如下载数据接口返回的数据是“ZGF0YQ==”,“ZGF0YQ==”经过Base64解码之后是“data”。
sequence_number String 数据上传到的序列号。序列号是每个记录的唯一标识符。序列号由DIS在数据生产者调用PutRecords操作以添加数据到DIS数据通道时DIS服务自动分配的。同一分区键的序列号通常会随时间变化增加。PutRecords请求之间的时间段越长,序列号越大。
Json格式上传流式数据 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 1 streamname="dis-test1"| #已存在的通道名 putRecords_sample.py文件中的putRecords_test方法中的records为需要上传的数据内容,数据上传格式如下:
变更源数据类型 源数据Schema作为通道下特定转储任务进行数据转换的依据,如果没有正确配置将引起数据转换失败从而导致转储任务异常。您可以当前就为通道配置源数据Schema,也可后期创建转储任务时再配置。您还可以在通道详情页面对已配置的源数据Schema进行修改。 使用注册账户登录DIS控制台。
管理源数据Schema 源数据Schema,即用户的JSON或CSV数据样例,用于描述JSON或CSV数据格式。DIS可以根据此JSON或CSV数据样例生成Avro schema, 将通道内上传的JSON或CSV数据转换为Parquet或CarbonData格式。 创建源数据Schema有如下三个入口:
从分区最老的数据开始消费,即读取分区内所有有效数据。 例如分区数据有效范围为[100, 200], 则会从100开始消费。 适用于不知道消费位置,则直接消费分区内所有有效数据的场景。 无 LATEST 从分区最新的数据之后开始消费,即不读取分区内的已有数据,而是从下一条上传的数据开始。
// 配置上传的数据 String message = "hello world."; PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest
另外还可以精细的控制对具体分区具体offset数据的确认,确认的offset为已接受数据最大offset+1。例如消费一批数据,最后一条的offset为100,则此时需要commit 101,这样下次消费就会从101开始,不会重复。代码样例如下: ConsumerRecords<String, String>
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); if (!records.isEmpty())
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); if (!records.isEmpty())
另外还可以精细的控制对具体分区具体offset数据的确认,确认的offset为已接受数据最大offset+1。例如消费一批数据,最后一条的offset为100,则此时需要commit 101,这样下次消费就会从101开始,不会重复。代码样例如下: ConsumerRecords<String, String>
String 序列号。序列号是每个记录的唯一标识符。序列号由DIS在数据生产者调用PutRecords操作以添加数据到DIS数据通道时DIS服务自动分配的。同一分区键的序列号通常会随时间变化增加。PutRecords请求之间的时间段越长,序列号越大。序列号与游标类型AT_SEQUE
方法,下载数据getRecords_test采用test方法;test方法较test_0方法,增加参数bodySerializeType="protobuf"。 配置好以上参数,执行protobuf_getrecords_sample.py文件调用getRecords_test方法,响应结果如下。
什么是数据接入服务DIS 数据接入服务(Data Ingestion Service)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体
完成服务的开通、删除、配置操作,并将用户信息同步到数据面。 完成数据面资源的申请与自动部署。 服务数据面 接收用户发送数据的请求,对已鉴权的数据接收并存储。 接收用户获取数据的请求,在鉴权后输出对应的用户数据。 按时老化存储在系统中的用户数据。 根据用户配置,将用户数据存储到对象存储服务(Object
数据接入服务-成长地图 | 华为云 数据接入服务 数据接入服务(Data Ingestion Service,简称DIS)面向IoT、互联网等实时数据,提供高效采集、传输、分发能力,支持多种IoT协议,提供丰富的接口,帮助您快速构建实时数据应用。 产品介绍 图说DIS 图说ECS
件。 创建数据库 在Console页面上方菜单栏中单击“产品”,单击“大数据”分类中的“数据湖探索 DLI”。 创建demo数据库,在DLI控制台总览页面,选择“SQL作业”,单击“创建作业”,进入SQL作业编辑器。 在SQL作业编辑器左侧,选择“数据库”,单击创建数据库。 “d
DIS对于从数据生产者快速移出数据,然后进行持续处理非常有用。以下是使用DIS的典型场景: 加速日志和数据传送获取:您无需等待批量处理数据,而是让数据生产者在生成数据后立即输入DIS数据通道,防止因数据生产者出现故障导致的数据损失。例如,系统和应用程序日志可以持续添加到数据通道并可在数秒内进行处理。