检测到您已登录华为云国际站账号,为了您更好的体验,建议您访问国际站服务网站 https://www.huaweicloud.com/intl/zh-cn
不再显示此消息
自定义Flink Streaming作业 获取DIS Flink Connector Demo 这里获取“dis-flink-connector-X.X.X.zip”压缩包。解压“dis-flink-connector-X.X.X.zip”压缩包,解压之后获得以下目录: “hua
读取通道数据时,如何区分不同类型数据? 不同类型的消息使用不同的通道; 使用同一个通道的不同分区。上传消息时,不同类型的消息指定不同的partition_key,消费时根据partition_key来区分不同类型消息。 父主题: 转储相关问题
准备DIS Flink Connector的相关环境 准备DIS应用开发环境 参考步骤1:开通DIS通道准备相应DIS环境。 安装Maven并配置本地仓库地址。 安装scala-sdk。 配置DIS Flink Connector依赖 项目中可通过以下配置引入DIS Flink Connector依赖:
使用设备接入IoTDA写入数据至DIS 设备接入服务(IoT Device Access)是华为云的物联网平台,提供海量设备连接上云、设备和云端双向消息通信、批量设备管理、远程控制和监控、OTA升级、设备联动规则等能力,并可将设备数据灵活流转到华为云数据接入服务(DIS),帮助物
批量启动转储任务 功能介绍 此接口用于批量启动转储任务。 调用方法 请参见如何调用API。 URI POST /v2/{project_id}/streams/{stream_name}/transfer-tasks/action 表1 路径参数 参数 是否必选 参数类型 描述 project_id
使用Kafka Adapter上传与下载数据 使用DIS Spark Streaming下载数据 使用DIS Flink Connector上传与下载数据 使用设备接入IoTDA写入数据至DIS
创建数据库 在Console页面上方菜单栏中单击“产品”,单击“大数据”分类中的“数据湖探索 DLI”。 创建demo数据库,在DLI控制台总览页面,选择“SQL作业”,单击“创建作业”,进入SQL作业编辑器。 在SQL作业编辑器左侧,选择“数据库”,单击创建数据库。 “de
streamname="dis-test1"| #已存在的通道名 putRecords_sample.py文件中的putRecords_test方法中的records为需要上传的数据内容,数据上传格式如下: 1 2 3 4 records=[{"data": "abcdefd", "partition_id":
partitionId, cursor); GetRecordsRequest recordsRequest = new GetRecordsRequest(); GetRecordsResult recordResponse = null; while
PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List<PutRecordsRequestEntry>
分为Input与Output插件,本节介绍插件的各个配置项具体含义。 配置DIS Logstash Input 配置模板如下:(该模板为从DIS通道下载数据写入本地文件) input { dis { streams => ["YOUR_DIS_STREAM_NAME"]
日志转储的状态。 已完成 失败 异常 读取记录数 从通道内读取的用户记录数。 写入记录数 写入目标服务(如MRS)的记录数,例如通道内的一条用户记录可能转换为多条记录写入MRS服务的opentsdb。 转储文件名 转储到目标服务的文件名称。 从通道内读取的用户记录会写入文件后,再通过文件的形式转储到目标服务(如OBS、MRS)
批量暂停转储任务 功能介绍 此接口用于批量暂停转储任务。 调用方法 请参见如何调用API。 URI POST /v2/{project_id}/streams/{stream_name}/transfer-tasks/action 表1 路径参数 参数 是否必选 参数类型 描述 project_id
批量添加资源标签 功能介绍 该接口用于批量添加资源(通道等)标签。此接口为幂等接口:创建时如果请求体中存在重复key则报错。创建时,不允许设置重复key数据,如果数据库已存在该key,就覆盖value的值。 调用方法 请参见如何调用API。 URI POST /v2/{proje
environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = os.environ["CLOUD_SDK_AK"] sk = os.environ["CLOUD_SDK_SK"]
对已有分区进行缩容操作后,缩容成功的分区不再进行计费也不参与配额控制。在步骤1:开通DIS通道中配置的“生命周期”时间内,缩容成功的分区可以读取数据不可写入数据,超过此时间则不可读取/写入数据。 执行自动扩缩容操作 使用注册账户登录DIS控制台。 单击管理控制台左上角的,选择区域和项目。 按照如下方法进行自动扩缩容。
当程序从异常停止恢复时重传部分数据。 1 sendingRecordSize 否 单次调用DIS数据发送接口时的数据集大小。 说明: “batchSize”表示一个事务的批量值(如1000),而“sendingRecordSize”表示一个Rest请求的批量值(如250表示会发起四次请求)。当“bat
据业务吞吐的需求选择通道的分区数。 stream_type:通道类型,“COMMON”表示普通分区,单分区支持最大1MB/s的写入速度和2MB/s的读取速度。 data_duration:通道生命周期,即通道分区中数据的保留时长。 请求响应成功后,返回201 Created,表示通道创建成功。
PutRecords请求之间的时间段越长,序列号越大。 应用程序:一个DIS应用程序是读取和处理来自DIS数据通道的使用者。您可以使用客户端库(SDK)构建DIS应用程序。 客户端库:SDK是一个适用于Java的客户端库,帮助用户轻松构建DIS应用程序,用以读取和处理来自DIS数据通道的数据。
memoryChannel agent.sinks = loggerSink # 定义 Source (使用dis source,从DIS读取数据) agent.sources.dissource.channels = memoryChannel agent.sources.dissource