华为云用户手册

  • 删除转储任务 参考初始化DIS客户端的操作初始化一个DIS客户端实例,实例名称为dic。 使用DIS SDK删除指定的转储任务。 1 2 3 4 5 6 7 DeleteTransferTaskRequest request = new DeleteTransferTaskRequest(); // 配置转储任务所属的通道名称 request.setStreamName(streamName); // 配置待删除的转储任务名称 request.setTransferTaskName(taskName); 配置“DeleteTransferTaskRequest”对象之后,通过调用deleteTransferTask的方法删除转储任务。 1 dic.deleteTransferTask(request); 父主题: 使用SDK(Java)
  • 添加转储到 MapReduce服务 MRS )的转储任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 CreateTransferTaskRequest request = new CreateTransferTaskRequest(); //配置通道名称:用户在 数据接入服务 (简称DIS)控制台创建通道 request.setStreamName(streamName); //添加MRS转储任务,并设置任务名称 MRSDestinationDescriptorRequest descriptor = new MRSDestinationDescriptorRequest(); descriptor.setTransferTaskName(taskName); // 配置MRS集群信息:集群名称和集群ID。可通过弹性大数据服务(简称MRS)控制台创建和查询,集群需为非安全模式 descriptor.setMrsClusterName("mrs_dis"); descriptor.setMrsClusterId("fe69a732-c7d3-4b0f-8cda-ec9eca0cf141"); // 转储MRS通过OBS服务中转,需配置OBS桶名和子文件夹名,此目录也用于保存转储失败的源数据文件。可通过OBS控制台或客户端创建桶和文件夹 descriptor.setObsBucketPath("obs-dis"); descriptor.setFilePrefix("transfertask"); // 转储周期,单位s descriptor.setDeliverTimeInterval(900); // 可选:在DIS管理页面自动创建dis_admin_agency委托后,默认采用此委托。如未创建过 IAM 委托,请用主账户登录DIS控制台并创建通道,点击“添加转储任务”,前往授权。 descriptor.setAgencyName("dis_admin_agency"); // 转储OBS的目标文件格式:默认text,可配置parquet、carbon descriptor.setDestinationFileType(DestinationFileTypeEnum.TEXT.getType()); // 设置从DIS通道拉取数据时的初始偏移量: 默认LATEST,从通道内最新上传的记录开始读取; TRIM_HORIZON,从通道内最早的未过期记录开始读取 descriptor.setConsumerStrategy(PartitionCursorTypeEnum.LATEST.name()); request.setMrsDestinationDescriptor(descriptor);
  • 添加转储到 数据湖探索 服务( DLI )的转储任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 CreateTransferTaskRequest request = new CreateTransferTaskRequest(); // 配置通道名称:用户在数据接入服务(简称DIS)控制台创建通道 request.setStreamName(streamName); // 添加DLI转储任务,并设置任务名称 UqueryDestinationDescriptorRequest descriptor = new UqueryDestinationDescriptorRequest(); descriptor.setTransferTaskName(taskName); // 配置DLI相关信息:数据库和内表名称。可通过 数据湖 探索(简称DLI)控制台创建和查询,DLI表需为内表 descriptor.setDliDatabaseName("dis_dli"); descriptor.setDliTableName("dis_test"); // 转储DLI通过OBS服务中转,需配置OBS桶名和子文件夹名,此目录也用于保存转储失败的源数据文件。可通过OBS控制台或客户端创建桶和文件夹 descriptor.setObsBucketPath("obs-dis"); descriptor.setFilePrefix("transfertask"); // 转储周期,单位s descriptor.setDeliverTimeInterval(900); // 可选:在DIS管理页面自动创建dis_admin_agency委托后,默认采用此委托。如未创建过IAM委托,请用主账户登录DIS控制台并创建通道,点击“添加转储任务”,前往授权。 descriptor.setAgencyName("dis_admin_agency"); // 设置从DIS通道拉取数据时的初始偏移量: 默认LATEST,从通道内最新上传的记录开始读取; TRIM_HORIZON,从通道内最早的未过期记录开始读取 descriptor.setConsumerStrategy(PartitionCursorTypeEnum.LATEST.name()); request.setDliDestinationDescriptor(descriptor);
  • 添加转储到 对象存储服务 (OBS)的转储任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 CreateTransferTaskRequest request = new CreateTransferTaskRequest(); // 配置通道名称:用户在数据接入服务(简称DIS)控制台创建通道 request.setStreamName(streamName); // 添加OBS转储任务,并设置任务名称 OBSDestinationDescriptorRequest descriptor = new OBSDestinationDescriptorRequest(); descriptor.setTransferTaskName(taskName); // 转储至对象存储服务(简称OBS):OBS桶名和子文件夹名,通过OBS控制台或客户端创建桶和文件夹 descriptor.setObsBucketPath("obs-dis"); descriptor.setFilePrefix("transfertask"); // 转储周期,单位s descriptor.setDeliverTimeInterval(900); // 可选:在DIS管理页面自动创建名称为“dis_admin_agency”的IAM委托,默认采用此委托,用于授权访问。如未创建过IAM委托,请用主账户登录DIS控制台并创建通道,点击“添加转储任务”,前往授权。 descriptor.setAgencyName("dis_admin_agency"); // 可选,转储OBS的目标文件格式:默认text,可配置parquet、carbon descriptor.setDestinationFileType(DestinationFileTypeEnum.TEXT.getType()); // 设置从DIS通道拉取数据时的初始偏移量: 默认LATEST,从通道内最新上传的记录开始读取; TRIM_HORIZON,从通道内最早的未过期记录开始读取 descriptor.setConsumerStrategy(PartitionCursorTypeEnum.LATEST.name()); request.setObsDestinationDescriptor(descriptor);
  • 添加转储到 数据仓库 服务(DWS)的转储任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 CreateTransferTaskRequest request = new CreateTransferTaskRequest(); //配置通道名称:用户在数据接入服务(简称DIS)控制台创建通道 request.setStreamName(streamName); //添加DWS转储任务,并设置任务名称 DwsDestinationDescriptorRequest descriptor = new DwsDestinationDescriptorRequest(); descriptor.setTransferTaskName(taskName); // 配置DWS集群信息:集群名称、集群ID、数据库等信息。可通过数据仓库服务(简称DWS)控制台创建和查询集群,并通过客户端或其他方式创建数据表 descriptor.setDwsClusterName("dis_test"); descriptor.setDwsClusterId("92f90f6a-de4d-4689-82f6-320c328b0062"); descriptor.setDwsDatabaseName("postgres"); descriptor.setDwsSchema("dbadmin"); descriptor.setDwsTableName("distable01"); descriptor.setDwsDelimiter("|"); descriptor.setUserName(System.getenv("DB_ADMIN")); descriptor.setUserPassword(System.getenv("DB_PASSWORD")); //DIS调用KMS服务加密存储DWS的密码,保证用户数据安全:用户可通过 数据加密 服务(简称KMS)控制台的"密钥管理"创建和查询KMS密钥信息 descriptor.setKmsUserKeyName("qiyinshan"); descriptor.setKmsUserKeyId("9521c600-64a8-4971-ad36-7bbfa6d00c41"); // 转储DWS通过OBS服务中转,需配置OBS桶名和子文件夹名,此目录也用于保存转储失败的源数据文件。可通过OBS控制台或客户端创建桶和文件夹 descriptor.setObsBucketPath("obs-dis"); descriptor.setFilePrefix("transfertask"); // 转储周期,单位s descriptor.setDeliverTimeInterval(900); // 可选:在DIS管理页面自动创建dis_admin_agency委托后,默认采用此委托。如未创建过IAM委托,请用主账户登录DIS控制台并创建通道,点击“添加转储任务”,前往授权。 descriptor.setAgencyName("dis_admin_agency"); // 设置从DIS通道拉取数据时的初始偏移量: 默认LATEST,从通道内最新上传的记录开始读取; TRIM_HORIZON,从通道内最早的未过期记录开始读取 descriptor.setConsumerStrategy(PartitionCursorTypeEnum.LATEST.name()); request.setDwsDestinationDescriptor(descriptor);
  • 查询APP列表 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 listApp_test 方法中的(limit可设置单次请求返回APP列表的最大数量取值范围:1~100)。 配置参数如下: 1 startAppName="app1" #APP名称(从该通道开始返回app列表,返回的app列表不包括此app名称。) 配置好以上参数,执行listApp_sample.py文件调用Applist_test方法。 响应结果如下: 1 2 200 {'has_more_app': False, 'apps': [{'app_id': 'kpvGNrFYfKjpqTSdPIX', 'create_time': 1543301301992, 'app_name': 'sadfghjkl'}, {'app_id': 'MtPG1lD1E7IesDuOcNt', 'create_time': 1542765418080, 'app_name': 'testAppName2'}]} 父主题: 使用SDK(Python)
  • 查询APP详情 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 1 appname=”app1” #查询的APP名称 配置好以上参数,执行describeApp_sample.py文件调用describeApp_test方法。 响应结果如下: 1 2 200 {'app_name': 'app1', 'app_id': 'OPKQuggQVtfqhyvK0cs', 'create_time': 1532425956631} 父主题: 使用SDK(Python)
  • Json格式上传流式数据 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 1 streamname="dis-test1"| #已存在的通道名 putRecords_sample.py文件中的putRecords_test方法中的records为需要上传的数据内容,数据上传格式如下: 1 2 3 4 records=[{"data": "abcdefd", "partition_id": “shardId-0000000001”}] #"data":"xxx"为上传的数据值,请自定义;“partition_id”:“shardId-0000000001”为数据写入的分区id值,请自定义。 record1 = {"data": "xxx","partition_id": partition_id} #可写入多条数据,数据格式如record1所示,每写一条数据使用下面的append方法传入records中。 配置好以上参数后,执行putRecords_sample.py文件调用putRecords_test方法,响应结果如下: 1 2 200 {'failed_record_count': 0, 'records': [{'partition_id': 'shardId-0000000001', 'sequence_number': '15'}]} 父主题: 使用SDK(Python)
  • 查询转储详情 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 1 2 streamname="dis-test1"#已存在的通道名 task_name="test_1" #查询该通道下的xx转储任务 配置好以上参数后,执行describe_dump_task_sample.py文件默认调用describe_dump_task_test方法。 返回信息如下: 1 2 200 {'state': 'RUNNING', 'stream_name': 'dis-test1', 'create_time': 1537949648144, 'last_transfer_timestamp': 1538018072564, 'destination_type': 'OBS', 'obs_destination_description': {'obs_bucket_path': '002', 'deliver_time_interval': 30, 'retry_duration': 0, 'agency_name': 'all', 'partition_format': 'yyyy/MM/dd/HH/mm', 'destination_file_type': 'text', 'record_delimiter': '|', 'consumer_strategy': 'LATEST', 'file_prefix': ''}, 'task_name': 'test_1', 'partitions': [{'state': 'RUNNING', 'discard': 0, 'last_transfer_offset': 500, 'partitionId': 'shardId-0000000000', 'last_transfer_timestamp': 1538018072564}, {'state': 'RUNNING', 'discard': 0, 'last_transfer_offset': 500, 'partitionId': 'shardId-0000000001', 'last_transfer_timestamp': 1538018072564}]} 父主题: 使用SDK(Python)
  • 查询转储列表 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 1 streamname="XXX" #已存在的通道名 执行list_dump_task_sample.py文件默认调用list_dump_task_test方法,获取响应200查询成功。 响应示例如下: 1 2 200 {'total_number': 1, 'tasks': [{'last_transfer_timestamp': 1538018769241, 'state': 'RUNNING', 'create_time': 1537949648144, 'destination_type': 'OBS', 'task_name': 'task_test1'}]} 父主题: 使用SDK(Python)
  • 删除转储任务 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 1 2 streamname = "" #已存在的通道名称 task_name="xx" task_name配置为特定的转储任务名称,则删除通道下的该转储任务。 配置好以上参数后,执行delete_dump_task_sample.py文件默认调用delete_dump_task_test方法,获取响应204删除成功。 父主题: 使用SDK(Python)
  • 添加转储任务 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 参照添加转储任务配置方法中的参数值。 配置如下参数: streamname='dis—test1' #已存在的通道名 task_name='113' 以添加OBS转储服务为例:value参数值设定与key对应 basic_Schema=DumpTask.setSchema(key=['consumer_strategy','deliver_time_int erval','agency_name','retry_duration'], value=['LATEST', 30, 'dis_admin_agency',1800]) obs_dump_task =['destination_file_type','obs_bucket_path','file_prefix', 'partition_format','record_delimiter'] obs_Schema = DumpTask.setSchema(basic_Schema=basic_Schema, key=obs_dump_task,value=['text','obs-1253', '','yyyy', '|']) # 添加OBS转储服务,配置obs_Schema值 cli.add_dump_task(streamname, task_name,'OBS',obs_Schema) 配置好以上参数后,执行add_dump_task_sample.py文件默认调用add_dump_task_test方法,获取响应201创建成功。 父主题: 使用SDK(Python)
  • 获取数据游标 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: partitionId="shardId-0000000000" streamname=”dis-test1“ #已存在的通道名 5种游标设置使用参考如下: # startSeq与AT_SEQUENCE_NUMBER/AFTER_SEQUENCE_NUMBER搭配使用 r = cli.getCursor(streamname, partitionId, cursorType='AT_SEQUENCE_NUMBER', startSeq="0") # r = cli.getCursor(streamname, partitionId, cursorType='AFTER_SEQUENCE_NUMBER', startSeq="0") # timestamp与AT_TIMESTAMP搭配使用 # r = cli.getCursor(streamname, partitionId, cursorType='AT_TIMESTAMP',timestamp=1554694135190) # r = cli.getCursor(streamname, partitionId, cursorType='TRIM_HORIZON') # r = cli.getCursor(streamname, partitionId, cursorType='LATEST') 配置好以上参数,执行getCursor_sample.py文件调用getCursor_test方法,响应结果示例如下: 1 2 200 {"partition_cursor": "eyJnZXRJdGVyYXRvclBhcmFtIjp7InN0cmVhbS1uYW1lIjoiSCIsInBhcnRpdGlvbi1pZCI6InNoYXJkSWQtMDAwMDAwMDAwMCIsImN1cnNvci10eXBlIjoiQVRfU0VRVUVOQ0VfTlVNQkVSIiwic3RhcnRpbmctc2VxdWVuY2UtbnVtYmVyIjoiMCJ9LCJnZW5lcmF0ZVRpbWVzdGFtcCI6MTUzMjQyNDg4NzE1NH0"} 父主题: 使用SDK(Python)
  • 变更分区数量 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 1 2 streamname = "" #已存在的running状态通道名 target_partition_count =”3” #变更后的数量值 配置好以上参数,执行changepartitionQuantity_sample.py文件调用changepartitionQuantity_test方法,响应结果如下: 1 2 3 4 5 { "stream_name":"stream_name_test", "current_partition_count":2 "target_partition_count":5 } 父主题: 使用SDK(Python)
  • 查询Checkpoint 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 1 2 3 streamname = "" #通道名称 appName="xx" # APP名称(APP是已存在状态) partitionId="shardId-0000000000" #分区的唯一标识符 partitionId可通过查询通道详情获取,需要先传入当前设置的通道名称。 配置好以上参数,执行getCheckpoint_sample.py文件调用getCheckpoint_test方法,响应结果如下: 1 2 3 4 { "sequence_number": "10", "metadata": "metadata" } 父主题: 使用SDK(Python)
  • 新增Checkpoint 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 1 2 3 4 5 streamname = "" #通道名称 appName="xx" # APP名称(APP是已存在状态) partitionId="shardId-0000000000" #分区的唯一标识符。 seqNumber="0" #序列号 metadata="" #用户消费程序端的元数据信息,元数据信息的最大长度为1000个字符 partitionId可通过查询通道详情获取,需要先传入当前设置的通道名称。 配置好以上参数,执行commitCheckpoint_sample.py文件调用commitCheckpoint_test方法,响应201表示成功。 父主题: 使用SDK(Python)
  • 查询通道详情 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 1 streamname="dis-test1" #已存在的通道名 配置好以上参数后,执行describeStream_sample.py文件默认调用describeStream_test方法。 返回信息如下: 1 2 200 {"status": "RUNNING", "stream_name": "dis-test1", "data_type": "BLOB", "has_more_partitions": false, "stream_type": "COMMON", "stream_id": "L84hxfES223eVrFyxiE", "retention_period": 168, "create_time": 1532423353637, "last_modified_time": 1532423354625, "partitions": [{"status": "ACTIVE", "hash_range": "[0 : 9223372036854775807]", "sequence_number_range": "[0 : 10]", "partition_id": "shardId-0000000000"}]} 父主题: 使用SDK(Python)
  • 查询通道列表 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 1 start_stream_name = "" #可设置为空,或是已存在的通道名 执行listStream_sample.py文件默认调用listStream_test方法,获取响应200查询成功。 通道列表的返回信息示例如下: 1 2 200 {'stream_names': ['dis-jLGp', 'dis-w_p', 'dis_test1', 'dis_test2'], 'has_more_streams': False, 'total_number': 4} 父主题: 使用SDK(Python)
  • 创建通道 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 根据stream_type选取方法,参照创建通道配置方法中的参数值。 stream_type=“” #无转储通道 配置createstream_sample.py中Dump_switch方法参数值。 stream_type=“FILE” #文件类型通道 配置createstream_sample.py中Dump_switch_FILE方法参数值。 配置好参数后,执行createstream_sample.py文件默认调用createStream_test方法,获取响应201创建成功。 父主题: 使用SDK(Python)
  • 背景信息 下载流式数据,需要确定从分区的什么位置开始获取(即获取游标)。确定起始位置后,再循环获取数据。 获取游标有如下五种方式: AT_SEQUENCE_NUMBER AFTER_SEQUENCE_NUMBER TRIM_HORIZON LATEST AT_TIMESTAMP 为更好理解游标类型,您需要了解如下几个基本概念。 序列号(sequenceNumber),每个记录的唯一标识符。序列号由DIS在数据生产者调用PutRecord操作以添加数据到DIS数据通道时DIS服务自动分配的。同一分区键的序列号通常会随时间变化增加。PutRecords请求之间的时间段越长,序列号越大。 每个分区的sequenceNumber从0开始持续增长,每条数据对应唯一的sequenceNumber,超过生命周期后此sequenceNumber将过期不可用。(例如上传一条数据到新分区,其sequenceNumber起始为0,上传100条之后,则最后一条的sequenceNumber为99;如超过生命周期之后,0~99的数据则不可用) 分区的数据有效范围可以通过调用describeStream(查询通道详情)接口获取,其sequenceNumberRange代表数据有效范围,第一个值为最老数据的sequenceNumber,最后一个值为下一条上传数据的sequenceNumber(最新数据的sequenceNumber为此值-1) 例如[100, 200],表示此分区总共上传了200条数据,其中第0~99条已过期,有效的最老数据为100,最新数据为199,下一条上传数据的sequenceNumber为200。
  • 参数说明 表2 参数说明 参数名 参数类型 说明 partitionId String 分区ID。 说明: 请根据上传流式数据的执行结果,控制台的返回信息字段,例如 “partitionId [shardId-0000000000]”进行定义。 startingSequenceNumber String 序列号。序列号是每个记录的唯一标识符。序列号由DIS在数据生产者调用PutRecords操作以添加数据到DIS数据通道时DIS服务自动分配的。同一分区键的序列号通常会随时间变化增加。PutRecords请求之间的时间段越长,序列号越大。 说明: 请根据上传流式数据的执行结果,控制台的返回信息字段,例如“sequenceNumber [1]”进行定义。 cursorType String 游标类型。 AT_SEQUENCE_NUMBER:从特定序列号(即startingSequenceNumber定义的序列号)所在的记录开始读取数据。此类型为默认游标类型。 AFTER_SEQUENCE_NUMBER:从特定序列号(即startingSequenceNumber定义的序列号)后的记录开始读取数据。 TRIM_HORIZON:从最早被存储至分区的有效记录开始读取。 例如,某租户使用DIS的通道,分别上传了三条数据A1,A2,A3。N天后(设定A1已过期,A2和A3仍在有效期范围内),该租户需要下载数据,并选择了TRIM_HORIZON这种下载方式。那么用户可下载的数据将从A2开始读取。 LATEST:从分区中的最新记录开始读取,此设置可以保证你总是读到分区中最新记录。 AT_TIMESTAMP:从特定时间戳(即timestamp定义的时间戳)开始读取。
  • 内容导航 SDK开发指南指导您如何安装和配置开发环境、如何通过调用DIS SDK提供的接口函数进行二次开发。 章节 内容 DIS SDK能做什么 内容导航 简要介绍DIS的概念和DIS SDK的概念。 SDK下载 兼容性 如何校验软件包完整性? 介绍使用DIS SDK进行二次开发过程中涉及到的资源信息。 开通DIS服务 介绍DIS服务和DIS通道的开通方式。 获取认证信息 介绍使用DIS SDK进行二次开发前需要进行的初始化工作。 Python:准备环境~~获取数据游标 介绍使用DIS SDK进行的常用操作(匹配python)。 Java:准备环境~~变更分区数量 介绍使用DIS SDK进行的常用操作(匹配java)。 DIS服务端错误码 介绍使用DIS SDK过程中遇到异常时的响应信息。 父主题: 简介
  • DIS概述 数据接入服务(Data Ingestion Service)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。 云服务实现了在多地域部署基础设施,具备高度的可扩展性和可靠性,用户可根据自身需要指定地域使用DIS服务,由此获得更快的访问速度和实惠的服务价格。 DIS对数据传输所需要的基础设置、存储、网络和配置进行管理。您无需为数据通道担心配置、部署、持续的硬件维护等。此外,DIS还可在云区域同步复制数据,为您提供数据高可用性和数据持久性。
  • 修订记录 发布日期 修订说明 2019-12-11 第二十四次正式发布: 增加dis-kafka-adapter,增加使用Kafka Adapter上传与下载数据。 2019-10-08 第二十三次正式发布: 优化Java和Python SDK。 2019-07-08 第二十次正式发布: 小文件功能下线,删除“创建源数据类型是FILE的通道”。 2019-07-03 第十九次正式发布: Java SDK不兼容原生Kafka客户端,删除“连接Kafka consumer”。 2019-05-14 第十八次正式发布: 支持使用SDK实现数据的加密上传下载,修改初始化DIS客户端。 2019-05-07 第十七次正式发布: 查询通道列表SDK增加分页功能说明,修改查询通道列表。 2019-04-16 第十六次正式发布: 查询通道列表SDK增加响应参数说明,修改查询通道列表。 2019-03-18 第十五次正式发布: 新增如下内容: 添加转储任务~~查询转储详情 初始化DIS客户端 2019-02-23 第十四次正式发布: 修改如下内容: 获取认证信息 2019-01-17 第十三次正式发布: 内容优化。 2019-01-07 第十二次正式发布: 修改如下内容: 下载流式数据 2018-11-28 第十一次正式发布: 修改如下内容: 初始化DIS客户端 创建通道 下载流式数据 2018-11-07 第十次正式发布。 修改如下内容: 如何校验软件包完整性? 2018-09-25 第九次正式发布。 新增如下内容: 准备环境~~获取数据游标 2018-08-19 第八次正式发布。 修改如下内容: 配置样例工程 2018-07-23 第七次正式发布。 修改文档结构和名称。 2018-07-10 第六次正式发布。 新增了如下内容: 创建APP 删除APP 查询Checkpoint 变更分区数量 2018-06-12 第五次正式发布。 修改了如下内容: 开通DIS服务 DIS服务端错误码 2018-05-11 第四次正式发布。 修改了如下内容: 开通DIS通道 DIS服务端错误码 Uquery更名为数据湖探索(DLI,Data Lake Insight)。 2018-02-08 第三次正式发布。 新增了如下内容: 创建通道 删除通道 查询通道列表 查询通道详情 获取数据游标 修改了如下内容: 上传流式数据 下载流式数据 2017-11-18 第二次正式发布。 修改了如下内容: 开通DIS通道 2017-10-28 第一次正式发布。
  • SQL20 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 select i_item_id ,i_item_desc ,i_category ,i_class ,i_current_price ,sum(cs_ext_sales_price) as itemrevenue ,sum(cs_ext_sales_price)*100/sum(sum(cs_ext_sales_price)) over (partition by i_class) as revenueratio from catalog_sales ,item ,date_dim where cs_item_sk = i_item_sk and i_category in ('Sports', 'Shoes', 'Women') and cs_sold_date_sk = d_date_sk and d_date between cast('2001-03-21' as date) and (cast('2001-03-21' as date) + 30) group by i_item_id ,i_item_desc ,i_category ,i_class ,i_current_price order by i_category ,i_class ,i_item_id ,i_item_desc ,revenueratio limit 100;
  • SQL17 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 select i_item_id ,i_item_desc ,s_state ,count(ss_quantity) as store_sales_quantitycount ,avg(ss_quantity) as store_sales_quantityave ,stddev_samp(ss_quantity) as store_sales_quantitystdev ,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov ,count(sr_return_quantity) as store_returns_quantitycount ,avg(sr_return_quantity) as store_returns_quantityave ,stddev_samp(sr_return_quantity) as store_returns_quantitystdev ,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as store_returns_quantitycov ,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) as catalog_sales_quantityave ,stddev_samp(cs_quantity) as catalog_sales_quantitystdev ,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov from store_sales ,store_returns ,catalog_sales ,date_dim d1 ,date_dim d2 ,date_dim d3 ,store ,item where d1.d_quarter_name = '2000Q1' and d1.d_date_sk = ss_sold_date_sk and i_item_sk = ss_item_sk and s_store_sk = ss_store_sk and ss_customer_sk = sr_customer_sk and ss_item_sk = sr_item_sk and ss_ticket_number = sr_ticket_number and sr_returned_date_sk = d2.d_date_sk and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3') and sr_customer_sk = cs_bill_customer_sk and sr_item_sk = cs_item_sk and cs_sold_date_sk = d3.d_date_sk and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3') group by i_item_id ,i_item_desc ,s_state order by i_item_id ,i_item_desc ,s_state limit 100;
  • SQL18 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 select i_item_id, ca_country, ca_state, ca_county, avg( cast(cs_quantity as decimal(12,2))) agg1, avg( cast(cs_list_price as decimal(12,2))) agg2, avg( cast(cs_coupon_amt as decimal(12,2))) agg3, avg( cast(cs_sales_price as decimal(12,2))) agg4, avg( cast(cs_net_profit as decimal(12,2))) agg5, avg( cast(c_birth_year as decimal(12,2))) agg6, avg( cast(cd1.cd_dep_count as decimal(12,2))) agg7 from catalog_sales, customer_demographics cd1, customer_demographics cd2, customer, customer_address, date_dim, item where cs_sold_date_sk = d_date_sk and cs_item_sk = i_item_sk and cs_bill_cdemo_sk = cd1.cd_demo_sk and cs_bill_customer_sk = c_customer_sk and cd1.cd_gender = 'M' and cd1.cd_education_status = 'Primary' and c_current_cdemo_sk = cd2.cd_demo_sk and c_current_addr_sk = ca_address_sk and c_birth_month in (10,1,8,7,3,5) and d_year = 1998 and ca_state in ('NE','OK','NC' ,'CO','ID','AR','MO') group by rollup (i_item_id, ca_country, ca_state, ca_county) order by ca_country, ca_state, ca_county, i_item_id limit 100;
  • SQL12 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 select i_item_id ,i_item_desc ,i_category ,i_class ,i_current_price ,sum(ws_ext_sales_price) as itemrevenue ,sum(ws_ext_sales_price)*100/sum(sum(ws_ext_sales_price)) over (partition by i_class) as revenueratio from web_sales ,item ,date_dim where ws_item_sk = i_item_sk and i_category in ('Music', 'Shoes', 'Children') and ws_sold_date_sk = d_date_sk and d_date between cast('2000-05-14' as date) and (cast('2000-05-14' as date) + 30 ) group by i_item_id ,i_item_desc ,i_category ,i_class ,i_current_price order by i_category ,i_class ,i_item_id ,i_item_desc ,revenueratio limit 100;
  • SQL13 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 select avg(ss_quantity) ,avg(ss_ext_sales_price) ,avg(ss_ext_wholesale_cost) ,sum(ss_ext_wholesale_cost) from store_sales ,store ,customer_demographics ,household_demographics ,customer_address ,date_dim where s_store_sk = ss_store_sk and ss_sold_date_sk = d_date_sk and d_year = 2001 and((ss_hdemo_sk=hd_demo_sk and cd_demo_sk = ss_cdemo_sk and cd_marital_status = 'U' and cd_education_status = '4 yr Degree' and ss_sales_price between 100.00 and 150.00 and hd_dep_count = 3 )or (ss_hdemo_sk=hd_demo_sk and cd_demo_sk = ss_cdemo_sk and cd_marital_status = 'D' and cd_education_status = '2 yr Degree' and ss_sales_price between 50.00 and 100.00 and hd_dep_count = 1 ) or (ss_hdemo_sk=hd_demo_sk and cd_demo_sk = ss_cdemo_sk and cd_marital_status = 'S' and cd_education_status = 'Advanced Degree' and ss_sales_price between 150.00 and 200.00 and hd_dep_count = 1 )) and((ss_addr_sk = ca_address_sk and ca_country = 'United States' and ca_state in ('IL', 'WI', 'TN') and ss_net_profit between 100 and 200 ) or (ss_addr_sk = ca_address_sk and ca_country = 'United States' and ca_state in ('MO', 'OK', 'WA') and ss_net_profit between 150 and 300 ) or (ss_addr_sk = ca_address_sk and ca_country = 'United States' and ca_state in ('NE', 'VA', 'GA') and ss_net_profit between 50 and 250 )) ;
  • SQL10 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 select cd_gender, cd_marital_status, cd_education_status, count(*) cnt1, cd_purchase_estimate, count(*) cnt2, cd_credit_rating, count(*) cnt3, cd_dep_count, count(*) cnt4, cd_dep_employed_count, count(*) cnt5, cd_dep_college_count, count(*) cnt6 from customer c,customer_address ca,customer_demographics where c.c_current_addr_sk = ca.ca_address_sk and ca_county in ('Clark County','Richardson County','Tom Green County','Sullivan County','Cass County') and cd_demo_sk = c.c_current_cdemo_sk and exists (select * from store_sales,date_dim where c.c_customer_sk = ss_customer_sk and ss_sold_date_sk = d_date_sk and d_year = 2000 and d_moy between 1 and 1+3) and (exists (select * from web_sales,date_dim where c.c_customer_sk = ws_bill_customer_sk and ws_sold_date_sk = d_date_sk and d_year = 2000 and d_moy between 1 ANd 1+3) or exists (select * from catalog_sales,date_dim where c.c_customer_sk = cs_ship_customer_sk and cs_sold_date_sk = d_date_sk and d_year = 2000 and d_moy between 1 and 1+3)) group by cd_gender, cd_marital_status, cd_education_status, cd_purchase_estimate, cd_credit_rating, cd_dep_count, cd_dep_employed_count, cd_dep_college_count order by cd_gender, cd_marital_status, cd_education_status, cd_purchase_estimate, cd_credit_rating, cd_dep_count, cd_dep_employed_count, cd_dep_college_count limit 100;
共100000条