华为云用户手册

  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在$SPARK_HOME目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkPutExample SparkOnHbaseJavaExample-1.0.jar bulktable cf1 python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample-1.0.jar HBaseBulkPutExample.py bulktable cf1 yarn-cluster模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkPutExample SparkOnHbaseJavaExample-1.0.jar bulktable cf1 python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --jars SparkOnHbaseJavaExample-1.0.jar HBaseBulkPutExample.py bulktable cf1
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseBulkPutExample文件: # -*- coding:utf-8 -*- """ 【说明】 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现 """ from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession spark = SparkSession\ .builder\ .appName("JavaHBaseBulkPutExample")\ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkPutExample') # 创建类实例并调用方法,传递sc._jsc参数 spark._jvm.JavaHBaseBulkPutExample().execute(spark._jsc, sys.argv) # 停止SparkSession spark.stop()
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
  • 操作步骤 打开IDEA工具,选择“Create New Project”。 图1 创建工程 在“New Project”页面,选择“Scala”开发环境,并选择“IDEA”,然后单击“Next”。 如果您需要新建Java语言的工程,选择对应参数即可。 图2 选择开发环境 在工程信息页面,填写工程名称和存放路径,设置JDK版本、Scala SDK版本,然后单击“Finish”完成工程创建。 图3 填写工程信息
  • 原因分析 以自定义UDF为例: 报错信息显示是找不到类。 首先需要确认的是这个类属于的jar包是否在jvm的classpath里面, spark自带的jar都在“spark客户端目录/jars/”。 确认是否存在多个jar包拥有这个类。 如果是其他依赖包,可能是没有使用--jars添加到任务里面。 如果是已经添加到任务里面,但是依旧没有取到,可能是因为配置文件的driver或者executor的classpath配置不正确,可以查看日志确认是否加载到环境。 另外可能报错是类初始化失败导致后面使用这个类的时候出现上述报错,需要确认是否在之前就有初始化失败或者其他报错的情况发生。 报错信息显示找不到方法。 确认这个方法对应的类所在的jar包是否加载到jvm的classpath里面,spark自带的类都在“spark客户端目录/jars/”。 确认是否有多个jar包包含这个类(尤其注意相同工具的不同版本)。 如果报错是Hadoop相关的包,有可能是因为使用的Hadoop版本不一致导致部分方法已经更改。 如果报错的是三方包里面的类,可能是因为Spark已经自带了相关的jar包,但是和代码中使用的版本不一致。
  • 问题现象 Spark能对接很多的第三方工具,因此在使用过程中经常会依赖一堆的三方包。而有一些包 MRS 已经自带,这样就有可能造成代码使用的jar包版本和集群自带的jar包版本不一致,在使用过程中就有可能出现jar包冲突的情况。 常见的jar包冲突报错有: 1、报错类找不到:java.lang.NoClassDefFoundError 2、报错方法找不到:java.lang.NoSuchMethodError
  • 准备开发环境 Hive组件可以使用JDBC/Python/Python3接口进行应用开发,要准备的开发和运行环境分别如下表所示。 表1 JDBC开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows7以上版本。 运行环境:Windows系统或Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置。版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端:Oracle JDK:支持1.8版本;IBM JDK:支持1.8.5.11版本。 TaiShan客户端:OpenJDK:支持1.8.0_272版本。 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装和配置IntelliJ IDEA 用于开发Hive应用程序的工具。版本要求如下: JDK使用1.8版本,IntelliJ IDEA使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 准备开发用户 参考准备MRS应用开发用户进行操作,准备用于应用开发的集群用户并授予相应权限。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-Zip 16.04版本。 表2 Python开发环境 准备项 说明 操作系统 开发环境和运行环境:Linux系统。 安装Python 用于开发Hive应用程序的工具,版本要求不低于2.6.6,最高不超过2.7.13。 安装setuptools Python开发环境的基本配置,版本要求5.0以上。 准备开发用户 参考准备MRS应用开发用户进行操作,准备用于应用开发的集群用户并授予相应权限。 Python开发工具的详细安装配置可参见配置Hive Python样例工程。 表3 Python3开发环境 准备项 说明 操作系统 开发环境和运行环境:Linux系统。 安装Python3 用于开发Hive应用程序的工具,版本要求不低于3.6,最高不超过3.8。 安装setuptools Python3开发环境的基本配置,版本要求为47.3.1。 准备开发用户 参考准备MRS应用开发用户进行操作,准备用于应用开发的集群用户并授予相应权限。 Python3开发工具的详细安装配置可参见配置Hive Python3样例工程。
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseBulkGetExample文件: # -*- coding:utf-8 -*- """ 【说明】 (1)由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现 (2)如果使用yarn-client模式运行,请确认Spark2x客户端Spark2x/spark/conf/spark-defaults.conf中 spark.yarn.security.credentials.hbase.enabled参数配置为true """ from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession spark = SparkSession\ .builder\ .appName("JavaHBaseBulkGetExample")\ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkGetExample') # 创建类实例并调用方法,传递sc._jsc参数 spark._jvm.JavaHBaseBulkGetExample().execute(spark._jsc, sys.argv) # 停止SparkSession spark.stop()
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala 版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkGetExample SparkOnHbaseJavaExample.jar bulktable python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample.jar HBaseBulkGetExample.py bulktable yarn-cluster模式: java/scala 版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkGetExample --files /opt/user.keytab,/opt/krb5.conf SparkOnHbaseJavaExample.jar bulktable python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode cluster --files /opt/user.keytab,/opt/krb5.conf --jars SparkOnHbaseJavaExample.jar HBaseBulkGetExample.py bulktable
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上(文件上传的路径需要和生成的jar包路径一致)。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
  • ClickHouse简介 ClickHouse是面向联机分析处理的列式数据库,支持SQL查询,且查询性能好,特别是基于大宽表的聚合分析查询性能非常优异,比其他分析型数据库速度快一个数量级。 ClickHouse的设计优点: 数据压缩比高 多核并行计算 向量化计算引擎 支持嵌套数据结构 支持稀疏索引 支持数据Insert和Update ClickHouse的应用场景: 实时数仓场景 使用流式计算引擎(如Flink)把实时数据写入ClickHouse,借助ClickHouse的优异查询性能,在亚秒级内响应多维度、多模式的实时查询分析请求。 离线查询场景 把规模庞大的业务数据导入到ClickHouse,构造数亿至数百亿记录规模、数百以上的维度的大宽表,随时进行个性化统计和持续探索式查询分析,辅助商业决策,具有非常好的查询体验。
  • IoTDB样例工程介绍 MRS样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下IoTDB相关样例工程: 表1 IoTDB相关样例工程 样例工程位置 描述 iotdb-examples/iotdb-flink-example 通过Flink访问IoTDB数据的示例程序,包括FlinkIoTDBSink和FlinkIoTDBSource。 FlinkIoTDBSink可实现通过Flink job将时序数据写入到IoTDB中。FlinkIoTDBSource则通过Flink job将时序数据从IoTDB读取出来并且打印,,相关样例介绍请参见IoTDB Flink样例程序。 iotdb-examples/iotdb-jdbc-example IoTDB JDBC处理数据Java示例程序。 本示例演示了如何使用JDBC接口连接IoTDB,并执行IoTDB SQL语句,相关样例介绍请参见IoTDB JDBC样例程序。 iotdb-examples/iotdb-kafka-example 通过Kafka访问IoTDB数据的示例程序。 本示例演示了如何先将时序数据发送到Kafka,再使用多线程将数据写入到IoTDB中,相关样例介绍请参见IoTDB Kafka样例程序。 iotdb-examples/iotdb-session-example IoTDB Session处理数据Java示例程序。 本示例演示了如何使用Session方式连接IoTDB,并执行IoTDB SQL语句,相关样例介绍请参见IoTDB Session样例程序。 iotdb-examples/iotdb-udf-exmaple 该样例程序介绍了如何实现一个简单的IoTDB自定义函数(UDF),相关样例介绍请参见IoTDB自定义函数(UDF)样例程序。
  • 常用概念 以电力场景为例,说明如何在IoTDB中创建一个正确的数据模型。 图1 电力场景属性层级组织结构 如图1所示,即“电力集团层-电厂层-设备层-传感器层”。其中ROOT为根节点,传感器层的每一个节点为叶子节点。IoTDB的语法规定,ROOT节点到叶子节点的路径以“.”连接,以此完整路径命名IoTDB中的一个时间序列。例如,图1最左侧路径对应的时间序列名称为“ROOT.ln.wf01.wt01.status”。 基本概念: 设备 设备指的是在实际场景中拥有传感器的装置。在IoTDB当中,所有的传感器都应有其对应的归属的设备。 传感器 传感器是指在实际场景中的一种检测装置,它能感受到被测量的信息,并能将感受到的信息按一定规律变换成为电信号或其他所需形式的信息输出并发送给IoTDB。在IoTDB当中,存储的所有的数据及路径,都是以传感器为单位进行组织。 存储组 用户可以将任意前缀路径设置成存储组。如果有4条时间序列,如“root.vehicle.d1.s1”、“root.vehicle.d1.s2”、“root.vehicle.d2.s1”和“root.vehicle.d2.s2”,路径“root.vehicle”下的两个设备d1、d2可能属于同一个业主或者同一个厂商,因此关系紧密。这时候就可以将前缀路径“root.vehicle”指定为一个存储组,这将使得IoTDB将其下的所有设备的数据存储在同一个文件夹下。如果以后“root.vehicle”路径下增加了新的设备,也将属于该存储组。 设置合理数量的存储组可以带来性能的提升。既不会因为产生过多的存储文件(夹)导致频繁切换IO降低系统速度(并且会占用大量内存且出现频繁的内存-文件切换),也不会因为过少的存储文件夹(降低了并发度)导致写入命令阻塞。 用户应根据自己的数据规模和使用场景,平衡存储文件的存储组设置,以达到更好的系统性能。 时间序列 时间序列是IoTDB中的核心概念。时间序列可以被看作产生时序数据的传感器所在的完整路径,在IoTDB中所有的时间序列必须以root开始、以传感器作为结尾。
  • 典型接口说明 以下仅对Manager REST API开发过程中的典型方法进行描述。 表1 restApiDemo.src.rest.BasicAuthAcces 方法 描述 loginAndAccess (String webUrl,String userName,String password,String userTLSVersion) webUrl 集群首页地址,从配置文件“UserInfo.properties”中获取。 userName 登录 FusionInsight 系统的用户名,从配置文件“UserInfo.properties”中获取。 password userName对应的密码,从配置文件“UserInfo.properties”中获取。 userTLSVersion TSL的版本。 返回类型:HttpClient 返回:httpClient 该接口实现Basic认证登录,并返回登录后的HttpClient,登录过程中用户只需要调用一个接口,简化了使用过程。 该接口的入参是从配置文件“UserInfo.properties”中获取的,该文件中的参数需要用户填写,该接口还会调用BasicAuthAccess类内部的多个方法。 表2 restApiDemo.src.rest.BasicAuthAcces.HttpManager 方法 描述 sendHttpGetRequest(HttpClient httpClient, String operationUrl, String operationName) 参数:HttpClient httpCient,登录认证完成后的返回结果。 operationUrl httpGet操作对应的URL。 operationName 具体操作的名称。 sendHttpPostRequest(HttpClient httpClient, String operationUrl, String jsonFilePath, String operationName) 参数:HttpClient httpClient,登录认证完成后的返回结果。 operationUrl httpPost操作对应的URL。 jsonFilePath httpPost操作对应的json文件。 operationName 具体操作名称。 sendHttpPostRequestWithString(HttpClient httpClient, String operationUrl, String jsonString, String operationName) 参数:HttpClient httpClient,登录认证完成后的返回结果。 operationUrl httpPost操作对应的URL。 jsonString httpPost操作对应的json的String格式。 operationName 具体操作名称。 sendHttpPutRequest(HttpClient httpclient, String operationUrl, String jsonFilePath, String operationName) 参数:HttpClient httpClient,登录认证完成后的返回结果。 operationUrl httpPut操作对应的URL。 jsonFilePath httpPut操作对应的json文件。 operationName 具体操作名称。 sendHttpPutRequestWithString(HttpClient httpclient, String operationUrl, String jsonString, String operationName) 参数:HttpClient httpClient,登录认证完成后的返回结果。 operationUrl httpPut操作对应的URL。 jsonString httpPut操作对应的json的String格式。 operationName 具体操作名称。 sendHttpDeleteRequest(HttpClient httpClient, String operationUrl, String jsonString, String operationName) 参数:HttpClient httpCient,登录认证完成后的返回结果。 operationUrl httpDelete操作对应的URL。 jsonString httpDelete操作对应的json的String格式。 operationName 具体操作名称。 以上接口的功能是发送http请求。调用以上接口,用户只需提供每类操作对应的URL,以及操作对应的json文件或json的String格式,无需编写中间的执行代码,减少了代码编写量,简化了用户执行各类操作的步骤。 以上接口会返回请求对应的命令编号,方便用户根据命令编号查询命令执行的进度。
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“PhoenixSample”类的testCreateTable方法中。 /** * Create Table */ public void testCreateTable() { LOG .info("Entering testCreateTable."); String URL = "jdbc:phoenix:" + conf.get("hbase.zookeeper.quorum"); // Create table String createTableSQL = "CREATE TABLE IF NOT EXISTS TEST (id integer not null primary key, name varchar, " + "account char(6), birth date)"; try (Connection conn = DriverManager.getConnection(url, props); Statement stat = conn.createStatement()) { // Execute Create SQL stat.executeUpdate(createTableSQL); LOG.info("Create table successfully."); } catch (Exception e) { LOG.error("Create table failed.", e); } LOG.info("Exiting testCreateTable."); } /** * Drop Table */ public void testDrop() { LOG.info("Entering testDrop."); String URL = "jdbc:phoenix:" + conf.get("hbase.zookeeper.quorum"); // Delete table String dropTableSQL = "DROP TABLE TEST"; try (Connection conn = DriverManager.getConnection(url, props); Statement stat = conn.createStatement()) { stat.executeUpdate(dropTableSQL); LOG.info("Drop successfully."); } catch (Exception e) { LOG.error("Drop failed.", e); } LOG.info("Exiting testDrop."); }
  • 场景说明 假定用户有某个网站周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Flink的DataStream应用程序实现如下功能: DataStream应用程序可以在Windows环境和Linux环境中运行。 实时统计总计网购时间超过2个小时的女性网民信息。 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志。该日志文件在该样例程序中的data目录下获取。 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 log2.txt:周日网民停留日志。该日志文件在该样例程序中的data目录下获取。 LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
  • 数据规划 DataStream样例工程的数据存储在文本中。 将log1.txt和log2.txt放置在指定路径下,例如"/opt/log1.txt"和"/opt/log2.txt"。 数据文件若存放在本地文件系统,需在所有部署Yarn NodeManager的节点指定目录放置,并设置运行用户访问权限。 若将数据文件放置于HDFS,需指定程序中读取文件路径HDFS路径,例如"hdfs://hacluster/path/to/file"。
  • 前提条件 已按照准备本地应用开发环境章节准备好开发用户,例如developuser,并下载用户的认证凭据文件到本地。 用户需要具备Oozie的普通用户权限,HDFS访问权限,Hive表读写权限,HBase读写权限以及Yarn的队列提交权限。 已在Linux环境中安装了完整的集群客户端。 获取Oozie服务器URL(任意节点),这个URL将是客户端提交流程任务的目标地址。 URL格式为:https://Oozie节点业务IP:21003/oozie。端口为“OOZIE_HTTPS_PORT”参数对应值,默认为21003。 例如,“https://10.10.10.176:21003/oozie”。
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseForEachPartitionExample文件: # -*- coding:utf-8 -*- """ 【说明】 (1)由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现 (2)如果使用yarn-client模式运行,请确认Spark2x客户端Spark2x/spark/conf/spark-defaults.conf中 spark.yarn.security.credentials.hbase.enabled参数配置为true """ from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession spark = SparkSession\ .builder\ .appName("JavaHBaseForEachPartitionExample")\ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseForEachPartitionExample') # 创建类实例并调用方法,传递sc._jsc参数 spark._jvm.JavaHBaseForEachPartitionExample().execute(spark._jsc, sys.argv) # 停止SparkSession spark.stop()
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上(文件上传的路径需要和生成的jar包路径一致)。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseForEachPartitionExample SparkOnHbaseJavaExample.jar table2 cf1 python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample.jar HBaseForEachPartitionExample.py table2 cf1 yarn-cluster模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseForEachPartitionExample --files /opt/user.keytab,/opt/krb5.conf SparkOnHbaseJavaExample.jar table2 cf1 python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode cluster --files /opt/user.keytab,/opt/krb5.conf --jars SparkOnHbaseJavaExample.jar HBaseForEachPartitionExample.py table2 cf1
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseForEachPartitionExample SparkOnHbaseJavaExample-1.0.jar table2 cf1 python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample-1.0.jar HBaseForEachPartitionExample.py table2 cf1 yarn-cluster模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseForEachPartitionExample SparkOnHbaseJavaExample-1.0.jar table2 cf1 python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --jars SparkOnHbaseJavaExample-1.0.jar HBaseForEachPartitionExample.py table2 cf1
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseForEachPartitionExample文件: # -*- coding:utf-8 -*- """ 【说明】 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现 """ from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession spark = SparkSession\ .builder\ .appName("JavaHBaseForEachPartitionExample")\ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseForEachPartitionExample') # 创建类实例并调用方法,传递sc._jsc参数 spark._jvm.JavaHBaseForEachPartitionExample().execute(spark._jsc, sys.argv) # 停止SparkSession spark.stop()
  • 回答 Spark部署时,如下jar包存放在客户端的“${SPARK_HOME}/jars/streamingClient010”目录以及服务端的“${BIGDATA_HOME}/FusionInsight_Spark2x_8.1.0.1/install/FusionInsight-Spark2x-3.1.1/spark/jars/streamingClient010”目录: kafka-clients-xxx.jar kafka_2.12-xxx.jar spark-streaming-kafka-0-10_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar spark-token-provider-kafka-0-10_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar 由于“$SPARK_HOME/jars/streamingClient010/*”默认没有添加到classpath,所以需要手动配置。 在提交应用程序运行时,在命令中添加如下参数即可,详细示例可参考在Linux环境中调测Spark应用。 --jars $SPARK_CLIENT_HOME/jars/streamingClient010/kafka-clients-2.4.0.jar,$SPARK_CLIENT_HOME/jars/streamingClient010/kafka_2.12-*.jar,$SPARK_CLIENT_HOME/jars/streamingClient010/spark-streaming-kafka-0-10_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar 用户自己开发的应用程序以及样例工程都可使用上述命令提交。 但是Spark开源社区提供的KafkaWordCount等样例程序,不仅需要添加--jars参数,还需要配置其他,否则会报“ClassNotFoundException”错误,yarn-client和yarn-cluster模式下稍有不同。 yarn-client模式下 在除--jars参数外,在客户端“spark-defaults.conf”配置文件中,将“spark.driver.extraClassPath”参数值中添加客户端依赖包路径,如“$SPARK_HOME/jars/streamingClient010/*”。 yarn-cluster模式下 除--jars参数外,还需要配置其他,有三种方法任选其一即可,具体如下: 在客户端spark-defaults.conf配置文件中,在“spark.yarn.cluster.driver.extraClassPath”参数值中添加服务端的依赖包路径,如“${BIGDATA_HOME}/FusionInsight_Spark2x_8.1.0.1/install/FusionInsight-Spark2x-3.1.1/spark/jars/streamingClient010/*”。 将各服务端节点的“original-spark-examples_2.12-3.1.1-xxx.jar”包删除。 在客户端“spark-defaults.conf”配置文件中,修改或增加配置选项“spark.driver.userClassPathFirst” = “true”。
  • 操作步骤 参考导入并配置HetuEngine Python3样例工程章节,获取样例代码,获取hetu-jdbc-XXX.jar文件,并复制到自定义目录中。 参考通过HSFabric的KeyTab认证实现查询HetuEngine SQL任务章节,获取“user.keytab”和“krb5.conf”文件,并放置到自定义目录中。 编辑样例代码,根据集群实际情况修改url、user、password等信息,并根据实际路径修改“jdbc_location”。 Windows系统路径填写示例:"D:\\hetu-examples-python3\\hetu-jdbc-XXX.jar" Linux系统路径示例:"/opt/hetu-examples-python3/hetu-jdbc-XXX.jar" 运行python3样例代码。 Windows直接通过pycharm或者Python IDLE运行py脚本。 Linux运行样例代码需要已安装Java。 进入样例代码路径并执行py脚本,样例代码路径如“/opt/hetu-examples-python3”: cd /opt/hetu-examples-python3 python3 JDBCExampleBroker.py 运行结果。 Windows系统中,在console界面可以看到运行结果示例: 图1 Windows系统运行结果 Linux系统中运行结果示例: Aug 12, 2023 5:19:53 PM io.XXX.jdbc.$internal.airlift.log.Logger info INFO: hsbroker finalUri is https://192.168.43.223:29860 Aug 12, 2023 5:19:53 PM io.XXX.jdbc.$internal.airlift.log.Logger info INFO: The final connection url is: //192.168.43.161:29888/hive/default Aug 12, 2023 5:19:53 PM io.XXX.jdbc.$internal.airlift.log.Logger info INFO: coordinator uri is //192.168.43.161:29888/hive/default [('default',), ('information_schema',)]
  • 数据规划 发布者Job使用自定义算子每秒钟产生10000条数据 数据包含两个属性:分别是Int和String类型 配置文件 nettyconnector.registerserver.topic.storage:设置NettySink的IP、端口及并发度信息在第三方注册服务器上的路径(必填),例如: nettyconnector.registerserver.topic.storage: /flink/nettyconnector nettyconnector.sinkserver.port.range:设置NettySink的端口范围(必填),例如: nettyconnector.sinkserver.port.range: 28444-28943 nettyconnector.sinkserver.subnet:设置网络所属域,例如: nettyconnector.sinkserver.subnet: 10.162.0.0/16 接口说明 注册服务器接口 注册服务器用来保存NettySink的IP、端口以及并发度信息,以便NettySource连接使用。为用户提供以下接口: public interface RegisterServerHandler { /** * 启动注册服务器 * @param configuration Flink的Configuration类型 */ void start(Configuration configuration) throws Exception; /** *注册服务器上创建Topic节点(目录) * @param topic topic节点名称 */ void createTopicNode(String topic) throw Exception; /** *将信息注册到某个topic节点(目录)下 * @param topic 需要注册到的目录 * @param registerRecord 需要注册的信息 */ void register(String topic, RegisterRecord registerRecord) throws Exception; /** *删除topic节点 * @param topic 待删除topic */ void deleteTopicNode(String topic) throws Exception; /** *注销注册信息 *@param topic 注册信息所在的topic *@param recordId 待注销注册信息ID */ void unregister(String topic, int recordId) throws Exception; /** * 查寻信息 * @param 查询信息所在的topic *@recordId 查询信息的ID */ RegisterRecord query(String topic, int recordId) throws Exception; /** * 查询某个Topic是否存在 * @param topic */ Boolean isExist(String topic) throws Exception; /** *关闭注册服务器句柄 */ void shutdown() throws Exception; 工程基于以上接口提供了ZookeeperRegisterHandler供用户使用。 NettySink算子 Class NettySink(String name, String topic, RegisterServerHandler registerServerHandler, int numberOfSubscribedJobs) name:为本NettySink的名称。 topic:为本NettySink产生数据的Topic,每个不同的NettySink(并发度除外)必须使用不同的TOPIC,否则会引起订阅混乱,数据无法正常分发。 registerServerHandler:为注册服务器的句柄。 numberOfSubscribedJobs:为订阅本NettySink的作业数量,该数量必须是明确的,只有当所有订阅者都连接上NettySink,NettySink才发送数据。 NettySource算子 Class NettySource(String name, String topic, RegisterServerHandler registerServerHandler) name:为本NettySource的名称,该NettySource必须是唯一的(并发度除外),否则,连接NettySink时会出现冲突,导致无法连接。 topic:订阅的NettySink的topic。 registerServerHandler:为注册服务器的句柄。 NettySource的并发度必须与NettySource的并发度相同,否则无法正常创建连接。
  • 权限要求 表1 操作权限要求 操作类型/作用对象 操作 权限要求 DATABASE CREATE DATABASE dbname [LOCATION "hdfs_path"] 如果指定了HDFS路径hdfs_path,需要是路径hdfs_path的所有者和具有RWX权限。 DROP DATABASE dbname 拥有数据库dbname的所有权。 ALTER DATABASE dbnameSET OWNERuser_or_role 具有admin权限。 TABLE CREATE TABLE table_a 拥有数据库的CREATE权限。 CREATE TABLE table_aAS SELECTtable_b 拥有数据库的CREATE权限,对表table_b拥有SELECT权限。 CREATE TABLE table_a LIKEtable_b 拥有数据库的CREATE权限。 CREATE [EXTERNAL] TABLE table_a LOCATION "hdfs_path" 拥有数据库的CREATE权限,是HDFS上的数据路径hdfs_path的所有者和具有RWX权限。 DROP TABLE table_a 是表table_a的所有者。 ALTER TABLE table_a SET LOCATION "hdfs_path" 是表table_a的所有者,是HDFS上的数据路径hdfs_path的所有者和具有RWX权限。 ALTER TABLE table_aSETFILEFORMAT 是表table_a的所有者。 TRUNCATE TABLE table_a 是表table_a的所有者。 ANALYZE TABLE table_a COMPUTE STATIS TICS 对表table_a拥有SELECT和INSERT权限。 SHOW TBLPROPERTIES table_a 对表table_a拥有SELECT权限。 SHOW CREATE TABLE table_a 对表table_a拥有SELECT且带有WITH GRANT OPTION的权限。 Alter ALTER TABLE table_a ADD COLUMN 是表table_a的所有者。 ALTER TABLE table_a REPLACE COLUMN 是表table_a的所有者。 ALTER TABLE table_a RENAME 是表table_a的所有者。 ALTER TABLE table_a SET SERDE 是表table_a的所有者。 ALTER TABLE table_a CLUSTER BY 是表table_a的所有者。 PARTITION ALTER TABLE table_a ADD PARTITIONpartition_spec LOCATION "hdfs_path" 对表table_a拥有INSERT权限,是HDFS上的数据路径hdfs_path的所有者和具有RWX权限。 ALTER TABLE table_a DROP PARTITIONpartition_spec 对表table_a拥有DELETE权限。 ALTER TABLE table_a PARTITIONpartition_spec SET LOCATION "hdfs_path" 是表table_a的所有者,是HDFS上的数据路径hdfs_path的所有者和具有RWX权限。 ALTER TABLE table_aPARTITIONpartition_spec SET FILEFORMAT 是表table_a的所有者。 LOAD LOAD INPATH 'hdfs_path' INTO TABLE table_a 对表table_a拥有INSERT权限,是HDFS上的数据路径hdfs_path的所有者和具有RWX权限。 INSERT INSERT TABLE table_a SELECT FROMtable_b 对表table_a拥有INSERT权限,对表table_b拥有SELECT权限。拥有Yarn的default队列的Submit权限。 SELECT SELECT * FROM table_a 对表table_a拥有SELECT权限。 SELECT FROM table_aJOINtable_b 对表table_a、表table_b拥有SELECT权限,拥有Yarn的default队列的Submit权限。 SELECT FROM (SELECT FROM table_aUNION ALL SELECT FROMtable_b) 对表table_a、表table_b拥有SELECT权限。拥有Yarn的default队列的Submit权限。 EXPLAIN EXPLAIN [EXTENDED|DEPENDENCY] query 对相关表目录具有RX权限。 VIEW CREATE VIEW view_name AS SELECT ... 对相关表拥有SELECT且带有WITH GRANT OPTION的权限。 ALTER VIEW view_name RENAME TOnew_view_name 是视图view_name的所有者。 DROP VIEW view_name 是视图view_name的所有者。 FUNCTION CREATE [TEMPORARY] FUNCTION function_name AS 'class_name' 具有admin权限。 DROP [TEMPORARY] function_name 具有admin权限。 MACRO CREATE TEMPORARY MACRO macro_name ... 具有admin权限。 DROP TEMPORARY MACRO macro_name 具有admin权限。 以上所有的操作只要拥有Hive的admin权限以及对应的HDFS目录权限就能做相应的操作。 如果当前组件使用了Ranger进行权限控制,需基于Ranger配置相关策略进行权限管理,具体操作可参考添加Hive的Ranger访问权限策略章节。
  • Hive介绍 Hive是一个开源的,建立在Hadoop上的 数据仓库 框架,提供类似SQL的HQL语言操作结构化数据,其基本原理是将HQL语言自动转换成Mapreduce任务或Spark任务,从而完成对Hadoop集群中存储的海量数据进行查询和分析。 Hive主要特点如下: 通过HQL语言非常容易的完成数据提取、转换和加载(ETL)。 通过HQL完成海量结构化数据分析。 灵活的数据存储格式,支持JSON、 CS V、TEXTFILE、RCFILE、ORCFILE、SEQUENCEFILE等存储格式,并支持自定义扩展。 多种客户端连接方式,支持JDBC接口。 Hive主要应用于海量数据的离线分析(如 日志分析 ,集群状态分析)、大规模的数据挖掘(用户行为分析,兴趣分区,区域展示)等场景下。 为保证Hive服务的高可用性、用户数据的安全及访问服务的可控制,在开源社区的Hive-3.1.0版本基础上,Hive新增如下特性: 基于Kerberos技术的安全认证机制。 数据文件加密机制。 完善的权限管理。 开源社区的Hive特性,请参见https://cwiki.apache.org/confluence/display/hive/designdocs。
  • 常用概念 keytab文件 存放用户信息的密钥文件。应用程序采用此密钥文件在MRS产品中进行API方式认证。 客户端 客户端直接面向用户,可通过Java API、Thrift API访问服务端进行Hive的相关操作。 HQL语言 Hive Query Language,类SQL语句。 HCatalog HCatalog是建立在Hive元数据之上的一个表信息管理层,吸收了Hive的DDL命令。为Mapreduce提供读写接口,提供Hive命令行接口来进行数据定义和元数据查询。基于MRS的HCatalog功能,Hive、Mapreduce开发人员能够共享元数据信息,避免中间转换和调整,能够提升数据处理的效率。 WebHCat WebHCat运行用户通过Rest API来执行Hive DDL,提交Mapreduce任务,查询Mapreduce任务执行结果等操作。
共100000条