云服务器内容精选

  • RDD多次使用时,建议将RDD持久化 RDD在默认情况下的存储级别是StorageLevel.NONE,即既不存磁盘也不放在内存中,如果某个RDD需要多次使用,可以考虑将该RDD持久化,方法如下: 调用spark.RDD中的cache()、persist()、persist(newLevel:StorageLevel)函数均可将RDD持久化,cache()和persist()都是将RDD的存储级别设置为StorageLevel.MEMORY_ONLY,persist(newLevel:StorageLevel)可以为RDD设置其他存储级别,但是要求调用该方法之前RDD的存储级别为StorageLevel.NONE或者与newLevel相同,也就是说,RDD的存储级别一旦设置为StorageLevel.NONE之外的级别,则无法改变。 如果想要将RDD去持久化,那么可以调用unpersist(blocking:Boolean = true),该函数功能如下: 将该RDD从持久化列表中移除,RDD对应的数据进入可回收状态; 将RDD的存储级别重新设置为StorageLevel.NONE。
  • 在对性能要求比较高的场景下,可以使用Kryo优化序列化性能 Spark提供了两种序列化实现: org.apache.spark.serializer.KryoSerializer:性能好,兼容性差 org.apache.spark.serializer.JavaSerializer:性能一般,兼容性好 使用:conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 为什么不默认使用Kryo序列化? Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介 绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。
  • 在业务情况允许的情况下使用高性能算子 使用reduceByKey/aggregateByKey替代groupByKey。 所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。 map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。通常来说,在可能的情况下,建议使用reduceByKey或aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。 使用mapPartitions替代普通map。 mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。 但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重! 使用filter之后进行coalesce操作。 通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即 可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。 使用repartitionAndSortWithinPartitions替代repartition与sort类操作。 repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在 repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions 算子。因为该算子 可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。 使用foreachPartitions替代foreach。 原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数 据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写 MySQL,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个 partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。
  • HQL语法规则之判空 判断字段是否为“空”,即没有值,使用“is null”;判断不为空,即有值,使用“is not null”。 要注意的是,在HQL中String类型的字段若是空字符串, 即长度为0,那么对它进行IS NULL的判断结果是False。此时应该使用“col = '' ”来判断空字符串;使用“col != '' ”来判断非空字符串。 正确示例: select * from default.tbl_src where id is null; select * from default.tbl_src where id is not null; select * from default.tbl_src where name = ''; select * from default.tbl_src where name != ''; 错误示例: select * from default.tbl_src where id = null; select * from default.tbl_src where id != null; select * from default.tbl_src where name is null; select * from default.tbl_src where name is not null; 注:表tbl_src的id字段为Int类型,name字段为String类型。
  • 客户端配置参数需要与服务端保持一致 当集群的Hive、YARN、HDFS服务端配置参数发生变化时,客户端程序对应的参数会被改变,用户需要重新审视在配置参数变更之前提交到HiveServer的配置参数是否和服务端配置参数一致,如果不一致,需要用户在客户端重新调整并提交到HiveServer。例如下面的示例中,如果修改了集群中的YARN配置参数时,Hive客户端、示例程序都需要审视并修改之前已经提交到HiveServer的配置参数: 初始状态: 集群YARN的参数配置如下: mapreduce.reduce.java.opts=-Xmx2048M 客户端的参数配置如下: mapreduce.reduce.java.opts=-Xmx2048M 集群YARN修改后,参数配置如下: mapreduce.reduce.java.opts=-Xmx1024M 如果此时客户端程序不做调整修改,则还是以客户端参数有效,会导致reducer内存不足而使MR运行失败。
  • Hive JDBC驱动的加载 客户端程序以JDBC的形式连接HiveServer时,需要首先加载Hive的JDBC驱动类org.apache.hive.jdbc.HiveDriver。 故在客户端程序的开始,必须先使用当前类加载器加载该驱动类。 如果classpath下没有相应的jar包,则客户端程序抛出Class Not Found异常并退出。 如下: Class.forName("org.apache.hive.jdbc.HiveDriver").newInstance();
  • 关闭数据库连接 客户端程序在执行完HQL之后,注意关闭数据库连接,以免内存泄露,同时这是一个良好的编程习惯。 需要关闭JDK的两个对象statement和connection。 如下: finally { if (null != statement) { statement.close(); } // 关闭JDBC连接 if (null != connection) { connection.close(); } }
  • 使用WebHCat的REST接口以Streaming方式提交MR任务的前置条件 本接口需要依赖hadoop的streaming包,在以Streaming方式提交MR任务给WebHCat前,需要将“hadoop-streaming-2.7.0.jar”包上传到HDFS的指定路径下:“hdfs:///apps/templeton/hadoop-streaming-2.7.0.jar”。首先登录到安装有客户端和Hive服务的节点上,以客户端安装路径为“/opt/client”为例: source /opt/client/bigdata_env 使用kinit登录人机用户或者机机用户。 hdfs dfs -put ${BIGDATA_HOME}/ FusionInsight _HD_8.1.0.1/FusionInsight-Hadoop-*/hadoop/share/hadoop/tools/lib/hadoop-streaming-*.jar /apps/templeton/ 其中/apps/templeton/需要根据不同的实例进行修改,默认实例使用/apps/templeton/,Hive1实例使用/apps1/templeton/,以此类推。
  • 获取数据库连接 使用JDK的驱动管理类java.sql.DriverManager来获取一个Hive的数据库连接。 Hive的数据库URL为url="jdbc:hive2://xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver;sasl.qop=auth-conf;auth=KERBEROS;principal=hive/hadoop.hadoop.com@HADOOP.COM;user.principal=hive/hadoop.hadoop.com;user.keytab=conf/hive.keytab"; 以上已经经过安全认证,所以Hive数据库的用户名和密码为null或者空。 如下: // 建立连接 connection = DriverManager.getConnection(url, "", "");
  • 执行HQL 执行HQL,注意HQL不能以";"结尾。 正确示例: String sql = "SELECT COUNT(*) FROM employees_info"; Connection connection = DriverManager.getConnection(url, "", ""); PreparedStatement statement = connection.prepareStatement(sql); resultSet = statement.executeQuery(); 错误示例: String sql = "SELECT COUNT(*) FROM employees_info;"; Connection connection = DriverManager.getConnection(url, "", ""); PreparedStatement statement = connection.prepareStatement(sql); resultSet = statement.executeQuery();
  • 多线程安全登录方式 如果有多线程进行login的操作,当应用程序第一次登录成功后,所有线程再次登录时应该使用relogin的方式。 login的代码样例: private Boolean login(Configuration conf){ boolean flag = false; UserGroupInformation.setConfiguration(conf); try { UserGroupInformation.loginUserFromKeytab(conf.get(PRINCIPAL), conf.get(KEYTAB)); System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased()); flag = true; } catch (IOException e) { e.printStackTrace(); } return flag; } relogin的代码样例: public Boolean relogin(){ boolean flag = false; try { UserGroupInformation.getLoginUser().reloginFromKeytab(); System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased()); flag = true; } catch (IOException e) { e.printStackTrace(); } return flag; }
  • 对有更新操作的数据流进行聚合计算时要注意数据准确性问题 在针对更新数据进行聚合需要选择合适的解决方案,否则聚合结果会是错误的。 例如: Create table t1( id int, partid int, value int ); select partid,sum(value) from t1 group by partid; 第一批数据:[1,1,10],[2,1,11],[3,2,8] 聚合结果:[1,21],[2,8] 第二批数据:[2,1,12] //对ID=2的记录进行更新。 错误结果:[1,33],[2,8] //若是无法识别是对ID=2的数据进行了更新。 聚合结果:[1,22],[2,8] //识别为更新操作可以得到正确结果。 对于如何识别是更新数据有三种方式: 通过状态后端解决 通过状态后端存储所有原始数据,新来的数据根据状态来判断是否是更新操作,进而通过Flink聚合回撤机制实现聚合结果数据的更新。 优点:可以解决聚合准确性问题,而且对用户友好,对数据没有要求。 缺点:大数据量情况下状态后端存储的数据比较多。 通过CDC格式数据解决 CDC格式数据是指更新操作记录中会同时包含更新前数据和更新后数据。通过更新前的内容来回撤掉之前的聚合结果,通过更新后的数据更新最新的计算结果。 优点:不需要有大的状态后端存储,整体计算资源压力要小于基于状态后端的方案。 缺点:需要依赖于数据格式,常见的方式通过CDC采集工具,将数据采集到Kafka,然后Flink读Kafka数据进行计算。 通过changelog数据解决 changelog与CDC格式的数据类似,只不过存储的方式不同,CDC格式数据会将更新前和更新后的数据在一行记录,而changelog数据会将更新数据拆分成两行,一行是对更新前数据的删除操作,一行是更新后的数据插入操作记录。Flink在计算的时候会将基于更新数据的聚合结果删除,在将基于更新后数据的计算结果插入。changelog可以基于Hudi表实现,基于CDC格式的数据可以转为changelog数据存储到Hudi的MOR表的log文件中,也可以基于状态后端生成Hudi的changelog数据。 优点:可以基于湖存储实现更新数据聚合一致性保证。 缺点: Hudi的MOR表中仅在log文件中存在changelog数据,如果Flink作业计算延迟导致上游数据积压,而Hudi又清理了log文件,就会导致changelog丢失。针对这种情况需要保留版本数多一点,且给Flink作业合理的资源配置避免数据积压周期超过了清理周期。 基于状态后端生成changelog也是依赖于状态后端的,状态后端通常是会配置TTL时间的,不会永久保留。这种场景下更新操作是任意更新,没有一定时间周期限制。例如更新近一个月的数据,TTL设置大于一个月即可;若更新全部数据,就需要设置TTL为永久,不适用于大表。 目前changelog的MOR表,仅支持Flink引擎进行compaction处理,不支持Spark引擎。
  • 多流Join场景事实流表个数不超过三个 当Join表过多时,状态后端压力太大会导致端到端时延增加。 【示例】实时Join维表数3个: CREATE TABLE table1(id int, param1 string) with(...); CREATE TABLE table2(id int, param2 string) with(...); CREATE TABLE table3(id int, param3 string) with(...); CREATE TABLE orders ( order_id STRING, price DECIMAL(32,2), currency STRING, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time ) WITH (/* ... */); select o.*, t1.param1, t2.param2, t3.param3 from orders AS o JOIN table1 AS t1 ON o.order_id = t1.id JOIN table2 AS t2 ON o.order_id = t2.id JOIN table3 AS t3 ON o.order_id = t3.id;
  • 关联嵌套层级不超过三层 嵌套层级越多,回撤流的的数据量越大。 【示例】关联嵌套3层: SELECT * FROM table1 WHERE column1 IN ( SELECT column1 FROM table2 WHERE column2 IN ( SELECT column2 FROM table3 WHERE column3 = 'value' ) )
  • 维表lookup join场景维度表个数不超过五个 Hudi维度表都在TM heap中,当维表过多时heap中保存的维表数据过多,TM会不断GC,导致作业性能下降。 【示例】lookup join维表数5个: CREATE TABLE table1(id int, param1 string) with(...); CREATE TABLE table2(id int, param2 string) with(...); CREATE TABLE table3(id int, param3 string) with(...); CREATE TABLE table4(id int, param4 string) with(...); CREATE TABLE table5(id int, param5 string) with(...); CREATE TABLE orders ( order_id STRING, price DECIMAL(32,2), currency STRING, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time ) WITH (/* ... */); select o.*, t1.param1, t2.param2, t3.param3, t4.param4, t5.param5 from orders AS o JOIN table1 FOR SYSTEM_TIME AS OF o.proc_time AS t1 ON o.order_id = t1.id JOIN table2 FOR SYSTEM_TIME AS OF o.proc_time AS t2 ON o.order_id = t2.id JOIN table3 FOR SYSTEM_TIME AS OF o.proc_time AS t3 ON o.order_id = t3.id JOIN table4 FOR SYSTEM_TIME AS OF o.proc_time AS t4 ON o.order_id = t4.id JOIN table5 FOR SYSTEM_TIME AS OF o.proc_time AS t5 ON o.order_id = t5.id;