华为云用户手册

  • Oozie应用开发常见概念 流程定义文件 描述业务逻辑的XML文件,包括“workflow.xml”、“coordinator.xml”、“bundle.xml”三类,最终由Oozie引擎解析并执行。 流程属性文件 流程运行期间的参数配置文件,对应文件名为“job.properties”,每个流程定义有且仅有一个该属性文件。 keytab文件 存放用户信息的密钥文件。在安全模式下,应用程序采用此密钥文件进行API方式认证。 Client 客户端直接面向用户,可通过Java API、Shell API、 REST API或者Web UI访问Oozie服务端。
  • 数据规划 首先需要把原日志文件放置在HDFS系统里。 本地新建两个文本文件input_data1.txt和input_data2.txt,将log1.txt中的内容复制保存到input_data1.txt,将log2.txt中的内容复制保存到input_data2.txt。 在HDFS客户端路径下建立一个文件夹,“/tmp/input”,并上传input_data1.txt,input_data2.txt到此目录,命令如下: 在Linux系统HDFS客户端使用命令hadoop fs -mkdir /tmp/input(hdfs dfs命令有同样的作用),创建对应目录。 进入到HDFS客户端下的“/tmp/input”目录,在Linux系统HDFS客户端使用命令在Linux系统HDFS客户端使用命令hadoop fs -put input_data1.txt /tmp/input和hadoop fs -put input_data2.txt /tmp/input,上传数据文件。
  • 场景说明 假定用户有某个周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Spark应用程序实现如下功能: 统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 log2.txt:周日网民停留日志 LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
  • 步骤4:验证Jupyter Notebook访问 MRS 在客户端节点执行如下命令,启动Jupyter Notebook。 PYSPARK_PYTHON=./Python/bin/python3 PYSPARK_DRIVER_PYTHON=jupyter-notebook PYSPARK_DRIVER_PYTHON_OPTS="--allow-root" pyspark --master yarn --executor-memory 2G --driver-memory 1G 在浏览器中输入“弹性IP地址:9999”地址,登录到Jupyter WebUI(保证E CS 的安全组对外放通本地公网IP和9999端口),登录密码为2设置的密码。 图2 登录Jupyter WebUI 创建代码。 创建一个新的python3任务,使用Spark读取文件。 图3 创建Python任务 登录到集群Manager界面,在Yarn的WebUI页面上查看提交的pyspark应用。 图4 查看任务运行情况 验证pandas库调用。 图5 验证pandas
  • 对接Jupyter常见问题 pandas本地import使用时,报错如下: 参考以下步骤进行处理: 执行命令python -m pip install backports.lzma安装lzma模块,如下图所示: 进入“/usr/local/python3/lib/python3.6”目录(机器不同,目录也有所不同,可以通过which命令来查找当前运行python是使用的那个目录的),然后编辑lzma.py文件。 将: from _lzma import * from _lzma import _encode_filter_properties, _decode_filter_properties 更改为: try: from _lzma import * from _lzma import _encode_filter_properties, _decode_filter_properties except ImportError: from backports.lzma import * from backports.lzma import _encode_filter_properties, _decode_filter_properties 修改前: 修改后: 保存退出,然后再次执行import。
  • 步骤3:安装Jupyter Notebook 使用root用户登录客户端节点,执行如下命令安装Jupyter Notebook。 pip3 install jupyter notebook 显示结果如下,表示安装成功: 为保障系统安全,需要生成一个密文密码用于登录Jupyter,放到Jupyter Notebook的配置文件中。 执行如下命令,需要输入两次密码:(进行到Out[3]退出) ipython [root@ecs-notebook python36]# ipython Python 3.6.6 (default, Dec 20 2021, 09:32:25) Type 'copyright', 'credits' or 'license' for more information IPython 7.16.2 -- An enhanced Interactive Python. Type '?' for help. In [1]: from notebook.auth import passwd In [2]: passwd() Enter password: Verify password: Out[2]: 'argon2:$argon2id$v=19$m=10240,t=10,p=8$g14BqLddl927n/unsyPlLQ$YmoKJzbUfNG7LcxylJzm90bgbKWUIiHy6ZV+ObTzdcA 执行如下命令生成Jupyter配置文件。 jupyter notebook --generate-config 修改配置文件。 vi ~/.jupyter/jupyter_notebook_config.py 添加如下配置: # -*- coding: utf-8 -*- c.NotebookApp.ip='*' #此处填写ecs对应的内网IP c.NotebookApp.password = u'argon2:$argon2id$v=19$m=10240,t=10,p=8$NmoAVwd8F6vFP2rX5ZbV7w$SyueJoC0a5TbCuHYzqfSx1vQcFvOTTryR+0uk2MNNZA' # 填写步骤2,Out[2]密码生成的密文 c.NotebookApp.open_browser = False # 禁止自动打开浏览器 c.NotebookApp.port = 9999 # 指定端口号 c.NotebookApp.allow_remote_access = True
  • Jupyter对接方案概述 在MRS服务中可以配合Jupyter Notebook使用PySpark,能够提高机器学习、数据探索和ETL应用开发效率。 本实践指导用户如何在MRS集群中配置Jupyter Notebook来使用Pyspark。 具体流程如下: 步骤1:在MRS集群外节点安装客户端 步骤2:安装Python3 步骤3:安装Jupyter Notebook 步骤4:验证Jupyter Notebook访问MRS 本实践仅适用于MRS 3.x及之后版本,且在集群外客户端节点中安装Python3。
  • 步骤2:安装Python3 使用root用户,登录集群外客户端节点,执行如下命令,检查是否安装了Python3。 python3 --version 是,执行8。 否,执行2。 本案例仅适用于集群外客户端节点安装Python3。 安装Python,此处以Python 3.6.6为例。 执行如下命令,安装相关依赖: yum install zlib zlib-devel zip -y yum install gcc-c++ yum install openssl-devel yum install sqlite-devel -y 如果pandas库需要额外安装如下依赖: yum install -y xz-devel yum install bzip2-devel 下载对应Python版本源码。 wget https://www.python.org/ftp/python/3.6.6/Python-3.6.6.tgz 执行如下命令,解压python源码压缩包,例如下载在“opt”目录下。 cd /opt tar -xvf Python-3.6.6.tgz 创建Python的安装目录,此处以“/opt/python36”为例。 mkdir /opt/python36 编译Python。 cd /opt/python-3.6.6 ./configure --prefix=/opt/python36 执行成功,显示结果如下: 执行make -j8命令,执行成功,显示结果如下: 执行make install命令,执行成功,显示结果如下: 执行如下命令,配置Python环境变量。 export PYTHON_HOME=/opt/python36 export PATH=$PYTHON_HOME/bin:$PATH 执行python3 --version命令,显示结果如下,表示Python已经安装完成。 Python 3.6.6 验证Python3。 pip3 install helloword python3 import helloworld helloworld.say_hello("test") 测试安装第三方Python库(如pandas、sklearn)。 pip3 install pandas pip3 install backports.lzma pip3 install sklearn 执行命令python3 -m pip list,查看安装结果。 打包Python.zip cd /opt/python36/ zip -r python36.zip ./* 上传到HDFS指定目录。 hdfs dfs -mkdir /user/python hdfs dfs -put python36.zip /user/python 配置MRS客户端。 进入Spark客户端安装目录“/opt/client/Spark2x/spark/conf”,在“spark-defaults.conf”配置文件如下参数。 spark.pyspark.driver.python=/usr/bin/python3 spark.yarn.dist.archives=hdfs://hacluster/user/python/python36.zip#Python
  • 访问开源 ZooKeeper 使用“testConnectApacheZk”连接开源ZooKeeper的代码,只需要将以下代码中的“xxx.xxx.xxx.xxx”修改为需要连接的开源的ZooKeeper的IP,端口号按照实际情况修改。如果仅需运行访问第三方Zookeeper的样例,需注释掉main函数中的“testConnectHive”方法。 digestZK = new org.apache.zookeeper.ZooKeeper("xxx.xxx.xxx.xxx:端口号", 60000, null); ZooKeeper连接使用完后需要关闭连接,否则可能导致连接泄露。可根据业务实际情况进行处理,代码如下: //使用try-with-resources方式,try语句执行完后会自动关闭ZooKeeper连接。 try (org.apache.zookeeper.ZooKeeper digestZk = new org.apache.zookeeper.ZooKeeper("xxx.xxx.xxx.xxx:端口号", 600000, null)) { ... }
  • 操作步骤 打开IDEA工具,选择“Create New Project”。 图1 创建工程 在“New Project”页面,选择“Scala”开发环境,并选择“IDEA”,然后单击“Next”。 如果您需要新建Java语言的工程,选择对应参数即可。 图2 选择开发环境 在工程信息页面,填写工程名称和存放路径,设置JDK版本、Scala SDK版本,然后单击“Finish”完成工程创建。 图3 填写工程信息
  • MRS 3.1.2-LTS.3 表2 MRS 3.1.2-LTS.3版本集群Maven仓库的jar版本与组件的对应关系 组件 组件版本 jar版本 Flink 1.12.0 1.12.0-hw-ei-310003 Hive 3.1.0 3.1.0-hw-ei-310003 Tez 0.9.2 0.9.1.0101-hw-ei-12 Spark 2.4.5 2.4.5-hw-ei-310003 CarbonData 2.0.1 - Hadoop 3.1.1 3.1.1-hw-ei-310003 HBase 2.2.3 2.2.3-hw-ei-310003 ZooKeeper 3.5.6 3.5.6-hw-ei-310003 Hue 4.7.0 - Oozie 5.1.0 5.1.0-hw-ei-310003 Flume 1.9.0 - Kafka 2.4.0 2.4.0-hw-ei-310003 Ranger 2.0.0 - ClickHouse 21.3.4.25 0.3.0 scala 2.12 -
  • MRS 3.2.0-LTS.1 表1 MRS 3.2.0-LTS.1版本集群Maven仓库的jar版本与组件的对应关系 组件 组件版本 jar版本 Flink 1.15.0 1.15.0-h0.cbu.mrs.320.r33 Hive 3.1.0 3.1.0-h0.cbu.mrs.320.r33 Tez 0.9.2 0.9.2-h0.cbu.mrs.320.r33 Spark2x 3.1.1 3.1.1-h0.cbu.mrs.320.r33 Hadoop 3.3.1 3.3.1-h0.cbu.mrs.320.r33 HBase 2.2.3 2.2.3-h0.cbu.mrs.320.r33 ZooKeeper 3.6.3 3.6.3-h0.cbu.mrs.320.r33 Hue 4.7.0 - IoTDB 0.14.0 0.14.0-h0.cbu.mrs.320.r33 Oozie 5.1.0 5.1.0-h0.cbu.mrs.320.r33 Flume 1.9.0 1.9.0-h0.cbu.mrs.320.r33 Kafka 2.11-2.4.0 2.4.0-h0.cbu.mrs.320.r33 Ranger 2.0.0 2.0.0-h0.cbu.mrs.320.r33 Phoenix 5.0.0 5.0.0-HBase-2.0-h0.cbu.mrs.320.r33 ClickHouse 22.3.2.2 0.3.1-h0.cbu.mrs.320.r33 Loader 1.99.3 1.99.3-h0.cbu.mrs.320.r33 DBService 2.7.0 - HetuEngine 1.2.0 1.2.0-h0.cbu.mrs.320.r33 CDL 1.0.0 1.0.0-h0.cbu.mrs.320.r33 Guardian 0.1.0 1.0.6-h0.cbu.mrs.321.r28
  • 回答 问题原因: 在IBM JDK下建立的JDBC connection时间超过登录用户的认证超时时间(默认一天),导致认证失败。 IBM JDK的机制跟Oracle JDK的机制不同,IBM JDK在认证登录后的使用过程中做了时间检查却没有检测外部的时间更新,导致即使显式调用relogin也无法得到刷新。 解决措施: 通常情况下,在发现JDBC connection不可用的时候,可以关闭该connection,重新创建一个connection继续执行。
  • HetuEngine基本概念 HSBroker: HetuEngine的服务代理,用作用户租户管理校验,HetuEngine访问URL的获取等。 Coordinator:HetuEngine服务的资源协调者,负责SQL解析和优化等事务。 Worker:负责执行任务和处理数据。 Connector:HetuEngine访问数据库的接口,HetuEngine通过Connector的驱动连接数据源,读取数据源元数据和对数据进行增删改查等操作。 Catalog:HetuEngine中一个catalog配置文件对应一个数据源,一个数据源可以有多个不同catalog配置,可以通过数据源的properties文件进行配置。 Schema:对应数据库的Schema名称。 Table:对应数据库的表名。
  • HetuEngine连接方式说明 表1 连接方式说明 连接方式 是否支持用户名密码认证方式 是否支持Keytab认证方式 是否支持客户端跨网段访问 使用前提 HSFabric 是 是 是 确保业务侧和HetuEngine服务端HSFabric所在业务节点网络互通 适用于双平面的网络场景 只需对外开放HSFabric固定的IP,端口 支持范围:MRS 3.1.3及之后版本 HSBroker 是 否 否 确保业务侧和HetuEngine服务端HSBroker、Coordinator(随机分布在Yarn NodeManger)所在业务节点网络互通 需对外开放Coordinator的IP,端口 支持范围:MRS 3.1.0及之后版本
  • 代码样例 以下代码片段是删除用户的示例,在“rest”包的“UserManager”类的main方法中。 //访问Manager接口完成删除用户 operationName = "DeleteUser"; String deleteJsonStr = "{\"userNames\":[\"user888\"]}"; operationUrl = webUrl + DELETE_USER_URL; httpManager.sendHttpDeleteRequest(httpClient, operationUrl, deleteJsonStr, operationName); LOG .info("Exit main.");
  • 操作场景 为了运行 FusionInsight MRS产品Flink组件的SpringBoot接口样例代码,需要完成下面的操作。当前支持 GaussDB (DWS)样例工程。 该章节以在Linux环境下开发GaussDB(DWS) SpringBoot方式连接Flink服务的应用程序为例。 执行GaussDB(DWS)样例需提前登录GaussDB(DWS)所在节点创建用于接受数据的空表“test_lzh1”,创建命令如下: create table test_lzh1 (id integer not null);
  • Kafka样例工程简介 MRS样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下Kafka相关样例工程: 表1 Kafka相关样例工程 样例工程位置 描述 kafka-examples 单线程生产数据,相关样例请参考使用Producer API向安全Topic生产消息。 单线程消费数据,相关样例请参考使用Consumer API订阅安全Topic并消费。 多线程生产数据,相关样例请参考使用多线程Producer发送消息。 多线程消费数据,相关样例请参考使用多线程Consumer消费消息。 基于KafkaStreams实现WordCount,相关样例请参考使用KafkaStreams统计数据 父主题: Kafka开发指南(普通模式)
  • 代码样例 以下代码片段在“hbase-zk-example\src\main\java\com\huawei\hadoop\hbase\example”包的“TestZKSample”类中,用户主要需要关注“login”和“connectApacheZK”这两个方法。 private static void login(String keytabFile, String principal) throws IOException { conf = HBaseConfiguration.create(); //In Windows environment String confDirPath = TestZKSample.class.getClassLoader().getResource("").getPath() + File.separator;[1] //In Linux environment //String confDirPath = System.getProperty("user.dir") + File.separator + "conf" + File.separator; // Set zoo.cfg for hbase to connect to fi zookeeper. conf.set("hbase.client.zookeeper.config.path", confDirPath + "zoo.cfg"); if (User.isHBaseSecurityEnabled(conf)) { // jaas.conf file, it is included in the client pakcage file System.setProperty("java.security.auth.login.config", confDirPath + "jaas.conf"); // set the kerberos server info,point to the kerberosclient System.setProperty("java.security.krb5.conf", confDirPath + "krb5.conf"); // set the keytab file name conf.set("username.client.keytab.file", confDirPath + keytabFile); // set the user's principal try { conf.set("username.client.kerberos.principal", principal); User.login(conf, "username.client.keytab.file", "username.client.kerberos.principal", InetAddress.getLocalHost().getCanonicalHostName()); } catch (IOException e) { throw new IOException("Login failed.", e); } } } private void connectApacheZK() throws IOException, org.apache.zookeeper.KeeperException { try { // Create apache zookeeper connection. ZooKeeper digestZk = new ZooKeeper("127.0.0.1:2181", 60000, null); LOG.info("digest directory:{}", digestZk.getChildren("/", null)); LOG.info("Successfully connect to apache zookeeper."); } catch (InterruptedException e) { LOG.error("Found error when connect apache zookeeper ", e); } }
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的testGet方法中。 public void testGet() { LOG.info("Entering testGet."); // Specify the column family name. byte[] familyName = Bytes.toBytes("info"); // Specify the column name. byte[][] qualifier = { Bytes.toBytes("name"), Bytes.toBytes("address") }; // Specify RowKey. byte[] rowKey = Bytes.toBytes("012005000201"); Table table = null; try { // Create the Table instance. table = conn.getTable(tableName); // Instantiate a Get object. Get get = new Get(rowKey); // Set the column family name and column name. get.addColumn(familyName, qualifier[0]); get.addColumn(familyName, qualifier[1]); // Submit a get request. Result result = table.get(get); // Print query results. for (Cell cell : result.rawCells()) { LOG.info("{}:{},{},{}", Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneFamily(cell)), Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell))); } LOG.info("Get data successfully."); } catch (IOException e) { LOG.error("Get data failed " ,e); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed " ,e); } } } LOG.info("Exiting testGet."); }
  • 功能介绍 统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。 主要分为三个部分: 从原文件中筛选女性网民上网时间数据信息,通过类CollectionMapper继承Mapper抽象类实现。 汇总每个女性上网时间,并输出时间大于两个小时的女性网民信息,通过类CollectionReducer继承Reducer抽象类实现。 main方法提供建立一个MapReduce job,并提交MapReduce作业到hadoop集群。
  • 查看调测结果 Spark应用程序运行完成后,可通过如下方式查看应用程序的运行情况。 通过运行结果数据查看应用程序运行情况。 结果数据存储路径和格式已经由Spark应用程序指定,可通过指定文件获取。 登录Spark WebUI查看应用程序运行情况。 Spark主要有两个Web页面。 Spark UI页面,用于展示正在执行的应用的运行情况。 页面主要包括了Jobs、Stages、Storage、Environment和Executors五个部分。Streaming应用会多一个Streaming标签页。 页面入口:在YARN的Web UI界面,查找到对应的Spark应用程序。单击应用信息的最后一列“ApplicationMaster”,即可进入SparkUI页面。 History Server页面,用于展示已经完成的和未完成的Spark应用的运行情况。 页面包括了应用ID、应用名称、开始时间、结束时间、执行时间、所属用户等信息。单击应用ID,页面将跳转到该应用的SparkUI页面。 通过Spark日志获取应用程序运行情况。 您可以查看Spark日志了解应用运行情况,并根据日志信息调整应用程序。相关日志信息可参考Spark2x日志介绍。
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“PhoenixSample”类的testSelect方法中。 /** * Select Data */ public void testSelect() { LOG.info("Entering testSelect."); String URL = "jdbc:phoenix:" + conf.get("hbase.zookeeper.quorum"); // Query String querySQL = "SELECT * FROM TEST WHERE id = ?"; Connection conn = null; PreparedStatement preStat = null; Statement stat = null; ResultSet result = null; try { // Create Connection conn = DriverManager.getConnection(url, props); // Create Statement stat = conn.createStatement(); // Create PrepareStatement preStat = conn.prepareStatement(querySQL); // Execute query preStat.setInt(1, 1); result = preStat.executeQuery(); // Get result while (result.next()) { int id = result.getInt("id"); String name = result.getString(1); System.out.println("id: " + id); System.out.println("name: " + name); } LOG.info("Select successfully."); } catch (Exception e) { LOG.error("Select failed.", e); } finally { if (null != result) { try { result.close(); } catch (Exception e2) { LOG.error("Result close failed.", e2); } } if (null != stat) { try { stat.close(); } catch (Exception e2) { LOG.error("Stat close failed.", e2); } } if (null != conn) { try { conn.close(); } catch (Exception e2) { LOG.error("Connection close failed.", e2); } } } LOG.info("Exiting testSelect."); }
  • 代码样例 代码示例中请根据实际情况,修改“OOZIE_URL_DEFALUT”为实际的任意Oozie的主机名,例如“https://10-1-131-131:21003/oozie/”。 public void test(String jobFilePath) { try { runJob(jobFilePath); } catch (Exception exception) { exception.printStackTrace(); } } private void runJob(String jobFilePath) throws OozieClientException, InterruptedException { Properties conf = getJobProperties(jobFilePath); String user = PropertiesCache.getInstance().getProperty("submit_user"); conf.setProperty("user.name", user); // submit and start the workflow job String jobId = oozieClient.run(conf); logger.info("Workflow job submitted: {}" , jobId); // wait until the workflow job finishes printing the status every 10 seconds while (oozieClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) { logger.info("Workflow job running ... {}" , jobId); Thread.sleep(10 * 1000); } // print the final status of the workflow job logger.info("Workflow job completed ... {}" , jobId); logger.info(String.valueOf(oozieClient.getJobInfo(jobId))); } /** * Get job.properties File in filePath * * @param filePath file path * @return job.properties * @since 2020-09-30 */ public Properties getJobProperties(String filePath) { File configFile = new File(filePath); if (!configFile.exists()) { logger.info(filePath , "{} is not exist."); } InputStream inputStream = null; // create a workflow job configuration Properties properties = oozieClient.createConfiguration(); try { inputStream = new FileInputStream(filePath); properties.load(inputStream); } catch (Exception e) { e.printStackTrace(); } finally { if (inputStream != null) { try { inputStream.close(); } catch (IOException ex) { ex.printStackTrace(); } } } return properties; }
  • SpringBoot样例工程的命令行形式运行 在IDEA界面使用Maven执行install。 当输出“BUILD SUC CES S”,表示编译成功,如下图所示。编译成功后将会在样例工程的target下生成含有“flink-dws-sink-example-1.0.0-SNAPSHOT”字段的Jar包。 在Linux上进入客户端安装目录,如“/opt/client/Flink/flink/conf”作为作为运行目录,将1中生成的“target”目录下包名中含有“flink-dws-sink-example-1.0.0-SNAPSHOT”字段的Jar包放进该路径。 执行以下命令创建yarn-session。 yarn-session.sh -t ssl/ -nm "session-spring11" -d 执行以下命令启动SpringBoot服务。 执行GaussDB(DWS)样例 flink run flink-dws-sink-example.jar
  • 代码样例 以下代码片段是修改用户的示例,在“rest”包的“UserManager”类的main方法中。 //访问Manager接口完成修改用户 operationName = "ModifyUser"; String modifyUserName = "user888"; operationUrl = webUrl + MODIFY_USER_URL + modifyUserName; jsonFilePath = "./conf/modifyUser.json"; httpManager.sendHttpPutRequest(httpClient, operationUrl, jsonFilePath, operationName);
  • 解决方案 提交yarn-client模式的结构流任务时需要额外如下操作: 将Spark客户端目录下spark-default.conf文件中的spark.driver.extraClassPath配置复制出来,并将Kafka相关jar包路径追加到该配置项之后,提交结构流任务时需要通过--conf将该配置项给加上。例如:Kafka相关jar包路径为“/kafkadir”,提交任务需要增加--conf spark.driver.extraClassPath=/opt/client/Spark2x/spark/conf/:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/x86/*:/kafkadir/*。 提交yarn-cluster模式的结构流任务时需要额外如下操作: 将Spark客户端目录下spark-default.conf文件中的spark.yarn.cluster.driver.extraClassPath配置给复制出来,并将Kafka相关jar包相对路径追加到该配置项之后,提交结构流任务时需要通过--conf 将该配置项给加上。例如:kafka相关包为kafka-clients-x.x.x.jar,kafka_2.11-x.x.x.jar,提交任务需要增加--conf spark.yarn.cluster.driver.extraClassPath=/home/huawei/Bigdata/common/runtime/security:./kafka-clients-x.x.x.jar:./kafka_2.11-x.x.x.jar。 当前版本Spark结构流部分不再支持kafka2.x之前的版本,对于升级场景请继续使用旧的客户端。
  • 回答 问题原因: 在IBM JDK下建立的Hive connection时间超过登录用户的认证超时时间(默认一天),导致认证失败。 IBM JDK的机制跟Oracle JDK的机制不同,IBM JDK在认证登录后的使用过程中做了时间检查却没有检测外部的时间更新,导致即使显式调用Hive relogin也无法得到刷新。 解决措施: 通常情况下,在发现Hive connection不可用的时候,可以关闭该connection,重新创建一个connection继续执行。
  • 样例代码 -- 查看薪水支付币种为美元的雇员联系方式. SELECT a.name, b.tel_phone, b.email FROM employees_info a JOIN employees_contact b ON(a.id = b.id) WHERE usd_flag='D'; -- 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中. INSERT OVERWRITE TABLE employees_info_extended PARTITION (entrytime = '2014') SELECT a.id, a.name, a.usd_flag, a.salary, a.deductions, a.address, b.tel_phone, b.email FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE a.entrytime = '2014'; -- 使用Hive中已有的函数COUNT(),统计表employees_info中有多少条记录. SELECT COUNT(*) FROM employees_info; -- 查询使用以“cn”结尾的邮箱的员工信息. SELECT a.name, b.tel_phone FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE b.email like '%cn';
  • 扩展使用 配置Hive中间过程的 数据加密 指定表的格式为RCFile(推荐使用)或SequenceFile,加密算法为ARC4Codec。SequenceFile是Hadoop特有的文件格式,RCFile是Hive优化的文件格式。RCFile优化了列存储,在对大表进行查询时,综合性能表现比SequenceFile更优。 set hive.exec.compress.output=true; set hive.exec.compress.intermediate=true; set hive.intermediate.compression.codec=org.apache.hadoop.io.encryption.arc4.ARC4Codec; 自定义函数,具体内容请参见创建Hive用户自定义函数。
共100000条