华为云用户手册

  • 场景说明 一个动态单词统计系统,数据源为持续生产随机文本的逻辑单元,业务处理流程如下: 数据源持续不断地发送随机文本给文本拆分逻辑,如“apple orange apple”。 单词拆分逻辑将数据源发送的每条文本按空格进行拆分,如“apple”,“orange”,“apple”,随后将每个单词逐一发给单词统计逻辑。 单词统计逻辑每收到一个单词就进行加一操作,并将实时结果打印输出,如: apple:1 orange:1 apple:2
  • 操作步骤 进入工程本地根目录,在Windows命令提示符窗口中执行下面命令进行打包。 mvn -s "{maven_setting_path}" clean package 上述打包命令中的{maven_setting_path}为本地Maven的“settings.xml”文件路径。 打包成功之后,在工程根目录的target子目录下获取打好的jar包。 将导出的Jar包上传至Linux运行环境的任意目录下,例如“/optclient”。 将工程中的“lib”文件夹和“conf”文件夹上传至和Jar包相同的Linux运行环境目录下,例如“/opt/client”(其中“lib”目录汇总包含了工程中依赖的所有的Jar包,“conf”目录包含运行jar包所需的集群相关配置文件,请参考准备运行环境)。 运行此样例代码需要设置运行用户,设置运行用户有两种方式,添加环境变量HADOOP_USER_NAME或者修改代码设置运行用户。若在没有修改代码的场景下,执行以下语句添加环境变量: export HADOOP_USER_NAME=test 用户可向管理员咨询运行用户。test在这里只是举例,若需运行Colocation相关操作的样例代码,则此用户需属supergroup用户组。 执行如下命令运行Jar包。 java -cp HDFSTest-XXX.jar:conf/:lib/* com.huawei.bigdata.hdfs.examples.HdfsExample java -cp HDFSTest-XXX.jar:conf/:lib/* com.huawei.bigdata.hdfs.examples.ColocationExample 在运行com.huawei.bigdata.hdfs.examples.ColocationExample:时,HDFS的配置项“fs.defaultFS”不能配置为“viewfs://ClusterX”。
  • 操作步骤 进入工程本地根目录,在Windows命令提示符窗口中执行下面命令进行打包。 mvn -s "{maven_setting_path}" clean package 上述打包命令中的{maven_setting_path}为本地Maven的“settings.xml”文件路径。 打包成功之后,在工程根目录的target子目录下获取打好的jar包。 将导出的Jar包上传至Linux运行环境的任意目录下,例如“/opt/client”。 将工程中的“lib”文件夹和“conf”文件夹上传至和Jar包相同的Linux运行环境目录下(其中“lib”目录汇总包含了工程中依赖的所有的Jar包,“conf”目录包含运行jar包所需的集群相关配置文件,请参考准备运行环境)。 执行如下命令运行Jar包。 java -cp HDFSTest-XXX.jar:conf/:lib/* com.huawei.bigdata.hdfs.examples.HdfsExample java -cp HDFSTest-XXX.jar:conf/:lib/* com.huawei.bigdata.hdfs.examples.ColocationExample 在运行“com.huawei.bigdata.hdfs.examples.ColocationExample”时,HDFS的配置项“fs.defaultFS”不能配置为“viewfs://ClusterX”。
  • 场景说明 一个动态单词统计系统,数据源为持续生产随机文本的逻辑单元,业务处理流程如下: 数据源持续不断地发送随机文本给文本拆分逻辑,如“apple orange apple”。 单词拆分逻辑将数据源发送的每条文本按空格进行拆分,如“apple”,“orange”,“apple”,随后将每个单词逐一发给单词统计逻辑。 单词统计逻辑每收到一个单词就进行加一操作,并将实时结果打印输出,如: apple:1 orange:1 apple:2
  • 吞吐量大场景下使用MiniBatch聚合增加吞吐量 MiniBatch聚合的核心思想是将一组输入的数据缓存在聚合算子内部的缓冲区中。当输入的数据被触发处理时,每个key只需一个操作即可访问状态,可以很大程度减少状态开销并获得更好的吞吐量。但是可能会增加一些延迟,因为它会缓冲一些记录而不是立即处理,这是吞吐量和延迟之间的权衡。默认未开启该功能。 API方式: // instantiate table environmentTableEnvironment tEnv = ... // access flink configuration Configuration configuration = tEnv.getConfig().getConfiguration(); // set low-level key-value options configuration.setString("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimizationconfiguration.setString("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input recordsconfiguration.setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task 资源文件方式(flink-conf.yaml): table.exec.mini-batch.enabled: true table.exec.mini-batch.allow-latency : 5 s table.exec.mini-batch.size: 5000
  • TM的Slot数和TM的CPU数成倍数关系 在Flink中,每个Task被分解成SubTask,SubTask作为执行的线程单位运行在TM上,在不开启Slot Sharing Group的情况下,一个SubTask是部署在一个slot上的。即使开启了Slot Sharing Group,大部分情况下Slot中拥有的SubTask也是负载均衡的。所以可以理解为TM上的Slot个数代表了上面运行的任务线程数。 合理的Slots数量应该和CPU核数相同,在使用超线程时,每个Slot将占用2个或更多的硬件线程。 【示例】建议配置TM Slot个数为CPU Core个数的2~4倍: taskmanager.numberOfTaskSlots: 4 taskmanager.cpu.cores: 2
  • 数据倾斜状态下可以使用localglobal优化策略 【示例】 #开启mini-batch优化 table.exec.mini-batch.enabled:true #最长等待时间 table.exec.mini-batch.allow-latency: 20ms #最大缓存记录数 table.exec.mini-batch.size:8000 #开启两阶段聚合 table.optimizer.agg-phase-strategy:TWO_PHASE
  • RocksDB作为状态后端时通过多块磁盘提升IO性能 RocksDB使用内存加磁盘的方式存储数据,当状态比较大时,磁盘占用空间会比较大。如果对RocksDB有频繁的读取请求,那么磁盘IO会成为Flink任务瓶颈。当一个 TaskManager包含三个slot时,那么单个服务器上的三个并行度都对磁盘造成频繁读写,从而导致三个并行度的之间相互争抢同一个磁盘IO,导致三个并行度的吞吐量都会下降。可以通过指定多个不同的硬盘从而减少IO竞争。 【示例】Rockdb配置Checkpoint目录放在不同磁盘(flink-conf.yaml): state.backend.rocksdb.localdir:/data1/flink/rocksdb,/data2/flink/rocksdb
  • 基于序列化性能尽量使用POJO和Avro等简单的数据类型 使用API编写Flink程序时需要考虑Java对象的序列化,大多数情况下Flink都可以高效的处理序列化。SQL中无需考虑,SQL中数据都为ROW类型,都采用了Flink内置的序列化器,能很高效的进行序列化。 表1 序列化 序列化器 Opts/s PojoSeriallizer 813 Kryo 294 Avro(Reflect API) 114 Avro(SpecificRecord API) 632
  • 网络通信调优 Flink通信主要依赖Netty网络,所以在Flink应用执行过程中,Netty的设置尤为重要,网络通信的好坏直接决定着数据交换的速度以及任务执行的效率。 【示例】 # netty的服务端线程数目(-1表示默认参数numOfSlot) taskmanager.network.netty.server.numThreads -1(numOfSlot) # netty的客户端线程数目(-1表示默认参数numofSlot) taskmanager.network.netty.client.numThreads : -1 # netty的客户端连接超时时间 taskmanager.network.netty.client.connectTimeoutSec:120s # netty的发送和接受缓冲区的大小(0表示netty默认参数,4MB) taskmanager.network.netty.sendReceiveBufferSize: 0 # netty的传输方式,默认方式会根据运行的平台选择合适的方式 taskmanager.network.netty.transport:auto
  • 内存总体调优 Flink内部对内存进行了划分,整体上划分成为了堆内存和堆外内存两部分。Java堆内存是通过Java程序创建时指定的,这也是JVM可自动GC的部分内存。堆外内存可细分为可被JVM管理的和不可被JVM管理的,可被JVM管理的有Managed Memory、Direct Memory,这部分是调优的重点,不可被JVM管理的有JVM Metaspace、JVM Overhead,这部分是native memory。 图1 内存 表2 相关参数 参数 配置 注释 说明 Total Memory taskmanager.memory.flink.size: none 总体Flink管理的内存大小,没有默认值,不包含Metaspace和Overhead,Standalone模式时设置。 整体内存。 taskmanager.memory.process.size: none 整个Flink进程使用的内存大小,容器模式时设置。 FrameWork taskmanager.memory.framework.heap.size: 128mb runtime占用的heap的大小,一般来说不用修改,占用空间相对固定。 RUNTIME底层占用的内存,一般不用做较大改变。 taskmanager.memory.framework.off-heap.size: 128mb runtime占用的off-heap的大小,一般来说不用修改,占用空间相对固定。 Task taskmanager.memory.task.heap.size:none 没有默认值,flink.size减去框架、托管、网络等得到。 算子逻辑,用户代码(如UDF)正常对象占用内存的地方。 taskmanager.memory.task.off-heap.size:0 默认值为0,task使用的off heap内存。 Managed Memory taskmanager.memory.managed.fraction: 0.4 托管内存占taskmanager.memory.flink.size的比例,默认0.4。 managed内存用于中间结果缓存、排序、哈希等(批计算),以及RocksDB state backend(流计算),该内存在批模式下一开始就申请固定大小内存,而流模式下会按需申请。 taskmanager.memory.managed.size: 0 托管内存大小,一般不指定,默认为0,内存大小由上面计算出来。若指定了则覆盖比例计算的内存。 Network taskmanager.memory.network.min:64mb 网络缓存的最小值。 用于taskmanager之间shuffle、广播以及与network buffer。 taskmanager.memory.network.max:1gb 网络缓存的最大值。( MRS 3.3.1及之后版本无需修改该值,默认值已为Long#MAX_VALUE) taskmanager.memory.network.fraction:0.1 network memory占用taskmanager.memory.flink.size的大小,默认0.1,会被限制在network.min和network.max之间。 用于taskmanager之间shuffle、广播以及与network buffer。 Others taskmanager.memory.jvm-metaspace.size:256M metaspace空间的最大值,默认值256MB。 用户自己管理的内存。 taskmanager.memory.jvm-overhead.min:192M jvm额外开销的最小值,默认192MB。 taskmanager.memory.jvm-overhead.max:1G jvm额外开销的最大值,默认1GB。 taskmanager.memory.jvm-overhead.fraction:0.1 jvm额外开销占taskmanager.memory.process.size的比例,默认0.1,算出来后会被限制在jvm-overhead.min和jvm-overhead.max之间。 3.3.1及之后版本无需修改taskmanager.memory.network.max网络缓存的最大值
  • 使用local-global两阶段聚合减少数据倾斜 Local-Global聚合是为解决数据倾斜问题提出的,通过将一组聚合分为两个阶段,首先在上游进行本地聚合,然后在下游进行全局聚合,类似于MapReduce中的 Combine + Reduce模式。 数据流中的记录可能会倾斜,因此某些聚合算子的实例必须比其他实例处理更多的记录,这会产生热点问题。本地聚合可以将一定数量具有相同key的输入数据累加到单个累加器中。全局聚合将仅接收reduce后的累加器,而不是大量的原始输入数据,这可以很大程度减少网络shuffle和状态访问的成本。每次本地聚合累积的输入数据量基于mini-batch间隔,这意味着local-global聚合依赖于启用了mini-batch优化。 API方式: // instantiate table environmentTableEnvironment tEnv = ... // access flink configuration Configuration configuration = tEnv.getConfig().getConfiguration();// set low-level key-value options configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); configuration.setString("table.exec.mini-batch.size", "5000"); configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation 资源文件方式: table.exec.mini-batch.enabled: true table.exec.mini-batch.allow-latency : 5 s table.exec.mini-batch.size: 5000 table.optimizer.agg-phase-strategy: TWO_PHASE
  • 数据量大并发数高且有Shuffle时可调整网络内存 在并发数高和数据量大时,发生shuffle后会发生大量的网络IO,提升网络缓存内存可以扩大一次性读取的数据量,从而提升IO速度。 【示例】 # 网络占用内存占整个进程内存的比例 taskmanager.memory.network.fraction: 0.6 # 网络缓存内存的最小值 taskmanager.memory.network.min: 1g # 网络缓存内存的最大值(MRS 3.3.1及之后版本无需修改该值,默认值已为Long#MAX_VALUE) taskmanager.memory.network.max: 20g
  • 如果不能使用broardcast join应该尽量减少shuffle数据 不能broadcast join那么必定会发生shuffle,可通过各种手段来减少发生shuffle的数据量,例如谓词下推,Runtime Filter等等。 【示例】 # Runtime filter配置 table.exec.runtime-filter.enabled: true # 下推 table.optimizer.source.predicate-pushdown-enabled: true
  • 大状态Checkpoint优先从本地状态恢复 为了快速的状态恢复,每个task会同时写Checkpoint数据到本地磁盘和远程分布式存储,也就是说这是一份双复制。只要task本地的Checkpoint数据没有被破坏,系统在应用恢复时会首先加载本地的Checkpoint数据,这样就很大程度减少了远程拉取状态数据的过程。 【示例】配置Checkpoint优先从本地恢复(flink-conf.yaml): state.backend.local-recovery: true
  • 非状态计算提升性能的资源优化 Flink计算操作分为如下两类: 无状态计算操作:该部分算子不需要保存计算状态,例如:filter、union all、lookup join。 有状态计算操作:该部分算子要根据数据前后状态变化进行计算,例如:join,union、window、group by、聚合算子等。 对于非状态计算主要调优为TaskManager的Heap Size与NetWork。 例如作业仅进行数据的读和写,TaskManage无需增加额外的vCore,off-Heap和Overhead默认为1GB,内存主要给Heap和Network。
  • 通过调整对应算子并行度提升性能 读写Hudi可以通过配置读写并发提升读写性能。 读算子的并行度调整参数:read.tasks 写算子的并行度调整参数:write.tasks 采用状态索引在作业重启的时候(非Checkpoint重启),需要读目标表重建索引,可以增大该算子并行度提升性能。 加载索引的并行度调整参数:write.index_bootstrap.tasks 采用状态索引写数据需要进行主键唯一性检查,分配具体写入文件,提升该算子并行度提升性能。 写算子索引检测算子调整参数:write.bucket_assign.tasks
  • 通过表级JTL进行状态后端优化 本章节适用于MRS 3.3.0及以后版本。 在Flink双流inner Join场景下,若Join业务允许join一次就可以剔除后端中的数据时,可以使用该特性。 该特性只适用于流流inner join。 可通过使用Hint方式单独为左表和右表设置不同join次数: Hint方式格式: table_path /*+ OPTIONS(key=val [, key=val]*) */ key: stringLiteral val: stringLiteral 在SQL语句中配置示例: CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH ( 'connector' = 'kafka', 'topic' = 'user_info_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); CREATE table print( `user_id` VARCHAR, `user_name` VARCHAR, `score` INT ) WITH ('connector' = 'print'); CREATE TABLE user_score (user_id VARCHAR, score INT) WITH ( 'connector' = 'kafka', 'topic' = 'user_score_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); INSERT INTO print SELECT t.user_id, t.user_name, d.score FROM user_info as t JOIN -- 为左表和右表设置不同的JTL关联次数 /*+ OPTIONS('eliminate-state.left.threshold'='1','eliminate-state.right.threshold'='1') */ user_score as d ON t.user_id = d.user_id;
  • 通过表级TTL进行状态后端优化 本章节适用于MRS 3.3.0及以后版本。 在Flink双流Join场景下,若Join的左表和右表其中一个表数据变化快,需要较短时间的过期时间,而另一个表数据变化较慢,需要较长时间的过期时间。目前Flink只有表级别的TTL(Time To Live:生存时间),为了保证Join的准确性,需要将表级别的TTL设置为较长时间的过期时间,此时状态后端中保存了大量的已经过期的数据,给状态后端造成了较大的压力。为了减少状态后端的压力,可以单独为左表和右表设置不同的过期时间。不支持where子句。 可通过使用Hint方式单独为左表和右表设置不同的过期时间,如左表(state.ttl.left)设置TTL为60秒,右表(state.ttl.right)设置TTL为120秒: Hint方式格式: table_path /*+ OPTIONS(key=val [, key=val]*) */ key: stringLiteral val: stringLiteral 在SQL语句中配置示例: CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH ( 'connector' = 'kafka', 'topic' = 'user_info_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); CREATE table print( `user_id` VARCHAR, `user_name` VARCHAR, `score` INT ) WITH ('connector' = 'print'); CREATE TABLE user_score (user_id VARCHAR, score INT) WITH ( 'connector' = 'kafka', 'topic' = 'user_score_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv' ); INSERT INTO print SELECT t.user_id, t.user_name, d.score FROM user_info as t LEFT JOIN -- 为左表和右表设置不同的TTL时间 /*+ OPTIONS('state.ttl.left'='60S', 'state.ttl.right'='120S') */ user_score as d ON t.user_id = d.user_id;
  • 前提条件 已安装客户端时: 已安装HBase客户端。 当客户端所在主机不是集群中的节点时,需要在客户端所在节点的hosts文件中设置主机名和IP地址映射。主机名和IP地址请保持一一对应。 未安装HBase客户端时: Linux环境已安装JDK,版本号需要和IntelliJ IDEA导出Jar包使用的JDK版本一致。 当Linux环境所在主机不是集群中的节点时,需要在节点的hosts文件中设置主机名和IP地址映射。主机名和IP地址请保持一一对应。
  • 通过Java API提交Oozie作业开发思路 通过典型场景,用户可以快速学习和掌握Oozie的开发过程,并且对关键的接口函数有所了解。 本示例演示了如何通过Java API提交MapReduce作业和查询作业状态,代码示例只涉及了MapReduce作业,其他作业的API调用代码是一样的,只是job配置“job.properties”与工作流配置“workflow.xml”不一样。 完成导入并配置Oozie样例工程操作后即可执行通过Java API提交MapReduce作业和查询作业状态。 父主题: 通过Java API提交Oozie作业
  • 回答 导致这个问题的主要原因是,yarn-client和yarn-cluster模式在提交任务时setAppName的执行顺序不同导致,yarn-client中setAppName是在向yarn注册Application之前读取,yarn-cluser模式则是在向yarn注册Application之后读取,这就导致yarn-cluster模式设置的应用名不生效。 解决措施: 在spark-submit脚本提交任务时用--name设置应用名和sparkconf.setAppName(appname)里面的应用名一样。 比如代码里设置的应用名为Spark Pi,用yarn-cluster模式提交应用时可以这样设置,在--name后面添加应用名,执行的命令如下: ./spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --name SparkPi jars/original-spark-examples*.jar 10
  • 打印metric信息说明 表2 基本指标项 Metric名称 描述 日志级别 total_request_count 周期时间内查询总次数 INFO active_success_count 周期时间内主集群查询成功次数 INFO active_error_count 周期时间内主集群查询失败次数 INFO active_timeout_count 周期时间内主集群查询超时次数 INFO standby_success_count 周期时间内备集群查询成功次数 INFO standby_error_count 周期时间内备集群查询失败次数 INFO Active Thread pool 周期打印请求主集群的执行线程池信息 DEBUG Standby Thread pool 周期打印请求备集群的执行线程池信息 DEBUG Clear Thread pool 周期打印释放资源的执行线程池信息 DEBUG 表3 针对GET、BatchGET、SCAN请求,分别打印Histogram指标项 Metric名称 描述 日志级别 averageLatency(ms) 平均时延 INFO minLatency(ms) 最小时延 INFO maxLatency(ms) 最大时延 INFO 95thPercentileLatency(ms) 95%请求的最大时延 INFO 99thPercentileLatency(ms) 99%请求的最大时延 INFO 99.9PercentileLatency(ms) 99.9%请求的最大时延 INFO 99.99PercentileLatency(ms) 99.99%请求的最大时延 INFO
  • 将主备集群相关配置设置到HBaseMultiClusterConnection中 该操作仅适用于MRS 3.3.0及之后版本。 创建双读Configuration,取消“com.huawei.bigdata.hbase.examples”包的“TestMain”类main方法中的testHBaseDualReadSample注释,确保“com.huawei.bigdata.hbase.examples”包的“HBaseDualReadSample”类中的“IS_CREATE_CONNECTION_BY_XML”值为“false”。 在“HBaseDualReadSample”类的addHbaseDualXmlParam方法中添加相关配置,相关配置项可参考HBase双读操作相关配置项说明。 private void addHbaseDualXmlParam(Configuration conf) { // We need to set the optional parameters contained in hbase-dual.xml to conf // when we use configuration transfer solution conf.set(CONNECTION_IMPL_KEY, DUAL_READ_CONNECTION); // conf.set("", ""); } 在“HBaseDualReadSample”类的initActiveConf方法中添加主集群客户端相关配置: private void initActiveConf() { // The hbase-dual.xml configuration scheme is used to generate the client configuration of the active cluster. // In actual application development, you need to generate the client configuration of the active cluster. String activeDir = HBaseDualReadSample.class.getClassLoader().getResource(Utils.CONF_DIRECTORY).getPath() + File.separator + ACTIVE_DIRECTORY + File.separator; Configuration activeConf = Utils.createConfByUserDir(activeDir); HBaseMultiClusterConnection.setActiveConf(activeConf); } 在“HBaseDualReadSample”类initStandbyConf方法中添加备集群客户端相关配置: private void initStandbyConf() { // The hbase-dual.xml configuration scheme is used to generate the client configuration of the standby cluster. // In actual application development, you need to generate the client configuration of the standby cluster. String standbyDir = HBaseDualReadSample.class.getClassLoader().getResource(Utils.CONF_DIRECTORY).getPath() + File.separator + STANDBY_DIRECTORY + File.separator; Configuration standbyConf = Utils.createConfByUserDir(standbyDir); HBaseMultiClusterConnection.setStandbyConf(standbyConf); } 确定数据来源的集群。 GET请求,以下代码片段在“com.huawei.bigdata.hbase.examples”包的“HBaseSample”类的testGet方法中添加。 Result result = table.get(get); if (result instanceof DualResult) { LOG .info(((DualResult)result).getClusterId()); } Scan请求,以下代码片段在“com.huawei.bigdata.hbase.examples”包的“HBaseSample”类的testScanData方法中添加。 ResultScanner rScanner = table.getScanner(scan); if (rScanner instanceof HBaseMultiScanner) { LOG.info(((HBaseMultiScanner)rScanner).getClusterId()); } 客户端支持打印metric信息。 “log4j.properties”文件中增加如下内容,客户端将metric信息输出到指定文件。指标项信息可参考打印metric信息说明。 log4j.logger.DUAL=debug,DUAL log4j.appender.DUAL=org.apache.log4j.RollingFileAppender log4j.appender.DUAL.File=/var/log/dual.log //客户端本地双读日志路径,根据实际路径修改,但目录要有写入权限 log4j.additivity.DUAL=false log4j.appender.DUAL.MaxFileSize=${hbase.log.maxfilesize} log4j.appender.DUAL.MaxBackupIndex=${hbase.log.maxbackupindex} log4j.appender.DUAL.layout=org.apache.log4j.PatternLayout log4j.appender.DUAL.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}: %m%n
  • 操作场景 HBase客户端应用通过自定义加载主备集群配置项,实现了双读能力。HBase双读作为提高HBase集群系统高可用性的一个关键特性,适用于使用Get读取数据、使用批量Get读取数据、使用Scan读取数据,以及基于二级索引查询。它能够同时读取主备集群数据,减少查询毛刺,具体表现为: 高成功率:双并发读机制,保证每一次读请求的成功率。 可用性:单集群故障时,查询业务不中断。短暂的网络抖动也不会导致查询时间变长。 通用性:双读特性不支持双写,但不影响原有的实时写场景。 易用性:客户端封装处理,业务侧不感知。 HBase双读使用约束: HBase双读特性基于Replication实现,备集群读取的数据可能和主集群存在差异,因此只能实现最终一致性。 目前HBase双读功能仅用于查询。主集群故障时,最新数据无法同步,备集群可能查询不到最新数据。 HBase的Scan操作可能分解为多次RPC。由于相关session信息在不同集群间不同步,数据不能保证完全一致,因此双读只在第一次RPC时生效,ResultScanner close之前的请求会固定访问第一次RPC时使用的集群。 HBase Admin接口、实时写入接口只会访问主集群。所以主集群故障后,不能提供Admin接口功能和实时写入接口功能,只能提供Get、Scan查询服务。
  • HBase双读操作相关配置项说明 表1 hbase-dual.xml配置项 配置项名称 配置项详解 默认值 级别 hbase.dualclient.active.cluster.configuration.path 主集群HBase客户端配置目录 无 必选配置 hbase.dualclient.standby.cluster.configuration.path 备集群HBase客户端配置目录 无 必选配置 dual.client.schedule.update.table.delay.second 更新开启容灾表列表的周期时间 5 可选配置 hbase.dualclient.glitchtimeout.ms 可以容忍主集群的最大毛刺时间 50 可选配置 hbase.dualclient.slow.query.timeout.ms 慢查询告警日志 180000 可选配置 hbase.dualclient.active.cluster.id 主集群id ACTIVE 可选配置 hbase.dualclient.standby.cluster.id 备集群id STANDBY 可选配置 hbase.dualclient.active.executor.thread.max 请求主集群的线程池max大小 100 可选配置 hbase.dualclient.active.executor.thread.core 请求主集群的线程池core大小 100 可选配置 hbase.dualclient.active.executor.queue 请求主集群的线程池queue大小 256 可选配置 hbase.dualclient.standby.executor.thread.max 请求备集群的线程池max大小 100 可选配置 hbase.dualclient.standby.executor.thread.core 请求备集群的线程池core大小 100 可选配置 hbase.dualclient.standby.executor.queue 请求备集群的线程池queue大小 256 可选配置 hbase.dualclient.clear.executor.thread.max 清理资源线程池max大小 30 可选配置 hbase.dualclient.clear.executor.thread.core 清理资源线程池core大小 30 可选配置 hbase.dualclient.clear.executor.queue 清理资源线程池queue大小 Integer. MAX_VALUE 可选配置 dual.client.metrics.enable 客户端metric信息是否打印 true 可选配置 dual.client.schedule.metrics.second 客户端metric信息打印周期 300 可选配置 dual.client.asynchronous.enable 是否异步请求主备集群 false 可选配置
  • 基本认证(Basic Authentication) 在HTTP中,基本认证是一种用来允许Web浏览器或其他客户端程序在请求时提供用户名和密码形式的身份凭证的一种登录验证方式。 在请求发送之前,用Basic加一个空格标识基本认证,以用户名追加一个冒号然后串接上密码,再将此字符串用Base64算法编码。 例如: 用户名是admin、密码是Asd#smSisn$123,则拼接后的字符串就是admin:Asd#smSisn$123,然后进行Base64编码,得到YWRtaW46QWRtaW5AMTIz,加上基本认证标识,得到Basic YWRtaW46QWRtaW5AMTIz,最终将编码后的字符串发送出去,由接收者解码得到一个由冒号分隔的用户名和密码的字符串。
  • 回答 导致这个问题的主要原因是,yarn-client和yarn-cluster模式在提交任务时setAppName的执行顺序不同导致,yarn-client中setAppName是在向yarn注册Application之前读取,yarn-cluser模式则是在向yarn注册Application之后读取,这就导致yarn-cluster模式设置的应用名不生效。 解决措施: 在spark-submit脚本提交任务时用--name设置应用名和sparkconf.setAppName(appname)里面的应用名一样。 比如代码里设置的应用名为Spark Pi,用yarn-cluster模式提交应用时可以这样设置,在--name后面添加应用名,执行的命令如下: ./spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --name SparkPi jars/original-spark-examples*.jar 10
  • 创建用户并授权 IAM ESM FullAccess策略 登录 统一身份认证 服务(IAM)控制台。 在IAM控制台创建用户组,并授予ESM服务的权限“ESM FullAccess”。 在IAM控制台创建用户,并将其加入2中创建的用户组。 用户登录并验证权限。 新创建的用户登录控制台,验证ESM服务的权限:在主菜单中选择 “组织”,进入“租户管理”页面。单击“创建租户”,若能正常创建租户,表示“ESM FullAccess”已生效。
  • 响应消息体(可选) 该部分可选。响应消息体通常以结构化格式(如JSON或XML)返回,与响应消息头中Content-Type对应,传递除响应消息头之外的内容。 对于创建查询任务接口,返回如下消息体。 { "job_id": "********" } 当接口调用出错时,会返回错误码及错误信息说明,错误响应的Body体格式如下所示。 { "error_msg": "The format of message is error", "error_code": "AS.0001" } 其中,error_code表示错误码,error_msg表示错误描述信息。
共100000条