华为云用户手册

  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,“opentsdb”表示输出到 表格存储服务 CloudTable(OpenTSDB)。 region 是 表格存储 服务所在区域。 cluster_id 否 待插入数据所属集群的id,该参数与tsdb_link_address必须指定其中一个。 tsdb_metrics 是 数据点的metric,支持参数化。 tsdb_timestamps 是 数据点的timestamp,数据类型支持LONG、INT、SHORT和STRING,仅支持指定动态列。 tsdb_values 是 数据点的value,数据类型支持SHORT、INT、LONG、FLOAT、DOUBLE和STRING,支持指定动态列或者常数值。 tsdb_tags 是 数据点的tags,每个tags里面至少一个标签值,最多8个标签值,支持参数化。 batch_insert_data_num 否 表示一次性批量写入的数据量(即数据条数),值必须为正整数,上限为65536,默认值为8。 tsdb_link_address 否 待插入数据所属集群的OpenTsdb链接地址,使用该参数时,作业需要运行在独享 DLI 队列,且DLI队列需要与CloudTable集群建立增强型跨源,该参数与cluster_id必须指定其中一个。 说明: 如何建立增强型跨源连接,请参考《 数据湖探索 用户指南》中增强型跨源连接章节。
  • 功能描述 DLI将Flink作业的输出数据输出到CloudTable的OpenTSDB中。OpenTSDB是基于HBase的分布式的,可伸缩的时间序列数据库。它存储的是时间序列数据,时间序列数据是指在不同时间点上收集到的数据,这类数据反映了一个对象随时间的变化状态或程度。支持秒级别数据的采集监控,进行永久存储,索引和查询,可用于系统监控和测量、物联网数据、金融数据和科学实验结果数据的收集监控。 表格存储服务(CloudTable),是基于Apache HBase提供的分布式、可伸缩、全托管的KeyValue数据存储服务,为DLI提供了高性能的随机读写能力,适用于海量结构化数据、半结构化数据以及时序数据的存储和查询应用,适用于物联网IOT应用和通用海量KeyValue数据存储与查询等场景。CloudTable的更多信息,请参见《表格存储服务用户指南》。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "opentsdb", region = "", cluster_id = "", tsdb_metrics = "", tsdb_timestamps = "", tsdb_values = "", tsdb_tags = "", batch_insert_data_num = "" )
  • 示例 将流weather_out的数据输出到表格存储服务CloudTable的OpenTSDB中。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CREATE SINK STREAM weather_out ( timestamp_value LONG, /* 时间 */ temperature FLOAT, /* 温度值 */ humidity FLOAT, /* 湿度值 */ location STRING /* 地点 */ ) WITH ( type = "opentsdb", region = "xxx", cluster_id = "e05649d6-00e2-44b4-b0ff-7194adaeab3f", tsdb_metrics = "weather", tsdb_timestamps = "${timestamp_value}", tsdb_values = "${temperature}; ${humidity}", tsdb_tags = "location:${location},signify:temperature; location:${location},signify:humidity", batch_insert_data_num = "10" );
  • 前提条件 确保已创建Kafka集群。 该场景作业需要运行在DLI的独享队列上,因此要与kafka集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖 探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' WATERMARK FOR rowtime_column_name AS watermark-strategy_expression) ) with ( 'connector' = 'kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'properties.group.id' = '', 'scan.startup.mode' = '', 'format' = '' );
  • 常见问题 Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决? org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata 跨源未绑定或未绑定成功,或是Kafka集群安全组未配置放通DLI队列的网段地址。参考增强型跨源连接重新配置跨源,或者Kafka集群安全组放通DLI队列的网段地址。 Flink作业运行失败,作业运行日志中如下报错信息,应该怎么解决? Caused by: java.lang.RuntimeException: RealLine:45;Table 'default_catalog.default_database.printSink' declares persistable metadata columns, but the underlying DynamicTableSink doesn't implement the SupportsWritingMetadata interface. If the column should not be persisted, it can be declared with the VIRTUAL keyword. sink表中定义了metadata类型,但是Print connector并不支持把sink表中的matadata去掉即可。
  • 示例 完整的SQL作业提交流程您可以参考《快速入门》中的《提交SQL作业》等章节描述。 队列是使用DLI服务的基础,执行SQL前需要先创建队列。具体可以参考《用户指南》中的“创建队列”章节。 在DLI管理控制台,单击左侧导航栏中的“SQL编辑器”,可进入SQL作业“SQL编辑器”页面。 在“SQL编辑器”页面右侧的编辑窗口中,输入如下创建数据库的SQL语句,单击“执行”。阅读并同意隐私协议,单击“确定”。 若testdb数据库不存在,则创建数据库testdb。 1 CREATE DATABASE IF NOT EXISTS testdb;
  • 示例7:创建表并设置多字符的分割符 示例说明:创建了一个名为table5的Hive表。表指定序列化和反序列化类ROW FORMAT SERDE,字段之间的分隔符被设置为/#,并且数据以文本文件格式存储。 只有指定ROW FORMAT SERDE为org.apache.hadoop.hive.contrib.serde2.MultiDelimitSerDe时,字段分隔符才支持设置为多字符。 只有Hive OBS表支持在建表时指定多字符的分隔符,Hive DLI表不支持在建表时指定多字符的分隔符。 指定了多字符分隔的表不支持INSERT、IMPORT等写数语句。如需添加数据,请将数据文件直接放到表对应的OBS路径下即可,本例中,将数据文件放到obs://bucketName/filePath下。 本例指定字段分割符 field.delim'为“/#”。 ROW FORMAT功能只支持textfile类型的表。 1 2 3 4 5 6 7 8 9 10 CREATE TABLE IF NOT EXISTS table5 ( col_1 STRING, col_2 INT ) ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.MultiDelimitSerDe' WITH SERDEPROPERTIES ( 'field.delim' = '/#' ) STORED AS textfile LOCATION 'obs://bucketName/filePath';
  • 关键字 EXTERNAL:指创建OBS表。 IF NOT EXISTS:指定该关键字以避免表已经存在时报错。 COMMENT:字段或表描述。 PARTITIONED BY:指定分区字段。 ROW FORMAT:行数据格式。 STORED AS:指定所存储的文件格式,当前该关键字只支持指定TEXTFILE, AVRO, ORC, SEQUENCEFILE, RCFILE, PARQUET格式。 LOCATION:指定OBS的路径。创建OBS表时必须指定此关键字。 TBLPROPERTIES:TBLPROPERTIES子句允许用户给表添加key/value的属性。 开启数据多版本功能,用于表数据的备份与恢复。开启多版本功能后,在进行删除或修改表数据时(insert overwrite或者truncate操作),系统会自动备份历史表数据并保留一定时间,后续您可以对保留周期内的数据进行快速恢复,避免因误操作而丢失数据。多版本功能SQL语法请参考开启或关闭数据多版本和多版本备份恢复数据。 创建OBS表时,通过指定TBLPROPERTIES ("dli.multi.version.enable"="true")开启DLI数据多版本功能,具体可以参考示例说明。 表1 TBLPROPERTIES主要参数说明 key值 value说明 dli.multi.version.enable true:开启DLI数据多版本功能。 false:关闭DLI数据多版本功能。 comment 表描述信息。 orc.compress orc存储格式表的一个属性,用来指定orc存储的压缩方式。支持取值为: ZLIB SNAPPY NONE PARQUET auto.purge 当设置为true时,删除或者覆盖的数据会不经过回收站,直接被删除。 AS:使用CTAS创建表。 ROW FORMAT SERDE为org.apache.hadoop.hive.contrib.serde2.MultiDelimitSerDe时,字段分隔符才支持设置为多字符。使用方法参考示例7:创建表并设置多字符的分割符。
  • 参数说明 表2 参数说明 参数 是否必选 描述 db_name 否 Database名称。 由字母、数字和下划线(_)组成。不能是纯数字,且不能以数字和下划线开头。 table_name 是 Database中的表名。 由字母、数字和下划线(_)组成。不能是纯数字,且不能以数字和下划线开头。匹配规则为:^(?!_)(?![0-9]+$)[A-Za-z0-9_$]*$。 特殊字符需要使用单引号('')包围起来。 表名对大小写不敏感,即不区分大小写。 col_name 是 列字段名称。 列字段由字母、数字和下划线(_)组成。不能是纯数字,且至少包含一个字母。 列名为大小写不敏感,即不区分大小写。 col_type 是 列字段的数据类型。数据类型为原生类型。 请参考原生数据类型。 col_comment 否 列字段描述。仅支持字符串常量。 row_format 是 行数据格式。row_format功能只支持textfile类型的表。 file_format 是 OBS表存储格式,支持TEXTFILE, AVRO, ORC, SEQUENCEFILE, RCFILE, PARQUET table_comment 否 表描述。仅支持字符串常量。 obs_path 是 数据文件所在的OBS存储路径,推荐使用OBS并行文件系统存储。 格式:obs://bucketName/tblPath bucketName即桶名称。 tblPath是目录名称。目录后不需要指定文件名。 当OBS的目录下文件夹与文件同名时,创建OBS表指向的路径会优先指向文件而非文件夹。 key = value 否 设置TBLPROPERTIES具体属性和值。 例如开启DLI数据多版本时,可以设置"dli.multi.version.enable"="true"来开启该功能。 select_statement 否 用于CTAS命令,将源表的select查询结果或某条数据插入到新创建的OBS表中。
  • 示例5:创建OBS分区表,自定义表的TBLPROPERTIES参数 示例说明:创建名为table3,并以col_3为分区依据的OBS分区表。在TBLPROPERTIES中配置dli.multi.version.enable、comment、orc.compress和auto.purge。 dli.multi.version.enable:本例配置为true,即代表开启DLI数据多版本功能,用于表数据的备份与恢复。 comment:表描述信息,comment描述信息支持后续修改。 orc.compress:指定orc存储的压缩方式,本例定义为ZLIB。 auto.purge:本例配置为true,即删除或者覆盖的数据会不经过回收站,直接被删除。 1 2 3 4 5 6 7 8 9 10 11 12 13 CREATE TABLE IF NOT EXISTs table3 ( col_1 STRING, col_2 STRING ) PARTITIONED BY (col_3 DATE) STORED AS rcfile LOCATION 'obs://bucketName/filePath' TBLPROPERTIES ( dli.multi.version.enable = true, comment = 'Created by dli', orc.compress = 'ZLIB', auto.purge = true );
  • 示例6:创建textfile格式的非分区表,并设置ROW FORMAT 示例说明:创建名为table4的textfile类型的非分区表,并设置ROW FORMAT(ROW FORMAT功能只支持textfile类型的表)。 FIELDS:字段表格中的列,每个字段有一个名称和数据类型,表中字段之间以'/'分隔。 COLLECTION ITEMS:集合项指的是一组数据中的元素,可以是数组、列表或集合等,表中集合项以'$'分隔。 MAP KEYS:映射键是一种键值对的数据结构,用于存储一组相关联的数据,表中Map键以'#'分隔。 LINES:表格中的行,每一行包含一组字段值,表中行以'\n'结束(注意,只支持用'\n'作为行分隔符)。 NULL:表示缺少值或未知值的特殊值。在表格中,NULL表示该字段没有值或该值未知。如果数据中存在null值,则用字符串“null”表示。 1 2 3 4 5 6 7 8 9 10 11 12 CREATE TABLE IF NOT EXISTS table4 ( col_1 STRING, col_2 INT ) STORED AS textfile LOCATION 'obs://bucketName/filePath' ROW FORMAT DELIMITED FIELDS TERMINATED BY '/' COLLECTION ITEMS TERMINATED BY '$' MAP KEYS TERMINATED BY '#' LINES TERMINATED BY '\n' NULL DEFINED AS 'null';
  • 示例2:创建OBS分区表 示例说明:创建一个名为student的分区表,该分区表使用院系编号(facultyNo)和班级编号(classNo)进行分区,该student表会同时按照不同的院系编号(facultyNo)和不同的班级编号(classNo)分区。 在实际的使用过程中,您可以选择合适的分区字段并将其添加到PARTITIONED BY关键字后。 1 2 3 4 5 6 7 8 9 10 CREATE TABLE IF NOT EXISTS student( id INT, name STRING ) STORED AS avro LOCATION 'obs://bucketName/filePath' PARTITIONED BY ( facultyNo INT, classNo INT );
  • 示例1:创建OBS非分区表 示例说明:创建名为table1的OBS非分区表,并用STORED AS关键字指定该表的存储格式为orc格式。 在您的实际使用中,可以将OBS表存储为textfile, avro, orc, sequencefile, rcfile, parquet等类型。 1 2 3 4 5 6 CREATE TABLE IF NOT EXISTS table1 ( col_1 STRING, col_2 INT ) STORED AS orc LOCATION 'obs://bucketName/filePath';
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name [(col_name1 col_type1 [COMMENT col_comment1], ...)] [COMMENT table_comment] [PARTITIONED BY (col_name2 col_type2, [COMMENT col_comment2], ...)] [ROW FORMAT row_format] [STORED AS file_format] LOCATION 'obs_path' [TBLPROPERTIES (key = value)] [AS select_statement] row_format: : SERDE serde_cls [WITH SERDEPROPERTIES (key1=val1, key2=val2, ...)] | DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] [COLLECTION ITEMS TERMINATED BY char] [MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char] [NULL DEFINED AS char]
  • 注意事项 创建表时会统计大小。 添加数据时不会修改大小。 如需查看表大小可以通过OBS查看。 CTAS建表语句不能指定表的属性。 关于分区表的使用说明: 创建分区表时,PARTITONED BY中指定分区列必须是不在表中的列,且需要指定数据类型。分区列支持string, boolean, tinyint, smallint, short, int, bigint, long, decimal, float, double, date, timestamp等hive开源支持的类型。 支持指定多个分区字段,分区字段只需在PARTITIONED BY关键字后指定,不能像普通字段一样在表名后指定,否则将出错。 单表分区数最多允许200000个。 Spark 3.3及以上版本支持使用Hive语法的CTAS语句创建分区表。 关于创建表时设置多字符的分隔符: 只有指定ROW FORMAT SERDE为org.apache.hadoop.hive.contrib.serde2.MultiDelimitSerDe时,字段分隔符才支持设置为多字符。 只有Hive OBS表支持在建表时指定多字符的分隔符,Hive DLI表不支持在建表时指定多字符的分隔符。 指定了多字符分隔的表不支持INSERT、IMPORT等写数语句。如需添加数据,请将数据文件直接放到表对应的OBS路径下即可,例如示例7:创建表并设置多字符的分割符中,将数据文件放到obs://bucketName/filePath下。
  • 表达式GROUP BY 功能描述 按表达式对流进行分组操作。 语法格式 1 2 3 4 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] 语法说明 groupItem:可以是单字段,多字段,也可以是字符串函数等调用,不能是聚合函数。 注意事项 无 示例 先利用substring函数取字段name的子字符串,并按照该子字符串进行分组,返回每个子字符串及对应的记录数。 1 2 insert into temp SELECT substring(name,6),count(name) FROM student GROUP BY substring(name,6);
  • Grouping sets, Rollup, Cube 功能描述 GROUPING SETS 的 GROUP BY 子句可以生成一个等效于由多个简单 GROUP BY 子句的 UNION ALL 生成的结果集,并且其效率比 GROUP BY 要高。 ROLLUP与CUBE按一定的规则产生多种分组,然后按各种分组统计数据。 CUBE生成的结果集显示了所选列中值的所有组合的聚合。 Rollup生成的结果集显示了所选列中值的某一层次结构的聚合。 语法格式 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY groupingItem] 语法说明 groupingItem:是Grouping sets(columnName [, columnName]*)、Rollup(columnName [, columnName]*)、Cube(columnName [, columnName]*) 注意事项 无 示例 分别产生基于user和product的结果 INSERT INTO temp SELECT SUM(amount) FROM Orders GROUP BY GROUPING SETS ((user), (product));
  • 按列GROUP BY 功能描述 按列进行分组操作。 语法格式 1 2 3 4 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] 语法说明 GROUP BY:按列可分为单列GROUP BY与多列GROUP BY。 单列GROUP BY:指GROUP BY子句中仅包含一列。 多列GROUP BY:指GROUP BY子句中不止一列,查询语句将按照GROUP BY的所有字段分组,所有字段都相同的记录将被放在同一组中。 注意事项 GroupBy在流处理表中会产生更新结果 示例 根据score及name两个字段对表student进行分组,并返回分组结果。 1 2 insert into temp SELECT name,score, max(score) FROM student GROUP BY name,score;
  • Union/Union ALL/Intersect/Except 语法格式 1 query UNION [ ALL ] | Intersect | Except query 语法说明 UNION返回多个查询结果的并集。 Intersect返回多个查询结果的交集。 Except返回多个查询结果的差集。 注意事项 集合运算是以一定条件将表首尾相接,所以其中每一个SELECT语句返回的列数必须相同,列的类型一定要相同,列名不一定要相同。 UNION默认是去重的,UNION ALL是不去重的。 示例 输出Orders1和Orders2的并集,不包含重复记录。 1 2 insert into temp SELECT * FROM Orders1 UNION SELECT * FROM Orders2;
  • IN 语法格式 1 2 3 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression WHERE column_name IN (value (, value)* ) | query 语法说明 IN操作符允许在where子句中规定多个值。若表达式在给定的表子查询中存在,则返回 true 。 注意事项 子查询表必须由单个列构成,且该列的数据类型需与表达式保持一致。 示例 输出Orders中NewProducts中product的user和amount信息。 1 2 3 4 5 insert into temp SELECT user, amount FROM Orders WHERE product IN ( SELECT product FROM NewProducts );
  • 功能描述 创建OBS分区表成功后,OBS表实际还没有生成分区信息。生成分区信息主要有以下两种场景: 给OBS分区表插入对应的分区数据,数据插入成功后OBS表才会生成分区元数据信息,后续则可以根据对应分区列进行查询等操作。 手工拷贝分区目录和数据到OBS分区表路径下,执行本章节介绍的分区添加命令生成分区元数据信息,后续即可根据对应分区列进行查询等操作。 本章节重点介绍使用ALTER TABLE命令添加分区的基本操作和使用说明。
  • 示例 建OBS表时仅有一个分区列,建表成功后添加分区数据。 先使用DataSource语法创建一个OBS分区表,分区列为external_data,数据存储在obs://bucketName/datapath路径下。 create table testobstable(id varchar(128), external_data varchar(16)) using JSON OPTIONS (path 'obs://bucketName/datapath') PARTITIONED by (external_data); 拷贝分区数据目录到obs://bucketName/datapath路径下。例如当前拷贝external_data=22的分区目录下所有文件到obs://bucketName/datapath路径下。 执行添加分区命令,将分区的元数据信息生效。 ALTER TABLE testobstable ADD PARTITION (external_data='22') LOCATION 'obs://bucketName/datapath/external_data=22'; 添加分区成功后,即可根据分区列进行数据查询等操作。 select * from testobstable where external_data='22'; 建OBS表时有多个分区列,建表成功后添加分区数据。 先使用DataSource语法创建一个OBS分区表,分区列为external_data和dt,数据存储在obs://bucketName/datapath路径下。 1 2 3 4 5 create table testobstable( id varchar(128), external_data varchar(16), dt varchar(16) ) using JSON OPTIONS (path 'obs://bucketName/datapath') PARTITIONED by (external_data, dt); 拷贝分区数据目录到obs://bucketName/datapath路径下。例如拷贝external_data=22及其子目录dt=2021-07-27和目录下文件到obs://bucketName/datapath路径下。 执行添加分区命令,将分区的元数据信息生效。 1 2 3 4 ALTER TABLE testobstable ADD PARTITION (external_data = '22', dt = '2021-07-27') LOCATION 'obs://bucketName/datapath/external_data=22/dt=2021-07-27'; 添加分区成功后,即可根据分区列进行数据查询等操作。 1 2 select * from testobstable where external_data = '22'; select * from testobstable where external_data = '22' and dt='2021-07-27';
  • 注意事项 向表中添加分区时,此表和分区列(建表时PARTITIONED BY指定的列)必须已存在,而所要添加的分区不能重复添加,否则将出错。已添加的分区可通过IF NOT EXISTS避免报错。 若分区表是按照多个字段进行分区的,添加分区时需要指定所有的分区字段,指定字段的顺序可任意。 “partition_specs”中的参数默认带有“( )”。例如:PARTITION (dt='2009-09-09',city='xxx')。 在添加分区时若指定OBS路径,则该OBS路径必须是已经存在的,否则会出错。 若添加多个分区,每组PARTITION partition_specs LOCATION 'obs_path'之间用空格隔开。例如: PARTITION partition_specs LOCATION 'obs_path' PARTITION partition_specs LOCATION 'obs_path'。 若新增分区指定的路径包含子目录(或嵌套子目录),则子目录下面的所有文件类型及内容也将作为该分区的记录。 您需要保证该分区目录下所有文件类型和文件内容与表的字段一致,否则查询将报错。 您可以在建表语句OPTIONS中设置“multiLevelDirEnable”为true以查询子目录下的内容,此参数默认值为false(注意,此配置项为表属性,请谨慎配置。Hive表不支持此配置项)。
  • 示例 将数据输出到Kafka中。 示例一 1 2 3 4 5 6 7 CREATE SINK STREAM kafka_sink (name STRING) WITH ( type="kafka", kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_topic = "testsink", encode = "json" ); 示例二 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 CREATE SINK STREAM kafka_sink ( a1 string, a2 string, a3 string, a4 INT ) // 输出字段 WITH ( type="kafka", kafka_bootstrap_servers = "192.x.x.x:9093, 192.x.x.x:9093, 192.x.x.x:9093", kafka_topic = "testflink", // 写入的topic encode = "csv", // 编码格式,支持json/csv kafka_certificate_name = "Flink", kafka_properties_delimiter = ",", kafka_properties = "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"xxx\";,sasl.mechanism=PLAIN,security.protocol=SASL_SSL" );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,"kafka"表示输出到Kafka中。 kafka_bootstrap_servers 是 Kafka的连接端口,需要确保能连通(需要通过增强型跨源开通DLI队列和Kafka集群的连接)。 kafka_topic 是 写入的topic。 encode 是 编码格式,当前支持“json”和“user_defined”。 若编码格式为“user_defined”,则需配置“encode_class_name”和“encode_class_parameter”属性。 encode_class_name 否 当encode为user_defined时,需配置该参数,指定用户自实现编码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 encode_class_parameter 否 当encode为user_defined时,可以通过配置该参数指定用户自实现编码类的入参,仅支持一个string类型的参数。 krb_auth 否 创建跨源认证的认证名。开启kerberos认证时,需配置该参数。如果创建的 MRS 集群未开启kerb认证的集群,请确保在DLI队列host文件中添加MRS集群master节点的“/etc/hosts”信息。 kafka_properties 否 可通过该参数配置kafka的原生属性,格式为"key1=value1;key2=value2" kafka_certificate_name 否 跨源认证信息名称。跨源认证信息类型为“Kafka_SSL”时,该参数有效。 说明: 指定该配置项时,服务仅加载该认证下指定的文件和密码,系统将自动设置到“kafka_properties”属性中。 Kafka SSL认证需要的其他配置信息,需要用户手动在“kafka_properties”属性中配置。
  • 前提条件 Kafka服务端的端口如果监听在hostname上,则需要将Kafka Broker节点的hostname和IP的对应关系添加到DLI队列中。Kafka Broker节点的hostname和IP请联系Kafka服务的部署人员。如何添加IP 域名 映射,请参见《数据湖探索用户指南》中修改主机信息章节。 Kafka是线下集群,需要通过增强型跨源连接功能将Flink作业与Kafka进行对接。且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 语法格式 1 2 3 4 5 6 7 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH( type = "kafka", kafka_bootstrap_servers = "", kafka_topic = "", encode = "json" )
  • 注意事项 privilege必须为赋权对象在resource中的已授权限,否则会回收失败。Privilege支持的权限类型可参见数据权限列表。 resource可以是queue、database、table、view、column,格式分别为: queue的格式为:queues.queue_name database的格式为:databases.db_name table的格式为:databases.db_name.tables.table_name view的格式为:databases.db_name.tables.view_name column的格式为:databases.db_name.tables.table_name.columns.column_name
共100000条