华为云用户手册

  • 关键字 IF NOT EXISTS:指定该关键字以避免表已经存在时报错。 COMMENT:字段或表描述。 PARTITIONED BY:指定分区字段。 ROW FORMAT:行数据格式。 STORED AS:指定所存储的文件格式,当前该关键字只支持指定TEXTFILE, AVRO, ORC, SEQUENCEFILE, RCFILE, PARQUET几种格式。创建 DLI 表时必须指定此关键字。 TBLPROPERTIES:用于为表添加key/value的属性。 在表存储格式为PARQUET时,可以通过指定TBLPROPERTIES(parquet.compression = 'zstd')来指定表压缩格式为zstd。 AS:使用CTAS创建表。
  • 参数说明 表1 参数描述 参数 是否必选 描述 db_name 否 Database名称。 由字母、数字和下划线(_)组成。不能是纯数字,且不能以数字和下划线开头。 table_name 是 Database中的表名。 由字母、数字和下划线(_)组成。不能是纯数字,且不能以数字和下划线开头。匹配规则为:^(?!_)(?![0-9]+$)[A-Za-z0-9_$]*$。如果特殊字符需要使用单引号('')包围起来。 col_name 是 列字段名称。 列字段由字母、数字和下划线(_)组成。不能是纯数字,且至少包含一个字母。 列名为大小写不敏感,即不区分大小写。 col_type 是 列字段的数据类型。数据类型为原生类型。 请参考原生数据类型。 col_comment 否 列字段描述。仅支持字符串常量。 row_format 是 行数据格式。row format功能只支持textfile类型的表。 file_format 是 DLI表数据存储格式:支持textfile, avro, orc, sequencefile, rcfile, parquet。 table_comment 否 表描述。仅支持字符串常量。 key = value 否 设置TBLPROPERTIES具体属性和值。 在表存储格式为PARQUET时,可以通过指定TBLPROPERTIES(parquet.compression = 'zstd')来指定表压缩格式为zstd。 select_statement 否 用于CTAS命令,将源表的select查询结果或某条数据插入到新创建的DLI表中。
  • 示例2:创建DLI分区表 示例说明:创建一个名为student的分区表,该分区表使用院系编号(facultyNo)和班级编号(classNo)进行分区,该student表会同时按照不同的院系编号(facultyNo)和不同的班级编号(classNo)分区。 在实际的使用过程中,您可以选择合适的分区字段并将其添加到PARTITIONED BY关键字后。 1 2 3 4 5 6 7 8 9 CREATE TABLE IF NOT EXISTS student( id int, name STRING ) STORED AS avro PARTITIONED BY ( facultyNo INT, classNo INT );
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CREATE TABLE [IF NOT EXISTS] [db_name.]table_name [(col_name1 col_type1 [COMMENT col_comment1], ...)] [COMMENT table_comment] [PARTITIONED BY (col_name2 col_type2, [COMMENT col_comment2], ...)] [ROW FORMAT row_format] STORED AS file_format [TBLPROPERTIES (key = value)] [AS select_statement]; row_format: : SERDE serde_cls [WITH SERDEPROPERTIES (key1=val1, key2=val2, ...)] | DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] [COLLECTION ITEMS TERMINATED BY char] [MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char] [NULL DEFINED AS char]
  • 注意事项 CTAS建表语句不能指定表的属性。 Hive DLI表不支持在建表时指定多字符的分隔符。 关于分区表的使用说明: 创建分区表时,PARTITONED BY中指定分区列必须是不在表中的列,且需要指定数据类型。分区列支持string, boolean, tinyint, smallint, short, int, bigint, long, decimal, float, double, date, timestamp等hive开源支持的类型。 支持指定多个分区字段,分区字段只需在PARTITIONED BY关键字后指定,不能像普通字段一样在表名后指定,否则将出错。 单表分区数最多允许200000个。 Spark 3.3及以上版本支持使用Hive语法的CTAS语句创建分区表。
  • 示例 将流qualified_cars 的数据输出到文档数据库collectionTest。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CREATE SINK STREAM qualified_cars ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT ) WITH ( type = "dds", region = "xxx", db_url = "192.168.0.8:8635,192.168.0.130:8635/dbtest/collectionTest", username = "xxxxxxxxxx", password = "xxxxxxxxxx", field_names = "car_id,car_owner,car_age,average_speed,total_miles", batch_insert_data_num = "10" );
  • 前提条件 请务必确保您的账户下已在文档数据库服务(DDS)里创建了DDS实例。 如何创建DDS实例,请参考《文档数据库服务快速入门》中“快速购买文档数据库实例”章节。 目前仅支持未开启SSL认证的集群实例,不支持副本集与单节点的类型实例。 该场景作业需要运行在DLI的独享队列上,请确保已创建DLI独享队列。 关于如何创建DLI独享队列,在购买队列时,选择“按需计费”,勾选“专属资源模式”即可。具体操作请参见《 数据湖探索 用户指南》中创建队列章节。 确保DLI独享队列与DDS集群建立跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖 探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 语法格式 1 2 3 4 5 6 7 8 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "dds", username = "", password = "", db_url = "", field_names = "" );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,dds表示输出到文档数据库服务中。 username 是 数据库连接用户名。 password 是 数据库连接密码。 db_url 是 DDS实例的访问地址,形如:ip1:port,ip2:port/database/collection。 field_names 是 待插入数据字段的key,具体形式如:"f1,f2,f3",并且保证与sink中数据列一一对应。 batch_insert_data_num 否 表示一次性批量写入的数据量,值必须为正整数,默认值为10。
  • Union/Union ALL/Intersect/Except 语法格式 1 query UNION [ ALL ] | Intersect | Except query 语法说明 UNION返回多个查询结果的并集。 Intersect返回多个查询结果的交集。 Except返回多个查询结果的差集。 注意事项 集合运算是以一定条件将表首尾相接,所以其中每一个SELECT语句返回的列数必须相同,列的类型一定要相同,列名不一定要相同。 UNION默认是去重的,UNION ALL是不去重的。 示例 输出Orders1和Orders2的并集,不包含重复记录。 1 2 insert into temp SELECT * FROM Orders1 UNION SELECT * FROM Orders2;
  • IN 语法格式 1 2 3 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression WHERE column_name IN (value (, value)* ) | query 语法说明 IN操作符允许在where子句中规定多个值。若表达式在给定的表子查询中存在,则返回 true 。 注意事项 子查询表必须由单个列构成,且该列的数据类型需与表达式保持一致。 示例 输出Orders中NewProducts中product的user和amount信息。 1 2 3 4 5 insert into temp SELECT user, amount FROM Orders WHERE product IN ( SELECT product FROM NewProducts );
  • 示例 RDS表用于与输入流连接。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 CREATE TABLE car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, proctime as PROCTIME() ) WITH ( 'connector.type' = 'dis', 'connector.region' = 'cn-north-1', 'connector.channel' = 'disInput', 'format.type' = 'csv' ); CREATE TABLE db_info ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( 'connector.type' = 'gaussdb', 'connector.driver' = 'org.postgresql.Driver', 'connector.url' = 'jdbc:gaussdb://xx.xx.xx.xx:8000/xx', 'connector.table' = 'car_info', 'connector.username' = 'xx', 'connector.password' = 'xx', 'connector.lookup.cache.max-rows' = '10000', 'connector.lookup.cache.ttl' = '24h' ); CREATE TABLE audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( 'connector.type' = 'dis', 'connector.region' = 'cn-north-1', 'connector.channel' = 'disOutput', 'connector.partition-key' = 'car_id,car_owner', 'format.type' = 'csv' ); INSERT INTO audi_cheaper_than_30w SELECT a.car_id, b.car_owner, b.car_brand, b.car_price FROM car_infos as a join db_info FOR SYSTEM_TIME AS OF a.proctime AS b on a.car_id = b.car_id;
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector.type' = 'gaussdb', 'connector.url' = '', 'connector.table' = '', 'connector.username' = '', 'connector.password' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,需配置为'gaussdb' connector.url 是 jdbc连接地址,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。 connector.table 是 读取数据库中的数据所在的表名 connector.driver 否 jdbc连接驱动,默认为: org.postgresql.Driver。 connector.username 否 数据库认证用户名,需要和'connector.password'一起配置 connector.password 否 数据库认证密码,需要和'connector.username'一起配置 connector.read.partition.column 否 用于对输入进行分区的列名 与connector.read.partition.lower-bound、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.lower-bound 否 第一个分区的最小值 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.upper-bound 否 最后一个分区的最大值 与connector.read.partition.column、connector.read.partition.lower-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.num 否 分区的个数 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.upper-bound必须同时存在或者同时不存在 connector.read.fetch-size 否 每次从数据库拉取数据的行数。默认值为0,表示忽略该提示 connector.lookup.cache.max-rows 否 维表配置,缓存的最大行数,超过该值时,最先添加的数据将被标记为过期。-1表示不使用缓存。 connector.lookup.cache.ttl 否 维表配置,缓存超时时间,超过该时间的数据会被剔除。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。 connector.lookup.max-retries 否 维表配置,数据拉取最大重试次数,默认为3。
  • 示例 从Redis中读取数据。 create table redisSource( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'redis', 'connector.host' = 'xx.xx.xx.xx', 'connector.port' = '6379', 'connector.password' = 'xx', 'connector.table-name' = 'car_info' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,对于redis,需配置为'redis'。 connector.host 是 redis连接地址。 connector.port 是 redis连接端口。 connector.password 否 redis认证密码。 connector.deploy-mode 否 redis部署模式,支持standalone/cluster,默认standalone。 connector.table-name 否 table存储模式下必配,redis中存储表名。在table存储模式下,数据将以hash类型存储到redis,其中key为:${table-name}:${ext-key},field名为列名。 说明: table存储模式:将connector.table-name、connector.key-column作为redis的key。redis的hash类型,每个key对应一个hashmap,hashmap的hashkey为源表的字段名,hashvalue为源表的字段值。 connector.use-internal-schema 否 table存储模式下可配置,是否使用redis中已存在schema,默认为false。 connector.key-column 否 table存储模式下可配置,将该字段值作为redis中的ext-key,未配置时,ext-key为生成的uuid。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector.type' = 'redis', 'connector.host' = '', 'connector.port' = '' );
  • 示例 DataSource语法新建test_datasource_lifecycle表,生命周期为100天 1 2 3 CREATE TABLE test_datasource_lifecycle(id int) USING parquet TBLPROPERTIES( "dli.lifecycle.days"=100); Hive语法新建test_hive_lifecycle表,生命周期为100天。 1 2 3 CREATE TABLE test_hive_lifecycle(id int) stored as parquet TBLPROPERTIES( "dli.lifecycle.days"=100); DataSource语法新建test_datasource_lifecycle_obs表,生命周期为100天,过期时默认删除数据且数据备份至目录'obs://dli-test/'。 1 2 3 4 CREATE TABLE test_datasource_lifecycle_obs(name string, id int) USING parquet OPTIONS (path "obs://dli-test/xxx") TBLPROPERTIES( "dli.lifecycle.days"=100, "external.table.purge"='true', "dli.lifecycle.trash.dir"='obs://dli-test/Lifecycle-Trash' ); Hive语法新建test_hive_lifecycle_obs表,生命周期为100天,过期时默认删除数据且数据备份至目录'obs://dli-test/'。 1 2 3 4 CREATE TABLE test_hive_lifecycle_obs(name string, id int) STORED AS parquet LOCATION 'obs://dli-test/xxx' TBLPROPERTIES( "dli.lifecycle.days"=100, "external.table.purge"='true', "dli.lifecycle.trash.dir"='obs://dli-test/Lifecycle-Trash' );
  • 参数说明 表1 参数说明 参数名称 是否必选 参数说明 table_name 是 需要设置生命周期的表名。 dli.lifecycle.days 是 设置的生命周期时间,只能为正整数,单位为天。 external.table.purge 否 仅OBS表支持配置该参数。 是否需要在删除表或分区时,清除path路径下的数据。默认不删除。 设置'external.table.purge'='true'时: 非分区OBS表配置删除文件后,表目录也会删除。 分区OBS表自定义分区数据也会删除。 dli.lifecycle.trash.dir 否 仅OBS表支持配置该参数。 设置'external.table.purge'='true'时,清除数据的备份目录,默认七天后删除备份数据。
  • 语法格式 DataSource语法创建DLI表 CREATE TABLE table_name(name string, id int) USING parquet TBLPROPERTIES( "dli.lifecycle.days"=1 ); Hive语法创建DLI表 CREATE TABLE table_name(name string, id int) stored as parquet TBLPROPERTIES( "dli.lifecycle.days"=1 ); DataSource语法创建OBS表 CREATE TABLE table_name(name string, id int) USING parquet OPTIONS (path "obs://dli-test/table_name") TBLPROPERTIES( "dli.lifecycle.days"=1, "external.table.purge"='true', "dli.lifecycle.trash.dir"='obs://dli-test/Lifecycle-Trash' ); Hive语法创建OBS表 1 2 3 4 CREATE TABLE table_name(name string, id int) STORED AS parquet LOCATION 'obs://dli-test/table_name' TBLPROPERTIES( "dli.lifecycle.days"=1, "external.table.purge"='true', "dli.lifecycle.trash.dir"='obs://dli-test/Lifecycle-Trash' );
  • 表的回收规则 在创建表时通过TBLPROPERTIES指定表的生命周期。 非分区表 如果表是非分区表,根据每张表的最后修改时间,经过生命周期时间后判断是否要回收此表。 分区表 如果是分区表,则根据各分区的最后一次表数据被修改的时间(LAST_AC CES S_TIME)判断该分区是否该被回收。分区表的最后一个分区被回收后,该表不会被删除。 分区表不支持设置分区级的生命周期,仅支持表级别的生命周期管理。 生命周期回收为每天定时启动,扫描全量分区。 生命周期回收为每天定时启动,扫描全量分区的最后一次表数据被修改的时间(LAST_ACCESS_TIME)需要超过生命周期指定的时间才回收。 假设某个分区表生命周期为1天,该分区数据最后一次被修改的时间是2023年05月20日15时。如果在2023年05月20日15时之前扫描此表(不到一天),则不会回收表分区。如果2023年05月20日回收扫描时发现表分区最后一次表数据被修改的时间(LAST_ACCESS_TIME)超过生命周期指定的时间,则上述分区会被回收。 生命周期主要提供定期回收表或分区的功能,每天根据服务的繁忙程度,不定时回收。不能确保表或分区的生命周期到期后,立刻被回收。 删除表后,表的所有属性信息全部会删除,包括生命周期。新建同名表后,表的生命周期以新设置的属性为准。
  • 批作业SQL常用配置项说明 本章节为您介绍DLI 批作业SQL语法的常用配置项。 表1 常用配置项 名称 默认值 描述 spark.sql.files.maxRecordsPerFile 0 要写入单个文件的最大记录数。如果该值为零或为负,则没有限制。 spark.sql.shuffle.partitions 200 为连接或聚合过滤数据时使用的默认分区数。 spark.sql.dynamicPartitionOverwrite.enabled false 当前配置设置为“false”时,DLI在覆盖写之前,会删除所有符合条件的分区。例如,分区表中有一个“2021-01”的分区,当使用INSERT OVERWRITE语句向表中写入“2021-02”这个分区的数据时,会把“2021-01”的分区数据也覆盖掉。 当前配置设置为“true”时,DLI不会提前删除分区,而是在运行时覆盖那些有数据写入的分区。 spark.sql.files.maxPartitionBytes 134217728 读取文件时要打包到单个分区中的最大字节数。 spark.sql.badRecordsPath - Bad Records的路径。 spark.sql.legacy.correlated.scalar.query.enabled false 该参数设置为true: 当子查询中数据不重复的情况下,执行关联子查询,不需要对子查询的结果去重。 当子查询中数据重复的情况下,执行关联子查询,会提示异常,必须对子查询的结果做去重处理,比如max(),min()。 该参数设置为false: 不管子查询中数据重复与否,执行关联子查询时,都需要对子查询的结果去重,比如max(),min(),否则提示异常。 spark.sql.keep.distinct.expandThreshold - 参数说明: 对于包含count(distinct)的多维分析(with cube)的查询场景,spark典型的执行计划是将cube使用expand算子来实现,但该操作会导致查询膨胀,为了避免出现查询膨胀,建议执行如下配置: spark.sql.keep.distinct.expandThreshold: 默认值:-1,即使用Spark默认的expand算子。 设置具体数值:即代表定义了查询膨胀的阈值(例如512),超过该阈值count(distinct) 使用distinct聚合算子来执行,不再使用expand算子。 spark.sql.distinct.aggregator.enabled:强制使用distinct聚合算子的开关。配置为true时不再根据spark.sql.keep.distinct.expandThreshold来判断。 适用场景:包含count(distinct)的多维分析(with cube)的查询场景,可能包含多个count(distinct),且包含cube/roll up 典型场景示例: SELECT a1, a2, count(distinct b), count(distinct c) FROM test_distinct group by a1, a2 with cube spark.sql.distinct.aggregator.enabled false spark.sql.optimizer.dynamicPartitionPruning.enabled true 该配置项用于启用或禁用动态分区修剪。在执行SQL查询时,动态分区修剪可以帮助减少需要扫描的数据量,提高查询性能。 配置为true时,代表启用动态分区修剪,SQL会在查询中自动检测并删除那些不满足WHERE子句条件的分区,适用于在处理具有大量分区的表时。 如果SQL查询中包含大量的嵌套left join操作,并且表有大量的动态分区时,这可能会导致在数据解析时消耗大量的内存资源,导致Driver节点的内存不足,并触发频繁的Full GC。 在这种情况下,可以配置该参数为false即禁用动态分区修剪优化,有助于减少内存使用,避免内存溢出和频繁的Full GC。 但禁用此优化可能会降低查询性能,禁用后Spark将不会自动修剪掉那些不满足条件的分区。 父主题: Spark SQL语法参考(即将下线)
  • 语法格式 1 2 3 4 5 CREATE FUNCTION [db_name.]function_name AS class_name [USING resource,...] resource: : JAR file_uri 或 1 2 3 4 5 CREATE OR REPLACE FUNCTION [db_name.]function_name AS class_name [USING resource,...] resource: : JAR file_uri
  • 示例 为了便于理解删除分区语句的使用方法,本节示例为您提供源数据,基于源数据提供删除分区的操作示例。 使用DataSource语法创建一个OBS表分区表。 创建了一个名为student的OBS分区表,表中有学生学号(id),学生姓名(name),学生院系编号(facultyNo)和学生班级编号(classNo),该表使用学生院系编号(facultyNo)和学生班级编号(classNo)进行分区。 1 2 3 4 5 6 7 8 create table if not exists student ( id int, name STRING, facultyNo int, classNo INT) using csv options (path 'obs://bucketName/filePath') partitioned by (faculytNo, classNo); 在表格中插入分区数据。 利用插入数据中的内容,可以插入以下数据 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 INSERT into student partition (facultyNo = 10, classNo = 101) values (1010101, "student01"), (1010102, "student02"); INSERT into student partition (facultyNo = 10, classNo = 102) values (1010203, "student03"), (1010204, "student04"); INSERT into student partition (facultyNo = 20, classNo = 101) values (2010105, "student05"), (2010106, "student06"); INSERT into student partition (facultyNo = 20, classNo = 102) values (2010207, "student07"), (2010208, "student08"); INSERT into student partition (facultyNo = 20, classNo = 103) values (2010309, "student09"), (2010310, "student10"); INSERT into student partition (facultyNo = 30, classNo = 101) values (3010111, "student11"), (3010112, "student12"); INSERT into student partition (facultyNo = 30, classNo = 102) values (3010213, "student13"), (3010214, "student14"); 查看分区。 利用查看指定表所有分区中的内容,可以查看相关的分区内容。 示例代码如下: SHOW partitions student; 表2 表数据示例 facultyNo classNo facultyNo=10 classNo=101 facultyNo=10 classNo=102 facultyNo=20 classNo=101 facultyNo=20 classNo=102 facultyNo=20 classNo=103 facultyNo=30 classNo=101 facultyNo=30 classNo=102 删除分区。 示例1:指定多个筛选条件删除分区 本示例删除facultyNo为20,classNo为103的分区; 如需按指定筛选条件删除分区请参考指定筛选条件删除分区(只支持OBS表)。 示例代码如下: ALTER TABLE student DROP IF EXISTS PARTITION (facultyNo=20, classNo=103); 重新利用第三步中的方法查看表中的分区,可以看到该分区被删除: SHOW partitions student; 示例2:指定单个筛选条件删除分区 本示例删除facultyNo为30的分区;在插入数据的过程中可以了解到,facultyNo为30的分区有两个。 如需按指定筛选条件删除分区请参考指定筛选条件删除分区(只支持OBS表)。 示例代码如下: ALTER TABLE student DROP IF EXISTS PARTITION (facultyNo = 30); 执行后结果: 表3 表数据示例 facultyNo classNo facultyNo=10 classNo=101 facultyNo=10 classNo=102 facultyNo=20 classNo=101 facultyNo=20 classNo=102 facultyNo=20 classNo=103
  • 参数说明 表1 参数描述 参数 描述 db_name Database名称,由字母、数字和下划线(_)组成。不能是纯数字,且不能以下划线开头。 table_name Database中的表名,由字母、数字和下划线(_)组成。不能是纯数字,且不能以下划线开头。匹配规则为:^(?!_)(?![0-9]+$)[A-Za-z0-9_$]*$。如果特殊字符需要使用单引号('')包围起来。 partition_specs 分区信息,key=value形式,key为分区字段,value为分区值。若分区字段为多个字段,可以不包含所有的字段,会删除匹配上的所有分区。“partition_specs”中的参数默认带有“( )”,例如:PARTITION (facultyNo=20, classNo=103);。
  • 个性化分析场景 场景简介:企业业务需求的多样化催生出对特定BI功能的需求,除了基础的数据查询和可视化分析,DataArts Insight提供了多种DataArts Insight能力,提供不同的数据视图和报表,以满足不同工作职责的用户需求实现个性化分析。 痛点:报表需求多、变化快,分析成本高,需求响应慢;BI使用门槛高,业务人员难上手。 优势:智能分析助手,自然语言交互的BI自助分析,无论是业务人员还是技术人员都能轻松获取和分析数据。
  • 代码样例 Token认证机制支持API,用户可在二次开发样例的Producer()和Consumer()中对其进行配置。 Producer()配置的样例代码如下: public static Properties initProperties() { Properties props = new Properties(); KafkaProperties kafkaProc = KafkaProperties.getInstance(); // Broker地址列表 props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); // 客户端ID props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer")); // Key序列化类 props.put(KEY_SERIALIZER, kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); // Value序列化类 props.put(VALUE_SERIALIZER, kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); // 服务名 props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); // 域名 props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); // 分区类名 props.put(PARTITIONER_NAME, kafkaProc.getValues(PARTITIONER_NAME, "com.huawei.bigdata.kafka.example.SimplePartitioner")); // 生成Token配置 StringBuilder token = new StringBuilder(); String LINE_SEPARATOR = System.getProperty("line.separator"); token.append("org.apache.kafka.common.security.scram.ScramLoginModule required").append(LINE_SEPARATOR); /** * 用户自己生成的Token的TOKENID */ token.append("username=\"PPVz2cxuQC-okwJVZnFKFg\"").append(LINE_SEPARATOR); /** * 用户自己生成的Token的HMAC */ token.append("password=\"pL5nHsIUODg5u0dRM+o62cOIf/j6yATSt6uaPBYfIb29dj/jbpiAnRGSWDJ6tL4KXo89dot0axcRIDsMagyN4g==\"").append(LINE_SEPARATOR); token.append("tokenauth=true;"); // 用户使用的SASL机制,配置为SC RAM -SHA-512 props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", token.toString()); return props; } Consumer()配置的样例代码如下: public static Properties initProperties() { Properties props = new Properties(); KafkaProperties kafkaProc = KafkaProperties.getInstance(); // Broker连接地址 props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); // Group id props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer")); // 是否自动提交offset props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true")); // 自动提交offset的时间间隔 props.put(AUTO_COMMIT_INTERVAL_MS, kafkaProc.getValues(AUTO_COMMIT_INTERVAL_MS,"1000")); // 会话超时时间 props.put(SESSION_TIMEOUT_MS, kafkaProc.getValues(SESSION_TIMEOUT_MS, "30000")); // 消息Key值使用的反序列化类 props.put(KEY_DESERIALIZER, kafkaProc.getValues(KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); // 消息内容使用的反序列化类 props.put(VALUE_DESERIALIZER, kafkaProc.getValues(VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); // 安全协议类型 props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); // 服务名 props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); // 域名 props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); // 生成Token配置 StringBuilder token = new StringBuilder(); String LINE_SEPARATOR = System.getProperty("line.separator"); token.append("org.apache.kafka.common.security.scram.ScramLoginModule required").append(LINE_SEPARATOR); /** * 用户自己生成的Token的TOKENID */ token.append("username=\"PPVz2cxuQC-okwJVZnFKFg\"").append(LINE_SEPARATOR); /** * 用户自己生成的Token的HMAC */ token.append("password=\"pL5nHsIUODg5u0dRM+o62cOIf/j6yATSt6uaPBYfIb29dj/jbpiAnRGSWDJ6tL4KXo89dot0axcRIDsMagyN4g==\"").append(LINE_SEPARATOR); token.append("tokenauth=true;"); // 用户使用的SASL机制,配置为SCRAM-SHA-512 props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", token.toString()); return props; }
  • Flink常用接口 Flink主要使用到如下这几个类: StreamExecutionEnvironment:是Flink流处理的基础,提供了程序的执行环境。 DataStream:Flink用类DataStream来表示程序中的流式数据。用户可以认为它们是含有重复数据的不可修改的集合(collection),DataStream中元素的数量是无限的。 KeyedStream:DataStream通过keyBy分组操作生成流,通过设置的key值对数据进行分组。 WindowedStream:KeyedStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 AllWindowedStream:DataStream通过window窗口函数生成的流,设置窗口类型并且定义窗口触发条件,然后在窗口数据上进行一些操作。 ConnectedStreams:将两条DataStream流连接起来并且保持原有流数据的类型,然后进行map或者flatMap操作。 JoinedStreams:在窗口上对数据进行等值join操作(等值就是判断两个值相同的join,比如a.id = b.id),join操作是coGroup操作的一种特殊场景。 CoGroupedStreams:在窗口上对数据进行coGroup操作,可以实现流的各种join类型。 图1 Flink Stream的各种流类型转换
  • 准备本地应用开发环境 Kafka开发应用时,需要准备的开发和运行环境如表1所示: 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows 7以上版本。 运行环境:Windows系统或Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装和配置IntelliJ IDEA 开发环境的基本配置。版本要求:JDK使用1.8版本,IntelliJ IDEA使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 安装JDK 开发和运行环境的基本配置。版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端: Oracle JDK:支持1.8版本 IBM JDK:支持1.8.5.11版本 TaiShan客户端: OpenJDK:支持1.8.0_272版本 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 华为提供开源镜像站,各服务样例工程依赖的Jar包通过华为开源镜像站下载,剩余所依赖的开源Jar包请直接从Maven中央库或者其他用户自定义的仓库地址下载,详情请参考配置华为开源镜像仓。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-Zip 16.04版本。 父主题: 准备Kafka应用开发环境
  • 在Linux调测程序 编译并生成Jar包,并将Jar包复制到与依赖库文件夹同级的目录“src/main/resources”下,具体步骤请参考在Linux调测程序。 运行Consumer样例工程的命令如下。 java -cp /opt/client/lib/*:/opt/client/src/main/resources com.huawei.bigdata.kafka.example.Consumer
共100000条