华为云用户手册

  • 代码样例 以下代码片段是添加用户的示例,在“rest”包的“UserManager”类的main方法中。 //访问Manager接口完成添加用户 operationName = "AddUser"; operationUrl = webUrl + ADD_USER_URL; jsonFilePath = "./conf/addUser.json"; httpManager.sendHttpPostRequest(httpClient, operationUrl, jsonFilePath, operationName)
  • Kafka样例工程简介 MRS 样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下Kafka相关样例工程: 表1 Kafka相关样例工程 样例工程位置 描述 kafka-examples 单线程生产数据,相关样例请参考使用Producer API向安全Topic生产消息。 单线程消费数据,相关样例请参考使用Consumer API订阅安全Topic并消费。 多线程生产数据,相关样例请参考使用多线程Producer发送消息。 多线程消费数据,相关样例请参考使用多线程Consumer消费消息。 基于KafkaStreams实现WordCount,相关样例请参考使用KafkaStreams统计数据 父主题: Kafka开发指南(普通模式)
  • 参数解释 “job.properties”文件中包含的各参数及其含义,请参见表1。 表1 参数含义 参数 含义 nameNode HDFS NameNode集群地址 resourceManager Yarn ResourceManager地址 queueName 流程任务处理时使用的MapReduce队列名 dataLoadRoot 流程任务所在目录名 oozie.coord.application.path Coordinator流程任务在HDFS上的存放路径 start 定时流程任务启动时间 end 定时流程任务终止时间 workflowAppUri Workflow流程任务在HDFS上的存放路径 可以根据业务需要,以“key=values”的格式自定义参数及值。
  • 样例代码 nameNode=hdfs://haclusterresourceManager=10.1.130.10:26004queueName=QueueAdataLoadRoot=examplesoozie.coord.application.path=${nameNode}/user/oozie_cli/${dataLoadRoot}/apps/dataLoadstart=2013-04-02T00:00Zend=2014-04-02T00:00ZworkflowAppUri=${nameNode}/user/oozie_cli/${dataLoadRoot}/apps/dataLoad
  • 场景说明 该样例以MapReduce访问HDFS、HBase、Hive为例,介绍如何编写MapReduce作业访问多个服务组件。帮助用户理解认证、配置加载等关键使用方式。 该样例逻辑过程如下: 以HDFS文本文件为输入数据: log1.txt:数据输入文件 YuanJing,male,10GuoYijun,male,5 Map阶段: 获取输入数据的一行并提取姓名信息。 查询HBase一条数据。 查询Hive一条数据。 将HBase查询结果与Hive查询结果进行拼接作为Map输出。 Reduce阶段: 获取Map输出中的最后一条数据。 将数据输出到HBase。 将数据保存到HDFS。
  • 数据规划 创建HDFS数据文件。 在Linux系统上新建文本文件,将log1.txt中的内容复制保存到data.txt。 在HDFS上创建一个文件夹“/tmp/examples/multi-components/mapreduce/input/”,并上传data.txt到此目录,命令如下: 登录HDFS客户端。 cd 客户端安装目录 source bigdata_env 在Linux系统HDFS客户端使用命令hdfs dfs -mkdir -p/tmp/examples/multi-components/mapreduce/input/ 在Linux系统HDFS客户端使用命令hdfs dfs -putdata.txt /tmp/examples/multi-components/mapreduce/input/ 创建HBase表并插入数据。 在Linux系统HBase客户端执行source bigdata_env,并使用命令hbase shell。 在HBase shell交互窗口创建数据表table1,该表有一个列族cf,使用命令create 'table1', 'cf'。 插入一条rowkey为1、列名为cid、数据值为123的数据,使用命令put 'table1', '1', 'cf:cid', '123'。 执行命令quit退出。 创建Hive表并载入数据。 在Linux系统Hive客户端使用命令beeline。 在Hive beeline交互窗口创建数据表person,该表有3个字段:name/gender/stayTime,使用命令CREATE TABLE person(name STRING, gender STRING, stayTime INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as textfile;。 在Hive beeline交互窗口加载数据文件,LOAD DATA INPATH'/tmp/examples/multi-components/mapreduce/input/'OVERWRITE INTO TABLE person;。 执行命令!q退出。 由于Hive加载数据将HDFS对应数据目录清空,所以需再次执行1。
  • 参数解释 “coordinator.xml”中包含的各参数及其含义,请参见表1。 表1 参数含义 参数 含义 frequency 流程定时执行的时间间隔 start 定时流程任务启动时间 end 定时流程任务终止时间 workflowAppUri Workflow流程任务在HDFS上的存放路径 resourceManager MapReduce ResourceManager地址 queueName 任务处理时使用的Mapreduce队列名 nameNode HDFS NameNode地址 “${变量名}”表示:该值来自“job.properties”所定义。 例如:${nameNode}表示的就是“hdfs://hacluster”。(可参见job.properties)
  • 代码样例 代码示例中请根据实际情况,修改“OOZIE_URL_DEFALUT”为实际的任意Oozie的主机名,例如“https://10-1-131-131:21003/oozie/”。 public void test(String jobFilePath) { try { runJob(jobFilePath); } catch (Exception exception) { exception.printStackTrace(); } } private void runJob(String jobFilePath) throws OozieClientException, InterruptedException { Properties conf = getJobProperties(jobFilePath); String user = PropertiesCache.getInstance().getProperty("submit_user"); conf.setProperty("user.name", user); // submit and start the workflow job String jobId = oozieClient.run(conf); logger.info("Workflow job submitted: {}" , jobId); // wait until the workflow job finishes printing the status every 10 seconds while (oozieClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) { logger.info("Workflow job running ... {}" , jobId); Thread.sleep(10 * 1000); } // print the final status of the workflow job logger.info("Workflow job completed ... {}" , jobId); logger.info(String.valueOf(oozieClient.getJobInfo(jobId))); } /** * Get job.properties File in filePath * * @param filePath file path * @return job.properties * @since 2020-09-30 */ public Properties getJobProperties(String filePath) { File configFile = new File(filePath); if (!configFile.exists()) { logger.info(filePath , "{} is not exist."); } InputStream inputStream = null; // create a workflow job configuration Properties properties = oozieClient.createConfiguration(); try { inputStream = new FileInputStream(filePath); properties.load(inputStream); } catch (Exception e) { e.printStackTrace(); } finally { if (inputStream != null) { try { inputStream.close(); } catch (IOException ex) { ex.printStackTrace(); } } } return properties; }
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala 版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkGetExample SparkOnHbaseJavaExample-1.0.jar bulktable python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample-1.0.jar HBaseBulkGetExample.py bulktable yarn-cluster模式: java/scala 版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkGetExample SparkOnHbaseJavaExample-1.0.jar bulktable python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --jars SparkOnHbaseJavaExample-1.0.jar HBaseBulkGetExample.py bulktable
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseBulkGetExample文件: # -*- coding:utf-8 -*-"""【说明】由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现"""from py4j.java_gateway import java_importfrom pyspark.sql import SparkSession# 创建SparkSessionspark = SparkSession\ .builder\ .appName("JavaHBaseBulkGetExample")\ .getOrCreate()# 向sc._jvm中导入要运行的类java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkGetExample')# 创建类实例并调用方法,传递sc._jsc参数spark._jvm.JavaHBaseBulkGetExample().execute(spark._jsc, sys.argv)# 停止SparkSessionspark.stop()
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
  • Spark简介 Spark是分布式批处理框架,提供分析挖掘与迭代式内存计算能力,支持多种语言(Scala/Java/Python)的应用开发。 适用以下场景: 数据处理(Data Processing):可以用来快速处理数据,兼具容错性和可扩展性。 迭代计算(Iterative Computation):支持迭代计算,有效应对多步的数据处理逻辑。 数据挖掘(Data Mining):在海量数据基础上进行复杂的挖掘分析,可支持各种数据挖掘和机器学习算法。 流式处理(Streaming Processing):支持秒级延迟的流式处理,可支持多种外部数据源。 查询分析(Query Analysis):支持标准SQL查询分析,同时提供DSL(DataFrame), 并支持多种外部输入。 本文档重点介绍Spark、Spark SQL和Spark Streaming应用开发指导。
  • Spark Streaming常用概念 Dstream DStream(又称Discretized Stream)是Spark Streaming提供的抽象概念。 DStream表示一个连续的数据流,是从数据源获取或者通过输入流转换生成的数据流。从本质上说,一个DStream表示一系列连续的RDD。RDD是一个只读的、可分区的分布式数据集。 DStream中的每个RDD包含了一个区间的数据。如图4所示。 图4 DStream与RDD关系 应用到DStream上的所有算子会被转译成下层RDD的算子操作,如图5所示。这些下层的RDD转换会通过Spark引擎进行计算。DStream算子隐藏大部分的操作细节,并且提供了方便的High-level API给开发者使用。 图5 DStream算子转译
  • Spark SQL常用概念 DataSet DataSet是一个由特定域的对象组成的强类型集合,可通过功能或关系操作并行转换其中的对象。 每个Dataset还有一个非类型视图,即由多个列组成的DataSet,称为DataFrame。 DataFrame是一个由多个列组成的结构化的分布式数据集合,等同于关系数据库中的一张表,或者是R/Python中的data frame。DataFrame是Spark SQL中的最基本的概念,可以通过多种方式创建,例如结构化的数据集、Hive表、外部数据库或者是RDD。
  • Structured Streaming常用概念 Input Source 输入数据源,数据源需要支持根据offset重放数据,不同的数据源有不同的容错性。 Sink 数据输出,Sink要支持幂等性写入操作,不同的sink有不同的容错性。 outputMode 结果输出模式,当前支持3种输出模: Complete Mode:整个更新的结果集都会写入外部存储。整张表的写入操作将由外部存储系统的连接器完成。 Append Mode:当时间间隔触发时,只有在Result Table中新增加的数据行会被写入外部存储。这种方式只适用于结果集中已经存在的内容不希望发生改变的情况下,如果已经存在的数据会被更新,不适合适用此种方式。 Update Mode:当时间间隔触发时,只有在Result Table中被更新的数据才会被写入外部存储系统。注意,和Complete Mode方式的不同之处是不更新的结果集不会写入外部存储。 Trigger 输出触发器,当前支持以下几种trigger: 默认:以微批模式执行,每个批次完成后自动执行下个批次。 固定间隔:固定时间间隔执行。 一次执行:只执行一次query,完成后退出。 连续模式:实验特性,可实现低至1ms延迟的流处理(推荐100ms)。 Structured Streaming支持微批模式和连续模式。微批模式不能保证对数据的低延迟处理,但是在相同时间下有更大的吞吐量;连续模式适合毫秒级的数据处理延迟,当前暂时还属于实验特性。 在当前版本中,若需要使用流流Join功能,则output模式只能选择append模式。 图6 微批模式运行过程简图 图7 连续模式运行过程简图
  • Spark开发接口简介 Spark支持使用Scala、Java和Python语言进行程序开发,由于Spark本身是由Scala语言开发出来的,且Scala语言具有简洁易懂的特性,推荐用户使用Scala语言进行Spark应用程序开发。 按不同的语言分类,Spark的API接口如表1所示。 表1 Spark API接口 接口类型 说明 Scala API 提供Scala语言的API,Spark Core、SparkSQL和Spark Streaming模块的常用接口请参见Spark scala API接口介绍。 Java API 提供Java语言的API,Spark Core、SparkSQL和Spark Streaming模块的常用接口请参见Spark Java API接口介绍。 Python API 提供Python语言的API,Spark Core、SparkSQL和Spark Streaming模块的常用接口请参见Spark Python API接口介绍。 按不同的模块分,Spark Core和Spark Streaming使用上表中的API接口进行程序开发。而SparkSQL模块,支持CLI或者JD BCS erver两种方式访问。其中JDB CS erver的连接方式也有Beeline和JDBC客户端代码两种。详情请参见Spark JDBCServer接口介绍。 spark-sql脚本、spark-shell脚本和spark-submit脚本(运行的应用中带SQL操作),不支持使用proxy user参数去提交任务。
  • 数据规划 在客户端执行hbase shell进入HBase命令行。 在HBase命令执行下面的命令创建HBase表: create 'streamingTable','cf1' 在客户端另外一个session通过linux命令构造一个端口进行接收数据(不同操作系统的机器,命令可能不同,suse尝试使用netcat -lk 9999): nc -lk 9999 在构造一个端口进行接收数据时,需要在客户端所在服务器上安装netcat
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseStreamingBulkPutExample文件: # -*- coding:utf-8 -*-"""【说明】由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现"""from py4j.java_gateway import java_importfrom pyspark.sql import SparkSession# 创建SparkSessionspark = SparkSession\ .builder\ .appName("JavaHBaseStreamingBulkPutExample")\ .getOrCreate()# 向sc._jvm中导入要运行的类java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.streaming.JavaHBaseStreamingBulkPutExample')# 创建类实例并调用方法,传递sc._jsc参数spark._jvm.JavaHBaseStreamingBulkPutExample().execute(spark._jsc, sys.argv)# 停止SparkSessionspark.stop()
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例),${ip}请使用实际执行nc -lk 9999的命令的机器ip bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.streaming.JavaHBaseStreamingBulkPutExample SparkOnHbaseJavaExample-1.0.jar ${ip} 9999 streamingTable cf1 python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --jars SparkOnHbaseJavaExample-1.0.jar HBaseStreamingBulkPutExample.py ${ip} 9999 streamingTable cf1 yarn-cluster模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例),${ip}请使用实际执行nc -lk 9999的命令的机器ip bin/spark-submit --master yarn --deploy-mode client --deploy-mode cluster --class com.huawei.bigdata.spark.examples.streaming.JavaHBaseStreamingBulkPutExample SparkOnHbaseJavaExample-1.0.jar ${ip} 9999 streamingTable cf1 python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --jars SparkOnHbaseJavaExample-1.0.jar HBaseStreamingBulkPutExample.py ${ip} 9999 streamingTable cf1
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
  • 代码样例 以下代码片段是导出用户列表的示例,在“rest”包的“ExportUsers”类的main方法中。 String operationName = "ExportUsers"; String exportOperationUrl = webUrl + EXPORT_URL; HttpManager httpManager = new HttpManager(); //调用导出接口 String responseLineContent = httpManager .sendHttpPostRequestWithString(httpClient, exportOperationUrl, StringUtils.EMPTY, operationName); //调用下载接口 operationName = "DownloadUsers"; JSONObject jsonObj = JSON.parseObject(responseLineContent); String downloadOperationUrl = webUrl + DOWNLOAD_URL + jsonObj.getString("fileName"); httpManager.sendHttpGetRequest(httpClient, downloadOperationUrl, operationName);
  • 代码样例 以下代码片段是查找用户的示例,在“rest”包的“UserManager”类的main方法中。 //访问Manager接口完成查找用户列表 operationName = "QueryUserList"; operationUrl = webUrl + QUERY_USER_LIST_URL; String responseLineContent = httpManager.sendHttpGetRequest(httpClient, operationUrl, operationName); LOG .info("The {} response is {}.", operationName, responseLineContent);
  • 回答 如下图所示,Spark Streaming应用中定义的逻辑为,从Kafka中读取数据,执行对应处理之后,然后将结果数据回写至Kafka中。 例如:Spark Streming中定义了批次时间,如果数据传入Kafka的速率为10MB/s,而Spark Streaming中定义了每60s一个批次,回写数据总共为600MB。而Kafka中定义了接收数据的阈值大小为500MB。那么此时回写数据已超出阈值。此时,会出现上述错误。 图1 应用场景 解决措施: 方式一:推荐优化Spark Streaming应用程序中定义的批次时间,降低批次时间,可避免超过Kafka定义的阈值。一般建议以5-10秒/次为宜。 方式二:将Kafka的阈值调大,建议在 FusionInsight Manager中的Kafka服务进行参数设置,将socket.request.max.bytes参数值根据应用场景,适当调整。
  • 数据规划 在kafka中生成模拟数据(需要有Kafka权限用户) 确保集群安装完成,包括HDFS、Yarn、Spark2x和Kafka。 创建Topic。 {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。 $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 1 --topic {Topic} 启动Kafka的Producer,向Kafka发送数据。 {ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考在Linux环境中调测Spark应用章节中导出jar包的操作步骤。 java -cp $SPARK_HOME/conf:$SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:{ClassPath} com.huawei.bigdata.spark.examples.KafkaProducer {brokerlist} {topic} {number of events produce every 0.02s} 示例: java -cp /opt/client/Spark2x/spark/conf:/opt/StructuredStreamingState-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.KafkaProducer xxx.xxx.xxx.xxx:21005,xxx.xxx.xxx.xxx:21005,xxx.xxx.xxx.xxx:21005 mytopic 10
  • 代码样例 以下代码片段是修改用户的示例,在“rest”包的“UserManager”类的main方法中。 //访问Manager接口完成修改用户 operationName = "ModifyUser"; String modifyUserName = "user888"; operationUrl = webUrl + MODIFY_USER_URL + modifyUserName; jsonFilePath = "./conf/modifyUser.json"; httpManager.sendHttpPutRequest(httpClient, operationUrl, jsonFilePath, operationName);
  • 问题 使用运行的Spark Streaming任务回写Kafka时,Kafka上接收不到回写的数据,且Kafka日志报错信息如下: 2016-03-02 17:46:19,017 | INFO | [kafka-network-thread-21005-1] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68)2016-03-02 17:46:19,155 | INFO | [kafka-network-thread-21005-2] | Closing socket connection to /10.91.8.208. | kafka.network.Processor (Logging.scala:68)2016-03-02 17:46:19,270 | INFO | [kafka-network-thread-21005-0] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68)2016-03-02 17:46:19,513 | INFO | [kafka-network-thread-21005-1] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68)2016-03-02 17:46:19,763 | INFO | [kafka-network-thread-21005-2] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68)53393 [main] INFO org.apache.hadoop.mapreduce.Job - Counters: 50
  • 准备开发环境 在进行应用开发时,要准备的开发和运行环境如表1所示。 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows7以上版本。 运行环境:Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置。版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端:Oracle JDK:支持1.8版本;IBM JDK:支持1.8.5.11版本。 TaiShan客户端:OpenJDK:支持1.8.0_272版本。 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装和配置IntelliJ IDEA 开发环境的基本配置,建议使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 不同的IntelliJ IDEA不要使用相同的workspace和相同路径下的示例工程。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 准备开发用户 参考准备MRS应用开发用户进行操作,准备用于应用开发的集群用户并授予相应权限。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-Zip 16.04版本。
  • 操作步骤 打开IDEA工具,选择“Create New Project”。 图1 创建工程 在“New Project”页面,选择“Scala”开发环境,并选择“IDEA”,然后单击“Next”。 如果您需要新建Java语言的工程,选择对应参数即可。 图2 选择开发环境 在工程信息页面,填写工程名称和存放路径,设置JDK版本、Scala SDK版本,然后单击“Finish”完成工程创建。 图3 填写工程信息
  • 运行多组件样例程序 将hive-site.xml、hbase-site.xml、hiveclient.properties放入工程的conf目录。 确保样例工程依赖的所有Hive、HBase相关jar包已正常获取。 打开MultiComponentLocalRunner.java,确认代码中System.setProperty("HADOOP_USER_NAME", "root");设置了用户为root,请确保场景说明中上传的数据的用户为root,或者在代码中将root修改为上传数据的用户名。 在IntelliJ IDEA开发环境中,选中“MultiComponentLocalRunner.java”工程,单击运行对应的应用程序工程。或者右键工程,选择“Run MultiComponentLocalRunner.main()”运行应用工程。 如果集群开启了ZooKeeper SSL,则运行该样例前,需要检查配置文件mapred-site.xml(准备运行环境中样例工程的“conf”配置文件目录中获取)的配置项“mapreduce.admin.map.child.java.opts”和“mapreduce.admin.reduce.child.java.opts”是否包含如下内容: -Dzookeeper.client.secure=true -Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty 如果不包含,将上述内容添加到配置项末尾处。
  • 操作场景 在程序代码完成开发后,您可以在Windows环境中运行应用。本地和集群业务平面网络互通时,您可以直接在本地进行调测。 MapReduce应用程序运行完成后,可通过如下方式查看应用程序的运行情况。 在IntelliJ IDEA中查看应用程序运行情况。 通过MapReduce日志获取应用程序运行情况。 登录MapReduce WebUI查看应用程序运行情况。 登录Yarn WebUI查看应用程序运行情况。 在MapReduce任务运行过程中禁止重启HDFS服务,否则可能会导致任务失败。
共99354条