华为云用户手册

  • 回答 如下图所示,Spark Streaming应用中定义的逻辑为,从Kafka中读取数据,执行对应处理之后,然后将结果数据回写至Kafka中。 例如:Spark Streaming中定义了批次时间,如果数据传入Kafka的速率为10MB/s,而Spark Streaming中定义了每60s一个批次,回写数据总共为600MB。而Kafka中定义了接收数据的阈值大小为500MB。那么此时回写数据已超出阈值。此时,会出现上述错误。 图1 应用场景 解决措施: 方式一:推荐优化Spark Streaming应用程序中定义的批次时间,降低批次时间,可避免超过Kafka定义的阈值。一般建议以5-10秒/次为宜。 方式二:将Kafka的阈值调大,建议在 FusionInsight Manager中的Kafka服务进行参数设置,将socket.request.max.bytes参数值根据应用场景,适当调整。
  • 问题 使用运行的Spark Streaming任务回写Kafka时,Kafka上接收不到回写的数据,且Kafka日志报错信息如下: 2016-03-02 17:46:19,017 | INFO | [kafka-network-thread-21005-1] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,155 | INFO | [kafka-network-thread-21005-2] | Closing socket connection to /10.91.8.208. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,270 | INFO | [kafka-network-thread-21005-0] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,513 | INFO | [kafka-network-thread-21005-1] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,763 | INFO | [kafka-network-thread-21005-2] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 53393 [main] INFO org.apache.hadoop.mapreduce.Job - Counters: 50
  • 操作步骤 运行结果会有如下成功信息: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/D:/mavenlocal/org/apache/logging/log4j/log4j-slf4j-impl/2.6.2/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/D:/mavenlocal/org/slf4j/slf4j-log4j12/1.7.30/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. ---- Begin executing sql: CREATE TABLE IF NOT EXISTS CHILD (NAME STRING, AGE INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ---- Result ---- Done executing sql: CREATE TABLE IF NOT EXISTS CHILD (NAME STRING, AGE INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ---- ---- Begin executing sql: LOAD DATA INPATH 'hdfs:/data/data' INTO TABLE CHILD ---- Result ---- Done executing sql: LOAD DATA INPATH 'hdfs:/data/data' INTO TABLE CHILD ---- ---- Begin executing sql: SELECT * FROM child ---- NAME AGE Miranda 32 Karlie 23 Candice 27 ---- Done executing sql: SELECT * FROM child ---- ---- Begin executing sql: DROP TABLE child ---- Result ---- Done executing sql: DROP TABLE child ---- Process finished with exit code 0
  • 应用开发操作步骤 确认华为 MRS 产品Storm和Kafka组件已经安装,并正常运行。 参考获取MRS应用开发样例工程,获取样例代码解压目录中“src\storm-examples”目录下的样例工程文件夹storm-examples并将storm-examples导入到IntelliJ IDEA开发环境,参见准备Storm应用开发环境。 在Linux环境下安装Storm客户端。 集群的Master节点或者Core节点使用客户端可参考集群内节点使用MRS客户端,MRS集群外客户端的安装操作可参考集群外节点使用MRS客户端。 下载并安装Kafka客户端程序。
  • 操作场景 本文档主要说明如何使用Storm-Kafka工具包,完成Storm和Kafka之间的交互。包含KafkaSpout和KafkaBolt两部分。KafkaSpout主要完成Storm从Kafka中读取数据的功能;KafkaBolt主要完成Storm向Kafka中写入数据的功能。 本章节代码样例基于Kafka新API,对应IntelliJ IDEA工程中com.huawei.storm.example.kafka.NewKafkaTopology.java。 本章节只适用于MRS产品Storm与Kafka组件间的访问。本章中描述的jar包的具体版本信息请以实际情况为准。
  • 准备JDBC/HCatalog开发环境 表1 JDBC/HCatalog开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows7以上版本。 运行环境:Windows系统或Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置。版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端:Oracle JDK:支持1.8版本;IBM JDK:支持1.8.5.11版本。 TaiShan客户端:OpenJDK:支持1.8.0_272版本。 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装和配置IntelliJ IDEA 用于开发Hive应用程序的工具。版本要求如下: JDK使用1.8版本,IntelliJ IDEA使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 华为提供开源镜像站,各服务样例工程依赖的Jar包通过华为开源镜像站下载,剩余所依赖的开源Jar包请直接从Maven中央库或者其他用户自定义的仓库地址下载,详情请参考配置华为开源镜像仓。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-Zip 16.04版本。
  • 回答 如下图所示,Spark Streaming应用中定义的逻辑为,从Kafka中读取数据,执行对应处理之后,然后将结果数据回写至Kafka中。 例如:Spark Streaming中定义了批次时间,如果数据传入Kafka的速率为10MB/s,而Spark Streaming中定义了每60s一个批次,回写数据总共为600MB。而Kafka中定义了接收数据的阈值大小为500MB。那么此时回写数据已超出阈值。此时,会出现上述错误。 图1 应用场景 解决措施: 方式一:推荐优化Spark Streaming应用程序中定义的批次时间,降低批次时间,可避免超过Kafka定义的阈值。一般建议以5-10秒/次为宜。 方式二:将Kafka的阈值调大,建议在FusionInsight Manager中的Kafka服务进行参数设置,将socket.request.max.bytes参数值根据应用场景,适当调整。
  • 问题 使用运行的Spark Streaming任务回写Kafka时,Kafka上接收不到回写的数据,且Kafka日志报错信息如下: 2016-03-02 17:46:19,017 | INFO | [kafka-network-thread-21005-1] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,155 | INFO | [kafka-network-thread-21005-2] | Closing socket connection to /10.91.8.208. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,270 | INFO | [kafka-network-thread-21005-0] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,513 | INFO | [kafka-network-thread-21005-1] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 2016-03-02 17:46:19,763 | INFO | [kafka-network-thread-21005-2] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68) 53393 [main] INFO org.apache.hadoop.mapreduce.Job - Counters: 50
  • 样例代码 使用Python方式提交数据分析任务,参考样例程序中的“hive-examples/python-examples/pyCLI_sec.py”。 导入HAConnection类。 from pyhs2.haconnection import HAConnection 声明HiveServer的IP地址列表。本例中hosts代表HiveServer的节点,xxx.xxx.xxx.xxx代表业务IP地址。 hosts = ["xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx"] 如果HiveServer实例被迁移,原始的示例程序会失效。在HiveServer实例迁移之后,用户需要更新示例程序中使用的HiveServer的IP地址。 在HAConnection的第三个参数填写正确的用户名,密码可以不填写。创建连接,执行HQL,样例代码中仅执行查询所有表功能,可根据实际情况修改HQL内容,输出查询的列名和结果到控制台。 try: with HAConnection(hosts = hosts, port = 10000, authMechanism = "PLAIN", user='root', password='******') as haConn: with haConn.getConnection() as conn: with conn.cursor() as cur: # Show databases print cur.getDatabases() # Execute query cur.execute("show tables") # Return column info from query print cur.getSchema() # Fetch table results for i in cur.fetch(): print i except Exception, e: print e
  • 代码样例 下面代码片段在com.huawei.storm.example.common包的“SplitSentenceBolt”类的“execute”方法中,作用在于拆分每条语句为单个单词并发送。 /** * {@inheritDoc} */ @Override public void execute(Tuple input, BasicOutputCollector collector) { String sentence = input.getString(0); String[] words = sentence.split(" "); for (String word : words) { word = word.trim(); if (!word.isEmpty()) { word = word.toLowerCase(); collector.emit(new Values(word)); } } } 下面代码片段在com.huawei.storm.example.wordcount.WordCountBolt类中,作用在于统计收到的每个单词的数量。 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) { count = 0; } count++; counts.put(word, count); System.out.println("word: " + word + ", count: " + count); }
  • 回答 导致这个问题的主要原因是,yarn-client和yarn-cluster模式在提交任务时setAppName的执行顺序不同导致,yarn-client中setAppName是在向yarn注册Application之前读取,yarn-cluster模式则是在向yarn注册Application之后读取,这就导致yarn-cluster模式设置的应用名不生效。 解决措施: 在spark-submit脚本提交任务时用--name设置应用名和sparkconf.setAppName(appname)里面的应用名一样。 比如代码里设置的应用名为Spark Pi,用yarn-cluster模式提交应用时可以这样设置,在--name后面添加应用名,执行的命令如下: ./spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --name SparkPi jars/original-spark-examples*.jar 10
  • 场景说明 一个动态单词统计系统,数据源为持续生产随机文本的逻辑单元,业务处理流程如下: 数据源持续不断地发送随机文本给文本拆分逻辑,如“apple orange apple”。 单词拆分逻辑将数据源发送的每条文本按空格进行拆分,如“apple”,“orange”,“apple”,随后将每个单词逐一发给单词统计逻辑。 单词统计逻辑每收到一个单词就进行加一操作,并将实时结果打印输出,如: apple:1 orange:1 apple:2
  • 功能分解 根据上述场景进行功能分解,如表1所示: 表1 在应用中开发的功能 序号 步骤 代码示例 1 创建一个Spout用来生成随机文本 请参见创建Storm Spout 2 创建一个Bolt用来将收到的随机文本拆分成一个个单词 请参见创建Storm Bolt 3 创建一个Blot用来统计收到的各单词次数 请参见创建Storm Bolt 4 创建topology 请参见创建Storm Topology 部分代码请参考开发Storm应用,完整代码请参考Storm-examples示例工程。
  • 操作步骤 修改WordCountTopology.java类,使用remoteSubmit方式提交应用程序。并替换Jar文件地址。 使用remoteSubmit方式提交应用程序 public static void main(String[] args) throws Exception { TopologyBuilder builder = buildTopology(); /* * 任务的提交认为三种方式 * 1、命令行方式提交,这种需要将应用程序jar包复制到客户端机器上执行客户端命令提交 * 2、远程方式提交,这种需要将应用程序的jar包打包好之后在IntelliJ IDEA中运行main方法提交 * 3、本地提交 ,在本地执行应用程序,一般用来测试 * 命令行方式和远程方式安全和普通模式都支持 * 本地提交仅支持普通模式 * * 用户同时只能选择一种任务提交方式,默认命令行方式提交,如果是其他方式,请删除代码注释即可 */ submitTopology(builder, SubmitType.REMOTE); } 根据实际情况修改userJarFilePath为实际的拓扑Jar包地址 private static void remoteSubmit(TopologyBuilder builder) throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException, IOException { Config config = createConf(); String userJarFilePath = "D:\\example.jar"; System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath); //安全模式下的一些准备工作 if (isSecurityModel()) { securityPrepare(config); } config.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(TOPO LOG Y_NAME, config, builder.createTopology()); } 执行WordCountTopology.java类的Main方法提交应用程序。
  • 操作场景 本文档主要说明如何使用开源Storm-JDBC工具包,完成Storm和JDBC之间的交互。Storm-JDBC中包含两类Bolt:JdbcInsertBolt和JdbcLookupBolt。其中,JdbcLookupBolt主要负责从数据库中查数据,JdbcInsertBolt主要向数据库中存数据。当然,JdbcLookupBolt和JdbcInsertBolt中也可以增加处理逻辑对数据进行处理。 本章节只适用Storm与JDBC组件间的访问。本章中描述的jar包的具体版本信息请以实际情况为准。
  • 应用开发操作步骤 确认产品Storm组件已经安装,且正常运行。 参考获取MRS应用开发样例工程,获取样例代码解压目录中“src\storm-examples”目录下的样例工程文件夹storm-examples并将storm-examples导入到IntelliJ IDEA开发环境,参见准备Storm应用开发环境。 工程导入后,修改样例工程的“resources/flux-examples”目录下的“jdbc.properties”文件,根据实际环境信息修改相关参数。 #配置JDBC服务端IP地址 JDBC_SERVER_NAME= #配置JDBC服务端端口 JDBC_PORT_NUM= #配置JDBC登录用户名 JDBC_USER_NAME= #配置JDBC登录用户密码 #密码明文存储存在安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全 JDBC_PASSWORD= #配置database表名 JDBC_BASE_TBL= 在Linux环境下安装Storm客户端。 集群的Master节点或者Core节点使用客户端可参考集群内节点使用MRS客户端,MRS集群外客户端的安装操作可参考集群外节点使用MRS客户端。
  • 数据库配置—Derby数据库配置过程 首先应下载一个数据库,可根据具体场景选择最适合的数据库。 该任务以Derby数据库为例。Derby是一个小型的,java编写的,易于使用却适合大多数应用程序的开放源码数据库。 Derby数据库的获取。在官网下载最新版的Derby数据库,将下载下来的数据库将传入Linux客户端(如"/opt"),并解压。 在Derby的安装目录下,进入bin目录,输入如下命令: export DERBY_INSTALL=/opt/db-derby-10.12.1.1-bin export CLASSPATH=$DERBY_INSTALL/lib/derbytools.jar:$DERBY_INSTALL\lib\derbynet.jar:. export DERBY_HOME=/opt/db-derby-10.12.1.1-bin . setNetworkServerCP ./startNetworkServer -h 主机名 执行./ij命令,输入connect 'jdbc:derby://主机名:1527/example;create=true';,建立连接。 数据库建立好后,可以执行sql语句进行操作,需要建立两张表ORIGINAL和GOAL,并向ORIGINAL中插入一组数据,命令如下:(表名仅供参考,可自行设定) CREATE TABLE GOAL(WORD VARCHAR(12),COUNT INT ); CREATE TABLE ORIGINAL(WORD VARCHAR(12),COUNT INT ); INSERT INTO ORIGINAL VALUES('orange',1),('pineapple',1),('banana',1),('watermelon',1);
  • 功能分解 根据上述场景进行功能分解,如表1所示: 表1 在应用中开发的功能 序号 步骤 代码示例 1 创建一个Spout用来生成随机文本 请参见创建Storm Spout 2 创建一个Bolt用来将收到的随机文本拆分成一个个单词 请参见创建Storm Bolt 3 创建一个Blot用来统计收到的各单词次数 请参见创建Storm Bolt 4 创建topology 请参见创建Storm Topology 部分代码请参考开发Storm应用,完整代码请参考Storm-examples示例工程。
  • 场景说明 一个动态单词统计系统,数据源为持续生产随机文本的逻辑单元,业务处理流程如下: 数据源持续不断地发送随机文本给文本拆分逻辑,如“apple orange apple”。 单词拆分逻辑将数据源发送的每条文本按空格进行拆分,如“apple”,“orange”,“apple”,随后将每个单词逐一发给单词统计逻辑。 单词统计逻辑每收到一个单词就进行加一操作,并将实时结果打印输出,如: apple:1 orange:1 apple:2
  • 部署运行及结果查看 导出本地jar包,请参见打包Storm样例工程应用。 将4中获取的配置文件和5中获取的jar包合并统一打出完整的业务jar包,请参见打包Storm应用业务。 将开发好的yaml文件及相关的properties文件复制至storm客户端所在主机的任意目录下,如“/opt”。 执行命令提交拓扑。 storm jar /opt/jartarget/source.jar org.apache.storm.flux.Flux --remote /opt/my-topology.yaml 如果设置业务以本地模式启动,则提交命令如下: storm jar /opt/jartarget/source.jar org.apache.storm.flux.Flux --local /opt/my-topology.yaml 如果业务设置为本地模式,请确保提交环境为普通模式环境,当前不支持安全环境下使用命令提交本地模式的业务。 如果使用了properties文件,则提交命令如下: storm jar /opt/jartarget/source.jar org.apache.storm.flux.Flux --remote /opt/my-topology.yaml --filter /opt/my-prop.properties 拓扑提交成功后请自行登录storm UI查看。
  • 准备JDBC/HCatalog开发环境 表1 JDBC/HCatalog开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows7以上版本。 运行环境:Windows系统或Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置。版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端:Oracle JDK:支持1.8版本;IBM JDK:支持1.8.5.11版本。 TaiShan客户端:OpenJDK:支持1.8.0_272版本。 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装和配置IntelliJ IDEA 用于开发Hive应用程序的工具。版本要求如下: JDK使用1.8版本,IntelliJ IDEA使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 华为提供开源镜像站,各服务样例工程依赖的Jar包通过华为开源镜像站下载,剩余所依赖的开源Jar包请直接从Maven中央库或者其他用户自定义的仓库地址下载,详情请参考配置华为开源镜像仓。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-Zip 16.04版本。
  • Storm应用开发流程 本文档主要基于Java API进行Storm拓扑的开发。 开发流程中各阶段的说明如图1和表1所示: 图1 拓扑开发流程 表1 Storm应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解Storm的基本概念,了解场景需求,拓扑等。 Storm应用开发常用概念 准备开发和运行环境 Storm的应用程序当前推荐使用Java语言进行开发。可使用IntelliJ IDEA工具。 Storm的运行环境即Storm客户端,请根据指导完成客户端的安装和配置。 准备Storm应用开发和运行环境 准备工程 Storm提供了不同场景下的样例程序,您可以导入样例工程进行程序学习。 导入并配置Storm样例工程 根据场景开发拓扑 提供了Storm拓扑的构造和Spout/Bolt开发过程。 开发Storm应用 打包IntelliJ IDEA代码 Storm样例程序是在Linux环境下运行,需要将IntelliJ IDEA中的代码打包成jar包。 打包Storm样例工程应用 打包业务 将IntelliJ IDEA代码生成的jar包与工程依赖的jar包,合并导出可提交的source.jar。 打包Storm应用业务 提交拓扑 指导用户将开发好的程序提交运行。 提交Storm拓扑 查看程序运行结果 指导用户提交拓扑后查看程序运行结果。 查看Storm应用调测结果 父主题: Storm应用开发概述
  • 操作步骤 将从IntelliJ IDEA中导出的jar包复制到Linux客户端指定目录(例如“/opt/jarsource”)。 若业务需要访问外部组件,其所依赖的配置文件请参考相关开发指引,获取到配置文件后将配置文件放在1中指定的目录下。 若业务需要访问外部组件,其所依赖的jar包请参考相关开发指引,获取到jar包后将jar包放在1中指定的目录下。 在Storm客户端安装目录“Storm/storm-1.2.1/bin”下执行打包命令,将上述jar包打成一个完整的业务jar包放入指定目录/opt/jartarget(可为任意空目录)。执行sh storm-jartool.sh /opt/jarsource/ /opt/jartarget命令后,会在“/opt/jartarget”下生成source.jar。
  • 回答 导致这个问题的主要原因是,yarn-client和yarn-cluster模式在提交任务时setAppName的执行顺序不同导致,yarn-client中setAppName是在向yarn注册Application之前读取,yarn-cluster模式则是在向yarn注册Application之后读取,这就导致yarn-cluster模式设置的应用名不生效。 解决措施: 在spark-submit脚本提交任务时用--name设置应用名和sparkconf.setAppName(appname)里面的应用名一样。 比如代码里设置的应用名为Spark Pi,用yarn-cluster模式提交应用时可以这样设置,在--name后面添加应用名,执行的命令如下: ./spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --name SparkPi jars/original-spark-examples*.jar 10
  • 代码样例 代码示例中请根据实际情况,修改“OOZIE_URL_DEFAULT”为实际的任意Oozie的主机名,例如“https://10-1-131-131:21003/oozie/”。 public void test(String jobFilePath) { try { runJob(jobFilePath); } catch (Exception exception) { exception.printStackTrace(); } } private void runJob(String jobFilePath) throws OozieClientException, InterruptedException { Properties conf = getJobProperties(jobFilePath); String user = PropertiesCache.getInstance().getProperty("submit_user"); conf.setProperty("user.name", user); // submit and start the workflow job String jobId = oozieClient.run(conf); logger.info("Workflow job submitted: {}" , jobId); // wait until the workflow job finishes printing the status every 10 seconds while (oozieClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) { logger.info("Workflow job running ... {}" , jobId); Thread.sleep(10 * 1000); } // print the final status of the workflow job logger.info("Workflow job completed ... {}" , jobId); logger.info(String.valueOf(oozieClient.getJobInfo(jobId))); } /** * Get job.properties File in filePath * * @param filePath file path * @return job.properties * @since 2020-09-30 */ public Properties getJobProperties(String filePath) { File configFile = new File(filePath); if (!configFile.exists()) { logger.info(filePath , "{} is not exist."); } InputStream inputStream = null; // create a workflow job configuration Properties properties = oozieClient.createConfiguration(); try { inputStream = new FileInputStream(filePath); properties.load(inputStream); } catch (Exception e) { e.printStackTrace(); } finally { if (inputStream != null) { try { inputStream.close(); } catch (IOException ex) { ex.printStackTrace(); } } } return properties; }
  • 操作步骤 获取样例代码。 下载样例工程的Maven工程源码和配置文件,请参见获取代码样例工程。 将样例代码导入IDEA中。 获取配置文件。 从集群的客户端中获取文件。在“$SPARK_HOME/conf”中下载hive-site.xml与spark-defaults.conf文件到本地。 在HDFS中上传数据。 在Linux中新建文本文件data,将如下数据内容保存到data文件中。 Miranda,32 Karlie,23 Candice,27 在Linux系统HDFS客户端使用命令hadoop fs -mkdir /data(hdfs dfs命令有同样的作用),创建对应目录。 在Linux系统HDFS客户端使用命令hadoop fs -put data /data,上传数据文件。 在样例代码中配置相关参数。 将加载数据的sql语句改为“LOAD DATA INPATH 'hdfs:/data/data' INTO TABLE CHILD”。 在程序运行时添加运行参数,分别为hive-site.xml与spark-defaults.conf文件的路径。 运行程序。
  • 注意事项 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。 无法确保定时消息仅投递一次,定时消息可能会重复投递。 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
  • 发送定时消息 发送定时消息的示例代码如下(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package main import ( "context" "fmt" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" "os" ) func main() { p, _ := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), producer.WithRetry(2), //producer.WithTls(true), //创建实例时,如果开启了SSL,请添加此行代码。 ) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } msg := primitive.NewMessage("test", []byte("Hello RocketMQ Go Client!")) msg.WithProperty("__STARTDELIVERTIME", strconv.FormatInt(time.Now().UnixMilli()+3000, 10)) res, err := p.SendSync(context.Background(), msg) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } } 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 192.168.0.1:8100:表示实例连接地址和端口。 test:表示Topic名称。
  • 准备环境 执行以下命令,检查是否已安装Go。 go version 返回如下回显时,说明Go已经安装。 go version go1.16.5 linux/amd64 如果未安装Go,请下载并安装。 进入Go脚本所在的bin目录下。 执行“touch go.mod”命令新建一个“go.mod”,并增加以下代码,添加依赖。 module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-client-go/v2 v2.1.2 ) 执行如下命令增加代理。 export GOPROXY=https://goproxy.cn,direct 执行如下命令下载依赖。 go mod tidy
  • 注意事项 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。 无法确保定时消息仅投递一次,定时消息可能会重复投递。 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全