华为云用户手册

  • 语法格式 1 2 3 4 5 6 7 8 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dds", username = "", password = "", db_url = "", field_names = "" );
  • 示例 将流qualified_cars 的数据输出到文档数据库collectionTest。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CREATE SINK STREAM qualified_cars ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT ) WITH ( type = "dds", region = "xxx", db_url = "192.168.0.8:8635,192.168.0.130:8635/dbtest/collectionTest", username = "xxxxxxxxxx", password = "xxxxxxxxxx", field_names = "car_id,car_owner,car_age,average_speed,total_miles", batch_insert_data_num = "10" );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,dds表示输出到文档数据库服务中。 username 是 数据库连接用户名。 password 是 数据库连接密码。 db_url 是 DDS实例的访问地址,形如:ip1:port,ip2:port/database/collection。 field_names 是 待插入数据字段的key,具体形式如:"f1,f2,f3",并且保证与sink中数据列一一对应。 batch_insert_data_num 否 表示一次性批量写入的数据量,值必须为正整数,默认值为10。
  • 前提条件 请务必确保您的账户下已在文档数据库服务(DDS)里创建了DDS实例。 如何创建DDS实例,请参考《文档数据库服务快速入门》中“快速购买文档数据库实例”章节。 目前仅支持未开启SSL认证的集群实例,不支持副本集与单节点的类型实例。 该场景作业需要运行在 DLI 的独享队列上,请确保已创建DLI独享队列。 关于如何创建DLI独享队列,在购买队列时,选择“按需计费”,勾选“专属资源模式”即可。具体操作请参见《 数据湖探索 用户指南》中创建队列章节。 确保DLI独享队列与DDS集群建立跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖 探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • Union/Union ALL/Intersect/Except 语法格式 1 query UNION [ ALL ] | Intersect | Except query 语法说明 UNION返回多个查询结果的并集。 Intersect返回多个查询结果的交集。 Except返回多个查询结果的差集。 注意事项 集合运算是以一定条件将表首尾相接,所以其中每一个SELECT语句返回的列数必须相同,列的类型一定要相同,列名不一定要相同。 UNION默认是去重的,UNION ALL是不去重的。 示例 输出Orders1和Orders2的并集,不包含重复记录。 1 2 insert into temp SELECT * FROM Orders1 UNION SELECT * FROM Orders2;
  • IN 语法格式 1 2 3 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression WHERE column_name IN (value (, value)* ) | query 语法说明 IN操作符允许在where子句中规定多个值。若表达式在给定的表子查询中存在,则返回 true 。 注意事项 子查询表必须由单个列构成,且该列的数据类型需与表达式保持一致。 示例 输出Orders中NewProducts中product的user和amount信息。 1 2 3 4 5 insert into temp SELECT user, amount FROM Orders WHERE product IN ( SELECT product FROM NewProducts );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 数据源类型,“dis”表示数据源为 数据接入服务 。 region 是 数据所在的DIS区域。 ak 否 访问密钥ID(Access Key ID)。访问密钥获取方式请参见我的凭证。 sk 否 Secret Access Key,与访问密钥ID结合使用的密钥。访问密钥获取方式请参见我的凭证。 channel 是 数据所在的DIS通道名称。 partition_count 否 数据所在的DIS通道分区数。该参数和partition_range参数不能同时配置。当该参数没有配置的时候默认读取所有partition。 partition_range 否 指定作业从DIS通道读取的分区范围。该参数和partition_count参数不能同时配置。当该参数没有配置的时候默认读取所有partition。 partition_range = "[0:2]"时,表示读取的分区范围是1-3,包括分区1、分区2和分区3。 encode 是 数据编码格式,可选为“csv”、“json”、“xml”、“email”、“blob”和“user_defined”。 若编码格式为“csv”,则需配置“field_delimiter”属性。 若编码格式为“json”,则需配置“json_config”属性。 若编码格式为“xml”,则需配置“xml_config”属性。 若编码格式为“email”,则需配置“email_key”属性。 若编码格式为“blob”,表示不对接收的数据进行解析,流属性仅能有一个且数据格式为ARRAY[TINYINT]。 若编码格式为“user_defined”,则需配置“encode_class_name”和“encode_class_parameter”属性。 field_delimiter 否 属性分隔符,仅当编码格式为csv时该参数需要填写,例如配置为“,”。 quote 否 可以指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普通字符处理。 当引用符号为双引号时,请设置quote = "\u005c\u0022"进行转义。 当引用符号为单引号时,则设置quote = "'"。 说明: 目前仅适用于 CS V格式。 设置引用符号后,必须保证每个字段中包含0个或者偶数个引用符号,否则会解析失败。 json_config 否 当编码格式为json时,用户需要通过该参数来指定json字段和流定义字段的映射关系,格式为“field1=data_json.field1; field2=data_json.field2; field3=$”,其中field3=$表示field3的内容为整个json串。 xml_config 否 当编码格式为xml时,用户需要通过该参数来指定xml字段和流定义字段的映射关系,格式为“field1=data_xml.field1; field2=data_xml.field2”。 email_key 否 当编码格式为email时,用户需要通过该参数来指定需要提取的信息,需要列出信息的key值,需要与流定义字段一一对应,多个key值时以逗号分隔,例如“Message-ID, Date, Subject, body”,其中由于邮件正文没有关键字,DLI规定其关键字为“body”。 encode_class_name 否 当encode为user_defined时,需配置该参数,指定用户自实现解码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 encode_class_parameter 否 当encode为user_defined时,可以通过配置该参数指定用户自实现解码类的入参,仅支持一个string类型的参数。 offset 否 当启动作业后再获取数据,则该参数无效。 当获取数据后再启动作业,用户可以根据需求设置该参数的数值。 例如当offset= "100"时,则表示DLI从DIS服务中的第100条数据开始处理。 start_time 否 DIS数据读取起始时间。 当该参数配置时则从配置的时间开始读取数据,有效格式为yyyy-MM-dd HH:mm:ss。 当没有配置start_time也没配置offset的时候,读取最新数据。 当没有配置start_time但配置了offset的时候,则从offset开始读取数据。 enable_checkpoint 否 是否启用checkpoint功能,可配置为true(启用)或者false(停用), 默认为false。 checkpoint_app_name 否 DIS服务的消费者标识,当不同作业消费相同通道时,需要区分不同的消费者标识,以免checkpoint混淆。 checkpoint_interval 否 DIS源算子做checkpoint的时间间隔,单位秒,默认为60。
  • 功能描述 创建source流从数据接入服务(DIS)获取数据。用户数据从DIS接入,Flink作业从DIS的通道读取数据,作为作业的输入数据。Flink作业可通过DIS的source源将数据从生产者快速移出,进行持续处理,适用于将云服务外数据导入云服务后进行过滤、实时分析、监控报告和转储等场景。 数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见《数据接入服务用户指南》。
  • 语法格式 CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dis", region = "", channel = "", partition_count = "", encode = "", field_delimiter = "", offset= "");
  • 示例 RDS表用于与输入流连接。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 CREATE TABLE car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, proctime as PROCTIME() ) WITH ( 'connector.type' = 'dis', 'connector.region' = 'cn-north-1', 'connector.channel' = 'disInput', 'format.type' = 'csv' ); CREATE TABLE db_info ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( 'connector.type' = 'gaussdb', 'connector.driver' = 'org.postgresql.Driver', 'connector.url' = 'jdbc:gaussdb://xx.xx.xx.xx:8000/xx', 'connector.table' = 'car_info', 'connector.username' = 'xx', 'connector.password' = 'xx', 'connector.lookup.cache.max-rows' = '10000', 'connector.lookup.cache.ttl' = '24h' ); CREATE TABLE audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( 'connector.type' = 'dis', 'connector.region' = 'cn-north-1', 'connector.channel' = 'disOutput', 'connector.partition-key' = 'car_id,car_owner', 'format.type' = 'csv' ); INSERT INTO audi_cheaper_than_30w SELECT a.car_id, b.car_owner, b.car_brand, b.car_price FROM car_infos as a join db_info FOR SYSTEM_TIME AS OF a.proctime AS b on a.car_id = b.car_id;
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector.type' = 'gaussdb', 'connector.url' = '', 'connector.table' = '', 'connector.username' = '', 'connector.password' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,需配置为'gaussdb' connector.url 是 jdbc连接地址,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。 connector.table 是 读取数据库中的数据所在的表名 connector.driver 否 jdbc连接驱动,默认为: org.postgresql.Driver。 connector.username 否 数据库认证用户名,需要和'connector.password'一起配置 connector.password 否 数据库认证密码,需要和'connector.username'一起配置 connector.read.partition.column 否 用于对输入进行分区的列名 与connector.read.partition.lower-bound、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.lower-bound 否 第一个分区的最小值 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.upper-bound 否 最后一个分区的最大值 与connector.read.partition.column、connector.read.partition.lower-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.num 否 分区的个数 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.upper-bound必须同时存在或者同时不存在 connector.read.fetch-size 否 每次从数据库拉取数据的行数。默认值为0,表示忽略该提示 connector.lookup.cache.max-rows 否 维表配置,缓存的最大行数,超过该值时,最先添加的数据将被标记为过期。-1表示不使用缓存。 connector.lookup.cache.ttl 否 维表配置,缓存超时时间,超过该时间的数据会被剔除。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。 connector.lookup.max-retries 否 维表配置,数据拉取最大重试次数,默认为3。
  • 示例 从Redis中读取数据。 create table redisSource( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'redis', 'connector.host' = 'xx.xx.xx.xx', 'connector.port' = '6379', 'connector.password' = 'xx', 'connector.table-name' = 'car_info' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,对于redis,需配置为'redis'。 connector.host 是 redis连接地址。 connector.port 是 redis连接端口。 connector.password 否 redis认证密码。 connector.deploy-mode 否 redis部署模式,支持standalone/cluster,默认standalone。 connector.table-name 否 table存储模式下必配,redis中存储表名。在table存储模式下,数据将以hash类型存储到redis,其中key为:${table-name}:${ext-key},field名为列名。 说明: table存储模式:将connector.table-name、connector.key-column作为redis的key。redis的hash类型,每个key对应一个hashmap,hashmap的hashkey为源表的字段名,hashvalue为源表的字段值。 connector.use-internal-schema 否 table存储模式下可配置,是否使用redis中已存在schema,默认为false。 connector.key-column 否 table存储模式下可配置,将该字段值作为redis中的ext-key,未配置时,ext-key为生成的uuid。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector.type' = 'redis', 'connector.host' = '', 'connector.port' = '' );
  • 示例 DataSource语法新建test_datasource_lifecycle表,生命周期为100天 1 2 3 CREATE TABLE test_datasource_lifecycle(id int) USING parquet TBLPROPERTIES( "dli.lifecycle.days"=100); Hive语法新建test_hive_lifecycle表,生命周期为100天。 1 2 3 CREATE TABLE test_hive_lifecycle(id int) stored as parquet TBLPROPERTIES( "dli.lifecycle.days"=100); DataSource语法新建test_datasource_lifecycle_obs表,生命周期为100天,过期时默认删除数据且数据备份至目录'obs://dli-test/'。 1 2 3 4 CREATE TABLE test_datasource_lifecycle_obs(name string, id int) USING parquet OPTIONS (path "obs://dli-test/xxx") TBLPROPERTIES( "dli.lifecycle.days"=100, "external.table.purge"='true', "dli.lifecycle.trash.dir"='obs://dli-test/Lifecycle-Trash' ); Hive语法新建test_hive_lifecycle_obs表,生命周期为100天,过期时默认删除数据且数据备份至目录'obs://dli-test/'。 1 2 3 4 CREATE TABLE test_hive_lifecycle_obs(name string, id int) STORED AS parquet LOCATION 'obs://dli-test/xxx' TBLPROPERTIES( "dli.lifecycle.days"=100, "external.table.purge"='true', "dli.lifecycle.trash.dir"='obs://dli-test/Lifecycle-Trash' );
  • 参数说明 表1 参数说明 参数名称 是否必选 参数说明 table_name 是 需要设置生命周期的表名。 dli.lifecycle.days 是 设置的生命周期时间,只能为正整数,单位为天。 external.table.purge 否 仅OBS表支持配置该参数。 是否需要在删除表或分区时,清除path路径下的数据。默认不删除。 设置'external.table.purge'='true'时: 非分区OBS表配置删除文件后,表目录也会删除。 分区OBS表自定义分区数据也会删除。 dli.lifecycle.trash.dir 否 仅OBS表支持配置该参数。 设置'external.table.purge'='true'时,清除数据的备份目录,默认七天后删除备份数据。
  • 表的回收规则 在创建表时通过TBLPROPERTIES指定表的生命周期。 非分区表 如果表是非分区表,根据每张表的最后修改时间,经过生命周期时间后判断是否要回收此表。 分区表 如果是分区表,则根据各分区的最后一次表数据被修改的时间(LAST_AC CES S_TIME)判断该分区是否该被回收。分区表的最后一个分区被回收后,该表不会被删除。 分区表不支持设置分区级的生命周期,仅支持表级别的生命周期管理。 生命周期回收为每天定时启动,扫描全量分区。 生命周期回收为每天定时启动,扫描全量分区的最后一次表数据被修改的时间(LAST_ACCESS_TIME)需要超过生命周期指定的时间才回收。 假设某个分区表生命周期为1天,该分区数据最后一次被修改的时间是2023年05月20日15时。如果在2023年05月20日15时之前扫描此表(不到一天),则不会回收表分区。如果2023年05月20日回收扫描时发现表分区最后一次表数据被修改的时间(LAST_ACCESS_TIME)超过生命周期指定的时间,则上述分区会被回收。 生命周期主要提供定期回收表或分区的功能,每天根据服务的繁忙程度,不定时回收。不能确保表或分区的生命周期到期后,立刻被回收。 删除表后,表的所有属性信息全部会删除,包括生命周期。新建同名表后,表的生命周期以新设置的属性为准。
  • 语法格式 DataSource语法创建DLI表 CREATE TABLE table_name(name string, id int) USING parquet TBLPROPERTIES( "dli.lifecycle.days"=1 ); Hive语法创建DLI表 CREATE TABLE table_name(name string, id int) stored as parquet TBLPROPERTIES( "dli.lifecycle.days"=1 ); DataSource语法创建OBS表 CREATE TABLE table_name(name string, id int) USING parquet OPTIONS (path "obs://dli-test/table_name") TBLPROPERTIES( "dli.lifecycle.days"=1, "external.table.purge"='true', "dli.lifecycle.trash.dir"='obs://dli-test/Lifecycle-Trash' ); Hive语法创建OBS表 1 2 3 4 CREATE TABLE table_name(name string, id int) STORED AS parquet LOCATION 'obs://dli-test/table_name' TBLPROPERTIES( "dli.lifecycle.days"=1, "external.table.purge"='true', "dli.lifecycle.trash.dir"='obs://dli-test/Lifecycle-Trash' );
  • 批作业SQL常用配置项说明 本章节为您介绍DLI 批作业SQL语法的常用配置项。 表1 常用配置项 名称 默认值 描述 spark.sql.files.maxRecordsPerFile 0 要写入单个文件的最大记录数。如果该值为零或为负,则没有限制。 spark.sql.shuffle.partitions 200 为连接或聚合过滤数据时使用的默认分区数。 spark.sql.dynamicPartitionOverwrite.enabled false 当前配置设置为“false”时,DLI在覆盖写之前,会删除所有符合条件的分区。例如,分区表中有一个“2021-01”的分区,当使用INSERT OVERWRITE语句向表中写入“2021-02”这个分区的数据时,会把“2021-01”的分区数据也覆盖掉。 当前配置设置为“true”时,DLI不会提前删除分区,而是在运行时覆盖那些有数据写入的分区。 spark.sql.files.maxPartitionBytes 134217728 读取文件时要打包到单个分区中的最大字节数。 spark.sql.badRecordsPath - Bad Records的路径。 spark.sql.legacy.correlated.scalar.query.enabled false 该参数设置为true: 当子查询中数据不重复的情况下,执行关联子查询,不需要对子查询的结果去重。 当子查询中数据重复的情况下,执行关联子查询,会提示异常,必须对子查询的结果做去重处理,比如max(),min()。 该参数设置为false: 不管子查询中数据重复与否,执行关联子查询时,都需要对子查询的结果去重,比如max(),min(),否则提示异常。 spark.sql.keep.distinct.expandThreshold - 参数说明: 对于包含count(distinct)的多维分析(with cube)的查询场景,spark典型的执行计划是将cube使用expand算子来实现,但该操作会导致查询膨胀,为了避免出现查询膨胀,建议执行如下配置: spark.sql.keep.distinct.expandThreshold: 默认值:-1,即使用Spark默认的expand算子。 设置具体数值:即代表定义了查询膨胀的阈值(例如512),超过该阈值count(distinct) 使用distinct聚合算子来执行,不再使用expand算子。 spark.sql.distinct.aggregator.enabled:强制使用distinct聚合算子的开关。配置为true时不再根据spark.sql.keep.distinct.expandThreshold来判断。 适用场景:包含count(distinct)的多维分析(with cube)的查询场景,可能包含多个count(distinct),且包含cube/roll up 典型场景示例: SELECT a1, a2, count(distinct b), count(distinct c) FROM test_distinct group by a1, a2 with cube spark.sql.distinct.aggregator.enabled false spark.sql.optimizer.dynamicPartitionPruning.enabled true 该配置项用于启用或禁用动态分区修剪。在执行SQL查询时,动态分区修剪可以帮助减少需要扫描的数据量,提高查询性能。 配置为true时,代表启用动态分区修剪,SQL会在查询中自动检测并删除那些不满足WHERE子句条件的分区,适用于在处理具有大量分区的表时。 如果SQL查询中包含大量的嵌套left join操作,并且表有大量的动态分区时,这可能会导致在数据解析时消耗大量的内存资源,导致Driver节点的内存不足,并触发频繁的Full GC。 在这种情况下,可以配置该参数为false即禁用动态分区修剪优化,有助于减少内存使用,避免内存溢出和频繁的Full GC。 但禁用此优化可能会降低查询性能,禁用后Spark将不会自动修剪掉那些不满足条件的分区。 父主题: Spark SQL语法参考(即将下线)
  • 语法格式 1 2 3 4 5 CREATE FUNCTION [db_name.]function_name AS class_name [USING resource,...] resource: : JAR file_uri 或 1 2 3 4 5 CREATE OR REPLACE FUNCTION [db_name.]function_name AS class_name [USING resource,...] resource: : JAR file_uri
  • 示例 为了便于理解删除分区语句的使用方法,本节示例为您提供源数据,基于源数据提供删除分区的操作示例。 使用DataSource语法创建一个OBS表分区表。 创建了一个名为student的OBS分区表,表中有学生学号(id),学生姓名(name),学生院系编号(facultyNo)和学生班级编号(classNo),该表使用学生院系编号(facultyNo)和学生班级编号(classNo)进行分区。 1 2 3 4 5 6 7 8 create table if not exists student ( id int, name STRING, facultyNo int, classNo INT) using csv options (path 'obs://bucketName/filePath') partitioned by (faculytNo, classNo); 在表格中插入分区数据。 利用插入数据中的内容,可以插入以下数据 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 INSERT into student partition (facultyNo = 10, classNo = 101) values (1010101, "student01"), (1010102, "student02"); INSERT into student partition (facultyNo = 10, classNo = 102) values (1010203, "student03"), (1010204, "student04"); INSERT into student partition (facultyNo = 20, classNo = 101) values (2010105, "student05"), (2010106, "student06"); INSERT into student partition (facultyNo = 20, classNo = 102) values (2010207, "student07"), (2010208, "student08"); INSERT into student partition (facultyNo = 20, classNo = 103) values (2010309, "student09"), (2010310, "student10"); INSERT into student partition (facultyNo = 30, classNo = 101) values (3010111, "student11"), (3010112, "student12"); INSERT into student partition (facultyNo = 30, classNo = 102) values (3010213, "student13"), (3010214, "student14"); 查看分区。 利用查看指定表所有分区中的内容,可以查看相关的分区内容。 示例代码如下: SHOW partitions student; 表2 表数据示例 facultyNo classNo facultyNo=10 classNo=101 facultyNo=10 classNo=102 facultyNo=20 classNo=101 facultyNo=20 classNo=102 facultyNo=20 classNo=103 facultyNo=30 classNo=101 facultyNo=30 classNo=102 删除分区。 示例1:指定多个筛选条件删除分区 本示例删除facultyNo为20,classNo为103的分区; 如需按指定筛选条件删除分区请参考指定筛选条件删除分区(只支持OBS表)。 示例代码如下: ALTER TABLE student DROP IF EXISTS PARTITION (facultyNo=20, classNo=103); 重新利用第三步中的方法查看表中的分区,可以看到该分区被删除: SHOW partitions student; 示例2:指定单个筛选条件删除分区 本示例删除facultyNo为30的分区;在插入数据的过程中可以了解到,facultyNo为30的分区有两个。 如需按指定筛选条件删除分区请参考指定筛选条件删除分区(只支持OBS表)。 示例代码如下: ALTER TABLE student DROP IF EXISTS PARTITION (facultyNo = 30); 执行后结果: 表3 表数据示例 facultyNo classNo facultyNo=10 classNo=101 facultyNo=10 classNo=102 facultyNo=20 classNo=101 facultyNo=20 classNo=102 facultyNo=20 classNo=103
  • 参数说明 表1 参数描述 参数 描述 db_name Database名称,由字母、数字和下划线(_)组成。不能是纯数字,且不能以下划线开头。 table_name Database中的表名,由字母、数字和下划线(_)组成。不能是纯数字,且不能以下划线开头。匹配规则为:^(?!_)(?![0-9]+$)[A-Za-z0-9_$]*$。如果特殊字符需要使用单引号('')包围起来。 partition_specs 分区信息,key=value形式,key为分区字段,value为分区值。若分区字段为多个字段,可以不包含所有的字段,会删除匹配上的所有分区。“partition_specs”中的参数默认带有“( )”,例如:PARTITION (facultyNo=20, classNo=103);。
  • 个性化分析场景 场景简介:企业业务需求的多样化催生出对特定BI功能的需求,除了基础的数据查询和可视化分析,DataArts Insight提供了多种DataArts Insight能力,提供不同的数据视图和报表,以满足不同工作职责的用户需求实现个性化分析。 痛点:报表需求多、变化快,分析成本高,需求响应慢;BI使用门槛高,业务人员难上手。 优势:智能分析助手,自然语言交互的BI自助分析,无论是业务人员还是技术人员都能轻松获取和分析数据。
  • 审计 云审计 服务(Cloud Trace Service, CTS ),是华为 云安全 解决方案中专业的日志审计服务,提供对各种云资源操作记录的收集、存储和查询功能,可用于支撑安全分析、合规审计、资源跟踪和问题定位等常见应用场景。 CTS可记录的DataArts Insight操作列表详见云审计服务支持的DataArts Insight操作列表说明。用户开通云审计服务并创建和配置追踪器后,CTS开始记录操作事件用于审计。关于如何开通云审计服务以及如何查看追踪事件,请参考《云审计服务快速入门》中的相关章节。 CTS支持配置关键操作通知。用户可将与 IAM 相关的高危敏感操作,作为关键操作加入到CTS的实时监控列表中进行监控跟踪。当用户使用DataArts Insight服务时,如果触发了监控列表中的关键操作,那么CTS会在记录操作日志的同时,向相关订阅者实时发送通知。
  • 代码样例 Token认证机制支持API,用户可在二次开发样例的Producer()和Consumer()中对其进行配置。 Producer()配置的样例代码如下: public static Properties initProperties() { Properties props = new Properties(); KafkaProperties kafkaProc = KafkaProperties.getInstance(); // Broker地址列表 props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); // 客户端ID props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer")); // Key序列化类 props.put(KEY_SERIALIZER, kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); // Value序列化类 props.put(VALUE_SERIALIZER, kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); // 服务名 props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); // 域名 props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); // 分区类名 props.put(PARTITIONER_NAME, kafkaProc.getValues(PARTITIONER_NAME, "com.huawei.bigdata.kafka.example.SimplePartitioner")); // 生成Token配置 StringBuilder token = new StringBuilder(); String LINE_SEPARATOR = System.getProperty("line.separator"); token.append("org.apache.kafka.common.security.scram.ScramLoginModule required").append(LINE_SEPARATOR); /** * 用户自己生成的Token的TOKENID */ token.append("username=\"PPVz2cxuQC-okwJVZnFKFg\"").append(LINE_SEPARATOR); /** * 用户自己生成的Token的HMAC */ token.append("password=\"pL5nHsIUODg5u0dRM+o62cOIf/j6yATSt6uaPBYfIb29dj/jbpiAnRGSWDJ6tL4KXo89dot0axcRIDsMagyN4g==\"").append(LINE_SEPARATOR); token.append("tokenauth=true;"); // 用户使用的SASL机制,配置为SC RAM -SHA-512 props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", token.toString()); return props; } Consumer()配置的样例代码如下: public static Properties initProperties() { Properties props = new Properties(); KafkaProperties kafkaProc = KafkaProperties.getInstance(); // Broker连接地址 props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); // Group id props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer")); // 是否自动提交offset props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true")); // 自动提交offset的时间间隔 props.put(AUTO_COMMIT_INTERVAL_MS, kafkaProc.getValues(AUTO_COMMIT_INTERVAL_MS,"1000")); // 会话超时时间 props.put(SESSION_TIMEOUT_MS, kafkaProc.getValues(SESSION_TIMEOUT_MS, "30000")); // 消息Key值使用的反序列化类 props.put(KEY_DESERIALIZER, kafkaProc.getValues(KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); // 消息内容使用的反序列化类 props.put(VALUE_DESERIALIZER, kafkaProc.getValues(VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); // 安全协议类型 props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); // 服务名 props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); // 域名 props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); // 生成Token配置 StringBuilder token = new StringBuilder(); String LINE_SEPARATOR = System.getProperty("line.separator"); token.append("org.apache.kafka.common.security.scram.ScramLoginModule required").append(LINE_SEPARATOR); /** * 用户自己生成的Token的TOKENID */ token.append("username=\"PPVz2cxuQC-okwJVZnFKFg\"").append(LINE_SEPARATOR); /** * 用户自己生成的Token的HMAC */ token.append("password=\"pL5nHsIUODg5u0dRM+o62cOIf/j6yATSt6uaPBYfIb29dj/jbpiAnRGSWDJ6tL4KXo89dot0axcRIDsMagyN4g==\"").append(LINE_SEPARATOR); token.append("tokenauth=true;"); // 用户使用的SASL机制,配置为SCRAM-SHA-512 props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", token.toString()); return props; }
  • Flink常用接口 Flink主要使用到如下这几个类: StreamExecutionEnvironment:是Flink流处理的基础,提供了程序的执行环境。 DataStream:Flink用类DataStream来表示程序中的流式数据。用户可以认为它们是含有重复数据的不可修改的集合(collection),DataStream中元素的数量是无限的。 KeyedStream:DataStream通过keyBy分组操作生成流,通过设置的key值对数据进行分组。 WindowedStream:KeyedStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 AllWindowedStream:DataStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 ConnectedStreams:将两条DataStream流连接起来并且保持原有流数据的类型,然后进行map或者flatMap操作。 JoinedStreams:在窗口上对数据进行等值join操作(等值就是判断两个值相同的join,比如a.id = b.id),join操作是coGroup操作的一种特殊场景。 CoGroupedStreams:在窗口上对数据进行coGroup操作,可以实现流的各种join类型。 图1 Flink Stream的各种流类型转换
  • 准备本地应用开发环境 Kafka开发应用时,需要准备的开发和运行环境如表1所示: 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows 7以上版本。 运行环境:Windows系统或Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装和配置IntelliJ IDEA 开发环境的基本配置。版本要求:JDK使用1.8版本,IntelliJ IDEA使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 安装JDK 开发和运行环境的基本配置。版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端: Oracle JDK:支持1.8版本 IBM JDK:支持1.8.5.11版本 TaiShan客户端: OpenJDK:支持1.8.0_272版本 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 华为提供开源镜像站,各服务样例工程依赖的Jar包通过华为开源镜像站下载,剩余所依赖的开源Jar包请直接从Maven中央库或者其他用户自定义的仓库地址下载,详情请参考配置华为开源镜像仓。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-Zip 16.04版本。 父主题: 准备Kafka应用开发环境
  • 在Linux调测程序 编译并生成Jar包,并将Jar包复制到与依赖库文件夹同级的目录“src/main/resources”下,具体步骤请参考在Linux调测程序。 运行Consumer样例工程的命令如下。 java -cp /opt/client/lib/*:/opt/client/src/main/resources com.huawei.bigdata.kafka.example.Consumer
  • 提供分流能力 表8 提供分流能力的相关接口 API 说明 def split(selector: OutputSelector[T]): SplitStream[T] 传入OutputSelector,重写select方法确定分流的依据(即打标记),构建SplitStream流。即对每个元素做一个字符串的标记,作为选择的依据,打好标记之后就可以通过标记选出并新建某个标记的流。 def select(outputNames: String*): DataStream[T] 从一个SplitStream中选出一个或多个流。 outputNames指的是使用split方法对每个元素做的字符串标记的序列。
共100000条