华为云用户手册

  • Structured Streaming可靠性说明 Structured Streaming通过checkpoint和WAL机制,对可重放的sources,以及支持重复处理的幂等性sinks,可以提供端到端的exactly-once容错语义。 用户可在程序中设置option("checkpointLocation", "checkpoint路径")启用checkpoint。 从checkpoint恢复时,应用程序或者配置可能发生变更,有部分变更会导致从checkpoint恢复失败,具体限制如下: 不允许source的个数或者类型发生变化。 source的参数变化,这种情况是否能被支持,取决于source类型和查询语句,例如: 速率控制相关参数的添加、删除和修改,此种情况能被支持,如:spark.readStream.format("kafka").option("subscribe", "topic")变更为spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...) 修改消费的topic/files可能会出现不可预知的问题,如:spark.readStream.format("kafka").option("subscribe", "topic")变更为spark.readStream.format("kafka").option("subscribe", "newTopic") sink的类型发生变化:允许特定的几个sink的组合,具体场景需要验证确认,例如: File sink允许变更为kafka sink,kafka中只处理新数据。 kafka sink不允许变更为file sink。 kafka sink允许变更为foreach sink,反之亦然。 sink的参数变化,这种情况是否能被支持,取决于sink类型和查询语句,例如: 不允许file sink的输出路径发生变更。 允许Kafka sink的输出topic发生变更。 允许foreach sink中的自定义算子代码发生变更,但是变更结果取决于用户代码。 Projection、filter和map-like操作变更,局部场景下能够支持,例如: 支持Filter的添加和删除,如:sdf.selectExpr("a")变更为sdf.where(...).selectExpr("a").filter(...) Output schema相同时,projections允许变更,如:sdf.selectExpr("stringColumn AS json").writeStream变更为sdf.select(to_json(...).as("json")).writeStream Output schema不相同时,projections在部分条件下允许变更,如:sdf.selectExpr("a").writeStream变更为sdf.selectExpr("b").writeStream,只有当sink支持“a”到“b”的schema转换时才不会出错。 状态操作的变更,在部分场景下会导致状态恢复失败: Streaming aggregation:如sdf.groupBy("a").agg(...)操作中,不允许分组键或聚合键的类型或者数量发生变化。 Streaming deduplication:如:sdf.dropDuplicates("a")操作中,不允许分组键或聚合键的类型或者数量发生变化。 Stream-stream join:如sdf1.join(sdf2, ...)操作中,关联键的schema不允许发生变化,join类型不允许发生变化,其他join条件的变更可能导致不确定性结果。 任意状态计算:如sdf.groupByKey(...).mapGroupsWithState(...)或者sdf.groupByKey(...).flatMapGroupsWithState(...)操作中,用户自定义状态的schema或者超时类型都不允许发生变化;允许用户自定义state-mapping函数变化,但是变更结果取决于用户代码;如果需要支持schema变更,用户可以将状态数据编码/解码成二进制数据以支持schema迁移。 Source的容错性支持列表 Sources 支持的Options 容错支持 说明 File source path:必填,文件路径 maxFilesPerTrigger:每次trigger最大文件数(默认无限大) latestFirst:是否有限处理新文件(默认值: false) fileNameOnly:是否以文件名作为新文件校验,而不是使用完整路径进行判断(默认值: false) 支持 支持通配符路径,但不支持以逗号分隔的多个路径。 文件必须以原子方式放置在给定的目录中,这在大多数文件系统中可以通过文件移动操作实现。 Socket Source host:连接的节点ip,必填 port:连接的端口,必填 不支持 - Rate Source rowsPerSecond:每秒产生的行数,默认值1 rampUpTime:在达到rowsPerSecond速度之前的上升时间 numPartitions:生成数据行的并行度 支持 - Kafka Source 参见https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html 支持 - Sink的容错性支持列表 Sinks 支持的output模式 支持Options 容错性 说明 File Sink Append Path:必须指定 指定的文件格式,参见DataFrameWriter中的相关接口 exactly-once 支持写入分区表,按时间分区用处较大 Kafka Sink Append, Update, Complete 参见:https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html at-least-once 参见https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html Foreach Sink Append, Update, Complete None 依赖于ForeachWriter实现 参见https://spark.apache.org/docs/3.1.1/structured-streaming-programming-guide.html#using-foreach ForeachBatch Sink Append, Update, Complete None 依赖于算子实现 参见https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch Console Sink Append, Update, Complete numRows:每轮打印的行数,默认20 truncate:输出太长时是否清空,默认true 不支持容错 - Memory Sink Append, Complete None 不支持容错,在complete模式下,重启query会重建整个表 -
  • Structured Streaming支持的功能 支持对流式数据的ETL操作。 支持流式DataFrames或Datasets的schema推断和分区。 流式DataFrames或Datasets上的操作:包括无类型,类似SQL的操作(比如select、where、groupBy),以及有类型的RDD操作(比如map、filter、flatMap)。 支持基于Event Time的聚合计算,支持对迟到数据的处理。 支持对流式数据的去除重复数据操作。 支持状态计算。 支持对流处理任务的监控。 支持批流join,流流join。 当前JOIN操作支持列表如下: 左表 右表 支持的Join类型 说明 Static Static 全部类型 即使在流处理中,不涉及流数据的join操作也能全部支持 Stream Static Inner 支持,但是无状态 Left Outer 支持,但是无状态 Right Outer 不支持 Full Outer 不支持 Stream Stream Inner 支持,左右表可选择使用watermark或者时间范围进行状态清理 Left Outer 有条件的支持,左表可选择使用watermark进行状态清理,右表必须使用watermark+时间范围 Right Outer 有条件的支持,右表可选择使用watermark进行状态清理,左表必须使用watermark+时间范围 Full Outer 不支持
  • Structured Streaming不支持的功能 不支持多个流聚合。 不支持limit、first、take这些取N条Row的操作。 不支持Distinct。 只有当output mode为complete时才支持排序操作。 有条件地支持流和静态数据集之间的外连接。 不支持部分DataSet上立即运行查询并返回结果的操作: count():无法从流式Dataset返回单个计数,而是使用ds.groupBy().count()返回一个包含运行计数的streaming Dataset。 foreach():使用ds.writeStream.foreach(...)代替。 show():使用输出console sink代替。
  • 代码样例 完整样例代码可参考com.huawei.bigdata.hdfs.examples.ColocationExample。 在运行Colocation工程时,需要将HDFS用户绑定supergroup用户组。 在运行Colocation工程时,HDFS的配置项fs.defaultFS不能配置为viewfs://ClusterX。 初始化 使用Colocation前需要进行kerberos安全认证。 private static void init() throws IOException { conf.set(KEYTAB, PATH_TO_KEYTAB); conf.set(PRINCIPAL, PRNCIPAL_NAME); LoginUtil.setJaasConf( LOG IN_CONTEXT_NAME, PRNCIPAL_NAME, PATH_TO_KEYTAB); LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL); LoginUtil.login(PRNCIPAL_NAME, PATH_TO_KEYTAB, PATH_TO_KRB5_CONF, conf); } 获取实例 样例:Colocation的操作使用DFSColocationAdmin和DFSColocationClient实例,在进行创建group等操作前需获取实例。 dfsAdmin = new DFSColocationAdmin(conf); dfs = new DFSColocationClient(); dfs.initialize(URI.create(conf.get("fs.defaultFS")), conf); 创建group 样例:创建一个gid01组,组中包含3个locator。 /** * 创建group * * @throws java.io.IOException */ private static void createGroup() throws IOException { dfsAdmin.createColocationGroup(COLOCATION_GROUP_GROUP01, Arrays.asList(new String[] { "lid01", "lid02", "lid03" })); } 写文件,写文件前必须创建对应的group 样例:写入testfile.txt文件。 /** * 创建并写入文件 * * @throws java.io.IOException */ private static void put() throws IOException { FSDataOutputStream out = dfs.create(new Path(TESTFILE_TXT), true, COLOCATION_GROUP_GROUP01, "lid01"); // 待写入HDFS的数据 byte[] readBuf = "Hello World".getBytes("UTF-8"); out.write(readBuf, 0, readBuf.length); out.close(); } 删除文件 样例:删除testfile.txt文件。 /** * 删除文件 * * @throws java.io.IOException */ @SuppressWarnings("deprecation") private static void delete() throws IOException { dfs.delete(new Path(TESTFILE_TXT)); } 删除group 样例:删除gid01。 /** * 删除group * * @throws java.io.IOException */ private static void deleteGroup() throws IOException { dfsAdmin.deleteColocationGroup(COLOCATION_GROUP_GROUP01); }
  • 功能简介 同分布(Colocation)功能是将存在关联关系的数据或可能要进行关联操作的数据存储在相同的存储节点上。HDFS文件同分布的特性,将那些需进行关联操作的文件存放在相同数据节点上,在进行关联操作计算时避免了到别的数据节点上获取数据,大大降低网络带宽的占用。 在使用Colocation功能之前,建议用户对Colocation的内部机制有一定了解,包括: Colocation分配节点原理 扩容与Colocation分配 Colocation与数据节点容量 Colocation分配节点原理 Colocation为locator分配数据节点的时候,locator的分配算法会根据已分配的情况,进行均衡的分配数据节点。 locator分配算法的原理是,查询目前存在的所有locators,读取所有locators所分配的数据节点,并记录其使用次数。根据使用次数,对数据节点进行排序,使用次数少的排在前面,优先选择排在前面的节点。每次选择一个节点后,计数加1,并重新排序,选择后续的节点。 扩容与Colocation分配 集群扩容之后,为了平衡地使用所有的数据节点,使新的数据节点的分配频率与旧的数据节点趋于一致,有如下两种策略可以选择,如表1所示。 表1 分配策略 编号 策略 说明 1 删除旧的locators,为集群中所有数据节点重新创建locators。 在未扩容之前分配的locators,平衡的使用了所有数据节点。当扩容后,新加入的数据节点并未分配到已经创建的locators中,所以使用Colocation来存储数据的时候,只会往旧的数据节点存储数据。 由于locators与特定数据节点相关,所以当集群进行扩容的时候,就需要对Colocation的locators分配进行重新规划。 2 创建一批新的locators,并重新规划数据存放方式。 旧的locators使用的是旧的数据节点,而新创建的locators偏重使用新的数据节点,所以需要根据实际业务对数据的使用需求,重新规划locators的使用。 一般的,建议用户在进行集群扩容之后采用策略1来重新分配locators,可以避免数据偏重使用新的数据节点。 Colocation与数据节点容量 由于使用Colocation进行存储数据的时候,会固定存储在指定的locators所对应的数据节点上面,所以如果不对locator进行规划,会造成数据节点容量不均衡。下面总结了保证数据节点容量均衡的两个主要的使用原则,如表2所示。 表2 使用原则 编号 使用原则 说明 1 所有的数据节点在locators中出现的频率一样。 如何保证频率一样:假如数据节点有N个,则创建locators的数量应为N的整数倍(N个、2N个……)。 2 对于所有locators的使用需要进行合理的数据存放规划,让数据均匀的分布在这些locators中。 无 HDFS的二次开发过程中,可以获取DFSColocationAdmin和DFSColocationClient实例,进行从location创建group、删除group、写文件和删除文件的操作。 使用Colocation功能,用户指定了DataNode,会造成某些节点上数据量很大。数据倾斜严重,导致HDFS写任务失败。 由于数据倾斜,导致MapReduce只会在某几个节点访问,造成这些节点上负载很大,而其他节点闲置。 针对单个应用程序任务,只能使用一次DFSColocationAdmin和DFSColocationClient实例。如果每次对文件系统操作都获取此实例,会创建过多HDFS链接,消耗HDFS资源。 Colocation提供了文件同分布的功能,执行集群Balancer或Mover操作时,会移动数据块,使Colocation功能失效。因此,使用Colocation功能时,建议将HDFS配置项dfs.datanode.block-pinning.enabled设置为true,此时执行集群Balancer或Mover操作时,使用Colocation写入的文件将不会被移动,从而保证了文件同分布。
  • 问题 向YARN服务器提交MapReduce任务后,客户端提示如下信息后长时间无响应。 16/03/03 16:44:56 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 44 for admin on ha-hdfs:hacluster 16/03/03 16:44:56 INFO security.TokenCache: Got dt for hdfs://hacluster; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hacluster, Ident: (HDFS_DELEGATION_TOKEN token 44 for admin) 16/03/03 16:44:56 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to 53 16/03/03 16:44:57 INFO input.FileInputFormat: Total input files to process : 200 16/03/03 16:44:57 INFO mapreduce.JobSubmitter: number of splits:200 16/03/03 16:44:57 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1456738266914_0005 16/03/03 16:44:57 INFO mapreduce.JobSubmitter: Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hacluster, Ident: (HDFS_DELEGATION_TOKEN token 44 for admin) 16/03/03 16:44:57 INFO impl.YarnClientImpl: Submitted application application_1456738266914_0005 16/03/03 16:44:57 INFO mapreduce.Job: The url to track the job: https://linux2:8090/proxy/application_1456738266914_0005/ 16/03/03 16:44:57 INFO mapreduce.Job: Running job: job_1456738266914_0005
  • 已安装客户端时编译并运行程序 进入样例工程本地根目录,在Windows命令提示符窗口中执行下面命令进行打包。 mvn -s "{maven_setting_path}" clean package 上述打包命令中的{maven_setting_path}为本地Maven的“settings.xml”文件路径。 打包成功之后,在工程根目录的target子目录下获取打好的jar包,例如“HDFSTest-XXX.jar”,jar包名称以实际打包结果为准。 将导出的Jar包上传至Linux客户端运行环境的任意目录下,例如“/opt/client”。 配置环境变量: cd /opt/client source bigdata_env 运行此样例代码需要设置运行用户,设置运行用户有两种方式,添加环境变量HADOOP_USER_NAME或者修改代码设置运行用户。若在没有修改代码的场景下,执行以下语句添加环境变量: export HADOOP_USER_NAME=test 用户可向管理员咨询运行用户。test在这里只是举例,若需运行Colocation相关操作的样例代码,则此用户需属supergroup用户组。 执行如下命令,运行Jar包。 hadoop jar HDFSTest-XXX.jar com.huawei.bigdata.hdfs.examples.HdfsExample hadoop jar HDFSTest-XXX.jar com.huawei.bigdata.hdfs.examples.ColocationExample 在运行com.huawei.bigdata.hdfs.examples.ColocationExample时,HDFS的配置项“fs.defaultFS”不能配置为“viewfs://ClusterX”。
  • 未安装客户端时编译并运行程序 进入工程本地根目录,在Windows命令提示符窗口中执行下面命令进行打包。 mvn -s "{maven_setting_path}" clean package 上述打包命令中的{maven_setting_path}为本地Maven的“settings.xml”文件路径。 打包成功之后,在工程根目录的target子目录下获取打好的jar包。 将导出的Jar包上传至Linux运行环境的任意目录下,例如“/optclient”。 将工程中的“lib”文件夹和“conf”文件夹上传至和Jar包相同的Linux运行环境目录下,例如“/opt/client”(其中“lib”目录汇总包含了工程中依赖的所有的Jar包,“conf”目录包含运行jar包所需的集群相关配置文件,请参考准备运行环境)。 运行此样例代码需要设置运行用户,设置运行用户有两种方式,添加环境变量HADOOP_USER_NAME或者修改代码设置运行用户。若在没有修改代码的场景下,执行以下语句添加环境变量: export HADOOP_USER_NAME=test 用户可向管理员咨询运行用户。test在这里只是举例,若需运行Colocation相关操作的样例代码,则此用户需属supergroup用户组。 执行如下命令运行Jar包。 java -cp HDFSTest-XXX.jar:conf/:lib/* com.huawei.bigdata.hdfs.examples.HdfsExample java -cp HDFSTest-XXX.jar:conf/:lib/* com.huawei.bigdata.hdfs.examples.ColocationExample 在运行com.huawei.bigdata.hdfs.examples.ColocationExample:时,HDFS的配置项“fs.defaultFS”不能配置为“viewfs://ClusterX”。
  • 前提条件 已安装客户端时: 已安装HDFS客户端。 当客户端所在主机不是集群中的节点时,需要在客户端所在节点的hosts文件中设置主机名和IP地址映射。主机名和IP地址请保持一一对应。 未安装客户端时: Linux环境已安装JDK,版本号需要和IDEA导出Jar包使用的JDK版本一致。 当Linux环境所在主机不是集群中的节点时,需要在Linux环境所在节点的hosts文件中设置主机名和IP地址映射。主机名和IP地址请保持一一对应。
  • 代码样例 如下是写文件的代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsExample类。 /** * 创建目录 * * @throws java.io.IOException */ private void mkdir() throws IOException { Path destPath = new Path(DEST_PATH); if (!createPath(destPath)) { LOG.error("failed to create destPath " + DEST_PATH); return; } LOG.info("success to create path " + DEST_PATH); } /** * create file path * * @param filePath * @return * @throws java.io.IOException */ private boolean createPath(final Path filePath) throws IOException { if (!fSystem.exists(filePath)) { fSystem.mkdirs(filePath); } return true; }
  • 建立ClickHouse连接 以下代码片段在“ClickhouseJDBCHaDemo”类的initConnection方法中。在创建连接时传入表1中配置的user和password作为认证凭据,ClickHouse会带着用户名和密码在服务端进行安全认证。 MRS 3.3.0之前版本: clickHouseProperties.setPassword(userPass); clickHouseProperties.setUser(userName); BalancedClickhouseDataSource balancedClickhouseDataSource = new BalancedClickhouseDataSource(JDBC_PREFIX + UriList, clickHouseProperties); MRS 3.3.0及之后版本: clickHouseProperties.setProperty(ClickHouseDefaults.USER.getKey(), userName); clickHouseProperties.setProperty(ClickHouseDefaults.PASSWORD.getKey(), userPass); try { clickHouseProperties.setProperty(ClickHouseClientOption.FAILOVER.getKey(), "21"); clickHouseProperties.setProperty(ClickHouseClientOption.LOAD_BALANCING_POLICY.getKey(), "roundRobin"); balancedClickhouseDataSource = new ClickHouseDataSource(JDBC_PREFIX + UriList, clickHouseProperties); } catch (Exception e) { LOG.error("Failed to create balancedClickHouseProperties."); throw e; } 父主题: 开发ClickHouse应用
  • 建立ClickHouse连接 以下代码片段在“ClickhouseJDBCHaDemo”类的initConnection方法中。在创建连接时传入表1中配置的user和password作为认证凭据,ClickHouse会带着用户名和密码在服务端进行安全认证。 MRS 3.3.0之前版本: clickHouseProperties.setPassword(userPass); clickHouseProperties.setUser(userName); BalancedClickhouseDataSource balancedClickhouseDataSource = new BalancedClickhouseDataSource(JDBC_PREFIX + UriList, clickHouseProperties); MRS 3.3.0及之后版本: clickHouseProperties.setProperty(ClickHouseDefaults.USER.getKey(), userName); clickHouseProperties.setProperty(ClickHouseDefaults.PASSWORD.getKey(), userPass); try { clickHouseProperties.setProperty(ClickHouseClientOption.FAILOVER.getKey(), "21"); clickHouseProperties.setProperty(ClickHouseClientOption.LOAD_BALANCING_POLICY.getKey(), "roundRobin"); balancedClickhouseDataSource = new ClickHouseDataSource(JDBC_PREFIX + UriList, clickHouseProperties); } catch (Exception e) { LOG.error("Failed to create balancedClickHouseProperties."); throw e; } 父主题: 开发ClickHouse应用
  • 功能介绍 统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。 主要分为三个部分: 从原文件中筛选女性网民上网时间数据信息,通过类CollectionMapper继承Mapper抽象类实现。 汇总每个女性上网时间,并输出时间大于两个小时的女性网民信息,通过类CollectionReducer继承Reducer抽象类实现。 main方法提供建立一个MapReduce job,并提交MapReduce作业到hadoop集群。
  • 常用概念 Colocation 同分布(Colocation)功能是将存在关联关系的数据或可能要进行关联操作的数据存储在相同的存储节点上。HDFS文件同分布的特性是,将那些需进行关联操作的文件存放在相同的数据节点上,在进行关联操作计算时,避免了到别的数据节点上获取数据的动作,大大降低了网络带宽的占用。 Client HDFS Client主要包括五种方式:JAVA API、C API、Shell、HTTP REST API、WEB UI五种方式,可参考HDFS常用API介绍、HDFS Shell命令介绍。 JAVA API 提供HDFS文件系统的应用接口,本开发指南主要介绍如何使用Java API进行HDFS文件系统的应用开发。 C API 提供HDFS文件系统的应用接口,使用C语言开发的用户可参考C接口的描述进行应用开发。 Shell 提供shell命令完成HDFS文件系统的基本操作。 HTTP REST API 提供除Shell、Java API和C API以外的其他接口,可通过此接口监控HDFS状态等信息。 WEB UI 提供Web可视化组件管理界面。 keytab文件 存放用户信息的密钥文件,应用程序采用此密钥文件在组件中进行API方式认证。
  • 代码样例 如下是删除文件的代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsExample类。 /** * 删除目录 * * @throws java.io.IOException */ private void rmdir() throws IOException { Path destPath = new Path(DEST_PATH); if (!deletePath(destPath)) { LOG.error("failed to delete destPath " + DEST_PATH); return; } LOG.info("success to delete path " + DEST_PATH); } /** * delete file path * * @param filePath * @return * @throws java.io.IOException */ private boolean deletePath(final Path filePath) throws IOException { if (!fSystem.exists(filePath)) { return false; } // fSystem.delete(filePath, true); return fSystem.delete(filePath, true); }
  • 代码样例 如下是代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsExample类。 /** * 追加文件内容 * * @throws java.io.IOException */ private void append() throws IOException { final String content = "I append this content."; FSDataOutputStream out = null; try { out = fSystem.append(new Path(DEST_PATH + File.separator + FILE_NAME)); out.write(content.getBytes()); out.hsync(); LOG.info("success to append."); } finally { // make sure the stream is closed finally. IOUtils.closeStream(out); } }
  • HDFS应用开发流程介绍 开发流程中各阶段的说明如图1和表1所示。 图1 HDFS应用程序开发流程 表1 HDFS应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解HDFS的基本概念。 HDFS应用开发简介 准备开发和运行环境 使用IntelliJ IDEA工具,请根据指导完成开发环境配置。 HDFS的运行环境即HDFS客户端,请根据指导完成客户端的安装和配置。 准备HDFS应用开发和运行环境 准备工程 HDFS提供了不同场景下的样例程序,可以导入样例工程进行程序学习。 导入并配置HDFS样例工程 根据场景开发工程 提供样例工程,帮助用户快速了解HDFS各部件的编程接口。 开发HDFS应用 编译并运行程序 指导用户将开发好的程序编译并提交运行。 调测HDFS应用 查看程序运行结果 程序运行结果会写在用户指定的路径下。用户还可以通过UI查看应用运行情况。 调测HDFS应用 父主题: HDFS开发指南(普通模式)
  • 增强特性 对比开源社区,华为还提供了两个增强特性,JD BCS erverHA方案和设置JDB CS erver连接的超时时间。 JDBCServerHA方案,多个JDBCServer主节点同时提供服务,当其中一个节点发生故障时,新的客户端连接会分配到其他主节点上,从而保障无间断为集群提供服务。Beeline和JDBC客户端代码两种连接方式的操作相同。 设置客户端与JDBCServer连接的超时时间。 Beeline 在网络拥塞的情况下,这个特性可以避免Beeline由于无限等待服务端的返回而挂起。使用方式如下: 启动Beeline时,在后面追加“--socketTimeOut=n”,其中“n”表示等待服务返回的超时时长,单位为秒,默认为“0”(表示永不超时)。建议根据业务场景,设置为业务所能容忍的最大等待时长。 JDBC客户端代码 在网络拥塞的情况下,这个特性可以避免客户端由于无限等待服务端的返回而挂起。使用方式如下: 在执行“DriverManager.getConnection”方法获取JDBC连接前,添加“DriverManager.setLoginTimeout(n)”方法来设置超时时长,其中n表示等待服务返回的超时时长,单位为秒,类型为Int,默认为“0”(表示永不超时)。建议根据业务场景,设置为业务所能容忍的最大等待时长。
  • 简介 JDBCServer是Hive中的HiveServer2的另外一个实现,它底层使用了Spark SQL来处理SQL语句,从而比Hive拥有更高的性能。 JDBCServer是一个JDBC接口,用户可以通过JDBC连接JDBCServer来访问SparkSQL的数据。JDBCServer在启动的时候,会启动一个sparkSQL的应用程序,而通过JDBC连接进来的客户端共同分享这个sparkSQL应用程序的资源,也就是说不同的用户之间可以共享数据。JDBCServer启动时还会开启一个侦听器,等待JDBC客户端的连接和提交查询。所以,在配置JDBCServer的时候,至少要配置JDBCServer的主机名和端口,如果要使用hive数据的话,还要提供hive metastore的uris。 JDBCServer默认在安装节点上的22550端口起一个JDBC服务(通过参数hive.server2.thrift.port配置),可以通过Beeline或者JDBC客户端代码来连接它,从而执行SQL命令。 如果您需要了解JDBCServer的其他信息,请参见Spark官网:http://spark.apache.org/docs/3.1.1/sql-programming-guide.html#distributed-sql-engine。
  • 基于API的Glob路径模式以获取LocatedFileStatus和从FileStatus打开文件 在DistributedFileSystem中添加了以下API,以获取具有块位置的FileStatus,并从FileStatus对象打开文件。这些API将减少从客户端到Namenode的RPC调用的数量。 表6 FileSystem API接口说明 Interface接口 Description说明 public LocatedFileStatus[] globLocatedStatus(Path, PathFilter, boolean) throws IOException 返回一个LocatedFileStatus对象数组,其对应文件路径符合路径过滤规则。 public FSDataInputStream open(FileStatus stat) throws IOException 如果stat对象是LocatedFileStatusHdfs的实例,该实例已具有位置信息,则直接创建InputStream而不联系Namenode。
  • 代码样例 hbase.root.logger=INFO,console,RFA //hbase客户端日志输出配置,console:输出到控制台;RFA:输出到日志文件 hbase.security.logger=DEBUG,console,RFAS //hbase客户端安全相关的日志输出配置,console:输出到控制台;RFAS:输出到日志文件 hbase.log.dir=/var/log/Bigdata/hbase/client/ //日志路径,根据实际路径修改,但目录要有写入权限 hbase.log.file=hbase-client.log //日志文件名 hbase.log.level=INFO //日志级别,如果需要更详细的日志定位问题,需要修改为DEBUG,修改完需要重启进程才能生效 hbase.log.maxbackupindex=20 //最多保存的日志文件数目 # Security audit appender hbase.security.log.file=hbase-client-audit.log //审计日志文件命令
  • REST接口 通过以下命令可跳过REST接口过滤器获取相应的应用信息。 普通模式下,JobHistory仅支持http协议,故在如下命令的url中请使用http协议。 获取JobHistory中所有应用信息: 命令: curl http://192.168.227.16:4040/api/v1/applications?mode=monitoring --insecure 其中192.168.227.16为JobHistory节点的业务IP,4040为JobHistory的端口号。 结果: [ { "id" : "application_1517290848707_0008", "name" : "Spark Pi", "attempts" : [ { "startTime" : "2018-01-30T15:05:37.433CST", "endTime" : "2018-01-30T15:06:04.625CST", "lastUpdated" : "2018-01-30T15:06:04.848CST", "duration" : 27192, "sparkUser" : "sparkuser", "completed" : true, "startTimeEpoch" : 1517295937433, "endTimeEpoch" : 1517295964625, "lastUpdatedEpoch" : 1517295964848 } ] }, { " id" : "application_1517290848707_0145", "name" : "Spark shell", "attempts" : [ { "startTime" : "2018-01-31T15:20:31.286CST", "endTime" : "1970-01-01T07:59:59.999CST", "lastUpdated" : "2018-01-31T15:20:47.086CST", "duration" : 0, "sparkUser" : "admintest", "completed" : false, "startTimeEpoch" : 1517383231286, "endTimeEpoch" : -1, "lastUpdatedEpoch" : 1517383247086 } ] }] 结果分析: 通过这个命令,可以查询当前集群中所有的Spark应用(包括正在运行的应用和已经完成的应用),每个应用的信息如下表1。 表1 应用常用信息 参数 描述 id 应用的ID name 应用的Name attempts 应用的尝试,包含了开始时间、结束时间、执行用户、是否完成等信息 获取JobHistory中某个应用的信息: 命令: curl http://192.168.227.16:4040/api/v1/applications/application_1517290848707_0008?mode=monitoring --insecure 其中192.168.227.16为JobHistory节点的业务IP,4040为JobHistory的端口号,application_1517290848707_0008为应用的id。 结果: { "id" : "application_1517290848707_0008", "name" : "Spark Pi", "attempts" : [ { "startTime" : "2018-01-30T15:05:37.433CST", "endTime" : "2018-01-30T15:06:04.625CST", "lastUpdated" : "2018-01-30T15:06:04.848CST", "duration" : 27192, "sparkUser" : "sparkuser", "completed" : true, "startTimeEpoch" : 1517295937433, "endTimeEpoch" : 1517295964625, "lastUpdatedEpoch" : 1517295964848 } ] } 结果分析: 通过这个命令,可以查询某个Spark应用的信息,显示的信息如表1所示。 获取正在执行的某个应用的Executor信息: 针对alive executor命令: curl http://192.168.169.84:8088/proxy/application_1478570725074_0046/api/v1/applications/application_1478570725074_0046/executors?mode=monitoring --insecure 针对全部executor(alive&dead)命令: curl http://192.168.169.84:8088/proxy/application_1478570725074_0046/api/v1/applications/application_1478570725074_0046/allexecutors?mode=monitoring --insecure 其中192.168.195.232为ResourceManager主节点的业务IP,8088为ResourceManager的端口号,application_1478570725074_0046为在YARN中的应用ID。 结果: [{ "id" : "driver", "hostPort" : "192.168.169.84:23886", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, "diskUsed" : 0, "activeTasks" : 0, "failedTasks" : 0, "completedTasks" : 0, "totalTasks" : 0, "totalDuration" : 0, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "maxMemory" : 278019440, "executorLogs" : { } }, { "id" : "1", "hostPort" : "192.168.169.84:23902", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, "diskUsed" : 0, "totalCores" : 1, "maxTasks" : 1, "activeTasks" : 0, "failedTasks" : 0, "completedTasks" : 0, "totalTasks" : 0, "totalDuration" : 0, "totalGCTime" : 139, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "maxMemory" : 555755765, "executorLogs" : { "stdout" : "https://XTJ-224:8044/node/containerlogs/container_1478570725074_0049_01_000002/admin/stdout?start=-4096", "stderr" : "https://XTJ-224:8044/node/containerlogs/container_1478570725074_0049_01_000002/admin/stderr?start=-4096" } } ] 结果分析: 通过这个命令,可以查询当前应用的所有Executor信息(包括Driver),每个Executor的信息包含如下表2所示的常用信息。 表2 Executor常用信息 参数 描述 id Executor的ID hostPort Executor所在节点的ip:端口 executorLogs Executor的日志查看路径
  • 功能简介 Spark的REST API以JSON格式展现Web UI的一些指标,提供用户一种更简单的方法去创建新的展示和监控的工具,并且支持查询正在运行的app和已经结束的app的相关信息。开源的Spark REST接口支持对Jobs、Stages、Storage、Environment和Executors的信息进行查询, FusionInsight 版本中添加了查询SQL、JDBC Server和Streaming的信息的REST接口。开源REST接口完整和详细的描述请参考官网上的文档以了解其使用方法:https://spark.apache.org/docs/3.1.1/monitoring.html#rest-api。
  • REST API增强 SQL相关的命令:获取所有SQL语句和执行时间最长的SQL语句 SparkUI命令: curl http://192.168.195.232:8088/proxy/application_1476947670799_0053/api/v1/applications/application_1476947670799_0053/SQL?mode=monitoring --insecure 其中192.168.195.232为ResourceManager主节点的业务IP,8088为ResourceManager的端口号,application_1476947670799_0053为在YARN中的应用ID。 JobHistory命令: curl http://192.168.227.16:4040/api/v1/applications/application_1478570725074_0004/SQL?mode=monitoring --insecure 其中192.168.227.16为JobHistory节点的业务IP,4040为JobHistory的端口号,application_1478570725074_0004为应用ID。 结果: SparkUI命令和JobHistory命令的查询结果均为: { "longestDurationOfCompletedSQL" : [ { "id" : 0, "status" : "COMPLETED", "description" : "getCallSite at SQLExecution.scala:48", "submissionTime" : "2016/11/08 15:39:00", "duration" : "2 s", "runningJobs" : [ ], "successedJobs" : [ 0 ], "failedJobs" : [ ] } ], "sqls" : [ { "id" : 0, "status" : "COMPLETED", "description" : "getCallSite at SQLExecution.scala:48", "submissionTime" : "2016/11/08 15:39:00", "duration" : "2 s", "runningJobs" : [ ], "successedJobs" : [ 0 ], "failedJobs" : [ ] }] } 结果分析: 通过这个命令,可以查询当前应用的所有SQL语句的信息(即结果中“sqls”的部分),执行时间最长的SQL语句的信息(即结果中“longestDurationOfCompletedSQL”的部分)。每个SQL语句的信息如下表3。 表3 SQL的常用信息 参数 描述 id SQL语句的ID status SQL语句的执行状态,有RUNNING、COMPLETED、FAILED三种 runningJobs SQL语句产生的job中,正在执行的job列表 successedJobs SQL语句产生的job中,执行成功的job列表 failedJobs SQL语句产生的job中,执行失败的job列表 JDBC Server相关的命令:获取连接数,正在执行的SQL数,所有session信息,所有SQL的信息 命令: curl http://192.168.195.232:8088/proxy/application_1476947670799_0053/api/v1/applications/application_1476947670799_0053/sqlserver?mode=monitoring --insecure 其中192.168.195.232为ResourceManager主节点的业务IP,8088为ResourceManager的端口号,application_1476947670799_0053为在YARN中的应用ID。 结果: { "sessionNum" : 1, "runningSqlNum" : 0, "sessions" : [ { "user" : "spark", "ip" : "192.168.169.84", "sessionId" : "9dfec575-48b4-4187-876a-71711d3d7a97", "startTime" : "2016/10/29 15:21:10", "finishTime" : "", "duration" : "1 minute 50 seconds", "totalExecute" : 1 } ], "sqls" : [ { "user" : "spark", "jobId" : [ ], "groupId" : "e49ff81a-230f-4892-a209-a48abea2d969", "startTime" : "2016/10/29 15:21:13", "finishTime" : "2016/10/29 15:21:14", "duration" : "555 ms", "statement" : "show tables", "state" : "FINISHED", "detail" : "== Parsed Logical Plan ==\nShowTablesCommand None\n\n== Analyzed Logical Plan ==\ntableName: string, isTemporary: boolean\nShowTablesCommand None\n\n== Cached Logical Plan ==\nShowTablesCommand None\n\n== Optimized Logical Plan ==\nShowTablesCommand None\n\n== Physical Plan ==\nExecutedCommand ShowTablesCommand None\n\nCode Generation: true" } ] } 结果分析: 通过这个命令,可以查询当前JDBC应用的session连接数,正在执行的SQL数,所有的session和SQL信息。每个session的信息如下表4,每个SQL的信息如下表5。 表4 session常用信息 参数 描述 user 该session连接的用户 ip session所在的节点IP sessionId session的ID startTime session开始连接的时间 finishTime session结束连接的时间 duration session连接时长 totalExecute 在该session上执行的SQL数 表5 sql常用信息 参数 描述 user SQL执行的用户 jobId SQL语句包含的job id列表 groupId SQL所在的group id startTime SQL开始时间 finishTime SQL结束时间 duration SQL执行时长 statement 对应的语句 detail 对应的逻辑计划,物理计划 Streaming相关的命令:获取平均输入频率,平均调度时延,平均执行时长,总时延平均值 命令: curl http://192.168.195.232:8088/proxy/application_1477722033672_0008/api/v1/applications/application_1477722033672_0008/streaming/statistics?mode=monitoring --insecure 其中192.168.195.232为ResourceManager主节点的业务IP,8088为ResourceManager的端口号,application_1477722033672_0008为在YARN中的应用ID。 结果: { "startTime" : "2018-12-25T08:58:10.836GMT", "batchDuration" : 1000, "numReceivers" : 1, "numActiveReceivers" : 1, "numInactiveReceivers" : 0, "numTotalCompletedBatches" : 373, "numRetainedCompletedBatches" : 373, "numActiveBatches" : 0, "numProcessedRecords" : 1, "numReceivedRecords" : 1, "avgInputRate" : 0.002680965147453083, "avgSchedulingDelay" : 14, "avgProcessingTime" : 47, "avgTotalDelay" : 62 } 结果分析: 通过这个命令,可以查询当前Streaming应用的平均输入频率(events/sec),平均调度时延(ms),平均执行时长(ms),总时延平均值(ms)。
  • 代码样例 如下是删除文件的代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsExample类。 /** * 删除目录 * * @throws java.io.IOException */ private void rmdir() throws IOException { Path destPath = new Path(DEST_PATH); if (!deletePath(destPath)) { LOG.error("failed to delete destPath " + DEST_PATH); return; } LOG.info("success to delete path " + DEST_PATH); } /** * * @param filePath * @return * @throws java.io.IOException */ private boolean deletePath(final Path filePath) throws IOException { if (!fSystem.exists(filePath)) { return false; } // fSystem.delete(filePath, true); return fSystem.delete(filePath, true); }
  • 代码样例 如下是删除文件的代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsExample类。 /** * 删除文件 * * @throws java.io.IOException */ private void delete() throws IOException { Path beDeletedPath = new Path(DEST_PATH + File.separator + FILE_NAME); if (fSystem.delete(beDeletedPath, true)) { LOG.info("success to delete the file " + DEST_PATH + File.separator + FILE_NAME); } else { LOG.warn("failed to delete the file " + DEST_PATH + File.separator + FILE_NAME); } }
  • 代码样例 如下是读文件的代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsExample类。 /** * 读文件 * * @throws java.io.IOException */ private void read() throws IOException { String strPath = DEST_PATH + File.separator + FILE_NAME; Path path = new Path(strPath); FSDataInputStream in = null; BufferedReader reader = null; StringBuffer strBuffer = new StringBuffer(); try { in = fSystem.open(path); reader = new BufferedReader(new InputStreamReader(in)); String sTempOneLine; // write file while ((sTempOneLine = reader.readLine()) != null) { strBuffer.append(sTempOneLine); } LOG.info("result is : " + strBuffer.toString()); LOG.info("success to read."); } finally { // make sure the streams are closed finally. IOUtils.closeStream(reader); IOUtils.closeStream(in); } }
  • 授权性能测试服务创建私有资源组 使用租户账号登录性能测试服务控制台,在左侧导航栏单击“测试资源”,进入私有资源组页面。 单击“创建私有资源组”,进入授权页面。 勾选“我已阅读并同意《华为云用户协议》”,单击“立即授权”。 界面右上角显示“授权成功”,表示已授权性能测试服务创建私有资源组。 授权成功后,在 IAM 控制台的“委托”界面,系统会自动创建一条名为“cpts_admin_trust”的委托,此委托包含“CCE Administrator”和“VPCEndpoint Administrator”权限。
  • 为IAM用户配置相关的权限 使用租户账号登录IAM控制台,为IAM用户配置如下权限,具体操作请参考 统一身份认证 服务。 编号 场景 配置权限 1 租户级操作CodeArts PerfTest资源场景 CodeArts PerfTest Administrator。 2 用户级操作CodeArts PerfTest资源场景 CodeArts PerfTest Developer,如果需要使用私有资源组,还需要配置CodeArts PerfTest Resource Developer。 3 创建CCE场景 基于场景1或场景2,还需要配置CCE Administrator,ECS CommonOperations,VPC FullAccess,详见 CCE权限管理 。 4 创建私有资源组场景 如果没有创建CCE,需要先按照场景3配置权限,创建CCE。 首次创建私有资源组,需要使用租户账号/管理员账号授权性能测试服务创建/修改租户的CCE和VPCEndPoint。 如果已经配置了CodeArts PerfTest Administrator,无需其他用户权限;如果已经配置了CodeArts PerfTest Developer,还需要配置CodeArts PerfTest Resource Admnistrator。 5 购买套餐包场景 基于场景1或场景2,还需要配置BSS Finance。 6 智能分析场景 应用监控:场景1已集成所需权限;基于场景2,还需要配置 APM FullAccess。 主机监控:场景1或场景2已集成,无需另外添加。
  • 怎样确定压测任务顺序读取全局变量的值? 如果您想确认压测任务是否会顺序读取全局变量的值,可以通过以下步骤测试: 设置全局变量,取值数量建议在10个以内(例如设置取值为6、5、4、3、2、1),方便快速测试。 设置一个用例,在此用例报文的body体中引用1中设置的全局变量,执行阶段设为按次数方式,并发为1、并发次数为10,启动此用例。 在性能报告中的“各项测试指标”中,单击操作图标“”,单击“下载请求日志”,查看请求体中全局变量的取值是否和设置的顺序一致。 通过日志可确认后续的取值是否顺序读取,当读取到最后一个值时,返回读取第一个值。 父主题: 压测工程管理
共100000条