华为云用户手册

  • FlinkServer作业重启策略介绍 Flink支持不同的重启策略,以在发生故障时控制作业是否重启以及如何重启。如果不指定重启策略,集群会使用默认的重启策略。用户也可以在提交作业时指定一个重启策略,可参考如何创建FlinkServer作业在作业开发界面配置( MRS 3.1.0及以后版本)。 重启策略也可以通过Flink的配置文件“客户端安装目录/Flink/flink/conf/flink-conf.yaml”中的参数“restart-strategy”指定,为全局配置,还可以在应用代码中动态指定,会覆盖全局配置,重启策略包括失败率(failure-rate)和两种默认策略,默认策略为如下: 无重启(No restart):如果没有启用CheckPoint,默认使用该策略。 固定间隔(fixed-delay):如果启用了CheckPoint,但没有配置重启策略,默认使用该策略。
  • fixed-delay策略 发生故障时会尝试重启作业固定次数,如果超过了最大的尝试次数,作业最终会失败。并且在两次连续重启尝试之间,重启策略会等待固定的时间。 以配置如果重启失败了3次则认为该Job失败,重试时间间隔为10s为例,参数配置为: restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 3 restart-strategy.fixed-delay.delay: 10 s
  • failure-rate策略 在作业失败后会直接重启,但超过设置的失败率后,作业会被认定为失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。 以配置10分钟内如果重启失败了3次则认为该作业失败,重试时间间隔为10s为例,参数配置为: restart-strategy: failure-rate restart-strategy.failure-rate.max-failures-per-interval: 3 restart-strategy.failure-rate.failure-rate-interval: 10 min restart-strategy.failure-rate.delay: 10 s
  • 如何选择重启策略 如果用户在作业失败后,不希望重试,则推荐使用No restart策略。 如果用户在作业失败后,希望对作业进行重试,推荐使用failure-rate策略。因为fixed-delay策略可能会因为网络、内存等硬件故障导致用户作业失败次数达到最大重试次数,从而导致作业失败。 为了防止在failure-rate策略下的无限重启,推荐如下参数配置: restart-strategy: failure-rate restart-strategy.failure-rate.max-failures-per-interval: 3 restart-strategy.failure-rate.failure-rate-interval: 10 min restart-strategy.failure-rate.delay: 10 s
  • 创建HetuEngine计算实例前提条件 已创建用于访问HetuEngine WebUI界面的用户,如hetu_user,用户创建具体操作请参见创建HetuEngine权限角色。 已在待操作集群创建所需租户。请确保修改HetuEngine计算实例配置时,对应的租户有足够的内存和CPU资源。 创建HetuEngine计算实例时必须使用“叶子租户”类型的租户,只有叶子租户的队列才能提交Yarn任务。 为了避免资源竞争带来的不确定性因素,建议为HetuEngine使用的租户创建独立资源池。
  • 计算实例状态说明 计算实例创建成功后,可在“计算实例”页签查看当前已创建的实例信息,包括实例所属租户名、对应实例数量、实例状态和资源总量等,实例状态信息如下: 图1 计算实例状态 绿色图标:实例处于运行中或亚健康状态。 红色图标:实例故障。 灰色图标:实例已停止、待启动。 蓝色图标:实例处于其他状态,包括扩容中、缩容中、滚动重启中、创建中、启动中、安全启动中、停止中、安全停机中、删除中、已删除、停止中等。
  • 限制 EXCHANGE PARTITION: 被迁移的单个或多个分区,迁移前必须都是已存在的分区,并归属于来源表,且在目标表中不包含这些分区; 该操作涉及的表需要有相同的列定义,并且有相同的分区键; 如果表中包含索引,该操作会失败; 来源表和目标表中任意一个为事务表时,不允许Exchange partition操作; 对于目标表,在一次操作中,多个分区要么同时迁移成功,要么全部失败。对于来源表,操作成功后,所有迁移的分区都会被释放; Alter table change column不支持orc格式的表。 ALTER TABLE table_name ADD | DROP col_name命令仅对于ORC/PARQUET存储格式的非分区表可用。
  • 示例 将表名从users修改为people: ALTER TABLE users RENAME TO people; 在表users中增加名为zip的列: ALTER TABLE users ADD COLUMN zip varchar; 从表users中删除名为zip的列: ALTER TABLE users DROP COLUMN zip; 将表users中列名id更改为user_id: ALTER TABLE users RENAME COLUMN id TO user_id; 修改分区操作: --创建两个分区表 CREATE TABLE IF NOT EXISTS hetu_int_table5 (eid int, name String, salary String, destination String, dept String, yoj int) COMMENT 'Employee Names' partitioned by (dt timestamp,country String, year int, bonus decimal(10,3)) STORED AS TEXTFILE; CREATE TABLE IF NOT EXISTS hetu_int_table6 (eid int, name String, salary String, destination String, dept String, yoj int) COMMENT 'Employee Names' partitioned by (dt timestamp,country String, year int, bonus decimal(10,3)) STORED AS TEXTFILE; --添加分区 ALTER TABLE hetu_int_table5 ADD IF NOT EXISTS PARTITION (dt='2008-08-08 10:20:30.0', country='IN', year=2001, bonus=500.23) PARTITION (dt='2008-08-09 10:20:30.0', country='IN', year=2001, bonus=100.50) ; --查看分区 show partitions hetu_int_table5; dt | country | year | bonus -------------------------|---------|------|--------- 2008-08-09 10:20:30.000 | IN | 2001 | 100.500 2008-08-08 10:20:30.000 | IN | 2001 | 500.230 (2 rows) --删除分区 ALTER TABLE hetu_int_table5 DROP IF EXISTS PARTITION (dt=timestamp '2008-08-08 10:20:30.0', country='IN', year=2001, bonus=500.23); --查看分区 show partitions hetu_int_table5; dt | country | year | bonus -------------------------|---------|------|--------- 2008-08-09 10:20:30.000 | IN | 2001 | 100.500 (1 row) --迁移分区示例 CREATE SCHEMA part_test; CREATE TABLE hetu_exchange_partition1 (a string, b string) PARTITIONED BY (ds string); CREATE TABLE part_test.hetu_exchange_partition2 (a string, b string) PARTITIONED BY (ds string); ALTER TABLE hetu_exchange_partition1 ADD PARTITION (ds='1'); --查看分区 show partitions hetu_exchange_partition1; ds ---- 1 (1 row) show partitions part_test.hetu_exchange_partition2; ds ---- (0 rows) --迁移分区,从 T1 到 T2 ALTER TABLE part_test.hetu_exchange_partition2 EXCHANGE PARTITION (ds='1') WITH TABLE hetu_exchange_partition1; --再次查看分区,可以看到分区迁移成功 show partitions hetu_exchange_partition1; ds ---- (0 row) show partitions part_test.hetu_exchange_partition2; ds ---- 1 (1 rows) --重命名分区 CREATE TABLE IF NOT EXISTS hetu_rename_table ( eid int, name String, salary String, destination String, dept String, yoj int) COMMENT 'Employee details' partitioned by (year int) STORED AS TEXTFILE; ALTER TABLE hetu_rename_table ADD IF NOT EXISTS PARTITION (year=2001); SHOW PARTITIONS hetu_rename_table; year ------ 2001 (1 row) ALTER TABLE hetu_rename_table PARTITION (year=2001) rename to partition (year=2020); SHOW PARTITIONS hetu_rename_table; year ------ 2020 (1 row) --修改分区表 create table altercolumn4(a integer, b string) partitioned by (c integer); --修改表的文件格式 alter table altercolumn4 SET FILEFORMAT textfile; insert into altercolumn4 values (100, 'Daya', 500); alter table altercolumn4 partition (c=500) change column b empname string comment 'changed column name to empname' first; --修改分区表的存储位置(需要先在hdfs上创建目录,执行语句后,无法查到之前插入的那条数据) alter table altercolumn4 partition (c=500) set Location '/user/hive/warehouse/c500'; --修改列 b 改名为name,同时类型从integer转为string create table altercolumn1(a integer, b integer) stored as textfile; alter table altercolumn1 change column b name string; --修改altercolumn1的存储属性 ALTER TABLE altercolumn1 CLUSTERED BY(a, name) SORTED BY(name) INTO 25 BUCKETS; --查看altercolumn1的属性 describe formatted altercolumn1; Describe Formatted Table ---------------------------------------------------------------------------------------- # col_name data_type comment a integer name varchar # Detailed Table Information Database: default Owner: admintest LastAccessTime: 0 Location: hdfs://hacluster/user/hive/warehouse/altercolumn1 Table Type: MANAGED_TABLE # Table Parameters: STATS_GENERATED_VIA_STATS_TASK workaround for potential lack of HIVE-12730 numFiles 0 numRows 0 orc.compress.size 262144 orc.compression.codec GZIP orc.row.index.stride 10000 orc.stripe.size 67108864 presto_query_id 20210325_025238_00034_f63xj@default@HetuEngine presto_version rawDataSize 0 totalSize 0 transient_lastDdlTime 1616640758 # Storage Information SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Compressed: No Num Buckets: 25 Bucket Columns: [a, name] Sort Columns: [SortingColumn{columnName=name, order=ASCENDING}] Storage Desc Params: serialization.format 1 (1 row) Query 20210325_090522_00091_f63xj@default@HetuEngine, FINISHED, 1 node Splits: 1 total, 1 done (100.00%) 0:00 [0 rows, 0B] [0 rows/s, 0B/s]
  • 示例 用指定列的查询结果创建新表orders_column_aliased: CREATE TABLE orders_column_aliased (order_date, total_price) AS SELECT orderdate, totalprice FROM orders; 用表orders的汇总结果新建一个表orders_by_data: CREATE TABLE orders_by_date COMMENT 'Summary of orders by date' WITH (format = 'ORC') AS SELECT orderdate, sum(totalprice) AS price FROM orders GROUP BY orderdate; 如果表orders_by_date不存在,则创建表orders_by_date: CREATE TABLE IF NOT EXISTS orders_by_date AS SELECT orderdate, sum(totalprice) AS price FROM orders GROUP BY orderdate; 用和表orders具有相同schema创建新表empty_orders table,但是没数据: CREATE TABLE empty_orders AS SELECT * FROM orders WITH NO DATA; 使用VALUES创建表,参考 VALUES。 分区表示例: CREATE EXTERNAL TABLE hetu_copy(corderkey, corderstatus, ctotalprice, corderdate, cds) PARTITIONED BY(cds) SORT BY (corderkey, corderstatus) COMMENT 'test' STORED AS orc LOCATION '/user/hetuserver/tmp' TBLPROPERTIES (orc_bloom_filter_fpp = 0.3, orc_compress = 'SNAPPY', orc_compress_size = 6710422, orc_bloom_filter_columns = 'corderstatus,ctotalprice') as select * from hetu_test; CREATE TABLE hetu_copy1(corderkey, corderstatus, ctotalprice, corderdate, cds) WITH (partitioned_by = ARRAY['cds'], bucketed_by = ARRAY['corderkey', 'corderstatus'], sorted_by = ARRAY['corderkey', 'corderstatus'], bucket_count = 16, orc_compress = 'SNAPPY', orc_compress_size = 6710422, orc_bloom_filter_columns = ARRAY['corderstatus', 'ctotalprice'], external = true, format = 'orc', location = '/user/hetuserver/tmp ') as select * from hetu_test;
  • 语法 CREATE [EXTERNAL]① TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name [ ( column_alias, ... ) ] [[PARTITIONED BY ①(col_name, ....)] [SORT BY① ([column [, column ...]])] ]① [COMMENT 'table_comment'] [ WITH ( property_name = expression [, ...] ) ]② [[STORED AS file_format]① [LOCATION 'hdfs_path']① [TBLPROPERTIES (orc_table_property = value [, ...] ) ] ]① AS query [ WITH [ NO ] DATA ]②
  • 修改ClickHouse默认用户密码 使用root用户登录ClickHouse安装节点,切换到omm用户,进入“$BIGDATA_HOME/ FusionInsight _ClickHouse_*/install/FusionInsight-ClickHouse-*/clickhouse/clickhouse_change_password”目录。 su - omm cd $BIGDATA_HOME/FusionInsight_ClickHouse_*/install/FusionInsight-ClickHouse-*/clickhouse/clickhouse_change_password 执行如下命令修改default或clickhouse用户密码: ./change_password.sh 如下所示:以clickhouse用户为例,按照提示输入clickhouse和密码,等待密码修改完成。 密码复杂度要求: 密码长度限制是8~64位。 至少包含一个小写字母、一个大写字母、一个数字和一个特殊字符,支持的特殊字符包含-%;[]{}@_。 查看密码修改结果: MRS 3.3.0-LTS版本:登录到ClickHouse Server节点的,查看“${BIGDATA_HOME}/FusionInsight_ClickHouse_*/*_ClickHouseServer/etc/users.xml”文件中参数“password_sha256_hex”的值,即为存储修改后的密码。 cd ${BIGDATA_HOME}/FusionInsight_ClickHouse_*/*_ClickHouseServer/etc/ vi users.xml 如下所示:用password_sha256_hex来存储修改后的密码。 MRS 3.3.1-LTS及之后版本:查看密码修改结果: 登录到ClickHouse Server节点的,查看“${BIGDATA_HOME}/FusionInsight_ClickHouse_*/*_ClickHouseServer/etc/users.xml”文件中参数“password_PBKDF2_hex”的值,即为存储修改后的密码。 cd ${BIGDATA_HOME}/FusionInsight_ClickHouse_*/*_ClickHouseServer/etc/ vi users.xml 如下所示:用password_PBKDF2_hex来存储修改后的密码。
  • 操作场景 因为MRS操作系统用户组个数限制,导致Hive不能创建超过32个角色,开启此功能后,Hive将支持创建超过32个角色。 开启本功能并对表库等授权后,对表库目录具有相同权限的角色将会用“|”合并。查询ACL权限时,将显示合并后的结果,与开启该功能前的显示会有区别。此操作不可逆,请充分考虑实际应用场景,再决定是否做出调整。 如果当前组件使用了Ranger进行权限控制,需基于Ranger配置相关策略进行权限管理,具体操作可参考添加Hive的Ranger访问权限策略。 开启此功能后,包括owner在内默认最大可支持512个角色,由MetaStore自定义参数“hive.supports.roles.max”控制,可根据实际应用场景进行修改。
  • SET SESSION 语法 SET SESSION name = expression; SET SESSION catalog.name = expression; 描述 用于设置当前会话的指定属性。 示例 SET SESSION optimize_hash_generation = true; SET SESSION hive.optimized_reader_enabled = true; 父主题: HetuEngine辅助命令语法
  • 示例 --PREPARE my_select1 FROM SELECT * FROM fruit;值标识是否为别名 DESCRIBE OUTPUT my_select1; --PREPARE my_select2 FROM SELECT count(*) as my_count, 1+2 FROM fruit; DESCRIBE OUTPUT my_select2; --PREPARE my_create FROM CREATE TABLE foo AS SELECT * FROM fruit; DESCRIBE OUTPUT my_create;
  • 权限原则与约束 HetuEngine访问同集群数据源。 HetuEngine启用Ranger鉴权,则统一使用Ranger的PBAC权限策略做鉴权。 HetuEngine停用Ranger鉴权,则统一使用MetaStore的RBAC权限策略做鉴权。 HetuEngine访问跨集群数据源。 同时受HetuEngine端权限和数据源端权限管控(Hive场景下,依赖于HDFS)。 查询视图时,仅需给目标视图授予select权限即可;使用视图联表查询时,需要同时给两者授予select权限。 不支持 GaussDB 和HetuEngine数据源列脱敏。 HetuEngine服务在切换权限控制类型时,需要重启整个HetuEngine服务,包括HSConsole页面上正在运行的HetuEngine计算实例。
  • 描述 显示一条语句的逻辑的或者分布式的执行计划,也可以用于校验一条SQL语句,或者是分析IO。 参数TYPE DISTRIBUTED用于显示分片后的计划(fragmented plan)。每一个fragment都会被一个或者多个节点执行。Fragments separation表示数据在两个节点之间进行交换。Fragment type表示一个fragment如何被执行以及数据在不同fragment之间怎样分布。 SINGLE Fragment会在单个节点上执行。 HASH Fragment会在固定数量的节点上执行,输入数据通过哈希函数进行分布。 ROUND_ROBIN Fragment会在固定数量的节点上执行,片段在固定数量的节点上执行,输入数据以轮询方式进行分布。 BROADCAST Fragment会在固定数量的节点上执行,输入数据被广播到所有的节点。 SOURCE Fragment在访问输入分段的节点上执行。
  • 示例 LOG ICAL: CREATE TABLE testTable (regionkey int, name varchar); EXPLAIN SELECT regionkey, count(*) FROM testTable GROUP BY 1; Query Plan ------------------------------------------------------------------------------------------------------------------------------------- Output[regionkey, _col1] │ Layout: [regionkey:integer, count:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?} │ _col1 := count └─ RemoteExchange[GATHER] │ Layout: [regionkey:integer, count:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?} └─ Project[] │ Layout: [regionkey:integer, count:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?} └─ Aggregate(FINAL)[regionkey][$hashvalue] │ Layout: [regionkey:integer, $hashvalue:bigint, count:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?} │ count := count("count_8") └─ LocalExchange[HASH][$hashvalue] ("regionkey") │ Layout: [regionkey:integer, count_8:bigint, $hashvalue:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?} └─ RemoteExchange[REPARTITION][$hashvalue_9] │ Layout: [regionkey:integer, count_8:bigint, $hashvalue_9:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?} └─ Aggregate(PARTIAL)[regionkey][$hashvalue_10] │ Layout: [regionkey:integer, $hashvalue_10:bigint, count_8:bigint] │ count_8 := count(*) └─ ScanProject[table = hive:default:testtable] Layout: [regionkey:integer, $hashvalue_10:bigint] Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B} $hashvalue_10 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("regionkey"), 0)) regionkey := regionkey:int:0:REGULAR DISTRIBUTED: EXPLAIN (type DISTRIBUTED) SELECT regionkey, count(*) FROM testTable GROUP BY 1; Query Plan ----------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [regionkey, count] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION Output[regionkey, _col1] │ Layout: [regionkey:integer, count:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?} │ _col1 := count └─ RemoteSource[1] Layout: [regionkey:integer, count:bigint] Fragment 1 [HASH] Output layout: [regionkey, count] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION Project[] │ Layout: [regionkey:integer, count:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?} └─ Aggregate(FINAL)[regionkey][$hashvalue] │ Layout: [regionkey:integer, $hashvalue:bigint, count:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?} │ count := count("count_8") └─ LocalExchange[HASH][$hashvalue] ("regionkey") │ Layout: [regionkey:integer, count_8:bigint, $hashvalue:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?} └─ RemoteSource[2] Layout: [regionkey:integer, count_8:bigint, $hashvalue_9:bigint] Fragment 2 [SOURCE] Output layout: [regionkey, count_8, $hashvalue_10] Output partitioning: HASH [regionkey][$hashvalue_10] Stage Execution Strategy: UNGROUPED_EXECUTION Aggregate(PARTIAL)[regionkey][$hashvalue_10] │ Layout: [regionkey:integer, $hashvalue_10:bigint, count_8:bigint] │ count_8 := count(*) └─ ScanProject[table = hive:default:testtable, grouped = false] Layout: [regionkey:integer, $hashvalue_10:bigint] Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B} $hashvalue_10 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("regionkey"), 0)) regionkey := regionkey:int:0:REGULAR VALIDATE: EXPLAIN (TYPE VALIDATE) SELECT id, count(*) FROM testTable GROUP BY 1; Valid ------- true IO: EXPLAIN (TYPE IO, FORMAT JSON) SELECT regionkey , count(*) FROM testTable GROUP BY 1; Query Plan --------------------------------- { "inputTableColumnInfos" : [ { "table" : { "catalog" : "hive", "schemaTable" : { "schema" : "default", "table" : "testtable" } }, "columnConstraints" : [ ] } ] }
  • 添加Hudi数据源前提条件 创建Hudi数据源的代理用户,该代理用户为人机用户且需拥有hive组。 在HetuEngine所在集群的所有节点的“/etc/hosts”文件中,添加待对接数据源所在集群的主机名称和对应的IP映射,及其“/etc/hosts”文件中的“10.10.10.10 hadoop.系统 域名 ”(如“10.10.10.10 hadoop.hadoop.com”),否则HetuEngine无法根据主机名称连接到非本集群节点。 参考创建HetuEngine权限角色创建HetuEngine管理员用户。
  • 提取函数 描述:提取函数用于从HTTP URL(或任何符合RFC 2396标准的URL)中提取内容。 [protocol:][//host[:port]][path][?query][#fragment] 提取的内容不会包含URI的语法分隔符,比如“:”或“?”。 url_extract_fragment(url) → varchar 描述:返回url的片段标识符,即#后面的字符串。 select url_extract_fragment('http://www.example.com:80/stu/index.html?name=xxx&age=25#teacher');--teacher url_extract_host(url)→ varchar 描述:返回url中的主机域名。 select url_extract_host('http://www.example.com:80/stu/index.html?name=xxx&age=25#teacher');-- www.example.com url_extract_parameter(url, name)→ varchar 描述:返回url中参数名为name的参数。 select url_extract_parameter('http://www.example.com:80/stu/index.html?name=xxx&age=25#teacher','age');-- 25 url_extract_path(url)→ varchar 描述:提取url中的路径。 select url_extract_path('http://www.example.com:80/stu/index.html?name=xxx&age=25#teacher');-- /stu/index.html url_extract_port(url)→ bigint 描述:提取url中的端口。 select url_extract_port('http://www.example.com:80/stu/index.html?name=xxx&age=25#teacher');-- 80 url_extract_protocol(url)→ varchar 描述:提取url中的协议。 select url_extract_protocol('http://www.example.com:80/stu/index.html?name=xxx&age=25#teacher'); -- http url_extract_query(url)→ varchar 描述:提取url中的查询字符串。 select url_extract_query('http://www.example.com:80/stu/index.html?name=xxx&age=25#teacher'); -- name=xxx&age=25
  • 编码函数 url_encode(value) → varchar 描述:对value进行转义处理,以便可以安全地将其包含在URL查询参数名和值中: 字母字符不会被编码。 字符 ., -, * 和 _不会被编码。 ASCII空格字符会被编码为+ 。 所有其他字符都将转换为UTF-8,并且字节被编码为字符串%XX,其中XX是UTF-8字节的大写十六进制值。 select url_encode('http://www.example.com:80/stu/index.html?name=xxx&age=25#teacher'); -- http%3A%2F%2Fwww.example.com%3A80%2Fstu%2Findex.html%3Fname%3Dxxx%26age%3D25%23teacher url_decode(value) → varchar 描述:对value编码后的URL进行解码操作。 select url_decode('http%3A%2F%2Fwww.example.com%3A80%2Fstu%2Findex.html%3Fname%3Dxxx%26age%3D25%23teacher'); -- http://www.example.com:80/stu/index.html?name=xxx&age=25#teacher
  • UNNEST UNNEST可以将ARRAY或MAP展开成relation。 ARRAYS展开为单独一列,MAP展开为两列(key,value)。 UNNEST还可以与多个参数一起使用,将被展开成多列,行数与最高基数参数相同(其他列用空填充)。 UNNEST可以选择使用WITH ORDINALITY子句,在这种情况下,会在末尾添加一个额外的ORDINALITY列。 UNNEST通常与JOIN一起使用,可以引用JOIN左侧关系中的列。 使用单独一列 SELECT student, score FROM tests CROSS JOIN UNNEST(scores) AS t (score); 使用多个列 SELECT numbers, animals, n, a FROM ( VALUES (ARRAY[2, 5], ARRAY['chicken', 'cat', 'bird']), (ARRAY[7, 8, 9], ARRAY['cow', 'fish']) ) AS x (numbers, animals) CROSS JOIN UNNEST(numbers, animals) AS t (n, a); 父主题: HetuEngine DQL SQL语法说明
  • 操作步骤 以Hive客户端安装用户登录安装客户端的节点。 执行以下命令,切换到客户端安装目录,配置环境变量并认证用户。 cd 客户端安装目录 source bigdata_env kinit Hive业务用户(如果集群未开启Kerberos认证,请跳过该操作) 执行以下命令登录Hive客户端。 beeline 创建表时指定inputFormat和outputFormat: CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name [(col_name data_type [COMMENT col_comment], ...)] [ROW FORMAT row_format] STORED AS inputformat 'org.apache.hadoop.hive.contrib.fileformat.SpecifiedDelimiterInputFormat' outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'; 查询之前指定分隔符配置项: set hive.textinput.record.delimiter='!@!'; Hive会以‘!@!’为行分隔符查询数据。
  • 操作场景 通常情况下,Hive以文本文件存储的表会以回车作为其行分隔符,即在查询过程中,以回车符作为一行表数据的结束符。但某些数据文件并不是以回车分隔的规则文本格式,而是以某些特殊符号分隔其规则文本。 MRS Hive支持指定不同的字符或字符组合作为Hive文本数据的行分隔符,即在创建表的时候,指定inputformat为SpecifiedDelimiterInputFormat,然后在每次查询前,都设置如下参数来指定分隔符,就可以以指定的分隔符查询表数据。 set hive.textinput.record.delimiter=''; 当前版本的Hue组件,不支持导入文件到Hive表时设置多个分隔符。
  • 问题排查步骤 磁盘或其他存储介质问题导致merge过慢或者中止。 登录Manager页面,检查是否存在磁盘容量不足或其他磁盘告警,如果存在,请按照告警指导处理。 如果是磁盘容量不足,也可以联系客户删除部分过期数据,释放空间,快速恢复业务。 Zookeeper异常导致merge无法正常执行。 登录Manager页面,检查ZooKeeper是否存在服务不可用、ClickHouse服务在ZooKeeper的数量配额使用率超过阈值等相关告警,如果存在,请按照告警指导处理。 执行如下SQL排查是否存在副本同步队列任务积压: select FQDN() as node,type,count() from clusterAllReplicas(default_cluster, system.replication_queue) group by node,type; 如果存在积压,请查看副本队列中的任务是否报错,并根据报错信息处理。 执行如下SQL排查是否存在节点间表结构不一致。 select FQDN(), create_table_query from clusterAllReplicas(default_cluster,system.tables) where name = '${table_name}' group by FQDN(),create_table_query; 如果存在,请将不一致的表结构修改一致。 执行如下SQL排查是否存在mutation任务异常: select FQDN(), database, table, mutation_id, create_time, command from clusterAllReplicas(default_cluster, system.mutations) where is_done = '0' order by create_time asc; 如果mutation任务正常,等待mutation任务完成,如果mutation任务异常,清理异常的mutation任务。 业务写入压力过大导致merge速度小于insert速度。 可以用以下SQL语句检查报错节点最近一小时的写入条数和频次: select tables,written_rows,count() from system.query_log where type='QueryFinish' and query_start_time between (toUnixTimestamp(now()) - 3600) AND toUnixTimestamp(now()) and query_kind = 'Insert' group by tables,written_rows order by written_rows limit 10; 业务上建议一次写入一个分区,写入频率不要太快,不要小批量数据的插入,适当增大每次插入的时间间隔。 如果没有触发Merge,或者Merge较慢,需要调整参数加快Merge。 加速Merge,需要调整如下参数,请参考加速Merge操作: 配置项 参考值 max_threads CPU核数*2 background_pool_size CPU核数 merge_max_block_size 8192的整数倍,根据CPU内存资源大小调整 cleanup_delay_period 适当小于默认值 30
  • 字符串函数 这些函数假定输入字符串包含有效的UTF-8编码的Unicode代码点。不会显式检查UTF-8数据是否有效,对于无效的UTF-8数据,函数可能会返回错误的结果。可以使用from_utf8来更正无效的UTF-8数据。 此外,这些函数对Unicode代码点进行运算,而不是对用户可见的字符(或字形群集)进行运算。某些语言将多个代码点组合成单个用户感观字符(这是语言书写系统的基本单位),但是函数会将每个代码点视为单独的单位。 lower和upper函数不执行某些语言所需的区域设置相关、上下文相关或一对多映射。 chr(n) → varchar 描述:返回Unicode编码值为n的字符值。 select chr(100); --d char_length(string) → bigint 参考length(string) character_length(string) → bigint 参考length(string) codepoint(string) → integer 描述:返回单个字符对应的Unicode编码。 select codepoint('d'); --100 concat(string1, string2) → varchar 描述:字符串连接。 select concat('hello','world'); -- helloworld concat_ws(string0, string1, ..., stringN) → varchar 描述:将string1、string2、...,stringN,以string0作为分隔符串联成一个字符串。如果string0为null,则返回值为null。分隔符后的参数如果是NULL值,将会被跳过。 select concat_ws(',','hello','world'); -- hello,world select concat_ws(NULL,'def'); --NULL select concat_ws(',','hello',NULL,'world'); -- hello,world select concat_ws(',','hello','','world'); -- hello,,world concat_ws(string0, array(varchar)) → varchar 描述:将数组中的元素以string0为分隔符进行串联。如果string0为null,则返回值为null。数组中的任何null值都将被跳过。 select concat_ws(NULL,ARRAY['abc']);--NULL select concat_ws(',',ARRAY['abc',NULL,NULL,'xyz']); -- abc,xyz select concat_ws(',',ARRAY['hello','world']); -- hello,world decode(binary bin, string charset) →varchar 描述:根据给定的字符集将第一个参数编码为字符串,支持的字符集包括('UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16'),当第一个参数为null,将返回null。 select decode(X'70 61 6e 64 61','UTF-8'); _col0 ------- panda (1 row) select decode(X'00 70 00 61 00 6e 00 64 00 61','UTF-16BE'); _col0 ------- panda (1 row) encode(string str, string charset) →binary 描述:字符串按照给定的字符集进行编码。 select encode('panda','UTF-8'); _col0 ---------------- 70 61 6e 64 61 (1 row) find_in_set (string str, string strList) →int 描述:返回str在逗号分隔的strList中第一次出现的位置。当有参数为null时,返回值也为null。 select find_in_set('ab', 'abc,b,ab,c,def'); -- 3 format_number(number x, int d) →string 描述:将数字x格式化为'#,###,###.##',保留d位小数,以字符串的形式返回结果。 select format_number(541211.212,2); -- 541,211.21 format(format,args...) → varchar 描述:参见Format。 locate(string substr, string str, int pos) →int 描述:返回子串在字符串的第pos位后第一次出现的位置。没有满足条件的返回0。 select locate('aaa','bbaaaaa',6);-- 0 select locate('aaa','bbaaaaa',1);-- 3 select locate('aaa','bbaaaaa',4);-- 4 length(string) → bigint 描述:返回字符串的长度。 select length('hello');-- 5 levenshtein_distance(string1, string2) → bigint 描述:计算string1和string2的Levenshtein距离,即将string转为string2所需要的单字符编辑(包括插入、删除或替换)最少次数。 select levenshtein_distance('helo word','hello,world'); -- 3 hamming_distance(string1, string2) → bigint 描述:返回字符串1和字符串2的汉明距离,即对应位置字符不同的数量。 请注意,两个字符串的长度必须相同。 select hamming_distance('abcde','edcba');-- 4 instr(string,substring) → bigint 描述:查找substring在string中首次出现的位置。 select instr('abcde', 'cd');--3 levenshtein(string1, string2) → bigint参考levenshtein_distance(string1, string2) levenshtein_distance(string1, string2) → bigint 描述:返回字符串1和字符串2的Levenshtein编辑距离,即将字符串1更改为字符串2所需的最小单字符编辑(插入,删除或替换)次数。 select levenshtein_distance('apple','epplea');-- 2 lower(string) → varchar 描述:将字符转换为小写。 select lower('HELLo!');-- hello! lcase(string A) → varchar 描述:同lower(string)。 ltrim(string) → varchar 描述:去掉字符串开头的空格。 select ltrim(' hello');-- hello lpad(string, size, padstring) → varchar 描述:左填充字符串以使用padstring调整字符大小。如果size小于字符串的长度,则结果将被截断为size个字符。大小不能为负,并且填充字符串必须为非空。 select lpad('myk',5,'dodo'); -- domyk
  • HoodieDeltaStreamer流式写入 Hudi自带HoodieDeltaStreamer工具支持流式写入,也可以使用SparkStreaming以微批的方式写入。HoodieDeltaStreamer提供以下功能: 支持Kafka,DFS多种数据源接入 。 支持管理检查点、回滚和恢复,保证exactly once语义。 支持自定义转换操作。 示例: 准备配置文件kafka-source.properties #hudi配置 hoodie.datasource.write.recordkey.field=id hoodie.datasource.write.partitionpath.field=age hoodie.upsert.shuffle.parallelism=100 #hive config hoodie.datasource.hive_sync.table=hudimor_deltastreamer_partition hoodie.datasource.hive_sync.partition_fields=age hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor hoodie.datasource.hive_sync.use_jdbc=false hoodie.datasource.hive_sync.support_timestamp=true # Kafka Source topic hoodie.deltastreamer.source.kafka.topic=hudimor_deltastreamer_partition #checkpoint hoodie.deltastreamer.checkpoint.provider.path=hdfs://hacluster/tmp/huditest/hudimor_deltastreamer_partition # Kafka props # The kafka cluster we want to ingest from bootstrap.servers= xx.xx.xx.xx:xx auto.offset.reset=earliest #auto.offset.reset=latest group.id=hoodie-delta-streamer offset.rang.limit=10000 指定HoodieDeltaStreamer执行参数(具体参数配置,请查看官网https://hudi.apache.org/ )执行如下命令: spark-submit --master yarn --jars /opt/hudi-java-examples-1.0.jar // 指定spark运行时需要的hudi jars路径 --driver-memory 1g --executor-memory 1g --executor-cores 1 --num-executors 2 --conf spark.kryoserializer.buffer.max=128m --driver-class-path /opt/client/Hudi/hudi/conf:/opt/client/Hudi/hudi/lib/*:/opt/client/Spark2x/spark/jars/*:/opt/hudi-examples-0.6.1-SNAPSHOT.jar:/opt/hudi-examples-0.6.1-SNAPSHOT-tests.jar // 指定spark driver需要的hudi jars路径 --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer spark-internal --props file:///opt/kafka-source.properties // 指定配置文件,注意:使用yarn-cluster模式提交任务时,请指定配置文件路径为HDFS路径。 --target-base-path /tmp/huditest/hudimor1_deltastreamer_partition // 指定hudi表路径 --table-type MERGE_ON_READ // 指定要写入的hudi表类型 --target-table hudimor_deltastreamer_partition // 指定hudi表名 --source-ordering-field name // 指定hudi表预合并列 --source-class org.apache.hudi.utilities.sources.JsonKafkaSource // 指定消费的数据源为JsonKafkaSource, 该参数根据不同数据源指定不同的source类 --schemaprovider-class com.huaweixxx.bigdata.hudi.examples.DataSchemaProviderExample // 指定hudi表所需要的schema --transformer-class com.huaweixxx.bigdata.hudi.examples.TransformerExample // 指定如何处理数据源拉取来的数据,可根据自身业务需求做定制 --enable-hive-sync // 开启hive同步,同步hudi表到hive --continuous // 指定流处理模式为连续模式
  • HetuEngine交互查询引擎概述 HetuEngine能够支持多种数据源的快速联合查询并提供可视化的数据源配置、管理页面,用户可通过HSConsole界面快速添加数据源。 当前版本HetuEngine支持对接的数据源如下表所示。 表1 HetuEngine对接数据源一览表 HetuEngine模式 数据源 数据源模式 支持对接的数据源版本 安全模式 Hive 安全模式 MRS 3.x、FusionInsight 6.5.1 HBase MRS 3.x、FusionInsight 6.5.1 HetuEngine MRS 3.1.1及以后 GaussDB GaussDB 200、GaussDB A 8.0.0及以后 Hudi MRS 3.1.2及以后 ClickHouse MRS 3.1.1及以后 IoTDB MRS 3.2.0及以后 MySQL MySQL 5.7、MySQL 8.0及以后 Oracle Oracle 12及以后版本 GBase GBase8a V950及以后版本 普通模式 Hive 普通模式 MRS 3.x、FusionInsight 6.5.1 HBase MRS 3.x、FusionInsight 6.5.1 Hudi MRS 3.1.2及以后 ClickHouse MRS 3.1.1及以后 IoTDB MRS 3.2.0及以后 GaussDB 安全模式 GaussDB 200、GaussDB A 8.0.0及以后 MySQL MySQL 5.7、MySQL 8.0及以后 Oracle Oracle 12及以后版本 GBase GBase8a V950及以后版本 HetuEngine数据源的添加、配置、删除等操作支持动态生效,无须重启集群。 目前动态生效不支持关闭,数据源动态生效时间默认为60秒。如需修改动态生效时间,请参考3.e修改“coordinator.config.properties”和“worker.config.properties”中的参数“catalog.scanner-interval”值为需要设定的动态生效时间,例如: catalog.scanner-interval =120s HetuEngine支持查询下推(pushdown),它能把查询,或者部分查询,下推到连接的数据源。这意味着特殊的谓词,聚合函数或者其他一些操作,可以被传递到底层数据库或者文件系统进行处理。查询下推能带来以下好处: 提升整体的查询性能。 减少HetuEngine和数据源之间的网络流量。 减少远端数据源的负载。 HetuEngine对查询下推的具体支持情况,依赖于具体的Connector,以及Connector相关的底层数据源或存储系统。 数据源集群域名与HetuEngine集群域名不能相同,HetuEngine也不支持同时对接两个相同域名的数据源(Hive,Hbase,Hudi数据源)。 数据源集群与HetuEngine集群节点业务平面网络互通。 父主题: 使用HetuEngine
  • 问题 在Spark的spark-shell上执行如下代码失败: val acctId = List(("49562", "Amal", "Derry"), ("00000", "Fred", "Xanadu")) val rddLeft = sc.makeRDD(acctId) val dfLeft = rddLeft.toDF("Id", "Name", "City") //dfLeft.show val acctCustId = List(("Amal", "49562", "CO"), ("Dave", "99999", "ZZ")) val rddRight = sc.makeRDD(acctCustId) val dfRight = rddRight.toDF("Name", "CustId", "State") //dfRight.show val dfJoin = dfLeft.join(dfRight, dfLeft("Id") === dfRight("CustId"), "outer") dfJoin.show dfJoin.repartition(1).write.format("com.databricks.spark.csv").option("delimiter", "\t").option("header", "true").option("treatEmptyValuesAsNulls", "true").option("nullValue", "").save("/tmp/outputDir")
  • 字符类型 名称 描述 VARCHAR(n) 变长字符串,n指字节长度。 CHAR(n) 定长字符串,不足补空格。n是指字节长度,如不带精度n,默认为1。 VARBINARY 变长二进制数据。需要带上前缀X,如:X'65683F',暂不支持指定长度的二进制字符串。 JSON 取值可以是a JSON object、a JSON array、a JSON number、a JSON string、true、false or null。 STRING 兼容impala的String,底层是varchar。 BINARY 兼容hive的Binary,底层实现为varbinary。 SQL表达式中,支持简单的字符表达式,也支持Unicode方式,一个Unicode字符串是以U&为固定前缀,以4位数值表示的Unicode前需要加转义符。 -- 字符表达式 select 'hello,winter!'; _col0 ------------------ hello,winter! (1 row) -- Unicode 表达式 select U&'Hello winter \2603 !'; _col0 ------------------ Hello winter ☃ ! (1 row) -- 自定义转义符 select U&'Hello winter #2603 !' UESCAPE '#'; _col0 ------------------ Hello winter ☃ ! (1 row) VARBINARY与BINARY。 -- 创建VARBINARY类型或BINARY类型的表 create table binary_tb(col1 BINARY); -- 插入数据 INSERT INTO binary_tb values (X'63683F'); --查询数据 select * from binary_tb ; -- 63 68 3f 在做CHAR数值比较的时候,在对两个仅尾部空格数不同的CHAR进行比较时,会认为它们是相等的。 SELECT CAST('FO' AS CHAR(4)) = CAST('FO ' AS CHAR(5)); _col0 ------- true (1 row)
  • 时间和日期类型 时间和日期类型目前精确到毫秒。 表3 时间和日期类型 名称 描述 存储空间 DATE 日期和时间。仅支持ISO 8601格式:'2020-01-01' 32位 TIME 不带时区的时间(时、分、秒、毫秒) 例如:TIME '01:02:03.456' 64位 TIME WITH TIMEZONE 带时区的时间(时、分、秒、毫秒),时区用UTC值表示 例如:TIME '01:02:03.456 -08:00' 96位 TIMESTAMP 时间戳 64位 TIMESTAMP WITH TIMEZONE 带时区的时间戳 64位 INTERVAL YEAR TO MONTH 时间间隔字面量,年,月,格式: SY-M S:可选符号(+/-) Y:年数 M:月数 128位 INTERVAL DAY TO SECOND 时间间隔字面量,日,小时,分钟, 秒,精确到毫秒,格式:SD H:M:S.nnn S:可选符号(+/-) D:天数 M:分钟数 S:秒数 nnn:毫秒数 128位 示例: -- 查询日期 SELECT DATE '2020-07-08'; _col0 ------------ 2020-07-08 (1 row) -- 查询时间 SELECT TIME '23:10:15'; _col0 -------------- 23:10:15 (1 row) SELECT TIME '01:02:03.456 -08:00'; _col0 -------------- 01:02:03.456-08:00 (1 row) -- 时间间隔用法 SELECT TIMESTAMP '2015-10-18 23:00:15' + INTERVAL '3 12:15:4.111' DAY TO SECOND; _col0 ------------------------- 2015-10-22 11:15:19.111 (1 row) SELECT TIMESTAMP '2015-10-18 23:00:15' + INTERVAL '3-1' YEAR TO MONTH; _col0 ------------------------- 2018-11-18 23:00:15 (1 row) select INTERVAL '3' YEAR + INTERVAL '2' MONTH ; _col0 ------- 3-2 (1 row) select INTERVAL '1' DAY+INTERVAL '2' HOUR +INTERVAL '3' MINUTE +INTERVAL '4' SECOND ; _col0 ---------------- 1 02:03:04.000 (1 row)
共100000条