华为云用户手册

  • 导入数据 GeoHash默认实现类扩展自定义索引抽象类。如果没有配置handler属性为自定义的实现类,则使用默认的实现类。用户可以通过扩展默认实现类来挂载geohash的自定义实现类。自定义索引抽象类方法包括: Init方法,用来提取、验证和存储handler属性。在失败时发生异常,并显示错误信息。 Generate方法,用来生成索引。它为每行数据生成一个索引数据。 Query方法,用来对给定输入生成索引值范围列表。 导入命令同普通Carbon表: LOAD DATA inpath '/tmp/geosotdata.csv' INTO TABLE geosot OPTIONS ('DELIMITER'= ','); LOAD DATA inpath '/tmp/geosotdata2.csv' INTO TABLE geosot OPTIONS ('DELIMITER'= ','); geosotdata.csv和geosotdata2.csv表请参考准备数据。
  • 快速示例 create table IF NOT EXISTS carbonTable ( COLUMN1 BIGINT, LONGITUDE BIGINT, LATITUDE BIGINT, COLUMN2 BIGINT, COLUMN3 BIGINT ) STORED AS carbondata TBLPROPERTIES ('SPATIAL_INDEX.mygeohash.type'='geohash','SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude','SPATIAL_INDEX.mygeohash.originLatitude'='39.850713','SPATIAL_INDEX.mygeohash.gridSize'='50','SPATIAL_INDEX.mygeohash.minLongitude'='115.828503','SPATIAL_INDEX.mygeohash.maxLongitude'='720.000000','SPATIAL_INDEX.mygeohash.minLatitude'='39.850713','SPATIAL_INDEX.mygeohash.maxLatitude'='720.000000','SPATIAL_INDEX'='mygeohash','SPATIAL_INDEX.mygeohash.conversionRatio'='1000000','SORT_COLUMNS'='column1,column2,column3,latitude,longitude');
  • 配置场景 当前Spark SQL执行一个查询时需要使用大量的内存,尤其是在做聚合(Aggregate)和关联(Join)操作时,此时如果内存有限的情况下就很容易出现OutOfMemoryError。有限内存下的稳定性就是确保在有限内存下依然能够正确执行相关的查询,而不出现OutOfMemoryError。 有限内存并不意味着内存无限小,它只是在内存不足于放下大于内存可用总量几倍的数据时,通过利用磁盘来做辅助从而确保查询依然稳定执行,但依然有一些数据是必须留在内存的,如在做涉及到Join的查询时,对于当前用于Join的相同key的数据还是需要放在内存中,如果该数据量较大而内存较小依然会出现OutOfMemoryError。 有限内存下的稳定性涉及到3个子功能: ExternalSort 外部排序功能,当执行排序时如果内存不足会将一部分数据溢出到磁盘中。 TungstenAggregate 新Hash聚合功能,默认对数据调用外部排序进行排序,然后再进行聚合,因此内存不足时在排序阶段会将数据溢出到磁盘,在聚合阶段因数据有序,在内存中只保留当前key的聚合结果,使用的内存较小。 SortMergeJoin、SortMergeOuterJoin 基于有序数据的等值连接。该功能默认对数据调用外部排序进行排序,然后再进行等值连接,因此内存不足时在排序阶段会将数据溢出到磁盘,在连接阶段因数据有序,在内存中只保留当前相同key的数据,使用的内存较小。
  • 配置描述 参数入口: 在应用提交时通过“--conf”设置这些参数,或者在客户端的“spark-defaults.conf”配置文件中调整如下参数。 表1 参数说明 参数 场景 描述 默认值 spark.sql.tungsten.enabled / 类型为Boolean。 当设置的值等于true时,表示开启tungsten功能,即逻辑计划等同于开启codegeneration,同时物理计划使用对应的tungsten执行计划。 当设置的值等于false时,表示关闭tungsten功能。 true spark.sql.codegen.wholeStage 类型为Boolean。 当设置的值等于true时,表示开启codegeneration功能,即运行时对于某些特定的查询将动态生成各逻辑计划代码。 当设置的值等于false时,表示关闭codegeneration功能,运行时使用当前已有静态代码。 true 开启ExternalSort除配置spark.sql.planner.externalSort=true外,还需配置spark.sql.unsafe.enabled=false或者spark.sql.codegen.wholeStage =false。 如果您需要开启TungstenAggregate,有如下几种方式: 将spark.sql.codegen.wholeStage 和spark.sql.unsafe.enabled的值都设置为true(通过配置文件或命令行方式设置)。 如果spark.sql.codegen.wholeStage 和spark.sql.unsafe.enabled都不为true或者其中一个不为true,只要spark.sql.tungsten.enabled的值设置为true时,TungstenAggregate会开启。
  • Hive SQL扩展语法说明 Hive SQL支持Hive-3.1.0版本中的所有特性,详情请参见https://cwiki.apache.org/confluence/display/hive/languagemanual。 系统提供的扩展Hive语句如表1所示。 表1 扩展Hive语句 扩展语法 语法说明 语法示例 示例说明 CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name (col_name data_type [COMMENT col_comment], ...) [ROW FORMAT row_format] [STORED AS file_format] | STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...) ] ...... [TBLPROPERTIES ("groupId"=" group1 ","locatorId"="locator1")] ...; 创建一个hive表,并指定表数据文件分布的locator信息。详细说明请参见使用HDFS Colocation存储Hive表。 CREATE TABLE tab1 (id INT, name STRING) row format delimited fields terminated by '\t' stored as RCFILE TBLPROPERTIES("groupId"=" group1 ","locatorId"="locator1"); 创建表tab1,并指定tab1的表数据分布在locator1节点上。 CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name (col_name data_type [COMMENT col_comment], ...) [ROW FORMAT row_format] [STORED AS file_format] | STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...) ] ... [TBLPROPERTIES ('column.encode.columns'='col_name1,col_name2'| 'column.encode.indices'='col_id1,col_id2','column.encode.classname'='encode_classname')]...; 创建一个hive表,并指定表的加密列和加密算法。详细说明请参见使用Hive列加密功能。 create table encode_test(id INT, name STRING, phone STRING, address STRING) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' WITH SERDEPROPERTIES ('column.encode.indices'='2,3', 'column.encode.classname'='org.apache.hadoop.hive.serde2. SMS 4Rewriter') STORED AS TEXTFILE; 创建表encode_test,并指定插入数据时对第2、3列加密,加密算法类为org.apache.hadoop.hive.serde2.SMS4Rewriter。 REMOVE TABLE hbase_tablename [WHERE where_condition]; 删除hive on hbase表中符合条件的数据。详细说明请参见删除Hive on HBase表中的单行记录。 remove table hbase_table1 where id = 1; 删除表中符合条件“id =1”的数据。 CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name (col_name data_type [COMMENT col_comment], ...) [ROW FORMAT row_format] STORED AS inputformat 'org.apache.hadoop.hive.contrib.fileformat.SpecifiedDelimiterInputFormat' outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'; 创建hive表,并设定表可以指定自定义行分隔符。详细说明请参见自定义行分隔符。 create table blu(time string, num string, msg string) row format delimited fields terminated by ',' stored as inputformat 'org.apache.hadoop.hive.contrib.fileformat.SpecifiedDelimiterInputFormat' outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'; 创建表blu,指定inputformat为SpecifiedDelimiterInputFormat,以便查询时可以指定表的查询行分隔符。 父主题: Hive常见SQL语法参考
  • 基于索引查询数据 在具有索引的用户表中,可以使用Filter来查询数据。对于创建单索引和组合索引的用户表,使用过滤器查询的结果与没有使用索引的表相同,但数据查询性能高于没有使用索引的表。 索引的使用规则如下: 对于为一个或多个列创建单个索引的情况: 当将此列用于AND或OR查询筛选时,使用索引可以提高查询性能。 例如,Filter_Condition(IndexCol1)AND / OR Filter_Condition(IndexCol2)。 当在查询中使用“索引列和非索引列”进行过滤时,此索引可以提高查询性能。 例如,Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2)AND Filter_Condition(NonIndexCol1)。 当在查询中使用“索引列或非索引列”进行筛选时,但不使用索引,查询性能不会提高。 例如,Filter_Condition(IndexCol1)AND / OR Filter_Condition(IndexCol2) OR Filter_Condition(NonIndexCol1)。 对于为多个列创建组合索引的情况: 当用于查询的列是组合索引的全部或部分列并且与组合索引具有相同的顺序时,使用索引会提高查询性能。 例如,为C1,C2和C3创建组合索引。 该索引在以下情况下生效: Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2)AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2) FILTER_CONDITION(IndexCol1) 该索引在下列情况下不生效: Filter_Condition(IndexCol2)AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol3) FILTER_CONDITION(IndexCol2) FILTER_CONDITION(IndexCol3) 当在查询中使用“索引列和非索引列”进行过滤时,使用索引可提高查询性能。 例如: Filter_Condition(IndexCol1)AND Filter_Condition(NonIndexCol1) Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2)AND Filter_Condition(NonIndexCol1) 当在查询中使用“索引列或非索引列”进行筛选时,但不使用索引,查询性能不会提高。 例如: Filter_Condition(IndexCol1)OR Filter_Condition(NonIndexCol1) (Filter_Condition(IndexCol1)AND Filter_Condition(IndexCol2))OR(Filter_Condition(NonIndexCol1)) 当多个列用于查询时,只能为组合索引中的最后一列指定值范围,而其他列只能设置为指定值。 例如,为C1,C2和C3创建组合索引。在范围查询中,只能为C3设置数值范围,过滤条件为“C1 = XXX,C2 = XXX,C3 = 数值范围”。
  • 相关接口 使用HIndex的API都在类org.apache.hadoop.hbase.hindex.client.HIndexAdmin中,相关接口介绍如下: 操作 接口 描述 注意事项 添加索引 addIndices() 将索引添加到没有数据的表中。调用此接口会将用户指定的索引添加到表中,但会跳过生成索引数据。因此,在此操作之后,索引不能用于scan/filter操作。它的使用场景为用户想要在具有大量预先存在用户数据的表上批量添加索引,其具体操作为使用诸如TableIndexer工具之类的外部工具来构建索引数据。 索引一旦添加则不能修改。若要修改,则需先删除旧的索引然后重新创建。 用户应注意不要在具有不同索引名称的相同列上创建两个索引。如果这样做,将会导致存储和处理的浪费。 索引不能添加到系统表中。 向索引列put数据时不支持append和increment操作。 如果客户端出现任何故障,除非发生DoNotRetryIOException,否则用户应该重试。 索引列族根据可用性按顺序从以下条件中选择: 默认索引列族“d”或者如果设置了属性“hindex.default.family.name”的值,则以该值为准。 符号#,@,$或% #0,@ 0,$ 0,%0,#1,@ 1 ...上至#255,@ 255,$ 255,%255 throw Exception 可以通过HIndex TableIndexer工具添加索引而无需建立索引数据。 addIndicesWithData() 将索引添加到有数据的表中。此方法将用户指定的索引添加到表中,并会对已经存在的用户数据创建对应的索引数据,也可先调用该方法生成索引再在存入用户数据的同时生成索引数据。在此操作之后,这些索引立即可用于scan/filter操作。 删除索引 dropIndices() 仅删除索引。该API从表中删除用户指定的索引,但跳过相应的索引数据。在此操作之后,索引不能用于scan/filter操作。集群在major compaction期间会自动删除旧的索引数据。 此API使用场景为表中包含大量索引数据且dropIndicesWithData()不可行。另外,用户也可以通过TableIndexer工具删除索引以及索引数据。 在索引的状态为ACTIVE,INACTIVE和DROPPING时,允许禁用索引的操作。 对于使用dropIndices()删除索引的操作,用户必须确保在将索引添加到具有相同索引名的表之前,相应的索引数据已被删除(即major compaction已完成)。 用户删除相应的索引会删除: 一个带有索引的列族。 组合索引所有列族中的任一个列族。 索引可以通过HIndex TableIndexer工具与索引数据一起删除。 dropIndicesWithData() 删除索引数据。此API删除用户指定的索引,并删除用户表中与这些索引对应的所有索引数据。在此操作之后,删除的索引完全从表中删除,不再可用于scan/filter操作。 启用/禁用索引 disableIndices() 该API禁用所有用户指定的索引,使其不再可用于scan/filter操作。 在索引的状态为ACTIVE,INACTIVE和BUILDING时允许启用索引的操作。 在索引的状态为ACTIVE和INACTIVE时允许禁用索引操作。 在禁用索引之前,用户必须确保索引数据与用户数据一致。如果在索引处于禁用状态期间没有在表中添加新的数据,索引数据与用户数据将保持一致。 启用索引时,可以通过使用TableIndexer工具构建索引来保证数据一致性。 enableIndices() 该API启用所有用户指定的索引,使其可用于scan/filter操作。 查看已创建的索引 listIndices() 该API可用于列出给定表中的所有索引。 无
  • 前提条件 ClickHouse服务运行正常,Zookeeper服务运行正常,迁入、迁出节点的ClickHouseServer实例状态正常。 请确保迁入节点已有待迁移数据表,且确保该表是MergeTree系列引擎的分区表。 创建迁移任务前请确保所有对待迁移数据表的写入任务已停止,且任务启动后,只允许对待迁移数据表进行查询操作,禁止对该表进行写入、删除等操作,否则可能会造成迁移前后数据不一致。 迁入节点的ClickHouse数据目录有足够的空间。
  • 访问作业浏览器 访问Hue WebUI,请参考访问Hue WebUI界面。 单击作业。 默认显示当前集群的所有作业。 作业浏览器显示的数字表示集群中所有作业的总数。 “作业浏览器”将显示作业以下信息: 表1 MRS 作业属性介绍 属性名 描述 名称 表示作业的名称。 用户 表示启动该作业的用户。 类型 表示作业的类型。 状态 表示作业的状态,包含“成功”、“正在运行”、“失败”。 进度 表示作业运行进度。 组 表示作业所属组。 开始 表示作业开始时间。 持续时间 表示作业运行使用的时间。 Id 表示作业的编号,由系统自动生成。 如果MRS集群安装了Spark组件,则默认会启动一个作业“Spark-JD BCS erver”,用于执行任务。
  • 操作步骤 并行度可以通过如下三种方式来设置,用户可以根据实际的内存、CPU、数据以及应用程序逻辑的情况调整并行度参数。 在会产生shuffle的操作函数内设置并行度参数,优先级最高。 testRDD.groupByKey(24) 在代码中配置“spark.default.parallelism”设置并行度,优先级次之。 val conf = new SparkConf() conf.set("spark.default.parallelism", 24) 在“$SPARK_HOME/conf/spark-defaults.conf”文件中配置“spark.default.parallelism”的值,优先级最低。 spark.default.parallelism 24
  • 配置自定义rowkey实现 使用BulkLoad工具批量导入HBase数据时,支持用户自定义的组合rowkey实现。用户可编写rowkey实现代码,导入时根据该代码逻辑进行组合rowkey导入。 配置自定义rowkey实现步骤如下: 用户编写自定义rowkey的实现类,需要继承接口,该接口所在的Jar包路径为“客户端安装目录/HBase/hbase/lib/hbase-it-bulk-load-*.jar”: [com.huawei.hadoop.hbase.tools.bulkload.RowkeyHandlerInterface], 实现接口中方法: byte[] getRowkeyBytes(String[] colsValues, RegulationDomain regulation) 其中: 传入参数“colsValues”为原始数据中的一行数据集合,每个元素为一列。 传入参数“regulation”为配置导入文件信息(一般情况下并不需要使用)。 将该实现类与其依赖包同时打包成Jar文件,保存到HBase客户端所在节点的任意位置并确保执行命令的用户具有读取和执行该Jar包的权限。 在执行导入命令时,增加两个参数配置项: -Dimport.rowkey.jar="第二步中Jar包的全路径" -Dimport.rowkey.class="用户实现类的全类名"
  • 配置场景 当Spark2x Web UI中有一些不允许其他用户看到的数据时,用户可能想对UI进行安全防护。用户一旦登录,Spark2x可以比较与这个用户相对应的视图ACLs来确认是否授权用户访问 UI。 Spark2x存在两种类型的Web UI,一种为运行中任务的Web UI,可以通过Yarn原生页面的应用链接或者REST接口访问。一种为已结束任务的Web UI,可以通过Spark2x JobHistory服务或者REST接口访问。 本章节仅支持安全模式(开启了Kerberos认证)集群。 运行中任务Web UI ACL配置。 运行中的任务,可通过服务端对如下参数进行配置。 “spark.admin.acls”:指定Web UI的管理员列表。 “spark.admin.acls.groups”:指定管理员组列表。 “spark.ui.view.acls”:指定yarn界面的访问者列表。 “spark.modify.acls.groups”:指定yarn界面的访问者组列表。 “spark.modify.acls”:指定Web UI的修改者列表。 “spark.ui.view.acls.groups”:指定Web UI的修改者组列表。 运行结束后Web UI ACL配置。 运行结束的任务通过客户端的参数“spark.history.ui.acls.enable”控制是否开启ACL访问权限。 如果开启了ACL控制,由客户端的“spark.admin.acls”和“spark.admin.acls.groups”配置指定Web UI的管理员列表和管理员组列表,由客户端的“spark.ui.view.acls”和“spark.modify.acls.groups”配置指定查看Web UI任务明细的访问者列表和组列表,由客户端的“spark.modify.acls”和“spark.ui.view.acls.groups”配置指定修改Web UI任务明细的访问者列表和组列表。
  • 操作场景 在启用Kerberos认证的集群中,用户使用Kafka前需要拥有对应的权限。MRS集群支持将Kafka的使用权限,授予不同用户。 Kafka默认用户组如表1所示。 在MRS 3.x及之后版本中,Kafka支持两种鉴权插件:“Kafka开源自带鉴权插件”和“Ranger鉴权插件”。 本章节描述的是基于“Kafka开源自带鉴权插件”的用户权限管理。若想使用 “Ranger鉴权插件”,请参考添加Kafka的Ranger访问权限策略。 表1 Kafka默认用户组 用户组名称 描述 kafkaadmin Kafka管理员用户组。添加入本组的用户,拥有所有主题的创建,删除,授权及读写权限。 kafkasuperuser Kafka高级用户组。添加入本组的用户,拥有所有主题的读写权限。 kafka Kafka普通用户组。添加入本组的用户,需要被kafkaadmin组用户授予特定主题的读写权限,才能访问对应主题。
  • 参考实例 Flume配置参考示例(SpoolDir--Mem--Hive): server.sources = spool_source server.channels = mem_channel server.sinks = Hive_Sink #config the source server.sources.spool_source.type = spooldir server.sources.spool_source.spoolDir = /tmp/testflume server.sources.spool_source.montime = server.sources.spool_source.fileSuffix =.COMPLETED server.sources.spool_source.deletePolicy = never server.sources.spool_source.trackerDir =.flumespool server.sources.spool_source.ignorePattern = ^$ server.sources.spool_source.batchSize = 20 server.sources.spool_source.inputCharset =UTF-8 server.sources.spool_source.selector.type = replicating server.sources.spool_source.fileHeader = false server.sources.spool_source.fileHeaderKey = file server.sources.spool_source.basenameHeaderKey= basename server.sources.spool_source.deserializer = LINE server.sources.spool_source.deserializer.maxBatchLine= 1 server.sources.spool_source.deserializer.maxLineLength= 2048 server.sources.spool_source.channels = mem_channel #config the channel server.channels.mem_channel.type = memory server.channels.mem_channel.capacity =10000 server.channels.mem_channel.transactionCapacity= 2000 server.channels.mem_channel.channelfullcount= 10 server.channels.mem_channel.keep-alive = 3 server.channels.mem_channel.byteCapacity = server.channels.mem_channel.byteCapacityBufferPercentage= 20 #config the sink server.sinks.Hive_Sink.type = hive server.sinks.Hive_Sink.channel = mem_channel server.sinks.Hive_Sink.hive.metastore = thrift://${任意metastore业务IP}:21088 server.sinks.Hive_Sink.hive.hiveSite = /opt/hivesink-conf/hive-site.xml server.sinks.Hive_Sink.hive.coreSite = /opt/hivesink-conf/core-site.xml server.sinks.Hive_Sink.hive.metastoreSite = /opt/hivesink-conf/hivemeatastore-site.xml server.sinks.Hive_Sink.hive.database = default server.sinks.Hive_Sink.hive.table = flume_multi_type_part server.sinks.Hive_Sink.hive.partition = Tag,%Y-%m,%d server.sinks.Hive_Sink.hive.txnsPerBatchAsk= 100 server.sinks.Hive_Sink.hive.autoCreatePartitions= true server.sinks.Hive_Sink.useLocalTimeStamp = true server.sinks.Hive_Sink.batchSize = 1000 server.sinks.Hive_Sink.hive.kerberosPrincipal= super1 server.sinks.Hive_Sink.hive.kerberosKeytab= /opt/mykeytab/user.keytab server.sinks.Hive_Sink.round = true server.sinks.Hive_Sink.roundValue = 10 server.sinks.Hive_Sink.roundUnit = minute server.sinks.Hive_Sink.serializer = DELIMITED server.sinks.Hive_Sink.serializer.delimiter= ";" server.sinks.Hive_Sink.serializer.serdeSeparator= ';' server.sinks.Hive_Sink.serializer.fieldnames= id,msg
  • 配置描述 参考修改集群服务配置参数进入Yarn服务参数“全部配置”界面,在搜索框中输入参数名称。 表1 参数说明 参数 描述 默认值 yarn.nodemanager.vmem-check-enabled 是否进行虚拟内存检测的开关。如果任务使用的内存量超出分配值,则直接将任务强制终止。 设置为true时,进行虚拟内存检测; 设置为false时,不进行虚拟内存检测。 MRS 3.x之前的版本集群:false MRS 3.x及后续版本集群:true yarn.nodemanager.pmem-check-enabled 是否进行物理内存检测的开关。如果任务使用的内存量超出分配值,则直接将任务强制终止。 设置为true时,进行物理内存检测; 设置为false时,不进行物理内存检测。 true
  • 操作场景 用户可以通过KafkaUI查看Topic详情、修改Topic Configs、增加Topic分区个数、删除Topic,并可实时查看不同时段的生产数据条数。 安全模式下,KafkaUI对查看Topic详情操作不作鉴权处理,即任何用户都可以查询Topic信息;对于修改Topic Configs、增加Topic分区个数、删除Topic场景,需保证KafkaUI登录用户属于“kafkaadmin”用户组或者单独给用户授予对应操作权限,否则将会鉴权失败。 非安全模式下,KafkaUI对所有操作不作鉴权处理。 本章节内容仅适用于MRS 3.1.2及之后版本。
  • 使用示例 --在default数据库和default_cluster集群下创建名为test表 CREATE TABLE default.test ON CLUSTER default_cluster ( `EventDate` DateTime, `id` UInt64 ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/test', '{replica}') PARTITION BY toYYYYMM(EventDate) ORDER BY id
  • 基本语法 方法一:在指定的“database_name”数据库中创建一个名为“table_name ”的表。 如果建表语句中没有包含“database_name”,则默认使用客户端登录时选择的数据库作为数据库名称。 CREATE TABLE [IF NOT EXISTS] [database_name.]table_name [ON CLUSTER ClickHouse集群名] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = engine_name() [PARTITION BY expr_list] [ORDER BY expr_list] ClickHouse在创建表时建议携带PARTITION BY创建表分区。因为ClickHouse数据迁移工具是基于表的分区进行数据迁移,在创建表时如果不携带PARTITION BY创建表分区,则在集群内ClickHouseServer节点间数据迁移界面无法对该表进行数据迁移。 方法二:创建一个与database_name2.table_name2具有相同结构的表,同时可以对其指定不同的表引擎声明。 如果没有表引擎声明,则创建的表将与database_name2.table_name2使用相同的表引擎。 CREATE TABLE [IF NOT EXISTS] [database_name.]table_name AS [database_name2.]table_name2 [ENGINE = engine_name] 方法三:使用指定的引擎创建一个与SELECT子句的结果具有相同结构的表,并使用SELECT子句的结果填充它。 CREATE TABLE [IF NOT EXISTS] [database_name.]table_name ENGINE = engine_name AS SELECT ...
  • 前提条件 MRS集群管理员已明确业务需求,并准备一个Kafka管理员用户(属于kafkaadmin组,普通模式不需要)。 已安装Kafka客户端,客户端安装目录如“/opt/client”。 本示例需创建两个Topic,可参考7,分别命名为“test_2”和“test_3”,并创建“move-kafka-topic.json”文件,创建路径如“/opt/client/Kafka/kafka”,Topic格式内容如下: { "topics": [{"topic":"test_2"},{"topic":"test_3"}], "version":1 }
  • 操作步骤 以客户端安装用户,登录安装Kafka客户端的节点。 切换到Kafka客户端安装目录。 cd /opt/client 执行以下命令,配置环境变量。 source bigdata_env 执行以下命令,进行用户认证。(普通模式跳过此步骤) kinit 组件业务用户 执行以下命令进入Kafka客户端的bin目录。 cd Kafka/kafka/bin 执行以下命令生成执行计划。 ./kafka-reassign-partitions.sh --zookeeper 172.16.0.119:2181/kafka --topics-to-move-json-file ../move-kafka-topic.json --broker-list "1,2,3" --generate 172.16.0.119:ZooKeeper实例的业务IP。 --broker-list "1,2,3":参数中的“1,2,3”为扩容后的所有broker_id。 执行vim ../reassignment.json创建“reassignment.json”文件并保存,保存路径为“/opt/kafkaclient/Kafka/kafka”。 拷贝6中生成的“Proposed partition reassignment configuration”下的内容至“reassignment.json”文件,如下所示: {"version":1,"partitions":[{"topic":"test","partition":4,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":1,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"test","partition":3,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"test","partition":0,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"test","partition":2,"replicas":[2,1],"log_dirs":["any","any"]}]} 执行以下命令进行分区重分布。 ./kafka-reassign-partitions.sh --zookeeper 172.16.0.119:2181/kafka --reassignment-json-file ../reassignment.json --execute --throttle 50000000 --throttle 50000000:限制网络带宽为50MB。带宽可根据数据量大小及客户对均衡时间的要求进行调整,5TB数据量,使用50MB带宽,均衡时长约8小时。 执行以下命令查看迁移状态。 ./kafka-reassign-partitions.sh --zookeeper 172.16.0.119:2181/kafka --reassignment-json-file ../reassignment.json --verify
  • 基础操作 使用root用户登录集群客户端节点,执行如下命令: cd {客户端安装目录} source bigdata_env source Hudi/component_env kinit 创建的用户 执行hudi-cli.sh进入Hudi客户端, cd {客户端安装目录}/Hudi/hudi/bin/ ./hudi-cli.sh 即可执行各种Hudi命令,执行示例(仅部分命令,全部命令请参考Hudi官网:https://hudi.apache.org/docs/quick-start-guide/): 查看帮助: help //查看hudi-cli的所有命令 help 'command' //查看某一个命令的帮助及参数列表。 连接表: connect --path '/tmp/huditest/test_table' 查看表信息: desc 查看compaction计划: compactions show all 查看clean计划: cleans show 执行clean: cleans run 查看commit信息: commits show 查看commit写入的分区: commit showpartitions --commit 20210127153356 20210127153356表示commit的时间戳,下同。 查看指定commit写入的文件: commit showfiles --commit 20210127153356 比较两个表的commit信息差异: commits compare --path /tmp/hudimor/mytest100 rollback指定提交(rollback每次只允许rollback最后一次commit): commit rollback --commit 20210127164905 compaction调度: compaction schedule --hoodieConfigs 'hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.BoundedIOCompactionStrategy,hoodie.compaction.target.io=1,hoodie.compact.inline.max.delta.commits=1' 执行compaction compaction run --parallelism 100 --sparkMemory 1g --retry 1 --compactionInstant 20210602101315 --hoodieConfigs 'hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.BoundedIOCompactionStrategy,hoodie.compaction.target.io=1,hoodie.compact.inline.max.delta.commits=1' --propsFilePath hdfs://hacluster/tmp/default/tb_test_mor/.hoodie/hoodie.properties --schemaFilePath /tmp/default/tb_test_mor/.hoodie/compact_tb_base.json 创建savepoint savepoint create --commit 20210318155750 回滚指定的savepoint savepoint rollback --savepoint 20210318155750 若commit写入导致元数据冲突异常,执行commit rollback、savepoint rollback能回退数据,但不能回退Hive元数据,只能删除Hive表然后手动进行同步刷新。 commit rollback只能回退当前最新的一个commit,savepoint rollback只能回退到最新的一个savepoint。二者均不能随意指定进行回退。
  • Flume模块介绍 Flume客户端/服务端由一个或多个Agent组成,而每个Agent是由Source、Channel、Sink三个模块组成,数据先进入Source然后传递到Channel,最后由Sink发送到下一个Agent或目的地(客户端外部)。各模块说明见表1。 表1 模块说明 名称 说明 Source Source负责接收数据或产生数据,并将数据批量放到一个或多个Channel。Source有两种类型:数据驱动和轮询。 典型的Source样例如下: 和系统集成并接收数据的Sources:Syslog、Netcat。 自动生成事件数据的Sources:Exec、SEQ。 用于Agent和Agent之间通信的IPC Sources:Avro。 Source必须至少和一个Channel关联。 Channel Channel位于Source和Sink之间,用于缓存Source传递的数据,当Sink成功将数据发送到下一跳的Channel或最终数据处理端,缓存数据将自动从Channel移除。 不同类型的Channel提供的持久化水平也是不一样的: Memory Channel:非持久化 File Channel:基于预写式日志(Write-Ahead Logging,简称WAL)的持久化实现 JDBC Channel:基于嵌入Database的持久化实现 Channel支持事务特性,可保证简易的顺序操作,同时可以配合任意数量的Source和Sink共同工作。 Sink Sink负责将数据传输到下一跳或最终目的,成功完成后将数据从Channel移除。 典型的Sink样例如下: 存储数据到最终目的终端Sink,比如:HDFS、Kafka 自动消耗的Sinks,比如:Null Sink 用于Agent和Agent之间通信的IPC sink:Avro Sink必须关联到一个Channel。 每个Flume的Agent可以配置多个Source、Channel、Sink模块,即一个Source将数据发送给多个Channel,再由多个Sink发送到下一个Agent或目的地。 Flume支持多个Flume配置级联,即上一个Agent的Sink将数据再发送给另一个Agent的Source。
  • 补充说明 Flume可靠性保障措施。 Source与Channel、Channel与Sink之间支持事务机制。 Sink Processor支持配置failover、load_balance机制。 例如load_balance示例如下: server.sinkgroups=g1 server.sinkgroups.g1.sinks=k1 k2 server.sinkgroups.g1.processor.type=load_balance server.sinkgroups.g1.processor.backoff=true server.sinkgroups.g1.processor.selector=random Flume多客户端聚合级联时的注意事项。 级联时需要走Avro或者Thrift协议进行级联。 聚合端存在多个节点时,连接配置尽量配置均衡,不要聚合到单节点上。 Flume客户端可以包含多个独立的数据流,即在一个配置文件properties.properties中配置多个Source、Channel、Sink。这些组件可以链接以形成多个流。 例如在一个配置中配置两个数据流,示例如下: server.sources = source1 source2 server.sinks = sink1 sink2 server.channels = channel1 channel2 #dataflow1 server.sources.source1.channels = channel1 server.sinks.sink1.channel = channel1 #dataflow2 server.sources.source2.channels = channel2 server.sinks.sink2.channel = channel2
  • 操作场景 集群的资源竞争场景如下: 提交两个低优先级的应用Job 1和Job 2。 正在运行中的Job 1和Job 2有部分task处于running状态,但由于集群或队列资源容量有限,仍有部分task未得到资源而处于pending状态。 提交一个较高优先级的应用Job 3,此时会出现如下资源分配情况:当Job 1和Job 2中running状态的task运行结束并释放资源后,Job 3中处于pending状态的task将优先得到这部分新释放的资源。 Job 3完成后,资源释放给Job 1、Job 2继续执行。 用户可以在YARN中配置任务的优先级。任务优先级是通过ResourceManager的调度器实现的。
  • 元数据管理器使用介绍 访问Hue WebUI,请参考访问Hue WebUI界面。 查看Hive表的元数据 在左侧导航栏单击表,单击某一表名称,界面将显示Hive表的元数据信息。 管理Hive表的元数据 在Hive表的元数据信息界面: 单击右上角的“导入”可导入数据。 单击“概述”,在“属性”域可查看表文件的位置信息。 可查看Hive表各列字段的信息,并手动添加描述信息,注意此处添加的描述信息并不是Hive表中的字段注释信息(comment)。 单击“样本”可浏览数据。 管理Hive元数据表 单击左侧列表中的可在数据库中根据上传的文件创建一个新表,也可手动创建一个新表。 Hue界面主要用于文件、表等数据的查看与分析,禁止通过Hue界面对操作对象进行删除等高危管理操作。如需操作,建议在确认对业务没有影响后通过各组件的相应操作方法进行处理,例如使用HDFS客户端对HDFS文件进行操作,使用Hive客户端对Hive表进行操作。
  • 上传UDF 访问Flink WebUI,请参考访问FlinkServer WebUI界面。 单击“UDF管理”进入UDF管理页面。 单击“添加UDF”,在“本地Jar文件”参数后选择并上传本地已准备好的UDF jar文件。 填写UDF名称以及描述信息后,单击“确定”。 “UDF名称”最多可添加10项,“名称”可自定义,“类名”需与上传的UDF jar文件中UDF函数全限定类名一一对应。 上传UDF jar文件后,服务器默认保留5分钟,5分钟内单击确定则完成UDF创建,超时后单击确定则创建UDF失败并弹出错误提示:本地UDF文件路径有误。 在UDF列表中,可查看当前应用内所有的UDF信息。可在对应UDF信息的“操作”列编辑或删除UDF信息(只能删除未被使用的UDF项)。 (可选)如果需要立即运行或开发作业,可在“作业管理”进行相关作业配置,可参考创建FlinkServer作业。
  • UDF java代码及SQL样例 UDF java使用样例 package com.xxx.udf; import org.apache.flink.table.functions.ScalarFunction; public class UdfClass_UDF extends ScalarFunction { public int eval(String s) { return s.length(); } } UDF SQL使用样例 CREATE TEMPORARY FUNCTION udf as 'com.xxx.udf.UdfClass_UDF'; CREATE TABLE udfSource (a VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='1'); CREATE TABLE udfSink (a VARCHAR,b int) WITH ('connector' = 'print'); INSERT INTO udfSink SELECT a, udf(a) FROM udfSource;
  • 操作步骤 启用Kerberos认证的集群,登录MRS Manager页面,创建拥有“Hive Admin Privilege”权限的角色,创建角色请参考创建角色。 创建属于“Presto”和“Hive”组的用户,同时为该用户绑定1中创建的角色,然后下载用户认证文件,参见创建用户,下载用户认证文件。 将下载的user.keytab文件和krb5.conf上传到MRS客户端所在节点。 步骤2-3仅启用Kerberos认证的集群执行,普通集群请直接从步骤4开始执行。 根据业务情况,准备好客户端,并登录安装客户端的节点。 例如在Master2节点更新客户端,则登录该节点使用客户端,具体参见使用MRS客户端。 执行以下命令切换用户。 sudo su - omm 执行以下命令,切换到客户端目录,例如“/opt/client”。 cd /opt/client 执行以下命令,配置环境变量。 source bigdata_env 连接Presto Server。根据客户端的不同,提供如下两种客户端的链接方式。 使用MRS提供的客户端。 未启用Kerberos认证的集群,执行以下命令连接本集群的Presto Server。 presto_cli.sh 未启用Kerberos认证的集群,执行以下命令连接其他集群的Presto Server,其中ip为对应集群的Presto的浮动IP(可通过在Presto配置项中搜索PRESTO_COORDINATOR_FLOAT_IP的值获得),port为Presto Server的端口号,默认为7520。 presto_cli.sh --server http://ip:port 启用Kerberos认证的集群,执行以下命令连接本集群的Presto Server。 presto_cli.sh --krb5-config-path krb5.conf文件路径 --krb5-principal 用户principal --krb5-keytab-path user.keytab文件路径 --user presto用户名 启用Kerberos认证的集群,执行以下命令连接其他集群的Presto Server,其中ip为对应集群的Presto的浮动IP(可通过在Presto配置项中搜索PRESTO_COORDINATOR_FLOAT_IP的值获得),port为Presto Server的端口号,默认为7521。 presto_cli.sh --krb5-config-path krb5.conf文件路径 --krb5-principal 用户principal --krb5-keytab-path user.keytab文件路径 --server https://ip:port --krb5-remote-service-name Presto Server name 使用原生客户端 Presto原生客户端为客户端目录下的Presto/presto/bin/presto,使用方式参见https://prestodb.io/docs/current/installation/cli.html和https://prestodb.io/docs/current/security/cli.html。 执行Query语句,如“show catalogs;”,更多语句请参阅https://prestodb.io/docs/current/sql.html。 启用Kerberos认证的集群使用Presto查询Hive Catalog的数据时,运行Presto客户端的用户需要有Hive表的访问权限,并且需要在Hive beeline中执行命令grant all on table [table_name] to group hive;,给Hive组赋权限。 查询结束后,执行以下命令退出客户端。 quit;
  • 在安全区中配置权限策略 使用Ranger安全区管理员用户登录Ranger管理页面。 在Ranger首页右上角的“Security Zone”选项的下拉列表中选择对应的安全区,即可切换至该安全区内的权限视图。 单击组件名称下的权限插件名称,即可进入组件安全访问策略列表页面。 各组件的策略列表中,系统默认生成的条目会自动继承至安全区内,用于保证集群内的部分系统默认用户或用户组的权限。 单击“Add New Policy”,根据业务场景规划配置相关用户或者用户组的资源访问策略。 例如在本章节样例中,在安全区内配置一条允许“test”用户访问“/testzone/test”目录的策略: 其他不同组件的完整访问策略配置样例参考: 添加HDFS的Ranger访问权限策略 添加HBase的Ranger访问权限策略 添加Hive的Ranger访问权限策略 添加Yarn的Ranger访问权限策略 添加Spark2x的Ranger访问权限策略 添加Kafka的Ranger访问权限策略 添加Storm的Ranger访问权限策略 策略添加后,需等待30秒左右,待系统生效。 安全区中定义的策略仅适用于区域中的资源,服务的资源被划分到安全区后,非安全区针对该资源的访问权限策略将不再生效。 如需配置针对当前安全区之外其他资源的访问策略,需在Ranger首页右上角的“Security Zone”选项中退出当前安全区后进行配置。
  • 添加安全区 使用Ranger管理员用户rangeradmin登录Ranger管理页面,具体操作可参考登录Ranger WebUI界面。 单击“Security Zone”,在区域列表页面中单击,添加安全区。 表1 安全区配置参数 参数名称 描述 示例 Zone Name 配置安全区的名称。 test Zone Description 配置安全区的描述信息。 - Admin Users/Admin Usergroups 配置安全区的管理用户/用户组,可在安全区中添加及修改相关资源的权限策略。 必须至少配置一个用户或用户组。 zone_admin Auditor Users/ Auditor Usergroups 添加审计用户/用户组,可在安全区中查看相关资源权限策略内容。 必须至少配置一个用户或用户组。 zone_user Select Tag Services 选择服务的标签信息。 - Select Resource Services 选择安全区内包含的服务及具体资源。 在“Select Resource Services”中选择服务后,需要在“Resource”列中添加具体的资源对象,例如HDFS服务器的文件目录、Yarn的队列、Hive的数据库及表、HBase的表及列。 /testzone 例如针对HDFS中的“/testzone”目录创建一个安全区,配置如下: 单击“Save”,等待安全区添加成功。 Ranger管理员可在“Security Zone”页面查看当前的所有安全区并单击“Edit”修改安全区的属性信息,当相关资源不需要在安全区中进行管理时,可单击“Delete”删除对应安全区。
共100000条