检测到您已登录华为云国际站账号,为了您更好的体验,建议您访问国际站服务网站 https://www.huaweicloud.com/intl/zh-cn
不再显示此消息
使用DIS Flume Plugin上传与下载数据 DIS Flume Plugin概述 安装DIS Flume Plugin前准备 安装Plugin 配置Plugin 验证Plugin 卸载Plugin(可选) 父主题: 使用DIS
配置样例工程 样例代码请查看:https://github.com/huaweicloud/huaweicloud-sdk-python-dis/tree/master/dis_sdk_python_demo。 操作步骤 huaweicloud-python-sdk-dis已发布到
准备环境 配置pom.xml文件 如果已有maven工程,在pom.xml中使用如下依赖即可。 <dependency> <groupId>com.huaweicloud.dis</groupId> <artifactId>huaweicloud-dis-kafk
估所需分区数量。 单击“使用计算值”将系统计算出的建议值应用于“分区数量”。 - 生命周期(小时) 存储在DIS中的数据保留的最长时间,超过此时长数据将被清除。 取值范围:24~72的整数。 24 源数据类型 BLOB:存储在数据库管理系统中的一组二进制数据。“源数据类型”选择“
估所需分区数量。 单击“使用计算值”将系统计算出的建议值应用于“分区数量”。 - 生命周期(小时) 存储在DIS中的数据保留的最长时间,超过此时长数据将被清除。 取值范围:24~72的整数。 24 源数据类型 BLOB:存储在数据库管理系统中的一组二进制数据。“源数据类型”选择“
读取通道数据时,如何区分不同类型数据? 不同类型的消息使用不同的通道; 使用同一个通道的不同分区。上传消息时,不同类型的消息指定不同的partition_key,消费时根据partition_key来区分不同类型消息。 父主题: 转储相关问题
并行处理:DIS可让您用多个应用程序同时处理同一个数据通道。例如,您可以让一个应用程序运行实时分析,让其他应用程序从同一个DIS数据通道中将数据发送至对象存储服务(Object Storage Service,简称OBS)。 安全可靠:DIS可将数据保留N*24小时,N的取值为1~7的整数,以防数据
查询通道列表 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 1 start_stream_name = "" #可设置为空,或是已存在的通道名 执行listStream_sample.py文件默认调用listStream_test方法,获取响应200查询成功。
查询转储列表 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 1 streamname="XXX" #已存在的通道名 执行list_dump_task_sample.py文件默认调用list_dump_task_test方法,获取响应200查询成功。 响应示例如下:
Json格式上传流式数据 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 1 streamname="dis-test1"| #已存在的通道名 putRecords_sample.py文件中的putRecords_test方法中的records为需要上传的数据内容,数据上传格式如下:
与原生KafkaProducer接口适配说明 DISKafkaProducer的实现与KafkaProducer的实现不同,DISKafkaProducer的客户端与服务端通过Rest API实现,而KafkaProducer是基于TCP协议实现,在接口兼容上有如下差异。 表1 适配说明 原生KafkaProducer
此过程的关键日志说明如下 Heartbeat {"state":"JOINING"} Heartbeat表示心跳请求,每10s发起一次,用于和服务端保持连接。如果超过1分钟服务端没有收到心跳,会认为消费端已离线,消费组会重新分配。若心跳结果为JOINING表示消费者需要重新加入消费组,若为STABLE表示消费组稳定。
验证Plugin 验证DIS Source 使用PuTTY工具远程登录Flume所在服务器。 确认已配置好包含dis source的配置文件 可基于Flume自带的flume-conf.properties.template修改,文件样例如下所示: agent.sources =
下载数据之消费位移 消费位移确认有自动提交与手动提交两种策略,在创建DISKafkaConsumer对象时,通过参数enable.auto.commit设定,true表示自动提交(默认)。 自动提交策略由消费者协调器(Coordinator)每隔${auto.commit.interval
准备DIS Spark Streaming的相关环境 准备DIS应用开发环境 参考步骤1:开通DIS通道准备相应DIS环境。 安装Maven并配置本地仓库地址。 安装scala-sdk。 配置DIS Spark Streaming依赖 项目中可通过以下配置引入DIS Spark Streaming依赖:
完成数据面资源的申请与自动部署。 服务数据面 接收用户发送数据的请求,对已鉴权的数据接收并存储。 接收用户获取数据的请求,在鉴权后输出对应的用户数据。 按时老化存储在系统中的用户数据。 根据用户配置,将用户数据存储到对象存储服务(Object Storage Service,简称OBS)、MapReduce服务(MapReduce
安装DIS Flume Plugin前准备 检查依赖 确认Flume已经安装并能正常运行。 确认Flume版本为1.4.0及以上版本。进入Flume安装目录,执行如下命令查看Flume版本。 $ bin/flume-ng version | grep Flume 确认使用的Java版本为1
and CLOUD_SDK_SK in the local environment ak = os.environ["CLOUD_SDK_AK"] sk = os.environ["CLOUD_SDK_SK"] credentials = BasicCredentials(ak
取值范围: 1h 3h 12h 可自定义查看监控信息的时间段。 单击“自定义”页签后的,分别设置开始时间和结束时间。 其中,结束时间不能晚于当前的系统时间。 开始时间与结束时间的差值不超过72h。 分区监控 分区编号 流分区编号,默认从0开始。取值方式:从下拉框选择。 该分区的总输入/输出流量(KB/秒)