华为云用户手册

  • 问题 使用HBase客户端操作表数据的时候客户端出现类似如下异常: 2015-12-15 02:41:14,054 | WARN | [task-result-getter-2] | Lost task 2.0 in stage 58.0 (TID 3288, linux-175): org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=36, exceptions: Tue Dec 15 02:41:14 CS T 2015, null, java.net.SocketTimeoutException: callTimeout=60000, callDuration=60303: row 'xxxxxx' on table 'xxxxxx' at region=xxxxxx,\x05\x1E\x80\x00\x00\x00\x80\x00\x00\x00\x00\x00\x00\x00\x80\x00\x00\x00\x00\x00\x00\x000\x00\x80\x00\x00\x00\x80\x00\x00\x00\x80\x00\x00, 1449912620868.6a6b7d0c272803d8186930a3bfdb10a9., hostname=xxxxxx,16020,1449941841479, seqNum=5 at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.throwEnrichedException(RpcRetryingCallerWithReadReplicas.java:275) at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:223) at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:61) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200) at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:323)
  • 回答 出现该问题的主要原因为RegionServer分配的内存过小、Region数量过大导致在运行过程中内存不足,服务端对客户端的响应过慢。在RegionServer的配置文件“hbase-site.xml”中需要调整如下对应的内存分配参数。 表1 RegionServer内存调整参数 参数 描述 默认值 GC_OPTS 在启动参数中给RegionServer分配的初始内存和最大内存。 -Xms8G -Xmx8G hfile.block.cache.size 分配给HFile/StoreFile所使用的块缓存的最大heap(-Xmx setting)的百分比。 当offheap关闭时,默认值为0.25。当offheap开启时,默认值是0.1。
  • 限制 EXCHANGE PARTITION: 被迁移的单个或多个分区,迁移前必须都是已存在的分区,并归属于来源表,且在目标表中不包含这些分区; 该操作涉及的表需要有相同的列定义,并且有相同的分区键; 如果表中包含索引,该操作会失败; 来源表和目标表中任意一个为事务表时,不允许Exchange partition操作; 对于目标表,在一次操作中,多个分区要么同时迁移成功,要么全部失败。对于来源表,操作成功后,所有迁移的分区都会被释放; Alter table change column不支持orc格式的表。 ALTER TABLE table_name ADD | DROP col_name命令仅对于ORC/PARQUET存储格式的非分区表可用。
  • 示例 将表名从users 修改为 people: ALTER TABLE users RENAME TO people; 在表users中增加名为zip的列: ALTER TABLE users ADD COLUMN zip varchar; 从表users中删除名为zip的列: ALTER TABLE users DROP COLUMN zip; 将表users中列名id更改为user_id: ALTER TABLE users RENAME COLUMN id TO user_id; 修改分区操作: --创建两个分区表 CREATE TABLE IF NOT EXISTS hetu_int_table5 (eid int, name String, salary String, destination String, dept String, yoj int) COMMENT 'Employee Names' partitioned by (dt timestamp,country String, year int, bonus decimal(10,3)) STORED AS TEXTFILE; CREATE TABLE IF NOT EXISTS hetu_int_table6 (eid int, name String, salary String, destination String, dept String, yoj int) COMMENT 'Employee Names' partitioned by (dt timestamp,country String, year int, bonus decimal(10,3)) STORED AS TEXTFILE; --添加分区 ALTER TABLE hetu_int_table5 ADD IF NOT EXISTS PARTITION (dt='2008-08-08 10:20:30.0', country='IN', year=2001, bonus=500.23) PARTITION (dt='2008-08-09 10:20:30.0', country='IN', year=2001, bonus=100.50) ; --查看分区 show partitions hetu_int_table5; dt | country | year | bonus -------------------------|---------|------|--------- 2008-08-09 10:20:30.000 | IN | 2001 | 100.500 2008-08-08 10:20:30.000 | IN | 2001 | 500.230 (2 rows) --删除分区 ALTER TABLE hetu_int_table5 DROP IF EXISTS PARTITION (dt=timestamp '2008-08-08 10:20:30.0', country='IN', year=2001, bonus=500.23); --查看分区 show partitions hetu_int_table5; dt | country | year | bonus -------------------------|---------|------|--------- 2008-08-09 10:20:30.000 | IN | 2001 | 100.500 (1 row) --迁移分区示例 CREATE SCHEMA part_test; CREATE TABLE hetu_exchange_partition1 (a string, b string) PARTITIONED BY (ds string); CREATE TABLE part_test.hetu_exchange_partition2 (a string, b string) PARTITIONED BY (ds string); ALTER TABLE hetu_exchange_partition1 ADD PARTITION (ds='1'); --查看分区 show partitions hetu_exchange_partition1; ds ---- 1 (1 row) show partitions part_test.hetu_exchange_partition2; ds ---- (0 rows) --迁移分区,从 T1 到 T2 ALTER TABLE part_test.hetu_exchange_partition2 EXCHANGE PARTITION (ds='1') WITH TABLE hetu_exchange_partition1; --再次查看分区,可以看到分区迁移成功 show partitions hetu_exchange_partition1; ds ---- (0 row) show partitions part_test.hetu_exchange_partition2; ds ---- 1 (1 rows) --重命名分区 CREATE TABLE IF NOT EXISTS hetu_rename_table ( eid int, name String, salary String, destination String, dept String, yoj int) COMMENT 'Employee details' partitioned by (year int) STORED AS TEXTFILE; ALTER TABLE hetu_rename_table ADD IF NOT EXISTS PARTITION (year=2001); SHOW PARTITIONS hetu_rename_table; year ------ 2001 (1 row) ALTER TABLE hetu_rename_table PARTITION (year=2001) rename to partition (year=2020); SHOW PARTITIONS hetu_rename_table; year ------ 2020 (1 row) --修改分区表 create table altercolumn4(a integer, b string) partitioned by (c integer); --修改表的文件格式 alter table altercolumn4 SET FILEFORMAT textfile; insert into altercolumn4 values (100, 'Daya', 500); alter table altercolumn4 partition (c=500) change column b empname string comment 'changed column name to empname' first; --修改分区表的存储位置(需要先在hdfs上创建目录,执行语句后,无法查到之前插入的那条数据) alter table altercolumn4 partition (c=500) set Location '/user/hive/warehouse/c500'; --修改列 b 改名为name,同时类型从integer转为string create table altercolumn1(a integer, b integer) stored as textfile; alter table altercolumn1 change column b name string; --修改altercolumn1的存储属性 ALTER TABLE altercolumn1 CLUSTERED BY(a, name) SORTED BY(name) INTO 25 BUCKETS; --查看altercolumn1的属性 describe formatted altercolumn1; Describe Formatted Table ---------------------------------------------------------------------------------------- # col_name data_type comment a integer name varchar # Detailed Table Information Database: default Owner: admintest LastAccessTime: 0 Location: hdfs://hacluster/user/hive/warehouse/altercolumn1 Table Type: MANAGED_TABLE # Table Parameters: STATS_GENERATED_VIA_STATS_TASK workaround for potential lack of HIVE-12730 numFiles 0 numRows 0 orc.compress.size 262144 orc.compression.codec GZIP orc.row.index.stride 10000 orc.stripe.size 67108864 presto_query_id 20210325_025238_00034_f63xj@default@HetuEngine presto_version rawDataSize 0 totalSize 0 transient_lastDdlTime 1616640758 # Storage Information SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Compressed: No Num Buckets: 25 Bucket Columns: [a, name] Sort Columns: [SortingColumn{columnName=name, order=ASCENDING}] Storage Desc Params: serialization.format 1 (1 row) Query 20210325_090522_00091_f63xj@default@HetuEngine, FINISHED, 1 node Splits: 1 total, 1 done (100.00%) 0:00 [0 rows, 0B] [0 rows/s, 0B/s]
  • 示例 用指定列的查询结果创建新表orders_column_aliased: CREATE TABLE orders_column_aliased (order_date, total_price) AS SELECT orderdate, totalprice FROM orders; 用表orders的汇总结果新建一个表orders_by_data: CREATE TABLE orders_by_date COMMENT 'Summary of orders by date' WITH (format = 'ORC') AS SELECT orderdate, sum(totalprice) AS price FROM orders GROUP BY orderdate; 如果表orders_by_date不存在,则创建表orders_by_date: CREATE TABLE IF NOT EXISTS orders_by_date AS SELECT orderdate, sum(totalprice) AS price FROM orders GROUP BY orderdate; 用和表orders具有相同schema创建新表empty_orders table,但是没数据: CREATE TABLE empty_orders AS SELECT * FROM orders WITH NO DATA; 使用VALUES 创建表,参考 VALUES。 分区表示例: CREATE EXTERNAL TABLE hetu_copy(corderkey, corderstatus, ctotalprice, corderdate, cds) PARTITIONED BY(cds) SORT BY (corderkey, corderstatus) COMMENT 'test' STORED AS orc LOCATION '/user/hetuserver/tmp' TBLPROPERTIES (orc_bloom_filter_fpp = 0.3, orc_compress = 'SNAPPY', orc_compress_size = 6710422, orc_bloom_filter_columns = 'corderstatus,ctotalprice') as select * from hetu_test; CREATE TABLE hetu_copy1(corderkey, corderstatus, ctotalprice, corderdate, cds) WITH (partitioned_by = ARRAY['cds'], bucketed_by = ARRAY['corderkey', 'corderstatus'], sorted_by = ARRAY['corderkey', 'corderstatus'], bucket_count = 16, orc_compress = 'SNAPPY', orc_compress_size = 6710422, orc_bloom_filter_columns = ARRAY['corderstatus', 'ctotalprice'], external = true, format = 'orc', location = '/user/hetuserver/tmp ') as select * from hetu_test;
  • 语法 CREATE [EXTERNAL]① TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name [ ( column_alias, ... ) ] [[PARTITIONED BY ①(col_name, ....)] [SORT BY① ([column [, column ...]])] ]① [COMMENT 'table_comment'] [ WITH ( property_name = expression [, ...] ) ]② [[STORED AS file_format]① [LOCATION 'hdfs_path']① [TBLPROPERTIES (orc_table_property = value [, ...] ) ] ]① AS query [ WITH [ NO ] DATA ]②
  • 回答 在单个父目录中创建大量的znode后,当客户端尝试在单个请求中获取所有子节点时,服务端将无法返回,因为结果将超出可存储在znode上的数据的最大长度。 为了避免这个问题,应该根据客户端应用的实际情况将“jute.maxbuffer”参数配置为一个更高的值。 “jute.maxbuffer”只能设置为Java系统属性,且没有zookeeper前缀。如果要将“jute.maxbuffer”的值设为X,在ZooKeeper客户端或服务端启动时传入以下系统属性:-Djute.maxbuffer=X。 例如,将参数值设置为4MB:-Djute.maxbuffer=0x400000。 表1 配置参数 参数 描述 默认值 jute.maxbuffer 指定可以存储在znode中的数据的最大长度。单位是Byte。默认值为0xfffff,即低于1MB。 说明: 如果更改此选项,则必须在所有服务器和客户端上设置该系统属性,否则将出现问题。 0xfffff
  • 问题 Hive同步数据时报错: Caused by: java.sql.SQLException: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Unable to alter table. The following columns have types incompatible with the existing columns in their respective positions : __col1,__col2
  • 回答 运行包含Reduce的Mapreduce任务时,通过-Dmapreduce.job.map.output.collector.class=org.apache.hadoop.mapred.nativetask.NativeMapOutputCollectorDelegator命令开启Native Task特性,任务在部分操作系统运行失败,日志中提示错误“version 'GLIBCXX_3.4.20' not found”。该问题原因是操作系统的GLIBCXX版本较低,导致该特性依赖的libnativetask.so.1.0.0库无法加载,进而导致任务失败。 规避手段: 设置配置项mapreduce.job.map.output.collector.class的值为org.apache.hadoop.mapred.MapTask$MapOutputBuffer。
  • 回答 当前在默认配置下,在内存中保留的Job和Stage的UI数据个数为1000个。 当前大集群优化已增加将UI数据溢出到磁盘的优化,其溢出条件是每个Stage中的UI数据大小达到最小阈值5MB。如果每个Stage的task数较小,那么其UI数据大小可能达不到该阈值,从而导致该Stage的UI数据一直缓存在内存中,直到UI数据个数到达保留的上限值(当前默认值为1000个),旧的UI数据才会在内存中被清除。 因此,在将旧的UI数据从内存中清除之前,UI数据会占用大量内存,从而导致执行10T的TPCDS测试套时出现Driver内存不足的现象。 规避措施: 根据业务需要,配置合适的需要保留的Job和Stage的UI数据个数,即配置“spark.ui.retainedJobs”和“spark.ui.retainedStages”参数。详细信息请参考常用参数中的表13。 如果需要保留的Job和Stage的UI数据个数较多,可通过配置“spark.driver.memory”参数,适当增大Driver的内存。详细信息请参考常用参数中的表10。
  • 使用Yarn客户端 安装客户端,具体请参考安装 MRS 客户端。 以客户端安装用户,登录安装客户端的节点。 执行以下命令,切换到客户端安装目录。 cd /opt/client 执行以下命令配置环境变量。 source bigdata_env 如果集群为安全模式,执行以下命令进行用户认证。普通模式集群无需执行用户认证。 kinit 组件业务用户 直接执行Yarn命令。例如: yarn application -list
  • 日志级别 Flume提供了如表2所示的日志级别。 运行日志的级别优先级从高到低分别是FATAL、ERROR、WARN、INFO、DEBUG,程序会打印高于或等于所设置级别的日志,设置的日志等级越高,打印出来的日志就越少。 表2 日志级别 日志类型 级别 描述 运行日志 FATAL FATAL表示系统运行的致命错误信息。 ERROR ERROR表示系统运行的错误信息。 WARN WARN表示当前事件处理存在异常信息。 INFO INFO表示记录系统及各事件正常运行状态信息。 DEBUG DEBUG表示记录系统及系统的调试信息。 如果您需要修改日志级别,请执行如下操作: 请参考修改集群服务配置参数,进入Flume的“全部配置”页面。 左边菜单栏中选择所需修改的角色所对应的日志菜单。 选择所需修改的日志级别。 保存配置,在弹出窗口中单击“确定”使配置生效。 配置完成后即生效,不需要重启服务。
  • 操作步骤 以omm用户登录集群主 OMS 节点,修改配置文件“${CONTROLLER_HOME}/etc/om/controller.properties”中参数“controller.backup.conf.script.execute.timeout”值为“10000000”(根据当前集群中的DBService数据量调大超时时间)。 以omm用户登录集群备OMS节点,重复执行1。 以omm用户登录主OMS节点,执行以下命令查询BackupRecoveryPluginProcess进程id,并结束此进程。 jps|grep -i BackupRecoveryPluginProcess kill -9 查询到的PID 登录到Manager页面重新执行DBService备份任务。 执行以下命令查看BackupRecoveryPluginProcess进程是否已开启。 jps|grep -i BackupRecoveryPluginProcess
  • 操作场景 用户可以根据业务需要,使用集群客户端创建Kafka Topic。启用Kerberos认证的集群,需要拥有管理Kafka主题的权限。也可以通过KafkaUI修改Topic Configs。 安全模式下,KafkaUI对修改Topic Configs场景,需保证KafkaUI登录用户属于“kafkaadmin”用户组或者单独给用户授予对应操作权限,否则将会鉴权失败。 非安全模式下,KafkaUI对所有操作不作鉴权处理。
  • 操作步骤 读数据服务端调优 参数入口: 进入HBase服务参数“全部配置”界面,具体操作请参考修改集群服务配置参数章节。 表1 影响实时读数据配置项 配置参数 描述 默认值 GC_OPTS HBase利用内存完成读写操作。提高HBase内存可以有效提高HBase性能。 GC_OPTS主要需要调整HeapSize的大小和NewSize的大小。调整HeapSize大小的时候,建议将Xms和Xmx设置成相同的值,这样可以避免JVM动态调整HeapSize大小的时候影响性能。调整NewSize大小的时候,建议把其设置为HeapSize大小的1/8。 HMaster:当HBase集群规模越大、Region数量越多时,可以适当调大HMaster的GC_OPTS参数。 RegionServer:RegionServer需要的内存一般比HMaster要大。在内存充足的情况下,HeapSize可以相对设置大一些。 说明: 主HMaster的HeapSize为4G的时候,HBase集群可以支持100000 region数的规模。根据经验值,集群每增加35000个region,HeapSize增加2G,主HMaster的HeapSize不建议超过32GB。 HMaster -server -Xms4G -Xmx4G -XX:NewSize=512M -XX:MaxNewSize=512M -XX:MetaspaceSize=128M -XX:MaxMetaspaceSize=512M -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:CMSInitiatingOccupancyFraction=65 -XX:+PrintGCDetails -Dsun.rmi.dgc.client.gcInterval=0x7FFFFFFFFFFFFFE -Dsun.rmi.dgc.server.gcInterval=0x7FFFFFFFFFFFFFE -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M Region Server -server -Xms6G -Xmx6G -XX:NewSize=1024M -XX:MaxNewSize=1024M -XX:MetaspaceSize=128M -XX:MaxMetaspaceSize=512M -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:CMSInitiatingOccupancyFraction=65 -XX:+PrintGCDetails -Dsun.rmi.dgc.client.gcInterval=0x7FFFFFFFFFFFFFE -Dsun.rmi.dgc.server.gcInterval=0x7FFFFFFFFFFFFFE -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M hbase.regionserver.handler.count 表示RegionServer在同一时刻能够并发处理多少请求。如果设置过高会导致激烈线程竞争,如果设置过小,请求将会在RegionServer长时间等待,降低处理能力。根据资源情况,适当增加处理线程数。 建议根据CPU的使用情况,可以选择设置为100至300之间的值。 200 hfile.block.cache.size HBase缓存区大小,主要影响查询性能。根据查询模式以及查询记录分布情况来决定缓存区的大小。如果采用随机查询使得缓存区的命中率较低,可以适当降低缓存区大小。 当offheap关闭时,默认值为0.25。当offheap开启时,默认值是0.1。 如果同时存在读和写的操作,这两种操作的性能会互相影响。如果写入导致的flush和Compaction操作频繁发生,会占用大量的磁盘IO操作,从而影响读取的性能。如果写入导致阻塞较多的Compaction操作,就会出现Region中存在多个HFile的情况,从而影响读取的性能。所以如果读取的性能不理想的时候,也要考虑写入的配置是否合理。 读数据客户端调优 Scan数据时需要设置caching(一次从服务端读取的记录条数,默认是1),如果使用默认值读性能会降到极低。 当不需要读一条数据所有的列时,需要指定读取的列,以减少网络IO。 只读取RowKey时,可以为Scan添加一个只读取RowKey的filter(FirstKeyOnlyFilter或KeyOnlyFilter)。 读数据表设计调优 表2 影响实时读数据相关参数 配置参数 描述 默认值 COMPRESSION 配置数据的压缩算法,这里的压缩是HFile中block级别的压缩。对于可以压缩的数据,配置压缩算法可以有效减少磁盘的IO,从而达到提高性能的目的。 说明: 并非所有数据都可以进行有效压缩。例如一张图片的数据,因为图片一般已经是压缩后的数据,所以压缩效果有限。常用的压缩算法是SNAPPY,因为它有较好的Encoding/Decoding速度和可以接受的压缩率。 NONE BLOCKSIZE 配置HFile中block块的大小,不同的block块大小,可以影响HBase读写数据的效率。越大的block块,配合压缩算法,压缩的效率就越好;但是由于HBase的读取数据是以block块为单位的,所以越大的block块,对于随机读的情况,性能可能会比较差。 如果要提升写入的性能,一般扩大到128KB或者256KB,可以提升写数据的效率,也不会影响太大的随机读性能。单位:字节。 65536 DATA_BLOCK_ENCODING 配置HFile中block块的编码方法。当一行数据中存在多列时,一般可以配置为“FAST_DIFF”,可以有效的节省数据存储的空间,从而提供性能。 NONE
  • 示例 CREATE SCHEMA web; DESCRIBE SCHEMA web; Describe Schema ------------------------------------------------------------------------- web hdfs://hacluster/user/hive/warehouse/web.db admintest USER (1 row)
  • 操作步骤 一个简单的流处理系统由以下三部分组件组成:数据源 + 接收器 + 处理器。数据源为Kafka,接受器为Streaming中的Kafka数据源接收器,处理器为Streaming。 对Streaming调优,就必须使该三个部件的性能都更优化。 数据源调优 在实际的应用场景中,数据源为了保证数据的容错性,会将数据保存在本地磁盘中,而Streaming的计算结果全部在内存中完成,数据源很有可能成为流式系统的最大瓶颈点。 对Kafka的性能调优,有以下几个点: 使用Kafka-0.8.2以后版本,可以使用异步模式的新Producer接口。 配置多个Broker的目录,设置多个IO线程,配置Topic合理的Partition个数。 详情请参见Kafka开源文档中的“性能调优”部分:http://kafka.apache.org/documentation.html。 接收器调优 Streaming中已有多种数据源的接收器,例如Kafka、Flume、MQTT、ZeroMQ等,其中Kafka的接收器类型最多,也是最成熟一套接收器。 Kafka包括三种模式的接收器API: KafkaReceiver:直接接收Kafka数据,进程异常后,可能出现数据丢失。 ReliableKafkaReceiver:通过ZooKeeper记录接收数据位移。 DirectKafka:直接通过RDD读取Kafka每个Partition中的数据,数据高可靠。 从实现上来看,DirectKafka的性能更优,实际测试上来看,DirectKafka也确实比其他两个API性能好了不少。因此推荐使用DirectKafka的API实现接收器。 数据接收器作为一个Kafka的消费者,对于它的配置优化,请参见Kafka开源文档:http://kafka.apache.org/documentation.html。 处理器调优 Spark Streaming的底层由Spark执行,因此大部分对于Spark的调优措施,都可以应用在Spark Streaming之中,例如: 数据序列化 配置内存 设置并行度 使用External Shuffle Service提升性能 在做Spark Streaming的性能优化时需注意一点,越追求性能上的优化,Spark Streaming整体的可靠性会越差。例如: “spark.streaming.receiver.writeAheadLog.enable”配置为“false”的时候,会明显减少磁盘的操作,提高性能,但由于缺少WAL机制,会出现异常恢复时,数据丢失。 因此,在调优Spark Streaming的时候,这些保证数据可靠性的配置项,在生产环境中是不能关闭的。 日志归档调优 参数“spark.eventLog.group.size”用来设置一个应用的JobHistory日志按照指定job个数分组,每个分组会单独创建一个文件记录日志,从而避免应用长期运行时形成单个过大日志造成JobHistory无法读取的问题,设置为“0”时表示不分组。 大部分Spark Streaming任务属于小型job,而且产生速度较快,会导致频繁的分组,产生大量日志小文件消耗磁盘I/O。建议增大此值,例如改为“1000”或更大值。
  • 回答 在这种场景下,CarbonData会给每个节点分配一个INSERT INTO或LOAD DATA任务。如果Executor不是不同的节点分配的,CarbonData将会启动较少的task。 解决措施: 您可以适当增大Executor内存和Executor核数,以便YARN可以在每个节点上启动一个Executor。具体的配置方法如下: 配置Executor核数。 将“spark-defaults.conf”中的“spark.executor.cores”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_CORES”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-cores NUM”参数设置核数。 配置Executor内存。 将“spark-defaults.conf”中的“spark.executor.memory”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_MEMORY”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-memory MEM”参数设置内存。
  • 回答 对于Hash shuffle,在shuffle的过程中写数据时不做排序操作,只是将数据根据Hash的结果,将各个reduce分区的数据写到各自的磁盘文件中。 这样带来的问题是如果reduce分区的数量比较大的话,将会产生大量的磁盘文件(比如:该问题中将产生1000000 * 100000 = 10^11个shuffle文件)。如果磁盘文件数量特别巨大,对文件读写的性能会带来比较大的影响,此外由于同时打开的文件句柄数量多,序列化以及压缩等操作需要占用非常大的临时内存空间,对内存的使用和GC带来很大的压力,从而容易造成Executor无法响应Driver。 因此,建议使用Sort shuffle,而不使用Hash shuffle。
  • 示例 describe formatted show_table1 a; Describe Formatted Column ------------------------------ col_name a data_type integer min max num_nulls distinct_count 0 avg_col_len max_col_len num_trues num_falses comment (1 row)
  • 配置参数 在Spark客户端的“spark-defaults.conf”配置文件中进行设置。 参数 说明 默认值 spark.executor.execute.shutdown.cleaner 配置为true后,支持executor退出时执行自定义代码。 false spark.executor.execute.shutdown.cleaner.max.timeout executor执行自定义代码的超时时间。 240s
  • 回答 正常情况下,相同rowkey值的数据加载到HBase是有先后顺序的,HBase以最近的时间戳的数据为最新数据,一般的默认查询中,没有指定时间戳的,就会对相同rowkey值的数据仅返回最新数据。 使用bulkload加载数据,由于数据在内存中处理生成HFile,速度是很快的,很可能出现相同rowkey值的数据具有相同时间戳,从而造成查询结果混乱的情况。 建议在建表和数据加载时,设计好rowkey值,尽量避免在同一个数据文件中存在相同rowkey值的情况。
  • Maxwell生成的数据格式及常见字段含义 Maxwell生成的数据格式为JSON,常见字段含义如下: type:操作类型,包含database-create,database-drop,table-create,table-drop,table-alter,insert,update,delete database:操作的数据库名称 ts:操作时间,13位时间戳 table:操作的表名 data:数据增加/删除/修改之后的内容 old:数据修改前的内容或者表修改前的结构定义 sql:DDL操作的SQL语句 def:表创建与表修改的结构定义 xid:事务唯一ID commit:数据增加/删除/修改操作是否已提交
  • 配置Maxwell 在maxwell-XXX文件夹下如果有conf目录则配置config.properties文件,配置项说明请参见表1。如果没有conf目录,则是在maxwell-XXX文件夹下将config.properties.example修改成config.properties。 表1 Maxwell配置项说明 配置项 是否必填 说明 默认值 user 是 连接MySQL的用户名,即2中新创建的用户 - password 是 连接MySQL的密码。配置文件中包含认证密码信息可能存在安全风险,建议当前场景执行完毕后删除相关配置文件或加强安全管理。 - host 否 MySQL地址 localhost port 否 MySQL端口 3306 log_level 否 日志打印级别,可选值为 debug info warn error info output_ddl 否 是否发送DDL(数据库与数据表的定义修改)事件 true:发送DDL事件 false:不发送DDL事件 false producer 是 生产者类型,配置为kafka stdout:将生成的事件打印在日志中 kafka:将生成的事件发送到kafka stdout producer_partition_by 否 分区策略,用来确保相同一类的数据写入到kafka同一分区 database:使用数据库名称做分区,保证同一个数据库的事件写入到kafka同一个分区中 table:使用表名称做分区,保证同一个表的事件写入到kafka同一个分区中 database ignore_producer_error 否 是否忽略生产者发送数据失败的错误 true:在日志中打印错误信息并跳过错误的数据,程序继续运行 false:在日志中打印错误信息并终止程序 true metrics_slf4j_interval 否 在日志中输出上传kafka成功与失败数据的数量统计的时间间隔,单位为秒 60 kafka.bootstrap.servers 是 kafka代理节点地址,配置形式为HOST:PORT[,HOST:PORT] - kafka_topic 否 写入kafka的topic名称 maxwell dead_letter_topic 否 当发送某条记录出错时,记录该条出错记录主键的kafka topic - kafka_version 否 Maxwell使用的kafka producer版本号,不能在config.properties中配置,需要在启动命令时用-- kafka_version xxx参数传入 - kafka_partition_hash 否 划分kafka topic partition的算法,支持default或murmur3 default kafka_key_format 否 Kafka record的key生成方式,支持array或Hash Hash ddl_kafka_topic 否 当output_ddl配置为true时,DDL操作写入的topic {kafka_topic} filter 否 过滤数据库或表。 如果只想采集mydatabase的库,可以配置为 exclude: *.*,include: mydatabase.* 如果只想采集mydatabase.mytable的表,可以配置为 exclude: *.*,include: mydatabase.mytable 如果只想采集mydatabase库下的mytable,mydate_123, mydate_456表,可以配置为 exclude: *.*,include: mydatabase.mytable, include: mydatabase./mydate_\\d*/ -
  • 验证Maxwell 登录Maxwell所在的服务器。 查看日志。如果日志里面没有ERROR日志,且有打印如下日志,表示与MySQL连接正常。 BinlogConnectorLifecycleListener - Binlog connected. 登录MySQL数据库,对测试数据进行更新/创建/删除等操作。操作语句可以参考如下示例。 -- 创建库 create database test; -- 创建表 create table test.e ( id int(10) not null primary key auto_increment, m double, c timestamp(6), comment varchar(255) charset 'latin1' ); -- 增加记录 insert into test.e set m = 4.2341, c = now(3), comment = 'I am a creature of light.'; -- 更新记录 update test.e set m = 5.444, c = now(3) where id = 1; -- 删除记录 delete from test.e where id = 1; -- 修改表 alter table test.e add column torvalds bigint unsigned after m; -- 删除表 drop table test.e; -- 删除库 drop database test; 观察Maxwell的日志输出,如果没有WARN/ERROR打印,则表示Maxwell安装配置正常。 如果要确定数据是否成功上传,可设置config.properties中的log_level为debug,则数据上传成功时会立刻打印如下JSON格式数据,具体字段含义请参考Maxwell生成的数据格式及常见字段含义。 {"database":"test","table":"e","type":"insert","ts":1541150929,"xid":60556,"commit":true,"data":{"id":1,"m":4.2341,"c":"2018-11-02 09:28:49.297000","comment":"I am a creature of light."}} …… 当整个流程调试通过之后,可以把config.properties文件中的配置项log_level修改为info,减少日志打印量,并重启Maxwell。 # log level [debug | info | warn | error] log_level=info
  • 启动Maxwell 登录Maxwell所在的服务器。 执行如下命令进入Maxwell安装目录。 cd /opt/maxwell-1.21.0/ 如果是初次使用Maxwell,建议将conf/config.properties中的log_level改为debug(调试级别),以便观察启动之后是否能正常从MySQL获取数据并发送到kafka,当整个流程调试通过之后,再把log_level修改为info,然后先停止再启动Maxwell生效。 # log level [debug | info | warn | error] log_level=debug 执行如下命令启动Maxwell。 source /opt/client/bigdata_env bin/Maxwell bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \ --producer=kafka --kafka.bootstrap.servers=kafkahost:9092 --kafka_topic=Maxwell 其中,user,password和host分别表示MySQL的用户名,密码和IP地址,这三个参数可以通过修改配置项配置也可以通过上述命令配置,kafkahost为流式集群的Core节点的IP地址。 显示类似如下信息,表示Maxwell启动成功。 Success to start Maxwell [78092].
  • 安装Maxwell 下载安装包,下载路径为https://github.com/zendesk/maxwell/releases,选择名为maxwell-XXX.tar.gz的二进制文件下载,其中XXX为版本号。 将tar.gz包上传到任意目录下(本示例路径为Master节点的/opt)。 登录部署Maxwell的服务器,并执行如下命令进入tar.gz包所在目录。 cd /opt 执行如下命令解压“maxwell-XXX.tar.gz”压缩包,并进入“maxwell-XXX”文件夹。 tar -zxvf maxwell-XXX.tar.gz cd maxwell-XXX
  • 使用Payload Spark建表时指定Payload create table hudi_test(id int, comb int, price string, name string, par string) using hudi options( primaryKey = "id", preCombineField = "comb", payloadClass="org.apache.hudi.common.model.OverwriteWithLatestAvroPayload") partitioned by (par); Datasource方式写入时指定Payload data.write.format("hudi"). option("hoodie.datasource.write.table.type", COW_TABLE_TYPE_OPT_VAL). option("hoodie.datasource.write.precombine.field", "comb"). option("hoodie.datasource.write.recordkey.field", "id"). option("hoodie.datasource.write.partitionpath.field", "par"). option("hoodie.datasource.write.payload.class", "org.apache.hudi.common.model.DefaultHoodieRecordPayload"). option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.SimpleKeyGenerator"). option("hoodie.datasource.write.operation", "upsert"). option("hoodie.datasource.hive_sync.enable", "true"). option("hoodie.datasource.hive_sync.partition_fields", "par"). option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor"). option("hoodie.datasource.hive_sync.table", "hudi_test"). option("hoodie.datasource.hive_sync.use_jdbc", "false"). option("hoodie.upsert.shuffle.parallelism", 4). option("hoodie.datasource.write.hive_style_partitioning", "true"). option("hoodie.table.name", "hudi_test").mode(Append).save(s"/tmp/hudi_test")
  • 常用Payload DefaultHoodieRecordPayload Hudi中默认使用DefaultHoodieRecordPayload,该Payload通过比较增量数据与存量数据的preCombineField字段值的大小来决定同主键的存量数据是否能被同主键的增量数据更新。在同主键的增量数据的preCombineField字段值绝对大于同主键的存量数据的preCombineField字段值时,同主键的增量数据将会被更新。 OverwriteWithLatestAvroPayload 该Payload保证同主键的增量数据永远都会更新至同主键的存量数据中。 PartialUpdateAvroPayload 该Payload继承了OverwriteNonDefaultsWithLatestAvroPayload,它可以保证在任何场景下增量数据中的null值不会覆盖存量数据。
  • Payload介绍 Payload是Hudi实现数据增量更新和删除的关键,它可以帮助Hudi在 数据湖 中高效的管理数据变更。Hudi Payload的格式是基于Apache Avro的,它使用了Avro的schema来定义数据的结构和类型。Payload可以被序列化和反序列化,以便在Hudi中进行数据的读取和写入。总之,Hudi Payload是Hudi的一个重要组成部分,它提供了一种可靠的、高效的、可扩展的方式来管理大规模数据湖中的数据变更。
共100000条