华为云用户手册

  • 创建一张表或Scan时设定blockcache为true HBase客户端建表和scan时,设置blockcache=true。需要根据具体的应用需求来设定它的值,这取决于有些数据是否会被反复的查询到,如果存在较多的重复记录,将这个值设置为true可以提升效率,否则,建议关闭。 建议按默认配置,默认就是true,只要不强制设置成false就可以,例如: HColumnDescriptor fieldADesc = new HColumnDescriptor("value".getBytes());fieldADesc.setBlockCacheEnabled(false);
  • Scan时指定StartKey和EndKey 一个有确切范围的Scan,在性能上会带来较大的好处。 代码示例: Scan scan = new Scan();scan.addColumn(Bytes.toBytes("familyname"),Bytes.toBytes("columnname"));scan.setStartRow( Bytes.toBytes("rowA")); // 假设起始Key为rowAscan.setStopRow( Bytes.toBytes("rowB")); // 假设EndKey为rowBfor(Result result : demoTable.getScanner(scan)) {// process Result instance}
  • 不要调用Admin的closeRegion方法关闭一个Region Admin中,提供了关闭一个Region的接口: public void closeRegion(final String regionname, final String serverName) 通过该方法关闭一个Region,HBase Client端会直接发RPC请求到Region所在的RegionServer上,整个流程对Master而言,是不感知的。也就是说,尽管RegionServer关闭了这个Region,但是,在Master侧,还以为该Region是在该RegionServer上面打开的。假如,在执行Balance的时候,Master计算出恰好要转移这个Region,那么,这个Region将无法被关闭,本次转移操作将无法完成(关于这个问题,在当前的HBase版本中的处理的确还欠缺妥当)。 因此,暂时不建议使用该方法关闭一个Region。
  • 不要关闭WAL WAL是Write-Ahead-Log的简称,是指数据在入库之前,首先会写入到日志文件中,借此来确保数据的安全性。 WAL功能默认是开启的,但是,在Put类中提供了关闭WAL功能的接口: public void setWriteToWAL(boolean write) 因此,不建议调用该方法将WAL关闭(即将writeToWAL设置为False),因为可能会造成最近1S(该值由RegionServer端的配置参数“hbase.regionserver.optionallogflushinterval”决定,默认为1S)内的数据丢失。但如果在实际应用中,对写入的速率要求很高,并且可以容忍丢失最近1S内的数据的话,可以将该功能关闭。
  • 业务表设计建议 预分Region,使Region分布均匀,提高并发 避免过多的热点Region。根据应用场景,可考虑将时间因素引入Rowkey。 同时访问的数据尽量连续存储。同时读取的数据相邻存储;同时读取的数据存放在同一行;同时读取的数据存放在同一cell。 查询频繁属性放在Rowkey前面部分。Rowkey的设计在排序上必须与主要的查询条件契合。 离散度较好的属性作为RowKey组成部分。分析数据离散度特点以及查询场景,综合各种场景进行设计。 存储冗余信息,提高检索性能。使用二级索引,适应更多查询场景。 利用过期时间、版本个数设置等操作,让表能自动清除过期数据。 在HBase中,一直在繁忙写数据的Region被称为热点Region。
  • 表引擎选择建议 自助报表分析、行为数据分析,在不涉及重复数据聚合的情况下,建议使用ReplicatedMergeTree表引擎。 涉及到物化视图等聚合函数的场景,建议使用ReplicatedAggregatingMergeTree表引擎。 经常有数据去重或有update修改数据的场景下,建议使用ReplacingMergeTree表引擎,配合使用argMax函数获取最新数据。 表1 应用场景列表 引擎名称 应用场景 MergeTree ClickHouse中最重要的引擎,基于分区键(partitioning key)的数据分区分块存储、前缀稀疏索引(order by和primary key)。 ReplacingMergeTree 相对于MergeTree,它会用最新的数据覆盖具有相同主键的重复项。 删除老数据的操作是在分区异步merge的时候进行处理,只有同一个分区的数据才会被去重,分区间及shard间重复数据不会被去重,所以应用侧想要获取到最新数据,需要配合argMax函数一起使用。 SummingMergeTree 当合并SummingMergeTree表的数据片段时,ClickHouse会把所有具有相同主键的行进行汇总,将同一主键的行替换为包含sum后的一行记录。 如果主键的组合方式使得单个键值对应于大量的行,则可以显著地减少存储空间并加快数据查询的速度。 AggregatingMergeTree 该引擎继承自MergeTree,并改变了数据片段的合并逻辑。 ClickHouse会将一个数据片段内所有具有相同主键(准确的说是排序键)的行替换成一行,这一行会存储一系列聚合函数的状态。可以使用AggregatingMergeTree表引擎来做增量数据的聚合统计,包括物化视图的数据聚合。 CollapsingMergeTree 在创建时与MergeTree基本一样,除了最后多了一个参数,需要指定Sign位(必须是Int8类型)。 CollapsingMergeTree会异步地删除(折叠)除了特定列Sign1和-1值以外的所有字段的值重复的行。 VersionedCollapsingMergeTree 是CollapsingMergeTree的升级,使用不同的collapsing算法,该算法允许使用多个线程以任何顺序插入数据。 Replicated*MergeTree 只有Replicated*MergeTree系列引擎是上面介绍的引擎的多副本版本,为了提升数据和服务的可靠性,建议使用副本引擎: ReplicatedMergeTree ReplicatedSummingMergeTree ReplicatedReplacingMergeTree ReplicatedAggregatingMergeTree ReplicatedCollapsingMergeTree ReplicatedVersionedCollapsingMergeTree ReplicatedGraphiteMergeTree
  • ClickHouse物化视图概述 由于TTL规则不会从原始表中同步到物化视图表,因此源表中带有TTL规则时,物化视图表同样需要配置TTL规则,并且建议与源表保持一致。 表1 普通物化视图与projection对比 物化视图类型 原表数据与物化视图一致性 灵活性 物化视图开发及维护复杂度 普通物化视图 数据从原表同步到物化视图需要时间窗。 灵活性较高,有新的业务可开发新的物化视图。 可开发复杂逻辑SQL语句的物化视图。 复杂度较高,需要开发很多物化视图,每个物化视图都需要单独去管理和维护。 projection 数据实时同步,数据写入即可查询到物化视图最新数据。 创建表时指定的物化视图语法,新的SQL业务需要修改表结构。 不需要开发很多物化视图,任意查询SQL会自动重写命中物化视图。 Projection仅在 MRS 3.2.0及以上的版本集群中支持。 父主题: ClickHouse物化视图设计
  • HDFS的读写文件注意点 HDFS不支持随机读和写。 HDFS追加文件内容只能在文件末尾添加,不能随机添加。 只有存储在HDFS文件系统中的数据才支持append,edit.log以及数据元文件不支持Append。Append追加文件时,需要将“hdfs-site.xml”中的“dfs.support.append”参数值设置为true。 “dfs.support.append”参数在开源社区版本中默认值是关闭,在 FusionInsight 版本默认值是开启。 该参数为服务器端参数。建议开启,开启后才能使用Append功能。 不适用HDFS场景可以考虑使用其他方式来存储数据,如HBase。
  • HDFS提高读取写入性能方式 写入数据流程:HDFS Client收到业务数据后,从NameNode获取到数据块编号、位置信息后,联系DataNode,并将需要写入数据的DataNode建立起流水线,完成后,客户端再通过自有协议写入数据到Datanode1,再由DataNode1复制到DataNode2、DataNode3(三备份)。写完的数据,将返回确认信息给HDFS Client。 合理设置块大小,如设置dfs.blocksize为 268435456(即256MB)。 对于一些不可能重用的大数据,缓存在操作系统的缓存区是无用的。可将以下两参数设置为false: dfs.datanode.drop.cache.behind.reads和dfs.datanode.drop.cache.behind.writes
  • HDFS文件操作API概述 Hadoop中关于文件操作类基本上全部是在“org.apache.hadoop.fs”包中,这些API能够支持的操作包含:打开文件,读写文件,删除文件等。Hadoop类库中最终面向用户提供的接口类是FileSystem,该类是个抽象类,只能通过来类的get方法得到具体类。get方法存在几个重载版本,常用的是这个: static FileSystem get(Configuration conf); 该类封装了几乎所有的文件操作,例如mkdir,delete等。综上基本可以得出操作文件的程序库框架: operator() { 得到Configuration对象 得到FileSystem对象 进行文件操作 }
  • HDFS初始化方法 HDFS初始化是指在使用HDFS提供的API之前,需要做的必要工作。 大致过程为:加载HDFS服务配置文件,并进行Kerberos安全认证,认证通过后再实例化Filesystem,之后使用HDFS的API。此处Kerberos安全认证需要使用到的keytab文件,请提前准备。 正确示例: private void init() throws IOException { Configuration conf = new Configuration(); // 读取配置文件 conf.addResource("user-hdfs.xml"); // 安全模式下,先进行安全认证 if ("kerberos".equalsIgnoreCase(conf.get("hadoop.security.authentication"))) { String PRINCIPAL = "username.client.kerberos.principal"; String KEYTAB = "username.client.keytab.file"; // 设置keytab密钥文件 conf.set(KEYTAB, System.getProperty("user.dir") + File.separator + "conf" + File.separator + conf.get(KEYTAB)); // 设置kerberos配置文件路径 */ String krbfilepath = System.getProperty("user.dir") + File.separator + "conf" + File.separator + "krb5.conf"; System.setProperty("java.security.krb5.conf", krbfilepath); // 进行登录认证 */ SecurityUtil.login(conf, KEYTAB, PRINCIPAL); } // 实例化文件系统对象 fSystem = FileSystem.get(conf); }
  • HDFS上传本地文件 通过FileSystem.copyFromLocalFile(Path src,Patch dst)可将本地文件上传到HDFS的指定位置上,其中src和dst均为文件的完整路径。 正确示例: public class CopyFile { public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); FileSystem hdfs=FileSystem.get(conf); //本地文件 Path src =new Path("D:\\HebutWinOS"); //HDFS为止 Path dst =new Path("/"); hdfs.copyFromLocalFile(src, dst); System.out.println("Upload to"+conf.get("fs.default.name")); FileStatus files[]=hdfs.listStatus(dst); for(FileStatus file:files){ System.out.println(file.getPath()); } } }
  • 查看HDFS文件的最后修改时间 通过FileSystem.getModificationTime()可查看指定HDFS文件的修改时间。 正确示例: public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); FileSystem hdfs=FileSystem.get(conf); Path fpath =new Path("/user/hadoop/test/file1.txt"); FileStatus fileStatus=hdfs.getFileStatus(fpath); long modiTime=fileStatus.getModificationTime(); System.out.println("file1.txt的修改时间是"+modiTime); }
  • HDFS需要开启DataNode数据存储路径 DataNode默认存储路径配置为:${BIGDATA_DATA_HOME}/hadoop/dataN/dn/datadir(N≥1),N为数据存放的目录个数。 例如:${BIGDATA_DATA_HOME}/hadoop/data1/dn/datadir、${BIGDATA_DATA_HOME}/hadoop/data2/dn/datadir 设置后,数据会存储到节点上每个挂载磁盘的对应目录下面。
  • HDFS创建文件 通过"FileSystem.mkdirs(Path f)"可在HDFS上创建文件夹,其中f为文件夹的完整路径。 正确示例: public class CreateDir { public static void main(String[] args) throws Exception{ Configuration conf=new Configuration(); FileSystem hdfs=FileSystem.get(conf); Path dfs=new Path("/TestDir"); hdfs.mkdirs(dfs); } }
  • 多线程安全登录方式 如果有多线程进行login的操作,当应用程序第一次登录成功后,所有线程再次登录时应该使用relogin的方式。 login的代码样例: private Boolean login(Configuration conf){ boolean flag = false; UserGroupInformation.setConfiguration(conf); try { UserGroupInformation.loginUserFromKeytab(conf.get(PRINCIPAL), conf.get(KEYTAB)); System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased()); flag = true; } catch (IOException e) { e.printStackTrace(); } return flag; } relogin的代码样例: public Boolean relogin(){ boolean flag = false; try { UserGroupInformation.getLoginUser().reloginFromKeytab(); System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased()); flag = true; } catch (IOException e) { e.printStackTrace(); } return flag; } 多次重复登录会导致后建立的会话对象覆盖掉之前登录建立的,将会导致之前建立的会话无法被维护监控,最终导致会话超期后部分功能不可用。
  • MapReduce中间文件存放路径 MapReduce默认中间文件夹存放路径只有一个,${hadoop.tmp.dir}/mapred/local,建议修改为每个磁盘下均可存放中间文件。 例如:/hadoop/hdfs/data1/mapred/local、/hadoop/hdfs/data2/mapred/local、/hadoop/hdfs/data3/mapred/local等,不存在的目录会自动忽略。
  • 通过表级TTL进行状态后端优化 本章节适用于MRS 3.3.0及以后版本。 在Flink双流Join场景下,若Join的左表和右表其中一个表数据变化快,需要较短时间的过期时间,而另一个表数据变化较慢,需要较长时间的过期时间。目前Flink只有表级别的TTL(Time To Live:生存时间),为了保证Join的准确性,需要将表级别的TTL设置为较长时间的过期时间,此时状态后端中保存了大量的已经过期的数据,给状态后端造成了较大的压力。为了减少状态后端的压力,可以单独为左表和右表设置不同的过期时间。不支持where子句。 可通过使用Hint方式单独为左表和右表设置不同的过期时间,如左表(state.ttl.left)设置TTL为60秒,右表(state.ttl.right)设置TTL为120秒: Hint方式格式: table_path /*+ OPTIONS(key=val [, key=val]*) */ key: stringLiteral val: stringLiteral 在SQL语句中配置示例: CREATE TABLE user_info (`user_id` VARCHAR, `user_name` VARCHAR) WITH ( 'connector' = 'kafka', 'topic' = 'user_info_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv');CREATE table print( `user_id` VARCHAR, `user_name` VARCHAR, `score` INT) WITH ('connector' = 'print');CREATE TABLE user_score (user_id VARCHAR, score INT) WITH ( 'connector' = 'kafka', 'topic' = 'user_score_001', 'properties.bootstrap.servers' = '192.168.64.138:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv');INSERT INTO printSELECT t.user_id, t.user_name, d.scoreFROM user_info as t LEFT JOIN -- 为左表和右表设置不同的TTL时间 /*+ OPTIONS('state.ttl.left'='60S', 'state.ttl.right'='120S') */ user_score as d ON t.user_id = d.user_id;
  • 数据量大并发数高且有Shuffle时可调整网络内存 在并发数高和数据量大时,发生shuffle后会发生大量的网络IO,提升网络缓存内存可以扩大一次性读取的数据量,从而提升IO速度。 【示例】 # 网络占用内存占整个进程内存的比例taskmanager.memory.network.fraction: 0.6# 网络缓存内存的最小值taskmanager.memory.network.min: 1g# 网络缓存内存的最大值(MRS 3.3.1及之后版本无需修改该值,默认值已为Long#MAX_VALUE)taskmanager.memory.network.max: 20g
  • 如果不能使用broardcast join应该尽量减少shuffle数据 不能broadcast join那么必定会发生shuffle,可通过各种手段来减少发生shuffle的数据量,例如谓词下推,Runtime Filter等等。 【示例】 # Runtime filter配置table.exec.runtime-filter.enabled: true# 下推table.optimizer.source.predicate-pushdown-enabled: true
  • 大状态Checkpoint优先从本地状态恢复 为了快速的状态恢复,每个task会同时写Checkpoint数据到本地磁盘和远程分布式存储,也就是说这是一份双复制。只要task本地的Checkpoint数据没有被破坏,系统在应用恢复时会首先加载本地的Checkpoint数据,这样就很大程度减少了远程拉取状态数据的过程。 【示例】配置Checkpoint优先从本地恢复(flink-conf.yaml): state.backend.local-recovery: true
  • 基于序列化性能尽量使用POJO和Avro等简单的数据类型 使用API编写Flink程序时需要考虑Java对象的序列化,大多数情况下Flink都可以高效的处理序列化。SQL中无需考虑,SQL中数据都为ROW类型,都采用了Flink内置的序列化器,能很高效的进行序列化。 表1 序列化 序列化器 Opts/s PojoSeriallizer 813 Kryo 294 Avro(Reflect API) 114 Avro(SpecificRecord API) 632
  • 网络通信调优 Flink通信主要依赖Netty网络,所以在Flink应用执行过程中,Netty的设置尤为重要,网络通信的好坏直接决定着数据交换的速度以及任务执行的效率。 【示例】 # netty的服务端线程数目(-1表示默认参数numOfSlot)taskmanager.network.netty.server.numThreads -1(numOfSlot)# netty的客户端线程数目(-1表示默认参数numofSlot)taskmanager.network.netty.client.numThreads : -1# netty的客户端连接超时时间taskmanager.network.netty.client.connectTimeoutSec:120s# netty的发送和接受缓冲区的大小(0表示netty默认参数,4MB)taskmanager.network.netty.sendReceiveBufferSize: 0# netty的传输方式,默认方式会根据运行的平台选择合适的方式taskmanager.network.netty.transport:auto
  • 内存总体调优 Flink内部对内存进行了划分,整体上划分成为了堆内存和堆外内存两部分。Java堆内存是通过Java程序创建时指定的,这也是JVM可自动GC的部分内存。堆外内存可细分为可被JVM管理的和不可被JVM管理的,可被JVM管理的有Managed Memory、Direct Memory,这部分是调优的重点,不可被JVM管理的有JVM Metaspace、JVM Overhead,这部分是native memory。 图1 内存 表2 相关参数 参数 配置 注释 说明 Total Memory taskmanager.memory.flink.size: none 总体Flink管理的内存大小,没有默认值,不包含Metaspace和Overhead,Standalone模式时设置。 整体内存。 taskmanager.memory.process.size: none 整个Flink进程使用的内存大小,容器模式时设置。 FrameWork taskmanager.memory.framework.heap.size: 128mb runtime占用的heap的大小,一般来说不用修改,占用空间相对固定。 RUNTIME底层占用的内存,一般不用做较大改变。 taskmanager.memory.framework.off-heap.size: 128mb runtime占用的off-heap的大小,一般来说不用修改,占用空间相对固定。 Task taskmanager.memory.task.heap.size:none 没有默认值,flink.size减去框架、托管、网络等得到。 算子逻辑,用户代码(如UDF)正常对象占用内存的地方。 taskmanager.memory.task.off-heap.size:0 默认值为0,task使用的off heap内存。 Managed Memory taskmanager.memory.managed.fraction: 0.4 托管内存占taskmanager.memory.flink.size的比例,默认0.4。 managed内存用于中间结果缓存、排序、哈希等(批计算),以及RocksDB state backend(流计算),该内存在批模式下一开始就申请固定大小内存,而流模式下会按需申请。 taskmanager.memory.managed.size: 0 托管内存大小,一般不指定,默认为0,内存大小由上面计算出来。若指定了则覆盖比例计算的内存。 Network taskmanager.memory.network.min:64mb 网络缓存的最小值。 用于taskmanager之间shuffle、广播以及与network buffer。 taskmanager.memory.network.max:1gb 网络缓存的最大值。(MRS 3.3.1及之后版本无需修改该值,默认值已为Long#MAX_VALUE) taskmanager.memory.network.fraction:0.1 network memory占用taskmanager.memory.flink.size的大小,默认0.1,会被限制在network.min和network.max之间。 用于taskmanager之间shuffle、广播以及与network buffer。 Others taskmanager.memory.jvm-metaspace.size:256M metaspace空间的最大值,默认值256MB。 用户自己管理的内存。 taskmanager.memory.jvm-overhead.min:192M jvm额外开销的最小值,默认192MB。 taskmanager.memory.jvm-overhead.max:1G jvm额外开销的最大值,默认1GB。 taskmanager.memory.jvm-overhead.fraction:0.1 jvm额外开销占taskmanager.memory.process.size的比例,默认0.1,算出来后会被限制在jvm-overhead.min和jvm-overhead.max之间。 3.3.1及之后版本无需修改taskmanager.memory.network.max网络缓存的最大值
  • 使用local-global两阶段聚合减少数据倾斜 Local-Global聚合是为解决数据倾斜问题提出的,通过将一组聚合分为两个阶段,首先在上游进行本地聚合,然后在下游进行全局聚合,类似于MapReduce中的 Combine + Reduce模式。 数据流中的记录可能会倾斜,因此某些聚合算子的实例必须比其他实例处理更多的记录,这会产生热点问题。本地聚合可以将一定数量具有相同key的输入数据累加到单个累加器中。全局聚合将仅接收reduce后的累加器,而不是大量的原始输入数据,这可以很大程度减少网络shuffle和状态访问的成本。每次本地聚合累积的输入数据量基于mini-batch间隔,这意味着local-global聚合依赖于启用了mini-batch优化。 API方式: // instantiate table environmentTableEnvironment tEnv = ...// access flink configurationConfiguration configuration = tEnv.getConfig().getConfiguration();// set low-level key-value optionsconfiguration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabledconfiguration.setString("table.exec.mini-batch.allow-latency", "5 s");configuration.setString("table.exec.mini-batch.size", "5000");configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation 资源文件方式: table.exec.mini-batch.enabled: truetable.exec.mini-batch.allow-latency : 5 stable.exec.mini-batch.size: 5000table.optimizer.agg-phase-strategy: TWO_PHASE
  • 吞吐量大场景下使用MiniBatch聚合增加吞吐量 MiniBatch聚合的核心思想是将一组输入的数据缓存在聚合算子内部的缓冲区中。当输入的数据被触发处理时,每个key只需一个操作即可访问状态,可以很大程度减少状态开销并获得更好的吞吐量。但是可能会增加一些延迟,因为它会缓冲一些记录而不是立即处理,这是吞吐量和延迟之间的权衡。默认未开启该功能。 API方式: // instantiate table environmentTableEnvironment tEnv = ...// access flink configurationConfiguration configuration = tEnv.getConfig().getConfiguration();// set low-level key-value optionsconfiguration.setString("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimizationconfiguration.setString("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input recordsconfiguration.setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task 资源文件方式(flink-conf.yaml): table.exec.mini-batch.enabled: truetable.exec.mini-batch.allow-latency : 5 stable.exec.mini-batch.size: 5000
  • 非状态计算提升性能的资源优化 Flink计算操作分为如下两类: 无状态计算操作:该部分算子不需要保存计算状态,例如:filter、union all、lookup join。 有状态计算操作:该部分算子要根据数据前后状态变化进行计算,例如:join,union、window、group by、聚合算子等。 对于非状态计算主要调优为TaskManager的Heap Size与NetWork。 例如作业仅进行数据的读和写,TaskManage无需增加额外的vCore,off-Heap和Overhead默认为1GB,内存主要给Heap和Network。
  • TM的Slot数和TM的CPU数成倍数关系 在Flink中,每个Task被分解成SubTask,SubTask作为执行的线程单位运行在TM上,在不开启Slot Sharing Group的情况下,一个SubTask是部署在一个slot上的。即使开启了Slot Sharing Group,大部分情况下Slot中拥有的SubTask也是负载均衡的。所以可以理解为TM上的Slot个数代表了上面运行的任务线程数。 合理的Slots数量应该和CPU核数相同,在使用超线程时,每个Slot将占用2个或更多的硬件线程。 【示例】建议配置TM Slot个数为CPU Core个数的2~4倍: taskmanager.numberOfTaskSlots: 4taskmanager.cpu.cores: 2
  • 数据倾斜状态下可以使用localglobal优化策略 【示例】 #开启mini-batch优化table.exec.mini-batch.enabled:true#最长等待时间table.exec.mini-batch.allow-latency: 20ms#最大缓存记录数table.exec.mini-batch.size:8000#开启两阶段聚合table.optimizer.agg-phase-strategy:TWO_PHASE
  • RocksDB作为状态后端时通过多块磁盘提升IO性能 RocksDB使用内存加磁盘的方式存储数据,当状态比较大时,磁盘占用空间会比较大。如果对RocksDB有频繁的读取请求,那么磁盘IO会成为Flink任务瓶颈。当一个 TaskManager包含三个slot时,那么单个服务器上的三个并行度都对磁盘造成频繁读写,从而导致三个并行度的之间相互争抢同一个磁盘IO,导致三个并行度的吞吐量都会下降。可以通过指定多个不同的硬盘从而减少IO竞争。 【示例】Rockdb配置Checkpoint目录放在不同磁盘(flink-conf.yaml): state.backend.rocksdb.localdir:/data1/flink/rocksdb,/data2/flink/rocksdb
共99354条