华为云用户手册

  • 弹性资源池计费模式 弹性资源池为 DLI 作业运行提供计算资源。弹性资源池支持三种计费模式: 包年包月:是预付费模式,按订单的购买周期计费。拥有专属的计算资源,空闲(无作业运行)时不会释放,使用体验更佳,价格比按需计费模式更优惠。 适用于可预估资源使用周期的场景,例如已完成开发进入生产阶段的项目,推荐使用包年包月计费模式预留业务所需资源。 包年包月的弹性资源池在使用过程中规格内按包周期计费,超出规格部分按需计费。 按需计费:按需计费是一种后付费模式,默认勾选专属资源模式,空闲时资源不被释放。 弹性资源池的按需模式默认勾选专属资源模式,自创建起根据购买的实际CU按自然小时收费,秒级计费,按小时结算。计算费用=单价*实际CU数*小时数。 按需计费模式适用于测试项目,资源消耗不高,按需计费成本更低。 套餐包:DLI支持购买弹性资源池CU时套餐包,购买后在弹性资源池中提交作业按CU时计费。建议购买弹性资源池CU时套餐包和按需计费模式结合使用, 购买了弹性资源池CU时套餐包时,按需使用过程中优先抵扣弹性资源池CU时套餐包的规格额度,超过额度的使用量按照按需计费。弹性资源池CU时套餐包额度每个月都会重置,例如1月5日订购套餐包,之后每月5日同一时间重置免费资源。 更多套餐包计费信息请参考套餐包计费。 本文将介绍按需计费的DLI弹性资源池的计费规则。
  • 适用场景 包年/包月:该计费模式需要用户预先支付一定时长的费用,适用于长期、稳定的业务需求。 按需计费:按需计费模式的弹性资源池适用于测试项目场景,作业随机性大,数据量小,资源消耗不高,按需计费成本更低,无需任何预付款。 包年/包月+按需计费:假设弹性资源池的规格为64CU,实际使用过程中大部分时间CU数在128CU以上,没有规格变更的场景下64CU部分采用包年包月计费,超出的64CU按弹性资源池CU时计费方式计费。 为了满足该场景下更优惠的计费,则可以通过规格变更的方式,将弹性资源池的规格扩大到128CU,则规格变更成功后128CU范围内都使用包年包月计费,整体相比原来更优惠。详细操作指导请参考弹性资源池规格变更。 按需计费+套餐包:适用于可预估队列使用量的场景、或测试项目等资源消耗不高的场景。使用过程中优先抵扣弹性资源池CU时套餐包的规格额度,超过额度的使用量按照按需计费。
  • 适用场景 通常情况下,建议您针对不同的业务创建项目: 开发项目:在此项目下,大多是工程师开发调试时使用,作业随机性大,数据量小。针对这种情况,建议您使用按CU时计费模式,能够帮您有效控制成本,将资源消耗控制在一定范围内。如果您有短时的专属资源需求,也可以在购买按需队列时勾选专属资源模式,享受资源专属。 生产项目:在此项目下,您的作业相对稳定(经过开发调试再上线),建议您使用包年/包月计费模式,可以更优惠。同时,按需计费的队列,在空闲1小时后(队列空闲是指该队列上没有正在运行的作业),系统会自动释放计算资源,再次使用时,需要重新分配计算资源,可能会耗费5~10min时间。使用包年/包月计费模式则可避免这种情况,节省等待时间。
  • 计费项 DLI的计费项包括计算计费、存储计费、扫描量计费。DLI的计费详情请参见DLI产品价格详情。您可以通过DLI提供的价格计算器,快速计算出购买资源的参考价格。 表1 DLI计费项 计费项 说明 计算计费 支持三种计费模式: 按照包年/包月的订购周期计费 根据计算资源使用量(CU时)按需计费。 按预购的套餐包计费。 支持包年包月+按需组合使用(超出包周期规格按需计费);支持套餐包+按需计费组合使用(超出套餐包使用量按需计费)。 存储计费 按照存储在DLI服务中的表数据存储量(单位为“GB”)收取存储费用。 在估算存储费用时,请特别注意,DLI采用压缩存储,通常能压缩到原文件大小的 1/5 。DLI存储按照压缩后的大小计费。 如果数据存储在OBS服务中,则DLI服务不收取存储费用,对应的费用由OBS服务收取。 扫描量计费 按照用户每个作业的数据扫描量(单位为“GB”)收取计算费用。 当前仅适用于在默认default队列中提交作业按扫描数据量计费。 CU是弹性资源池的计价单位。 1CU= 1Core 4GMem。不同规格对应的计算能力不一样,规格越高计算能力越好。 default队列中提交作业按扫描数据量计费。
  • 计费模式 DLI在不同的计费项下有不同的计费模式,具体详见表2说明。 表2 DLI计费模式 计费项 计费模式 计费模式说明 计算计费 包年/包月 按照弹性资源池包年/包月的固定费用计费。 按需计费 按需弹性资源池资源使用CU时进行计费。 计算费用=单价*CU数*小时数 CU时套餐包 DLI提供了弹性资源池CU时套餐包。 CU时套餐包的额度会按订购周期重置。 DLI表的数据存储 按需计费 按照存储在DLI服务中的数据存储量(单位为“GB”)收取存储费用。 存储费用=单价*存储数据量(GB)*小时数 存储套餐包 购买了存储量套餐包,按需使用过程中优先抵扣存储套餐包的规格额度,超过套餐包额度的按照按需计费。存储套餐的额度每个小时会重置。 数据扫描量 按需计费 扫描量计费=单价*扫描量数据(GB) 扫描数据量套餐包 购买了扫描数据量套餐包,按需使用过程中优先抵扣扫描数据量套餐包的规格额度,超过扫描数据量套餐包额度的按照按需计费。扫描数据量套餐的额度会按订购周期重置。 按订购周期重置:如重置周期为月,且按订购周期重置,即如果用户1月5日订购的套餐包,之后每个月5日同一时间重置免费资源。 按自然周期重置:结合重置周期使用,如重置周期为月,且按自然周期重置,即每月1日0点重置免费资源。 DLI的套餐包中仅存储套餐包按自然周期重置,即每小时重置,其他套餐包按订购周期重置。
  • 示例 该示例展示了一个经典的业务流水线,维度表来自 Hive,每天通过批处理流水线作业或 Flink 作业更新一次,kafka流来自实时在线业务数据或日志,需要与维度表连接以扩充流。 使用spark sql 创建 hive obs 外表,并插入数据。 CREATE TABLE if not exists dimension_hive_table ( product_id STRING, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP, update_user STRING ) STORED AS PARQUET LOCATION 'obs://demo/spark.db/dimension_hive_table' PARTITIONED BY ( create_time STRING ); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_11', 'product_name_11', 1.2345, 100, 50, 20, '2023-11-25 02:10:58', 'update_user_1'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_12', 'product_name_12', 2.3456, 200, 100, 40, '2023-11-25 02:10:58', 'update_user_2'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_13', 'product_name_13', 3.4567, 300, 150, 60, '2023-11-25 02:10:58', 'update_user_3'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_14', 'product_name_14', 4.5678, 400, 200, 80, '2023-11-25 02:10:58', 'update_user_4'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_15', 'product_name_15', 5.6789, 500, 250, 100, '2023-11-25 02:10:58', 'update_user_5'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_16', 'product_name_16', 6.7890, 600, 300, 120, '2023-11-25 02:10:58', 'update_user_6'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_17', 'product_name_17', 7.8901, 700, 350, 140, '2023-11-25 02:10:58', 'update_user_7'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_18', 'product_name_18', 8.9012, 800, 400, 160, '2023-11-25 02:10:58', 'update_user_8'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_19', 'product_name_19', 9.0123, 900, 450, 180, '2023-11-25 02:10:58', 'update_user_9'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_10', 'product_name_10', 10.1234, 1000, 500, 200, '2023-11-25 02:10:58', 'update_user_10'); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业模拟从kafka读取数据,并关联hive维表对数据进行打宽,并输出到print。 如下脚本中的加粗参数请根据实际环境修改。 CREATE CATA LOG myhive WITH ( 'type' = 'hive' , 'default-database' = 'demo', 'hive-conf-dir' = '/opt/flink/conf' ); USE CATALOG myhive; CREATE TABLE if not exists ordersSource ( product_id STRING, user_name string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'TOPIC', 'properties.bootstrap.servers' = 'KafkaIP:PROT,KafkaIP:PROT,KafkaIP:PROT', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table if not exists print ( product_id STRING, user_name string, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP, update_user STRING, create_time STRING ) with ( 'connector' = 'print' ); insert into print select orders.product_id, orders.user_name, dim.product_name, dim.unit_price, dim.pv_count, dim.like_count, dim.comment_count, dim.update_time, dim.update_user, dim.create_time from ordersSource orders left join dimension_hive_table /*+ OPTIONS('lookup.join.cache.ttl'='60 m') */ for system_time as of orders.proctime as dim on orders.product_id = dim.product_id; 连接Kafka集群,向Kafka的source topic中插入如下测试数据: {"product_id": "product_id_11", "user_name": "name11"} {"product_id": "product_id_12", "user_name": "name12"} 查看print结果表数据。 +I[product_id_11, name11, product_name_11, 1.2345, 100, 50, 20, 2023-11-24T18:10:58, update_user_1, create_time_1] +I[product_id_12, name12, product_name_12, 2.3456, 200, 100, 40, 2023-11-24T18:10:58, update_user_2, create_time_1]
  • 注意事项 每个连接子任务都需要保留自己的Hive表缓存。请确保Hive表可以放入TM任务槽的内存中。 建议为streaming-source.monitor-interval(最新分区作为临时表)或 lookup.join.cache.ttl(所有分区作为临时表)设置一个相对较大的值。否则,作业容易出现性能问题,避免表更新和重新加载过于频繁。 缓存刷新需加载整个Hive表。无法区分新数据和旧数据。
  • 参数说明 在执行与最新的Hive表的时间关联时,Hive表将被缓存到Slot内存中,然后通过键将流中的每条记录与表进行关联,以确定是否找到匹配项。将最新的Hive表用作时间表不需要任何额外的配置。使用以下属性配置Hive表缓存的TTL。在缓存过期后,将重新扫描Hive表以加载最新的数据。 参数 默认值 类型 说明 lookup.join.cache.ttl 60 min Duration 查找连接中构建表的缓存 TTL(例如 10 分钟)。默认情况下,TTL 为 60 分钟。 该选项仅在查找有界的 hive 表源时有效,如果您使用流式 hive 源作为时态表,请使用 streaming-source.monitor-interval 配置数据更新间隔。
  • 功能描述 对于随时间变化的分区表,我们可以将其读取为无界流,如果每个分区包含某个版本的完整数据,则该分区可以被视为时间表的一个版本,时间表的版本保留了分区的数据。Flink支持在处理时间关联中自动跟踪时间表的最新分区(版本)。 最新分区(版本)由 'streaming-source.partition-order' 选项定义。 这是在Flink 流应用作业中将 Hive 表用作维度表的最常见用例。
  • 示例 下面的示例展示了一个经典的业务流水线,维度表来自 Hive,每天通过批处理流水线作业或 Flink 作业更新一次,kafka流来自实时在线业务数据或日志,需要与维度表连接以扩充流。 使用spark sql 创建 hive obs 外表,并插入数据。 CREATE TABLE if not exists dimension_hive_table ( product_id STRING, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP, update_user STRING ) STORED AS PARQUET LOCATION 'obs://demo/spark.db/dimension_hive_table' PARTITIONED BY ( create_time STRING ); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_11', 'product_name_11', 1.2345, 100, 50, 20, '2023-11-25 02:10:58', 'update_user_1'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_12', 'product_name_12', 2.3456, 200, 100, 40, '2023-11-25 02:10:58', 'update_user_2'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_13', 'product_name_13', 3.4567, 300, 150, 60, '2023-11-25 02:10:58', 'update_user_3'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_14', 'product_name_14', 4.5678, 400, 200, 80, '2023-11-25 02:10:58', 'update_user_4'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_15', 'product_name_15', 5.6789, 500, 250, 100, '2023-11-25 02:10:58', 'update_user_5'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_16', 'product_name_16', 6.7890, 600, 300, 120, '2023-11-25 02:10:58', 'update_user_6'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_17', 'product_name_17', 7.8901, 700, 350, 140, '2023-11-25 02:10:58', 'update_user_7'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_18', 'product_name_18', 8.9012, 800, 400, 160, '2023-11-25 02:10:58', 'update_user_8'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_19', 'product_name_19', 9.0123, 900, 450, 180, '2023-11-25 02:10:58', 'update_user_9'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_10', 'product_name_10', 10.1234, 1000, 500, 200, '2023-11-25 02:10:58', 'update_user_10'); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业模拟从kafka读取数据,并关联hive维表对数据进行打宽,并输出到print。 如下脚本中的加粗参数请根据实际环境修改。 CREATE CATALOG myhive WITH ( 'type' = 'hive' , 'default-database' = 'demo', 'hive-conf-dir' = '/opt/flink/conf' ); USE CATALOG myhive; CREATE TABLE if not exists ordersSource ( product_id STRING, user_name string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'TOPIC', 'properties.bootstrap.servers' = 'KafkaIP:PROT,KafkaIP:PROT,KafkaIP:PROT', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table if not exists print ( product_id STRING, user_name string, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP, update_user STRING, create_time STRING ) with ( 'connector' = 'print' ); insert into print select orders.product_id, orders.user_name, dim.product_name, dim.unit_price, dim.pv_count, dim.like_count, dim.comment_count, dim.update_time, dim.update_user, dim.create_time from ordersSource orders left join dimension_hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.partition.include' = 'latest', 'streaming-source.monitor-interval' = '10 m') */ for system_time as of orders.proctime as dim on orders.product_id = dim.product_id; 连接Kafka集群,向Kafka的source topic中插入如下测试数据: {"product_id": "product_id_11", "user_name": "name11"} {"product_id": "product_id_12", "user_name": "name12"} 查看print结果表数据。 +I[product_id_11, name11, product_name_11, 1.2345, 100, 50, 20, 2023-11-24T18:10:58, update_user_1, create_time_1] +I[product_id_12, name12, product_name_12, 2.3456, 200, 100, 40, 2023-11-24T18:10:58, update_user_2, create_time_1] 模拟向hive 维表,插入新的分区数据 INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_21', 'product_name_21', 1.2345, 100, 50, 20, '2023-11-25 02:10:58', 'update_user_1'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_22', 'product_name_22', 2.3456, 200, 100, 40, '2023-11-25 02:10:58', 'update_user_2'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_23', 'product_name_23', 3.4567, 300, 150, 60, '2023-11-25 02:10:58', 'update_user_3'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_24', 'product_name_24', 4.5678, 400, 200, 80, '2023-11-25 02:10:58', 'update_user_4'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_25', 'product_name_25', 5.6789, 500, 250, 100, '2023-11-25 02:10:58', 'update_user_5'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_26', 'product_name_26', 6.7890, 600, 300, 120, '2023-11-25 02:10:58', 'update_user_6'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_27', 'product_name_27', 7.8901, 700, 350, 140, '2023-11-25 02:10:58', 'update_user_7'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_28', 'product_name_28', 8.9012, 800, 400, 160, '2023-11-25 02:10:58', 'update_user_8'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_29', 'product_name_29', 9.0123, 900, 450, 180, '2023-11-25 02:10:58', 'update_user_9'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_20', 'product_name_20', 10.1234, 1000, 500, 200, '2023-11-25 02:10:58', 'update_user_10'); 连接Kafka集群,向Kafka的source topic中插入如下测试数据。关联上一个分区create_time='create_time_1'数据: {"product_id": "product_id_13", "user_name": "name13"} 查看print结果表数据。可观察到hive维表中的前一个分区create_time='create_time_1'数据已经被清除 +I[product_id_13, name13, null, null, null, null, null, null, null, null] 连接Kafka集群,向Kafka的source topic中插入如下测试数据。关联最新分区create_time='create_time_2'数据: {"product_id": "product_id_21", "user_name": "name21"} 查看print结果表数据。可观察到hive维表中保存了最新分区create_time='create_time_2'的数据 +I[product_id_21, name21, product_name_21, 1.2345, 100, 50, 20, 2023-11-24T18:10:58, update_user_1, create_time_2]
  • 功能描述 您可以将Hive表用作时态表,通过时态连接来关联Hive表。有关时态连接的详细信息,请参阅 temporal join。 Flink支持processing-time temporal join Hive Table,processing-time temporal join始终会加入最新版本的时态表。Flink支持分区表和 Hive非分区表的临时连接,对于分区表,Flink 支持自动跟踪Hive表的最新分区。详情可参考:Apache Flink Hive Read & Write
  • 注意事项 Flink目前不支持与Hive表进行基于事件时间event-time的时间关联。 Temporal Join The Latest Partition 特性,仅在 Flink STREAMING 模式下支持。 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 数据类型的使用,请参考Format章节。 Hive 方言支持的 DDL 语句,Flink 1.15 当前仅支持使用Hive语法创建OBS表和使用hive语法的DLI Lakehouse表。 使用Hive语法创建OBS表 defalut方言: with 属性中需要设置hive.is-external为true。 使用hive 方言:建表语句需要使用EXTERNAL关键字。 使用hive语法的DLI Lakehouse表 使用hive 方言:表属性中需要添加'is_lakehouse'='true'。 创建Flink OpenSource SQL作业时,在作业编辑界面配置开启checkpoint功能。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 CREATE EXTERNAL TABLE [IF NOT EXISTS] table_name [(col_name data_type [column_constraint] [COMMENT col_comment], ... [table_constraint])] [COMMENT table_comment] [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] [ [ROW FORMAT row_format] [STORED AS file_format] ] [LOCATION obs_path] [TBLPROPERTIES (property_name=property_value, ...)] row_format: : DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] [COLLECTION ITEMS TERMINATED BY char] [MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char] [NULL DEFINED AS char] | SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, ...)] file_format: : SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO | INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname column_constraint: : NOT NULL [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]] table_constraint: : [CONSTRAINT constraint_name] PRIMARY KEY (col_name, ...) [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]]
  • 功能描述 本节介绍利用Flink写Hive的表。Hive结果表的定义,以及创建结果表时使用的参数和示例代码。详情可参考:Apache Flink Hive Read & Write Flink 支持在 BATCH 和 STREAMING 模式下从Hive写入数据。 当作为BATCH应用程序运行时,Flink将写 Hive表,仅在作业完成时使这些记录可见。BATCH 写入支持追加和覆盖现有表。 STREAMING 不断写入,向Hive添加新数据,以增量方式提交记录使其可见。用户控制何时/如何触发具有多个属性的提交。流式写入不支持插入覆盖。有关可用配置的完整列表,请参阅流式处理接收器。Streaming sink
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 数据类型的使用,请参考Format章节。 Hive 方言支持的 DDL 语句,Flink 1.15 当前仅支持使用Hive语法创建OBS表和使用hive语法的DLI Lakehouse表。 使用Hive语法创建OBS表 defalut方言: with 属性中需要设置hive.is-external为true。 使用hive 方言:建表语句需要使用EXTERNAL关键字。 使用hive语法的DLI Lakehouse表 使用hive 方言:表属性中需要添加'is_lakehouse'='true'。 创建Flink OpenSource SQL作业时,在作业编辑界面配置开启checkpoint功能。
  • 参数说明 请参考使用Hive语法创建OBS表,和Hive 文档了解每个DDL语句的语义。 表1 TBLPROPERTIES 参数说明 参数 是否必选 默认参数 数据类型 说明 streaming-source.enable 否 false Boolean 是否启用流源。 注意: 请确保每个分区/文件都应该以原子方式写入,否则读取器可能会得到不完整的数据。 streaming-source.partition.include 否 all String 设置分区读取的选项,支持的选项是 'all' 和 'latest'。默认情况下,该选项为 'all' 。 'all' 表示读取所有分区; 'latest'仅在流式处理 Hive 源表用作temporal table时才有效。'latest' 表示按'streaming-source.partition.order'的顺序读取最新的分区。 Flink 支持对最新的 hive 分区进行临时连接,通过启用 'streaming-source.enable',并将 'streaming-source.partition.include' 设置为 'latest'。同时,用户可以通过配置以下分区相关选项来分配分区比较顺序和数据更新间隔。 streaming-source.monitor-interval 否 None Duration 连续监视分区/文件的时间间隔。注意:Hive 流式处理读取的默认间隔为'1 min',Hive 流式处理temporal join的默认间隔为 '60 min',这是因为在当前 Hive 流式处理临时连接实现中,每个 TM 都会访问 Hive metaStore,这可能会对 metaStore 产生压力,这将在未来得到改善。 streaming-source.partition-order 否 partition-name String 流源的分区顺序,支持 create-time、partition-time 和 partition-name。 create-time 比较分区/文件创建时间,这不是 Hive metaStore 中的分区创建时间,而是文件系统中的文件夹/文件修改时间,如果分区文件夹以某种方式更新,例如将新文件添加到文件夹中,可能会影响数据的使用方式。 partition-time 比较从分区名称中提取的时间。 partition-name 比较分区名称的字母顺序。 对于非分区表,此值应始终为“create-time”。 默认情况下,该值为 partition-name。该选项与已弃用的选项“streaming-source.consume-order”相等。 streaming-source.consume-start-offset 否 None String 流式处理消费的起始偏移量。如何解析和比较偏移量取决于您的订单。对于 create-time 和 partition-time,应为时间戳字符串 (yyyy-[m]m-[d]d [hh:mm:ss])。 对于partition-time,将使用分区时间提取器从分区中提取时间。对于 partition-name,是分区名称字符串(例如 pt_year=2020/pt_mon=10/pt_day=01)。 is_lakehouse 否 无 Boolean 如果使用hive语法的DLI Lakehouse表,则需要设置is_lakehouse为true。 Source Parallelism Inference 默认情况下,Flink 会根据文件数量和每个文件中的块数来推断其 Hive 读取器的最佳并行度。 Flink 支持灵活配置并行推理策略。您可以在 TableConfig 中配置以下参数(请注意,这些参数会影响作业的所有源): Key Default Type Description table.exec.hive.infer-source-parallelism true Boolean 如果为 true,则根据拆分数推断源并行度。如果为 false,则源的并行度由 config 设置。 table.exec.hive.infer-source-parallelism.max 1000 Integer 设置源运算符的最大推断并行度。 Load Partition Splits 多线程用于拆分 hive 的分区。您可以使用 table.exec.hive.load-partition-splits.thread-num 来配置线程号。默认值为 3,配置的值应大于 0。 Key Default Type Description table.exec.hive.load-partition-splits.thread-num 3 Integer 配置的值应大于0。 SQL 提示可用于将配置应用于 Hive 表,而无需更改其在 Hive 元存储中的定义。Hints | Apache Flink Vectorized Optimization upon Read 当满足以下条件时,Flink 会自动对 Hive 表进行矢量化读取: 格式:ORC 或 Parquet。 没有复杂数据类型的列,如配置单元类型:List、Map、Struct、Union。 默认情况下,此功能处于启用状态。可以使用以下配置禁用它。 table.exec.hive.fallback-mapred-reader=true
  • 示例 使用Spark SQL创建Hive语法OBS表,并插入10条数据。模拟数据源。 CREATE TABLE IF NOT EXISTS demo.student( name STRING, score DOUBLE) PARTITIONED BY (classNo INT) STORED AS PARQUET LOCATION 'obs://demo/spark.db/student'; INSERT INTO demo.student PARTITION(classNo=1) VALUES ('Alice', 90.0), ('Bob', 80.0), ('Charlie', 70.0), ('David', 60.0), ('Eve', 50.0), ('Frank', 40.0), ('Grace', 30.0), ('Hank', 20.0), ('Ivy', 10.0), ('Jack', 0.0); 使用Flink SQL展示使用批的方式,从Hive语法OBS表demo.student中读取数据,并打印。需要开启checkpoint。 CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'demo', 'hive-conf-dir' = '/opt/flink/conf' ); USE CATALOG myhive; create table if not exists print ( name STRING, score DOUBLE, classNo INT) with ('connector' = 'print'); insert into print select * from student; 结果(taskmanager的out日志): +I[Alice, 90.0, 1] +I[Bob, 80.0, 1] +I[Charlie, 70.0, 1] +I[David, 60.0, 1] +I[Eve, 50.0, 1] +I[Frank, 40.0, 1] +I[Grace, 30.0, 1] +I[Hank, 20.0, 1] +I[Ivy, 10.0, 1] +I[Jack, 0.0, 1] 使用Flink SQL展示使用流的方式,从Hive语法OBS表demo.student中读取数据,并打印。 CREATE CATALOG myhive WITH ( 'type' = 'hive' , 'default-database' = 'demo', 'hive-conf-dir' = '/opt/flink/conf' ); USE CATALOG myhive; create table if not exists print ( name STRING, score DOUBLE, classNo INT) with ('connector' = 'print'); insert into print select * from student /*+ OPTIONS('streaming-source.enable' = 'true', 'streaming-source.monitor-interval' = '3 m') */;
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 CREATE EXTERNAL TABLE [IF NOT EXISTS] table_name [(col_name data_type [column_constraint] [COMMENT col_comment], ... [table_constraint])] [COMMENT table_comment] [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] [ [ROW FORMAT row_format] [STORED AS file_format] ] [LOCATION obs_path] [TBLPROPERTIES (property_name=property_value, ...)] row_format: : DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] [COLLECTION ITEMS TERMINATED BY char] [MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char] [NULL DEFINED AS char] | SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, ...)] file_format: : SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO | INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname column_constraint: : NOT NULL [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]] table_constraint: : [CONSTRAINT constraint_name] PRIMARY KEY (col_name, ...) [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]]
  • 简介 Apache Hive 已经成为了 数据仓库 生态系统中的核心。 它不仅仅是一个用于大数据分析和ETL场景的SQL引擎,同样它也是一个数据管理平台,可用于发现,定义,和演化数据。 Flink与Hive的集成包含两个层面,一是利用了Hive的MetaStore作为持久化的Catalog,二是利用Flink来读写Hive的表。Overview | Apache Flink 从Flink 1.11.0开始,在使用 Hive方言时,Flink允许用户用Hive语法来编写SQL语句。通过提供与Hive语法的兼容性,改善与Hive的互操作性,并减少用户需要在Flink和Hive之间切换来执行不同语句的情况。详情可参考:Apache Flink Hive 方言 使用HiveCatalog,Apache Flink可以用于统一处理Apache Hive表的BATCH和STREAM。Flink可以作为Hive批处理引擎的更高效的替代方案,或者用于连续读写Hive表,以支持实时数据仓库应用程序。Apache Flink Hive Read & Write
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 数据类型的使用,请参考Format章节。 Hive 方言支持的 DDL 语句,Flink 1.15 当前仅支持使用Hive语法创建OBS表和使用hive语法的DLI Lakehouse表。 使用Hive语法创建OBS表 defalut方言: with 属性中需要设置hive.is-external为true。 使用hive 方言:建表语句需要使用EXTERNAL关键字。 使用hive语法的DLI Lakehouse表 使用hive 方言:表属性中需要添加'is_lakehouse'='true'。 开启checkpoint功能。 建议切换到Hive方言来创建Hive兼容表。如果您想用默认的方言创建Hive兼容表,确保在您的表属性中设置'connector'='hive',否则在HiveCatalog中一个表默认被认为是通用的。注意,如果使用Hive方言,就不需要connector属性。 监视策略是扫描当前位置路径中的所有目录/文件。许多分区可能会导致性能下降。 对未分区表进行流式读取时,要求将每个文件以原子方式写入目标目录。 分区表的流式读取要求在 hive 元存储的视图中以原子方式添加每个分区。否则,将使用添加到现有分区的新数据。 流式读取不支持 Flink DDL 中的水印语法。这些表不能用于窗口运算符。
  • 功能描述 本节介绍利用Flink来读写Hive的表。Hive源表的定义,以及创建源表时使用的参数和示例代码。详情可参考:Apache Flink Hive Read & Write Flink支持在BATCH 和 STREAMING模式下从Hive读取数据。当作为BATCH应用程序运行时,Flink将在执行查询的时间点对表的状态执行查询。STREAMING读取将持续监控表,并在新数据可用时以增量方式获取新数据。默认情况下,Flink会读取有界的表。 STREAMING读取支持同时使用分区表和非分区表。对于分区表,Flink将监控新分区的生成,并在可用时增量读取它们。对于未分区的表,Flink 会监控文件夹中新文件的生成情况,并增量读取新文件。
  • 注意事项 Hive方言只能用于操作Hive对象,并要求当前Catalog是一个HiveCatalog 。 Hive方言只支持db.table这种两级的标识符,不支持带有Catalog名字的标识符。更多信息请参考Apache Flink Hive Read & Write。 虽然所有Hive版本支持相同的语法,但是一些特定的功能对Hive版本有依赖,请参考Hive 版本。 例如,更新数据库位置 只在 Hive-2.4.0 或更高版本支持。 执行DML和DQL时应该使用HiveModule 。 从Flink 1.15版本开始,在使用Hive方言抛出以下异常时,请尝试用opt目录下的 flink-table-planner_2.12 jar包来替换lib目录下的flink-table-planner-loader jar包。具体原因请参考 FLINK-25128。
  • 功能描述 HiveCatalog有两个用途:作为原生Flink元数据的持久化存储,以及作为读写现有Hive元数据的接口。 Flink 的Hive 文档提供了有关设置 HiveCatalog以及访问现有 Hive 元数据的详细信息。详情参考:Apache Flink Hive Catalog HiveCatalog可以用来处理两种类型的表:Hive兼容表和通用表。 Hive兼容表是以Hive兼容的方式存储的,他们的元数据和实际的数据都在分层存储中。因此,通过flink创建的与hive兼容的表,可以通过hive查询。 Hive通用表是特定于Flink的。当使用HiveCatalog创建通用表时,只是使用HMS来持久化元数据。虽然这些表对Hive来说是可见的,但Hive不太可能理解元数据。因此,在Hive中使用这样的表会导致未定义的行为。 建议切换到Hive方言来创建Hive兼容表。如果您想用默认的方言创建Hive兼容表,确保在您的表属性中设置'connector'='hive',否则在HiveCatalog中一个表默认被认为是通用的。如果使用Hive方言,就不需要connector属性。了解Hive方言。
  • 注意事项 警告Hive Metastore以小写形式存储所有元数据对象名称。 如果使用相同名称的目录已经存在,那么将会抛出一个异常。 Hudi表需要使用hudi catalog。并不适用于hive catalog。 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 参数说明 表1 参数说明 参数 必选 默认值 类型 描述 type 是 无 String Catalog的类型。 创建HiveCatalog时,该参数必须设置为'hive'。 hive-conf-dir 是 无 String 指向包含 hive-site.xml目录的URI。 该值固定为'hive-conf-dir' = '/opt/flink/conf' default-database 否 default String 当一个catalog被设为当前catalog时,所使用的默认当前database。
  • 示例 在Flink OpenSource SQL作业中,创建名为myhive的catalog,并使用它用于管理元数据。 CREATE CATALOG myhive WITH ( 'type' = 'hive' ,'hive-conf-dir' = '/opt/flink/conf' ); USE CATALOG myhive; create table dataGenSource( user_id string, amount int ) with ( 'connector' = 'datagen', 'rows-per-second' = '1', --每秒生成一条数据 'fields.user_id.kind' = 'random', --为字段user_id指定random生成器 'fields.user_id.length' = '3' --限制user_id长度为3 ); create table printSink( user_id string, amount int ) with ( 'connector' = 'print' ); insert into printSink select * from dataGenSource; 查看default数据库中,是否含有dataGenSource、printSink 表。 Hive Metastore 以小写形式存储所有元数据对象名称。 图1 查看default数据库 使用名为myhive的catalog中的元数据,新建Flink OpenSource SQL作业。 CREATE CATALOG myhive WITH ( 'type' = 'hive' ,'hive-conf-dir' = '/opt/flink/conf' ); USE CATALOG myhive; insert into printSink select * from dataGenSource;
  • 简介 Catalog提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。 数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过TableEnvironment注册的UDF。 元数据也可以是持久化的,例如Hive Metastore中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从Table API和SQL查询语句中来访问。详情参考Apache Flink Catalogs
  • 前提条件 该场景作业需要运行在DLI的独享队列上,因此要与HBase建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖探索 用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 如果使用 MRS HBase,请在增强型跨源的主机信息中添加MRS集群所有节点的主机ip信息。 详细操作请参考《 数据湖 探索用户指南》中的“修改主机信息”章节描述。
  • 常见问题 Q:Flink作业日志中有如下报错信息应该怎么解决? org.apache.zookeeper.ClientCnxn$SessionTimeoutException: Client session timed out, have not heard from server in 90069ms for connection id 0x0 A:可能是跨源连接未绑定或跨源绑定失败。参考增强型跨源连接重新配置跨源,Kafka集群安全组放通DLI队列的网段地址。
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 所有 HBase 表的列簇必须定义为ROW类型,字段名对应列簇名(column family),嵌套的字段名对应列限定符名(column qualifier)。用户只需在表结构中声明查询中使用的的列簇和列限定符。除了 ROW 类型的列,剩下的原子数据类型字段(比如,STRING, BIGINT)将被识别为 HBase 的 rowkey,一张表中只能声明一个 rowkey。rowkey 字段的名字可以是任意的,如果是保留关键字,需要用反引号。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全