华为云用户手册

  • MapReduce应用开发流程介绍 开发流程中各阶段的说明如图1和表1所示。 图1 MapReduce应用程序开发流程 表1 MapReduce应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解MapReduce的基本概念。 MapReduce应用开发简介 准备开发和运行环境 使用IntelliJ IDEA工具,请根据指导完成开发环境配置。 MapReduce的运行环境即MapReduce客户端,请根据指导完成客户端的安装和配置。 准备MapReduce开发和运行环境 准备工程 MapReduce提供了不同场景下的样例程序,您可以导入样例工程进行程序学习。或者您可以根据指导,新建一个MapReduce工程。 导入并配置MapReduce样例工程 根据场景开发工程 提供了样例工程。 帮助用户快速了解MapReduce各部件的编程接口。 开发MapReduce应用 编译并运行程序 指导用户将开发好的程序编译并提交运行。 调测MapReduce应用 查看程序运行结果 程序运行结果会写在用户指定的路径下。用户还可以通过UI查看应用运行情况。 调测MapReduce应用 父主题: MapReduce开发指南(普通模式)
  • Oozie应用开发样例工程介绍 MRS 样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下Oozie相关样例工程: 表1 Oozie相关样例工程 样例工程位置 描述 oozie-examples/ooziesecurity-examples/OozieMapReduceExample Oozie提交MapReduce任务示例程序。 本示例演示了如何通过Java API提交MapReduce作业和查询作业状态,对网站的日志文件进行离线分析。 oozie-examples/ooziesecurity-examples/OozieSparkHBaseExample 使用Oozie调度Spark访问HBase的示例程序。 oozie-examples/ooziesecurity-examples/OozieSparkHiveExample 使用Oozie调度Spark访问Hive的示例程序。 父主题: Oozie应用开发概述
  • 回答 导致这个问题的主要原因是,yarn-client和yarn-cluster模式在提交任务时setAppName的执行顺序不同导致,yarn-client中setAppName是在向yarn注册Application之前读取,yarn-cluser模式则是在向yarn注册Application之后读取,这就导致yarn-cluster模式设置的应用名不生效。 解决措施: 在spark-submit脚本提交任务时用--name设置应用名和sparkconf.setAppName(appname)里面的应用名一样。 比如代码里设置的应用名为Spark Pi,用yarn-cluster模式提交应用时可以这样设置,在--name后面添加应用名,执行的命令如下: ./spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --name SparkPi jars/original-spark-examples*.jar 10
  • 回答 问题原因: 在IBM JDK下建立的Hive connection时间超过登录用户的认证超时时间(默认一天),导致认证失败。 IBM JDK的机制跟Oracle JDK的机制不同,IBM JDK在认证登录后的使用过程中做了时间检查却没有检测外部的时间更新,导致即使显式调用Hive relogin也无法得到刷新。 解决措施: 通常情况下,在发现Hive connection不可用的时候,可以关闭该connection,重新创建一个connection继续执行。
  • 代码样例 下面代码片段在com.huawei.storm.example.wordcount包的“WordCountTopology”类的“main”方法中,作用在于构建应用程序并提交。 public static void main(String[] args) throws Exception { TopologyBuilder builder = buildTopology(); /* * 任务的提交认为三种方式 * 1、命令行方式提交,这种需要将应用程序jar包复制到客户端机器上执行客户端命令提交 * 2、远程方式提交,这种需要将应用程序的jar包打包好之后在IntelliJ IDEA中运行main方法提交 * 3、本地提交 ,在本地执行应用程序,一般用来测试 * 命令行方式和远程方式安全和普通模式都支持 * 本地提交仅支持普通模式 * * 用户同时只能选择一种任务提交方式,默认命令行方式提交,如果是其他方式,请删除代码注释即可 */ submitTopology(builder, SubmitType.CMD); } private static void submitTopology(TopologyBuilder builder, SubmitType type) throws Exception { switch (type) { case CMD: { cmdSubmit(builder, null); break; } case REMOTE: { remoteSubmit(builder); break; } case LOCAL: { localSubmit(builder); break; } } } /** * 命令行方式远程提交 * 步骤如下: * 打包成Jar包,然后在客户端命令行上面进行提交 * 远程提交的时候,要先将该应用程序和其他外部依赖(非excemple工程提供,用户自己程序依赖)的jar包打包成一个大的jar包 * 再通过storm客户端中storm -jar的命令进行提交 * * 如果是安全环境,客户端命令行提交之前,必须先通过kinit命令进行安全登录 * * 运行命令如下: * ./storm jar ../example/example.jar com.huawei.storm.example.WordCountTopology */ private static void cmdSubmit(TopologyBuilder builder, Config conf) throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException { if (conf == null) { conf = new Config(); } conf.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(TOPO LOG Y_NAME, conf, builder.createTopology()); } private static void localSubmit(TopologyBuilder builder) throws InterruptedException { Config conf = new Config(); conf.setDebug(true); conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } private static void remoteSubmit(TopologyBuilder builder) throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException, IOException { Config config = createConf(); String userJarFilePath = "替换为用户jar包地址"; System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath); //安全模式下的一些准备工作 if (isSecurityModel()) { securityPrepare(config); } config.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, config, builder.createTopology()); } private static TopologyBuilder buildTopology() { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word")); return builder; } 如果拓扑开启了ack,推荐acker的数量不大于所设置的worker数量。
  • 打包项目 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中编包并运行Spark程序。 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/user.keytab”,“/opt/krb5.conf”。 运行样例程序前,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”)。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt/” )下。
  • 数据规划 创建HBase表,构造数据,列需要包含key,modify_time,valid。其中每条数据key值全表唯一,modify_time代表修改时间,valid代表是否为有效数据(该样例中'1'为有效,'0'为无效数据)。 示例:进入hbase shell,执行如下命令: create 'hbase_table','key','info' put 'hbase_table','1','info:modify_time','2019-11-22 23:28:39' put 'hbase_table','1','info:valid','1' put 'hbase_table','2','info:modify_time','2019-11-22 23:28:39' put 'hbase_table','2','info:valid','1' put 'hbase_table','3','info:modify_time','2019-11-22 23:28:39' put 'hbase_table','3','info:valid','0' put 'hbase_table','4','info:modify_time','2019-11-22 23:28:39' put 'hbase_table','4','info:valid','1' 上述数据的modify_time列可设置为当前时间之前的值。 put 'hbase_table','5','info:modify_time','2021-03-03 15:20:39' put 'hbase_table','5','info:valid','1' put 'hbase_table','6','info:modify_time','2021-03-03 15:20:39' put 'hbase_table','6','info:valid','1' put 'hbase_table','7','info:modify_time','2021-03-03 15:20:39' put 'hbase_table','7','info:valid','0' put 'hbase_table','8','info:modify_time','2021-03-03 15:20:39' put 'hbase_table','8','info:valid','1' put 'hbase_table','4','info:valid','0' put 'hbase_table','4','info:modify_time','2021-03-03 15:20:39' 上述数据的modify_time列可设置为样例程序启动后30分钟内的时间值(此处的30分钟为样例程序默认的同步间隔时间,可修改)。 put 'hbase_table','9','info:modify_time','2021-03-03 15:32:39' put 'hbase_table','9','info:valid','1' put 'hbase_table','10','info:modify_time','2021-03-03 15:32:39' put 'hbase_table','10','info:valid','1' put 'hbase_table','11','info:modify_time','2021-03-03 15:32:39' put 'hbase_table','11','info:valid','0' put 'hbase_table','12','info:modify_time','2021-03-03 15:32:39' put 'hbase_table','12','info:valid','1' 上述数据的modify_time列可设置为样例程序启动后30分钟到60分钟内的时间值,即第二次同步周期。 在sparksql中创建HBase的hive外表,命令如下: create table external_hbase_table(key string ,modify_time STRING, valid STRING) using org.apache.spark.sql.hbase.HBaseSource options(hbaseTableName "hbase_table", keyCols "key", colsMapping "modify_time=info.modify_time,valid=info.valid"); 在sparksql中创建CarbonData表: create table carbon01(key string,modify_time STRING, valid STRING) stored as carbondata; 初始化加载当前hbase表中所有数据到CarbonData表; insert into table carbon01 select * from external_hbase_table where valid='1'; 用spark-submit提交命令: spark-submit --master yarn --deploy-mode client --keytab /opt/FIclient/user.keytab --principal sparkuser --class com.huawei.bigdata.spark.examples.HBaseExternalHivetoCarbon /opt/example/HBaseExternalHivetoCarbon-1.0.jar
  • 准备开发环境 在进行应用开发时,需要准备的本地开发环境如表1所示。 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,推荐Windows7以上版本。 运行环境:Windows或Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置。版本要求如下: MRS集群的服务端和客户端仅支持自带的Oracle JDK(版本为1.8),不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的,支持Oracle JDK和IBM JDK。 Oracle JDK:支持1.7和1.8版本。 IBM JDK:推荐1.7.8.10、1.7.9.40和1.8.3.0版本。 安装和配置IntelliJ IDEA 开发环境的基本配置,建议使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 不同的IntelliJ IDEA不要使用相同的workspace和相同路径下的示例工程。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 准备开发用户 参考准备MRS应用开发用户进行操作,准备用于应用开发的集群用户并授予相应权限。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-zip 16.04版本。
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的testModifyTable方法中 public void testModifyTable() { LOG.info("Entering testModifyTable."); // Specify the column family name. byte[] familyName = Bytes.toBytes("education"); Admin admin = null; try { // Instantiate an Admin object. admin = conn.getAdmin(); // Obtain the table descriptor. TableDescriptor htd = admin.getDescriptor(tableName); // Check whether the column family is specified before modification. if (!htd.hasColumnFamily(familyName)) { // Create the column descriptor. TableDescriptor tableBuilder = TableDescriptorBuilder.newBuilder(htd) .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(familyName).build()).build(); // Disable the table to get the table offline before modifying // the table. admin.disableTable(tableName);//注[1] // Submit a modifyTable request. admin.modifyTable(tableBuilder); // Enable the table to get the table online after modifying the // table. admin.enableTable(tableName); } LOG.info("Modify table successfully."); } catch (IOException e) { LOG.error("Modify table failed " ,e); } finally { if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Close admin failed " ,e); } } } LOG.info("Exiting testModifyTable."); }
  • 问题 Flink内核升级到1.3.0之后,当Kafka调用带有非static的KafkaPartitioner类对象为参数的FlinkKafkaProducer010去构造函数时,运行时会报错。 报错内容如下: org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaPartitioner is not serializable. The object probably contains or references non serializable fields.
  • Kafka样例工程介绍 MRS样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下Kafka相关样例工程: 表1 Kafka相关样例工程 样例工程位置 描述 kafka-examples 单线程生产数据,相关样例请参考使用Producer API向安全Topic生产消息。 单线程消费数据,相关样例请参考使用Consumer API订阅安全Topic并消费。 多线程生产数据,相关样例请参考使用多线程Producer发送消息。 多线程消费数据,相关样例请参考使用多线程Consumer消费消息。 基于KafkaStreams实现WordCount,相关样例请参考使用KafkaStreams统计数据 父主题: Kafka开发指南(安全模式)
  • 功能介绍 主要分为三个部分: 从HDFS原文件中抽取name信息,查询HBase、Hive相关数据,并进行数据拼接,通过类MultiComponentMapper继承Mapper抽象类实现。 获取拼接后的数据取最后一条输出到HBase、HDFS,通过类MultiComponentReducer继承Reducer抽象类实现。 main方法提供建立一个MapReduce job,并提交MapReduce作业到Hadoop集群。
  • 回答 原因分析:显示该异常是因为“recoverFromCheckpointLocation”的值判定为false,但却配置了checkpoint目录。 参数“recoverFromCheckpointLocation”的值为代码中“outputMode == OutputMode.Complete()”语句的判断结果(outputMode的默认输出方式为“append”)。 处理方法:编写应用时,用户可以根据具体情况修改数据的输出方式。 将输出方式修改为“complete”,“recoverFromCheckpointLocation”的值会判定为true。此时配置了checkpoint目录时就不会显示异常。
  • 问题 Structured Streaming的cluster模式,在数据处理过程中终止ApplicationManager,执行应用时显示如下异常。 2017-05-09 20:46:02,393 | INFO | main | client token: Token { kind: YARN_CLIENT_TOKEN, service: } diagnostics: User class threw exception: org.apache.spark.sql.AnalysisException: This query does not support recovering from checkpoint location. Delete hdfs://hacluster/structuredtest/checkpoint/offsets to start over.; ApplicationMaster host: 10.96.101.170 ApplicationMaster RPC port: 0 queue: default start time: 1494333891969 final status: FAILED tracking URL: https://9-96-101-191:8090/proxy/application_1493689105146_0052/ user: spark2x | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) Exception in thread "main" org.apache.spark.SparkException: Application application_1493689105146_0052 finished with failed status
  • 样例代码 使用Python方式提交数据分析任务,参考样例程序中的“hive-examples/python-examples/pyCLI_sec.py”。 导入HAConnection类。 from pyhs2.haconnection import HAConnection 声明HiveServer的IP地址列表。本例中hosts代表HiveServer的节点,xxx.xxx.xxx.xxx代表业务IP地址。 hosts = ["xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx"] 如果HiveServer实例被迁移,原始的示例程序会失效。在HiveServer实例迁移之后,用户需要更新示例程序中使用的HiveServer的IP地址。 在HAConnection的第三个参数填写正确的用户名,密码可以不填写。创建连接,执行HQL,样例代码中仅执行查询所有表功能,可根据实际情况修改HQL内容,输出查询的列名和结果到控制台。 try: with HAConnection(hosts = hosts, port = 21066, authMechanism = "PLAIN", user='root', password='******') as haConn: with haConn.getConnection() as conn: with conn.cursor() as cur: # Show databases print cur.getDatabases() # Execute query cur.execute("show tables") # Return column info from query print cur.getSchema() # Fetch table results for i in cur.fetch(): print i except Exception, e: print e
  • 操作步骤 查看Spark应用运行结果数据。 结果数据存储路径和格式已经由Spark应用程序指定,可通过指定文件获取。 查看Spark应用程序运行情况。 Spark主要有两个Web页面。 Spark UI页面,用于展示正在执行的应用的运行情况。 页面主要包括了Jobs、Stages、Storage、Environment和Executors五个部分。Streaming应用会多一个Streaming标签页。 页面入口:在YARN的Web UI界面,查找到对应的Spark应用程序。单击应用信息的最后一列“ApplicationMaster”,即可进入SparkUI页面。 History Server页面,用于展示已经完成的和未完成的Spark应用的运行情况。 页面包括了应用ID、应用名称、开始时间、结束时间、执行时间、所属用户等信息。单击应用ID,页面将跳转到该应用的SparkUI页面。 查看Spark日志获取应用运行情况。 您可以查看Spark日志了解应用运行情况,并根据日志信息调整应用程序。相关日志信息可参考Spark2x日志介绍。
  • 回答 由于浏览器所在的计算机IP地址未加到Web访问白名单导致。用户可以通过修改客户端的配置文件“conf/flink-conf.yaml”来解决问题。 确认配置项“jobmanager.web.ssl.enabled”的值是否是“false”,若不是,请修改为“false”。 确认配置项“jobmanager.web.access-control-allow-origin”和“jobmanager.web.allow-access-address”中是否已经添加浏览器所在的计算机IP地址。如果没有添加,可以通过这两项配置项进行添加。例如: jobmanager.web.access-control-allow-origin: 浏览器所在的计算机IP地址 jobmanager.web.allow-access-address: 浏览器所在的计算机IP地址
  • 场景说明 在安全集群环境下,各个组件之间的相互通信不能够简单的互通,而需要在通信之前进行相互认证,以确保通信的安全性。HBase应用开发需要进行ZooKeeper和Kerberos安全认证。用于ZooKeeper认证的文件为“jaas.conf”,用于Kerberos安全认证文件为keytab文件和krb5.conf文件。具体使用方法在样例代码的“README.md”中会有详细说明。 安全认证主要采用代码认证方式。支持Oracle JAVA平台和IBM JAVA平台。 以下代码在“com.huawei.bigdata.hbase.examples”包的“TestMain”类中。
  • 配置安全登录 请根据实际情况,在“com.huawei.bigdata.hbase.examples”包的“TestMain”类中修改“userName”为实际用户名,例如“developuser”。 private static void login() throws IOException { if (User.isHBaseSecurityEnabled(conf)) { userName = "developuser"; //In Windows environment String userdir = TestMain.class.getClassLoader().getResource("conf").getPath() + File.separator; //In Linux environment //String userdir = System.getProperty("user.dir") + File.separator + "conf" + File.separator; /* * if need to connect zk, please provide jaas info about zk. of course, * you can do it as below: * System.setProperty("java.security.auth.login.config", confDirPath + * "jaas.conf"); but the demo can help you more : Note: if this process * will connect more than one zk cluster, the demo may be not proper. you * can contact us for more help */ LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userName, userKeytabFile); LoginUtil.login(userName, userKeytabFile, krb5File, conf); } }
  • 准备本地应用开发环境 在进行二次开发时,要准备的开发和运行环境如表1所示。 表1 开发环境 准备项 说明 操作系统 Windows系统,支持Windows 7以上版本。 开发和运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置。版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端:Oracle JDK:支持1.8版本;IBM JDK:支持1.8.5.11版本。 TaiShan客户端:OpenJDK:支持1.8.0_272版本。 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 安装和配置IDEA 用于开发Oozie应用程序的工具。版本要求:支持JDK1.8以上的版本。 说明: 若使用IBM JDK,请确保IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IDEA中的JDK配置为Open JDK。 不同的IntelliJ IDEA不要使用相同的workspace和相同路径下的示例工程。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 7-zip 用于解压“*.zip”和“*.rar”文件。 支持7-zip 16.04版本。 父主题: 准备Oozie应用开发环境
  • Oozie应用开发样例工程介绍 MRS样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下Oozie相关样例工程: 表1 Oozie相关样例工程 样例工程位置 描述 oozie-examples/oozienormal-examples/OozieMapReduceExample Oozie提交MapReduce任务示例程序。 本示例演示了如何通过Java API提交MapReduce作业和查询作业状态,对网站的日志文件进行离线分析。 oozie-examples/oozienormal-examples/OozieSparkHBaseExample 使用Oozie调度Spark访问HBase的示例程序。 oozie-examples/oozienormal-examples/OozieSparkHiveExample 使用Oozie调度Spark访问Hive的示例程序。 父主题: Oozie应用开发概述
  • 数据规划 运行样例程序前,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”)。 创建HBase表,构造数据,列需要包含key,modify_time,valid。其中每条数据key值全表唯一,modify_time代表修改时间,valid代表是否为有效数据(该样例中'1'为有效,'0'为无效数据)。 示例:进入hbase shell,执行如下命令: create 'hbase_table','key','info' put 'hbase_table','1','info:modify_time','2019-11-22 23:28:39' put 'hbase_table','1','info:valid','1' put 'hbase_table','2','info:modify_time','2019-11-22 23:28:39' put 'hbase_table','2','info:valid','1' put 'hbase_table','3','info:modify_time','2019-11-22 23:28:39' put 'hbase_table','3','info:valid','0' put 'hbase_table','4','info:modify_time','2019-11-22 23:28:39' put 'hbase_table','4','info:valid','1' 上述数据的modify_time列可设置为当前时间之前的值。 put 'hbase_table','5','info:modify_time','2021-03-03 15:20:39' put 'hbase_table','5','info:valid','1' put 'hbase_table','6','info:modify_time','2021-03-03 15:20:39' put 'hbase_table','6','info:valid','1' put 'hbase_table','7','info:modify_time','2021-03-03 15:20:39' put 'hbase_table','7','info:valid','0' put 'hbase_table','8','info:modify_time','2021-03-03 15:20:39' put 'hbase_table','8','info:valid','1' put 'hbase_table','4','info:valid','0' put 'hbase_table','4','info:modify_time','2021-03-03 15:20:39' 上述数据的modify_time列可设置为样例程序启动后30分钟内的时间值(此处的30分钟为样例程序默认的同步间隔时间,可修改)。 put 'hbase_table','9','info:modify_time','2021-03-03 15:32:39' put 'hbase_table','9','info:valid','1' put 'hbase_table','10','info:modify_time','2021-03-03 15:32:39' put 'hbase_table','10','info:valid','1' put 'hbase_table','11','info:modify_time','2021-03-03 15:32:39' put 'hbase_table','11','info:valid','0' put 'hbase_table','12','info:modify_time','2021-03-03 15:32:39' put 'hbase_table','12','info:valid','1' 上述数据的modify_time列可设置为样例程序启动后30分钟到60分钟内的时间值,即第二次同步周期。 在sparksql中创建HBase的hive外表,命令如下: create table external_hbase_table(key string ,modify_time STRING, valid STRING) using org.apache.spark.sql.hbase.HBaseSource options(hbaseTableName "hbase_table", keyCols "key", colsMapping "modify_time=info.modify_time,valid=info.valid"); 在sparksql中创建CarbonData表: create table carbon01(key string,modify_time STRING, valid STRING) stored as carbondata; 初始化加载当前hbase表中所有数据到CarbonData表; insert into table carbon01 select * from external_hbase_table where valid='1'; 用spark-submit提交命令: spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.HBaseExternalHivetoCarbon /opt/example/HBaseExternalHivetoCarbon-1.0.jar
  • MapReduce样例工程介绍 MRS样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下MapReduce相关样例工程: 表1 MapReduce相关样例工程 样例工程位置 描述 mapreduce-example-security MapReduce统计数据的应用开发示例: 提供了一个MapReduce统计数据的应用开发示例,通过类CollectionMapper实现数据分析、处理,并输出满足用户需要的数据信息。 相关样例介绍请参见MapReduce统计样例程序。 MapReduce作业访问多组件的应用开发示例: 以MapReduce访问HDFS、HBase、Hive为例,介绍如何编写MapReduce作业访问多个服务组件。帮助用户理解认证、配置加载等关键使用方式。 相关样例介绍请参见MapReduce访问多组件样例程序。 父主题: MapReduce开发指南(安全模式)
  • Storm应用开发流程 本文档主要基于Java API进行Storm拓扑的开发。 开发流程中各阶段的说明如图1和表1所示: 图1 拓扑开发流程 表1 Storm应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解Storm的基本概念,了解场景需求,拓扑等。 Storm应用开发常用概念 准备开发和运行环境 Storm的应用程序当前推荐使用Java语言进行开发。可使用IntelliJ IDEA工具。 Storm的运行环境即Storm客户端,请根据指导完成客户端的安装和配置。 准备Storm应用开发和运行环境 准备工程 Storm提供了不同场景下的样例程序,您可以导入样例工程进行程序学习。 导入并配置Storm样例工程 根据场景开发拓扑 提供了Storm拓扑的构造和Spout/Bolt开发过程。 开发Storm应用 打包IntelliJ IDEA代码 Storm样例程序是在Linux环境下运行,需要将IntelliJ IDEA中的代码打包成jar包。 打包Strom样例工程应用 打包业务 将IntelliJ IDEA代码生成的jar包与工程依赖的jar包,合并导出可提交的source.jar。 打包Strom应用业务 提交拓扑 指导用户将开发好的程序提交运行。 提交Storm拓扑 查看程序运行结果 指导用户提交拓扑后查看程序运行结果。 查看Storm应用调测结果 父主题: Storm应用开发概述
  • 回答 首先查看ZooKeeper中/flink_base的目录权限是否为:'world,'anyone: cdrwa;如果不是,请修改/flink_base的目录权限为:'world,'anyone: cdrwa,然后继续根据步骤二排查;如果是,请根据步骤二排查。 由于在Flink配置文件中“high-availability.zookeeper.client.acl”默认为“creator”,即谁创建谁有权限,由于原有用户已经使用ZooKeeper上的/flink_base/flink目录,导致新创建的用户访问不了ZooKeeper上的/flink_base/flink目录。 新用户可以通过以下操作来解决问题。 查看客户端的配置文件“conf/flink-conf.yaml”。 修改配置项“high-availability.zookeeper.path.root”对应的ZooKeeper目录,例如:/flink2。 重新提交任务。
  • Spark2x样例工程介绍 MRS样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下Spark2x相关样例工程: 表1 Spark2x相关样例工程 样例工程位置 描述 sparksecurity-examples/SparkHbasetoCarbonJavaExample Spark同步HBase数据到CarbonData的Java示例程序。 本示例工程中,应用将数据实时写入HBase,用于点查业务。数据每隔一段时间批量同步到CarbonData表中,用于分析型查询业务。 sparksecurity-examples/SparkHbasetoHbaseJavaExample Spark从HBase读取数据再写入HBase的Java/Scala/Python示例程序。 本示例工程中,Spark应用程序实现两个HBase表数据的分析汇总。 sparksecurity-examples/SparkHbasetoHbasePythonExample sparksecurity-examples/SparkHbasetoHbaseScalaExample sparksecurity-examples/SparkHivetoHbaseJavaExample Spark从Hive读取数据再写入到HBase的Java/Scala/Python示例程序。 本示例工程中,Spark应用程序实现分析处理Hive表中的数据,并将结果写入HBase表。 sparksecurity-examples/SparkHivetoHbasePythonExample sparksecurity-examples/SparkHivetoHbaseScalaExample sparksecurity-examples/SparkJavaExample Spark Core任务的Java/Python/Scala/R示例程序。 本工程应用程序实现从HDFS上读取文本数据并计算分析。 SparkRExample示例不支持未开启Kerberos认证的集群。 sparksecurity-examples/SparkPythonExample sparksecurity-examples/SparkRExample sparksecurity-examples/SparkScalaExample sparksecurity-examples/SparkLauncherJavaExample 使用Spark Launcher提交作业的Java/Scala示例程序。 本工程应用程序通过org.apache.spark.launcher.SparkLauncher类采用Java/Scala命令方式提交Spark应用。 sparksecurity-examples/SparkLauncherScalaExample sparksecurity-examples/SparkOnClickHouseJavaExample Spark通过ClickHouse JDBC的原生接口,以及Spark JDBC驱动,实现对ClickHouse数据库和表的创建、查询、插入等操作样例代码。 sparksecurity-examples/SparkOnClickHousePythonExample sparksecurity-examples/SparkOnClickHouseScalaExample sparksecurity-examples/SparkOnHbaseJavaExample Spark on HBase场景的Java/Scala/Python示例程序。 本工程应用程序以数据源的方式去使用HBase,将数据以Avro格式存储在HBase中,并从中读取数据以及对读取的数据进行过滤等操作。 sparksecurity-examples/SparkOnHbasePythonExample sparksecurity-examples/SparkOnHbaseScalaExample sparksecurity-examples/SparkOnHudiJavaExample Spark on Hudi场景的Java/Scala/Python示例程序。 本工程应用程序使用Spark操作Hudi执行插入数据、查询数据、更新数据、增量查询、特定时间点查询、删除数据等操作。 sparksecurity-examples/SparkOnHudiPythonExample sparksecurity-examples/SparkOnHudiScalaExample sparksecurity-examples/SparkOnMultiHbaseScalaExample Spark同时访问两个集群中的HBase的Scala示例程序。 sparksecurity-examples/SparkSQLJavaExample Spark SQL任务的Java/Python/Scala示例程序。 本工程应用程序实现从HDFS上读取文本数据并计算分析。 sparksecurity-examples/SparkSQLPythonExample sparksecurity-examples/SparkSQLScalaExample sparksecurity-examples/SparkStreamingKafka010JavaExample Spark Streaming从Kafka接收数据并进行统计分析的Java/Scala示例程序。 本工程应用程序实时累加计算Kafka中的流数据,统计每个单词的记录总数。 sparksecurity-examples/SparkStreamingKafka010PythonExample sparksecurity-examples/SparkStreamingtoHbaseJavaExample010 Spark Streaming读取Kafka数据并写入HBase的Java/Scala/Python示例程序。 本工程应用程序每5秒启动一次任务,读取Kafka中的数据并更新到指定的HBase表中。 sparksecurity-examples/SparkStreamingtoHbasePythonExample010 sparksecurity-examples/SparkStreamingtoHbaseScalaExample010 sparksecurity-examples/SparkStructuredStreamingJavaExample 在Spark应用中,通过使用StructuredStreaming调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数。 sparksecurity-examples/SparkStructuredStreamingPythonExample sparksecurity-examples/SparkStructuredStreamingScalaExample sparksecurity-examples/SparkThriftServerJavaExample 通过JDBC访问Spark SQL的Java/Scala示例程序。 本示例中,用户自定义JD BCS erver的客户端,使用JDBC连接来进行表的创建、数据加载、查询和删除。 sparksecurity-examples/SparkThriftServerScalaExample sparksecurity-examples/StructuredStreamingADScalaExample 使用Structured Streaming,从kafka中读取广告请求数据、广告展示数据、广告点击数据,实时获取广告有效展示统计数据和广告有效点击统计数据,将统计结果写入kafka中。 sparksecurity-examples/StructuredStreamingStateScalaExample 在Spark结构流应用中,跨批次统计每个session期间发生了多少次event以及本session的开始和结束timestamp;同时输出本批次被更新状态的session。 父主题: Spark2x开发指南(安全模式)
  • 操作步骤 准备依赖的Jar包和配置文件。 在Linux环境新建目录,例如“/opt/test”,并创建子目录“lib”和“src/main/resources/”。将样例工程中“lib”文件夹下的Jar包上传Linux环境的“lib”目录。将样例工程中“src/main/resources”文件夹下的配置文件上传到Linux环境的“src/main/resources”目录。 在IntelliJ IDEA工程中修改WordCountTopology.java类,使用remoteSubmit方式提交应用程序。并替换Jar文件地址。 使用remoteSubmit方式提交应用程序 public static void main(String[] args) throws Exception { TopologyBuilder builder = buildTopology(); /* * 任务的提交认为三种方式 * 1、命令行方式提交,这种需要将应用程序jar包复制到客户端机器上执行客户端命令提交 * 2、远程方式提交,这种需要将应用程序的jar包打包好之后在IntelliJ IDEA中运行main方法提交 * 3、本地提交 ,在本地执行应用程序,一般用来测试 * 命令行方式和远程方式安全和普通模式都支持 * 本地提交仅支持普通模式 * * 用户同时只能选择一种任务提交方式,默认命令行方式提交,如果是其他方式,请删除代码注释即可 */ submitTopology(builder, SubmitType.REMOTE); } 修改userJarFilePath为Linux环境指定路径“/opt/test/lib/example.jar”。 private static void remoteSubmit(TopologyBuilder builder) throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException, IOException { Config config = createConf(); String userJarFilePath = "/opt/test/lib/example.jar "; System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath); //安全模式下的一些准备工作 if (isSecurityModel()) { securityPrepare(config); } config.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, config, builder.createTopology()); } 导出Jar包并上传到Linux环境。 参考打包Strom样例工程应用执行打包,并将jar包命名为“example.jar”。 将导出的Jar包复制到Linux环境的“/opt/test/lib”目录下。 切换到“/opt/test”,执行以下命令,运行Jar包。 java -classpath /opt/test/lib/*:/opt/test/src/main/resources com.huawei.storm.example.wordcount.WordCountTopology
  • 配置ClickHouse连接属性 在ClickhouseJDBCHaDemo、Demo、NativeJDBCHaDemo和Util文件创建connection的样例中设置连接属性,如下样例代码设置socket超时时间为60s。 ClickHouseProperties clickHouseProperties = new ClickHouseProperties(); clickHouseProperties.setSocketTimeout(60000); 如果导入并配置ClickHouse样例工程中的“clickhouse-example.properties”配置文件中“sslUsed”参数配置为“true”时,则需要在ClickhouseJDBCHaDemo、Demo、NativeJDBCHaDemo和Util文件创建connection的样例中设置如下连接属性: clickHouseProperties.setSsl(true); clickHouseProperties.setSslMode("none"); 父主题: 开发ClickHouse应用
  • 数据规划 在kafka中生成模拟数据(需要有Kafka权限用户)。 java -cp $SPARK_HOME/conf:$SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:{ClassPath} com.huawei.bigdata.spark.examples.KafkaADEventProducer {BrokerList} {timeOfProduceReqEvent} {eventTimeBeforeCurrentTime} {reqTopic} {reqEventCount} {showTopic} {showEventMaxDelay} {clickTopic} {clickEventMaxDelay} 确保集群安装完成,包括HDFS、Yarn、Spark2x和Kafka。 启动Kafka的Producer,向Kafka发送数据。 {ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考在Linux环境中编包并运行Spark程序章节中导出jar包的操作步骤。 命令举例: java -cp /opt/client/Spark2x/spark/conf:/opt/StructuredStreamingADScalaExample-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.KafkaADEventProducer 10.132.190.170:21005,10.132.190.165:21005 2h 1h req 10000000 show 5m click 5m 此命令将在kafka上创建3个topic:req、show、click,在2h内生成1千万条请求事件数据,请求事件的时间取值范围为{当前时间-1h 至 当前时间},并为每条请求事件随机生成0-5条展示事件,展示事件的时间取值范围为{请求事件时间 至请求事件时间+5m },为每条展示事件随机生成0-5条点击事件,点击事件的时间取值范围为{展示事件时间 至展示事件时间+5m }
  • 场景说明 假定一个广告业务,存在广告请求事件、广告展示事件、广告点击事件,广告主需要实时统计有效的广告展示和广告点击数据。 已知: 终端用户每次请求一个广告后,会生成广告请求事件,保存到kafka的adRequest topic中。 请求一个广告后,可能用于多次展示,每次展示,会生成广告展示事件,保存到kafka的adShow topic中。 每个广告展示,可能会产生多次点击,每次点击,会生成广告点击事件,保存到kafka的adClick topic中。 广告有效展示的定义如下: 请求到展示的时长超过A分钟算无效展示。 A分钟内多次展示,每次展示事件为有效展示。 广告有效点击的定义如下: 展示到点击时长超过B分钟算无效点击。 B分钟内多次点击,仅首次点击事件为有效点击。 基于此业务场景,模拟简单的数据结构如下: 广告请求事件 数据结构:adID^reqTime 广告展示事件 数据结构:adID^showID^showTime 广告点击事件 数据结构:adID^showID^clickTime 数据关联关系如下: 广告请求事件与广告展示事件通过adID关联。 广告展示事件与广告点击事件通过adID+showID关联。 数据要求: 数据从产生到到达流处理引擎的延迟时间不超过2小时 广告请求事件、广告展示事件、广告点击事件到达流处理引擎的时间不能保证有序和时间对齐
共100000条