华为云用户手册

  • 增强特性 对比开源社区,华为还提供了两个增强特性,JD BCS erver HA方案和设置JDB CS erver连接的超时时间。 JDBCServer的HA方案,多个JDBCServer主节点同时提供服务,当其中一个节点发生故障时,新的客户端连接会分配到其他主节点上,从而保障无间断为集群提供服务。Beeline和JDBC客户端代码两种连接方式的操作相同。 设置客户端与JDBCServer连接的超时时间。 Beeline 在网络拥塞的情况下,这个特性可以避免beeline由于无限等待服务端的返回而挂起。使用方式如下: 启动beeline时,在后面追加“--socketTimeOut=n”,其中“n”表示等待服务返回的超时时长,单位为秒,默认为“0”(表示永不超时)。建议根据业务场景,设置为业务所能容忍的最大等待时长。 JDBC客户端代码 在网络拥塞的情况下,这个特性可以避免客户端由于无限等待服务端的返回而挂起。使用方式如下: 在执行“DriverManager.getConnection”方法获取JDBC连接前,添加“DriverManager.setLoginTimeout(n)”方法来设置超时时长,其中n表示等待服务返回的超时时长,单位为秒,类型为Int,默认为“0”(表示永不超时)。建议根据业务场景,设置为业务所能容忍的最大等待时长。
  • HBase应用开发流程 本文档主要基于Java API对HBase进行应用开发。 开发流程中各阶段的说明如图1和表1所示。 图1 HBase应用程序开发流程 表1 HBase应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解HBase的基本概念,了解场景需求,设计表等。 常用概念 准备开发和运行环境 HBase的应用程序当前推荐使用Java语言进行开发。可使用IntelliJ IDEA工具。 HBase的运行环境即HBase客户端,请根据指导完成客户端的安装和配置。 准备HBase应用开发和运行环境 准备工程 HBase提供了不同场景下的样例程序,您可以导入样例工程进行程序学习。 导入并配置HBase样例工程 根据场景开发工程 提供了Java语言的样例工程,包含从建表、写入到删除表全流程的样例工程。 开发HBase应用 编译并运行程序 指导用户将开发好的程序编译并提交运行。 调测HBase应用 查看程序运行结果 程序运行结果会写在用户指定的路径下。用户还可以通过UI查看应用运行情况。 调测HBase应用 父主题: HBase开发指南(普通模式)
  • SpringBoot样例工程的命令行形式运行 在IDEA界面使用Maven执行install。 当输出“BUILD SUC CES S”,表示编译成功,如下图所示。编译成功后将会在样例工程的target下生成含有“flink-dws-sink-example-1.0.0-SNAPSHOT”字段的Jar包。 在Linux上进入客户端安装目录,如“/opt/client/Flink/flink/conf”作为作为运行目录,将1中生成的“target”目录下包名中含有“flink-dws-sink-example-1.0.0-SNAPSHOT”字段的Jar包放进该路径。 执行以下命令创建yarn-session。 yarn-session.sh -t ssl/ -nm "session-spring11" -d 执行以下命令启动SpringBoot服务。 执行 GaussDB (DWS)样例 flink run flink-dws-sink-example.jar
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“PhoenixSample”类的testCreateTable方法中。 /** * Create Table */ public void testCreateTable() { LOG .info("Entering testCreateTable."); String URL = "jdbc:phoenix:" + conf.get("hbase.zookeeper.quorum"); // Create table String createTableSQL = "CREATE TABLE IF NOT EXISTS TEST (id integer not null primary key, name varchar, " + "account char(6), birth date)"; try (Connection conn = DriverManager.getConnection(url, props); Statement stat = conn.createStatement()) { // Execute Create SQL stat.executeUpdate(createTableSQL); LOG.info("Create table successfully."); } catch (Exception e) { LOG.error("Create table failed.", e); } LOG.info("Exiting testCreateTable."); } /** * Drop Table */ public void testDrop() { LOG.info("Entering testDrop."); String URL = "jdbc:phoenix:" + conf.get("hbase.zookeeper.quorum"); // Delete table String dropTableSQL = "DROP TABLE TEST"; try (Connection conn = DriverManager.getConnection(url, props); Statement stat = conn.createStatement()) { stat.executeUpdate(dropTableSQL); LOG.info("Drop successfully."); } catch (Exception e) { LOG.error("Drop failed.", e); } LOG.info("Exiting testDrop."); }
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala 版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkDeleteExample SparkOnHbaseJavaExample-1.0.jar bulktable python版本(文件名等于实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample-1.0.jar HBaseButDeleteExample.py bulktable yarn-cluster模式: java/scala 版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkDeleteExample SparkOnHbaseJavaExample-1.0.jar bulktable python版本(文件名等于实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --jars SparkOnHbaseJavaExample-1.0.jar HBaseButDeleteExample.py bulktable
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseBulkDeleteExample文件: def main(args: Array[String]) { # -*- coding:utf-8 -*- """ 【说明】 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现 """ from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession spark = SparkSession\ .builder\ .appName("JavaHBaseBulkDeleteExample")\ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkDeleteExample') # 创建类实例并调用方法,传递sc._jsc参数 spark._jvm.JavaHBaseBulkDeleteExample().execute(spark._jsc, sys.argv) # 停止SparkSession spark.stop()
  • 在Linux调测程序 编译并生成Jar包,并将Jar包复制到与依赖库文件夹同级的目录“src/main/resources”下,具体步骤请参考在Linux调测程序。 运行Consumer样例工程的命令如下。 java -cp /opt/client/lib/*:/opt/client/src/main/resources com.huawei.bigdata.kafka.example.Consumer
  • 提交命令 假设用例代码打包后的jar包名为 、spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseMapPartitionExample SparkOnHbaseJavaExample.jar table2 python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample.jar HBaseMapPartitionExample.py table2 yarn-cluster模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseMapPartitionExample --files /opt/user.keytab,/opt/krb5.conf SparkOnHbaseJavaExample.jar table2 python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode cluster --files /opt/user.keytab,/opt/krb5.conf --jars SparkOnHbaseJavaExample.jar HBaseMapPartitionExample.py table2
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseMapPartitionExample文件: # -*- coding:utf-8 -*- """ 【说明】 (1)由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现 (2)如果使用yarn-client模式运行,请确认Spark2x客户端Spark2x/spark/conf/spark-defaults.conf中 spark.yarn.security.credentials.hbase.enabled参数配置为true """ from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession spark = SparkSession\ .builder\ .appName("JavaHBaseMapPartitionExample")\ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseMapPartitionExample') # 创建类实例并调用方法,传递sc._jsc参数 spark._jvm.JavaHBaseMapPartitionExample().execute(spark._jsc, sys.argv) # 停止SparkSession spark.stop()
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上(文件上传的路径需要和生成的jar包路径一致)。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
  • 基本概念 ResourceManager(RM) RM是一个全局的资源管理器,负责整个系统的资源管理和分配。它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager,ASM)。 ApplicationMaster(AM) 用户提交的每个应用程序均包含一个AM,主要功能包括: 与RM调度器协商以获取资源(用Container表示)。 将得到的资源进一步分配给内部任务。 与NM通信以启动/停止任务。 监控所有任务的运行状态,并在任务运行失败时重新为任务申请资源以重启任务。 NodeManager(NM) NM是每个节点上的资源和任务管理器,一方面,它会定时地向RM汇报本节点上的资源使用情况和各个Container的运行状态;另一方面,它会接收并处理来自AM的Container启动/停止等各种请求。 Container Container是YARN中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当AM向RM申请资源时,RM为AM返回的资源便是用Container表示的。
  • 简介 Yarn是一个分布式的资源管理系统,用于提高分布式的集群环境下的资源利用率,这些资源包括内存、IO、网络、磁盘等。其产生的原因是为了解决原MapReduce框架的不足。最初MapReduce的committer还可以周期性的在已有的代码上进行修改,可是随着代码的增加以及原MapReduce框架设计的不足,在原MapReduce框架上进行修改变得越来越困难,所以MapReduce的committer决定从架构上重新设计MapReduce,使下一代的MapReduce(MRv2/Yarn)框架具有更好的扩展性、可用性、可靠性、向后兼容性和更高的资源利用率,以及能支持除了MapReduce计算框架外的更多的计算框架。
  • 场景说明 假定用户有某个周末网民网购停留时间的日志文本,基于某些业务要求,要求开发MapReduce应用程序实现如下功能: 统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 log2.txt:周日网民停留日志 LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
  • 数据规划 首先需要把原日志文件放置在HDFS系统里。 在Linux系统上新建两个文本文件,将log1.txt中的内容复制保存到input_data1.txt,将log2.txt中的内容复制保存到input_data2.txt。 在HDFS上建立一个文件夹,“/tmp/input”,并上传input_data1.txt,input_data2.txt到此目录,命令如下: 在Linux系统HDFS客户端使用命令hdfs dfs -mkdir /tmp/input 在Linux系统HDFS客户端使用命令hdfs dfs -put local_filepath /tmp/input
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseBulkPutExample文件: # -*- coding:utf-8 -*- """ 【说明】 (1)由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现 (2)如果使用yarn-client模式运行,请确认Spark2x客户端Spark2x/spark/conf/spark-defaults.conf中 spark.yarn.security.credentials.hbase.enabled参数配置为true """ from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession spark = SparkSession\ .builder\ .appName("JavaHBaseBulkPutExample")\ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkPutExample') # 创建类实例并调用方法,传递sc._jsc参数 spark._jvm.JavaHBaseBulkPutExample().execute(spark._jsc, sys.argv) # 停止SparkSession spark.stop()
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端$SPARK_HOME目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkPutExample SparkOnHbaseJavaExample.jar bulktable cf1 python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample.jar HBaseBulkPutExample.py bulktable cf1 yarn-cluster模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkPutExample --files /opt/user.keytab,/opt/krb5.conf SparkOnHbaseJavaExample.jar bulktable cf1 python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode cluster --files /opt/user.keytab,/opt/krb5.conf --jars SparkOnHbaseJavaExample.jar HBaseBulkPutExample.py bulktable cf1
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上(文件上传的路径需要和生成的jar包路径一致)。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
  • 查看调试结果 SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/D:/mavenlocal/org/apache/logging/log4j/log4j-slf4j-impl/2.6.2/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/D:/mavenlocal/org/slf4j/slf4j-log4j12/1.7.30/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. ---- Begin executing sql: CREATE TABLE IF NOT EXISTS CHILD (NAME STRING, AGE INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ---- Result ---- Done executing sql: CREATE TABLE IF NOT EXISTS CHILD (NAME STRING, AGE INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ---- ---- Begin executing sql: LOAD DATA INPATH 'hdfs:/data/data' INTO TABLE CHILD ---- Result ---- Done executing sql: LOAD DATA INPATH 'hdfs:/data/data' INTO TABLE CHILD ---- ---- Begin executing sql: SELECT * FROM child ---- NAME AGE Miranda 32 Karlie 23 Candice 27 ---- Done executing sql: SELECT * FROM child ---- ---- Begin executing sql: DROP TABLE child ---- Result ---- Done executing sql: DROP TABLE child ---- Process finished with exit code 0
  • 编包并运行程序 获取样例代码。 下载样例工程的Maven工程源码和配置文件,请参见获取 MRS 应用开发样例工程。 将样例代码导入IDEA中。 获取配置文件。 从集群的客户端中获取文件。在“$SPARK_HOME/conf”中下载hive-site.xml与spark-defaults.conf文件到本地。 在HDFS中上传数据。 在Liunx中新建文本文件data,将如下数据内容保存到data文件中。 Miranda,32 Karlie,23 Candice,27 在Linux系统HDFS客户端使用命令hadoop fs -mkdir /data(hdfs dfs命令有同样的作用),创建对应目录。 在Linux系统HDFS客户端使用命令hadoop fs -put data /data,上传数据文件。 在样例代码中配置相关参数。 将加载数据的sql语句改为“LOAD DATA INPATH 'hdfs:/data/data' INTO TABLE CHILD”。 在程序运行时添加运行参数,分别为hive-site.xml与spark-defaults.conf文件的路径。 运行程序。
  • 获取MRS样例工程 MRS样例工程下载地址为https://github.com/huaweicloud/huaweicloud-mrs-example。 切换分支为与MRS集群相匹配的版本分支,例如“mrs-3.2.0.1”,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 图1 MRS样例工程代码下载 MRS LTS版本对应样例工程下载地址: MRS 3.3.0-LTS版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.3.0。 MRS 3.2.0-LTS.1版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.2.0.1。 MRS 3.1.2-LTS.3版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.2。 MRS普通版本对应样例工程下载地址: MRS 3.0.2版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.0.2。 MRS 3.1.0版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.0。 MRS 3.1.5版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.5。 MRS 2.1.x版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-2.1。 MRS 1.9.x版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-1.9。 MRS 1.8.x版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-1.8。 MRS 1.8之前版本:http://mapreduceservice.obs-website.cn-north-1.myhuaweicloud.com/。
  • MRS各组件样例工程汇总 MRS样例代码库提供了各组件的基本功能样例工程供用户使用,当前版本各组件提供的样例工程汇总参见表1。 表1 MRS组件样例工程汇总 组件 样例工程位置 描述 ClickHouse clickhouse-examples 指导用户基于Java语言,实现MRS集群中的ClickHouse的数据表创建、删除以及数据的插入、查询等操作。 本工程中包含了建立服务端连接、创建数据库、创建数据表、插入数据、查询数据及删除数据表等操作示例。 ClickHouseJDBC-Transaction-JavaExample ClickHouse事务开发代码样例,仅MRS 3.3.0及之后版本支持。 Doris doris-examples/doris-jdbc-example Doris数据读写操作的应用开发示例,仅MRS 3.3.0及之后版本支持。 通过调用Doris接口可实现创建用户表、向表中插入数据、查询表数据、删除表等功能 Flink 开启Kerberos认证集群的样例工程目录“flink-examples/flink-examples-security”。 未开启Kerberos认证集群的样例工程目录为“flink-examples/flink-examples-normal”。 FlinkCheckpointJavaExample Flink异步Checkpoint机制的Java/Scala示例程序。 本工程中,程序使用自定义算子持续产生数据,产生的数据为一个四元组(Long,String,String,Integer)。数据经统计后,将统计结果打印到终端输出。每隔6秒钟触发一次checkpoint,然后将checkpoint的结果保存到HDFS中。 FlinkCheckpointScalaExample FlinkHBaseJavaExample Flink API作业读写HBase数据的Java示例程序。 MRS 3.2.0及之后版本支持。 FlinkKafkaJavaExample Flink向Kafka生产并消费数据的Java/Scala示例程序。 在本工程中,假定某个Flink业务每秒就会收到1个消息记录,启动Producer应用向Kafka发送数据,然后启动Consumer应用从Kafka接收数据,对数据内容进行处理后并打印输出。 FlinkKafkaScalaExample FlinkPipelineJavaExample Flink Job Pipeline的Java/Scala示例程序。 本样例中一个发布者Job自己每秒钟产生10000条数据,另外两个Job作为订阅者,分别订阅一份数据。订阅者收到数据之后将其转化格式,并抽样打印输出。 FlinkPipelineScalaExample FlinkSqlJavaExample 使用客户端通过jar作业提交SQL作业的应用开发示例。 FlinkStreamJavaExample Flink构造DataStream的Java/Scala示例程序。 本工程示例为基于业务要求分析用户日志数据,读取文本数据后生成相应的DataStream,然后筛选指定条件的数据,并获取结果。 FlinkStreamScalaExample FlinkStreamSqlJoinExample Flink SQL Join示例程序。 本工程示例调用flink-connector-kafka模块的接口,生产并消费数据。生成Table1和Table2,并使用Flink SQL对Table1和Table2进行联合查询,打印输出结果。 FlinkRESTAPIJavaExample 本工程示例调用FlinkServer的RestAPI创建租户。 flink-examples/flink-sql 本工程示例使用Flink Jar提交SQL作业。 flink-examples/pyflink-example pyflink-kafka 本工程示例使用Python提交普通作业,提供Python读写Kafka作业的样例。 pyflink-sql 本工程示例使用Python提交SQL作业,提供Python提交SQL作业的样例。 HBase hbase-examples hbase-example HBase数据读写操作及全局二级索引的应用开发示例。通过调用HBase接口可实现以下功能: 创建用户表、导入用户数据、增加用户信息、查询用户信息及为用户表创建二级索引等功能。 MRS 3.3.0及之后版本,可实现创建/删除全局二级索引、修改全局二级索引状态、以及基于全局二级索引查询等功能。 hbase-rest-example HBase Rest接口应用开发示例。 使用Rest接口实现查询HBase集群信息、获取表、操作NameSpace、操作表等功能。 hbase-thrift-example 访问HBase ThriftServer应用开发示例。 访问ThriftServer操作表、向表中写数据、从表中读数据。 hbase-zk-example HBase访问ZooKeeper应用开发示例。 在同一个客户端进程内同时访问MRS ZooKeeper和第三方的ZooKeeper,其中HBase客户端访问MRS ZooKeeper,客户应用访问第三方ZooKeeper。 HDFS 开启Kerberos认证集群的样例工程目录“hdfs-example-security”。 未开启Kerberos认证集群的样例工程目录为“hdfs-example-normal”。 HDFS文件操作的Java示例程序。 本工程主要给出了创建HDFS文件夹、写文件、追加文件内容、读文件和删除文件/文件夹等相关接口操作示例。 hdfs-c-example HDFS C语言开发代码样例。 本示例提供了基于C语言的HDFS文件系统连接、文件操作如创建文件、读写文件、追加文件、删除文件等。 HetuEngine 开启Kerberos认证集群的样例工程目录为“hetu-examples/hetu-examples-security”。 未开启Kerberos认证集群的样例工程目录为“hetu-examples/hetu-examples-normal”。 通过不同方式连接HetuEngine的Java、Python示例程序。 通过HSFabric、HSBroker等连接方式,使用用户名和密码连接到HetuEngine,或通过KeyTab文件认证方式连接HetuEngine,组装对应的SQL发送到HetuEngine执行,完成对Hive数据源的增删改查操作。 Hive hive-examples hive-jdbc-example Hive JDBC处理数据Java示例程序。 本工程使用JDBC接口连接Hive,在Hive中执行相关数据操作。使用JDBC接口实现创建表、加载数据、查询数据等功能,还可实现在同一个客户端进程内同时访问 FusionInsight ZooKeeper和第三方的ZooKeeper。 hive-jdbc-example-multizk hcatalog-example Hive HCatalog处理数据Java示例程序。 使用HCatalog接口实现通过Hive命令行方式对MRS Hive元数据进行数据定义和查询操作。 python-examples 使用Python连接Hive执行SQL样例。 可实现使用Python对接Hive并提交数据分析任务。 python3-examples 使用Python3连接Hive执行SQL样例。 可实现使用Python3对接Hive并提交数据分析任务。 IoTDB iotdb-examples iotdb-flink-example 通过Flink访问IoTDB数据的示例程序,包括FlinkIoTDBSink和FlinkIoTDBSource。 FlinkIoTDBSink可实现通过Flink job将时序数据写入到IoTDB中。FlinkIoTDBSource则通过Flink job将时序数据从IoTDB读取出来并且打印。 iotdb-jdbc-example IoTDB JDBC处理数据Java示例程序。 本示例演示了如何使用JDBC接口连接IoTDB,并执行IoTDB SQL语句。 iotdb-kafka-example 通过Kafka访问IoTDB数据的示例程序。 本示例演示了如何先将时序数据发送到Kafka,再使用多线程将数据写入到IoTDB中。 iotdb-session-example IoTDB Session处理数据Java示例程序。 本示例演示了如何使用Session方式连接IoTDB,并执行IoTDB SQL语句。 iotdb-udf-exmaple 该样例程序介绍了如何实现一个简单的IoTDB自定义函数(UDF)。 Kafka kafka-examples Kafka流式数据的处理Java示例程序。 本工程基于Kafka Streams完成单词统计功能,通过读取输入Topic中的消息,统计每条消息中的单词个数,从输出Topic消费数据,然后将统计结果以Key-Value的形式输出。 Manager manager-examples FusionInsight Manager API接口调用示例。 本工程调用Manager API接口实现集群用户的创建、修改及删除等操作。 MapReduce 开启Kerberos认证集群的样例工程目录“mapreduce-example-security”。 未开启Kerberos认证集群的样例工程目录为“mapreduce-example-normal”。 MapReduce任务提交Java示例程序。 本工程提供了一个MapReduce统计数据的应用开发示例,实现数据分析、处理,并输出满足用户需要的数据信息。 另外以MapReduce访问HDFS、HBase、Hive为例,介绍如何编写MapReduce作业访问多个服务组件。帮助用户理解认证、配置加载等关键使用方式。 Oozie 开启Kerberos认证集群的样例工程目录“oozie-examples/ooziesecurity-examples”。 未开启Kerberos认证集群的样例工程目录为“oozie-examples/oozienormal-examples”。 OozieMapReduceExample Oozie提交MapReduce任务示例程序。 本示例演示了如何通过Java API提交MapReduce作业和查询作业状态,对网站的日志文件进行离线分析。 OozieSparkHBaseExample 使用Oozie调度Spark访问HBase的示例程序。 OozieSparkHiveExample 使用Oozie调度Spark访问Hive的示例程序。 Spark 开启Kerberos认证集群的样例工程目录“spark-examples/sparksecurity-examples”。 未开启Kerberos认证集群的样例工程目录为“spark-examples/sparknormal-examples”。 SparkHbasetoCarbonJavaExample Spark同步HBase数据到CarbonData的Java示例程序。 本示例工程中,应用将数据实时写入HBase,用于点查业务。数据每隔一段时间批量同步到CarbonData表中,用于分析型查询业务。 SparkHbasetoHbaseJavaExample Spark从HBase读取数据再写入HBase的Java/Scala/Python示例程序。 本示例工程中,Spark应用程序实现两个HBase表数据的分析汇总。 SparkHbasetoHbasePythonExample SparkHbasetoHbaseScalaExample SparkHivetoHbaseJavaExample Spark从Hive读取数据再写入到HBase的Java/Scala/Python示例程序。 本示例工程中,Spark应用程序实现分析处理Hive表中的数据,并将结果写入HBase表。 SparkHivetoHbasePythonExample SparkHivetoHbaseScalaExample SparkJavaExample Spark Core任务的Java/Python/Scala/R示例程序。 本工程应用程序实现从HDFS上读取文本数据并计算分析。 SparkRExample示例不支持未开启Kerberos认证的集群。 SparkPythonExample SparkScalaExample SparkRExample SparkLauncherJavaExample 使用Spark Launcher提交作业的Java/Scala示例程序。 本工程应用程序通过org.apache.spark.launcher.SparkLauncher类采用Java/Scala命令方式提交Spark应用。 SparkLauncherScalaExample SparkOnHbaseJavaExample Spark on HBase场景的Java/Scala/Python示例程序。 本工程应用程序以数据源的方式去使用HBase,将数据以Avro格式存储在HBase中,并从中读取数据以及对读取的数据进行过滤等操作。 SparkOnHbasePythonExample SparkOnHbaseScalaExample SparkOnHudiJavaExample Spark on Hudi场景的Java/Scala/Python示例程序。 本工程应用程序使用Spark操作Hudi执行插入数据、查询数据、更新数据、增量查询、特定时间点查询、删除数据等操作。 SparkOnHudiPythonExample SparkOnHudiScalaExample SparkOnMultiHbaseScalaExample Spark同时访问两个集群中的HBase的Scala示例程序。 本示例不支持未开启Kerberos认证的集群。 SparkSQLJavaExample Spark SQL任务的Java/Python/Scala示例程序。 本工程应用程序实现从HDFS上读取文本数据并计算分析。 SparkSQLPythonExample SparkSQLScalaExample SparkStreamingKafka010JavaExample Spark Streaming从Kafka接收数据并进行统计分析的Java/Scala示例程序。 本工程应用程序实时累加计算Kafka中的流数据,统计每个单词的记录总数。 SparkStreamingKafka010ScalaExample SparkStreamingtoHbaseJavaExample010 Spark Streaming读取Kafka数据并写入HBase的Java/Scala/Python示例程序。 本工程应用程序每5秒启动一次任务,读取Kafka中的数据并更新到指定的HBase表中。 SparkStreamingtoHbasePythonExample010 SparkStreamingtoHbaseScalaExample010 SparkStructuredStreamingJavaExample 在Spark应用中,通过使用StructuredStreaming调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数。 SparkStructuredStreamingPythonExample SparkStructuredStreamingScalaExample SparkThriftServerJavaExample 通过JDBC访问Spark SQL的Java/Scala示例程序。 本示例中,用户自定义JDBCServer的客户端,使用JDBC连接来进行表的创建、数据加载、查询和删除。 SparkThriftServerScalaExample StructuredStreamingADScalaExample 使用Structured Streaming,从kafka中读取广告请求数据、广告展示数据、广告点击数据,实时获取广告有效展示统计数据和广告有效点击统计数据,将统计结果写入kafka中。 StructuredStreamingStateScalaExample 在Spark结构流应用中,跨批次统计每个session期间发生了多少次event以及本session的开始和结束timestamp;同时输出本批次被更新状态的session。 SpringBoot(MRS 3.3.0及之后版本支持) clickhouse-examples clickhouse-rest-client-example SpringBoot连接ClickHouse服务应用开发示例。 本示例中,包含了建立服务端连接、创建数据库、创建数据表、插入数据、查询数据等操作示例。 doris-examples doris-rest-client-example Doris数据读写操作的SpringBoot应用开发示例。 提供SpringBoot连接Doris的样例程序。 flink-examples flink-dws-read-example GaussDB(DWS) SpringBoot方式连接Flink服务的应用开发示例。 flink-dws-sink-example hbase-examples SpringBoot连接Phoenix应用开发示例。 提供SpringBoot连接HBase与Phoenix的样例程序。 hive-examples hive-rest-client-example SpringBoot连接Hive应用开发示例。 本工程使用SpringBoot方式连接Hive,在Hive中执行创建表、加载数据、查询数据、删除表等操作。 kafka-examples SpringBoot连接Kafka实现Topic生产消费的应用开发示例。
  • Flink应用程序开发流程 Flink开发流程参考如下步骤: 图1 Flink应用程序开发流程 表1 Flink应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解Flink的基本概念。 基本概念 准备开发和运行环境 Flink的应用程序支持使用Scala、Java两种语言进行开发。推荐使用IDEA工具,请根据指导完成不同语言的开发环境配置。Flink的运行环境即Flink客户端,请根据指导完成客户端的安装和配置。 准备本地应用开发环境 准备工程 Flink提供了样例程序,您可以导入样例工程进行程序学习。或者您可以根据指导,新建一个Flink工程。 导入并配置Flink样例工程 根据场景开发工程 提供了Scala、Java两种不同语言的样例工程,帮助用户快速了解Flink各部件的编程接口。 开发Flink应用 编译并运行程序 指导用户将开发好的程序编译并提交运行。 编译并调测Flink应用 查看程序运行结果 程序运行结果会写在用户指定的路径下,用户还可以通过UI查看应用运行情况。 查看Flink应用调测结果 调优程序 您可以根据程序运行情况,对程序进行调优,使其性能满足业务场景需求。 调优完成后,请重新进行编译和运行。 组件操作指南中的“Flink性能调优”
  • 数据规划 在kafka中生成模拟数据(需要有Kafka权限用户) 确保集群安装完成,包括HDFS、Yarn、Spark2x和Kafka。 创建Topic。 {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。 $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 1 --topic {Topic} 启动Kafka的Producer,向Kafka发送数据。 {ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考在Linux环境中调测Spark应用章节中导出jar包的操作步骤。 java -cp $SPARK_HOME/conf:$SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:{ClassPath} com.huawei.bigdata.spark.examples.KafkaProducer {brokerlist} {topic} {number of events produce every 0.02s} 示例: java -cp /opt/client/Spark2x/spark/conf:/opt/StructuredStreamingState-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.KafkaProducer xxx.xxx.xxx.xxx:21005,xxx.xxx.xxx.xxx:21005,xxx.xxx.xxx.xxx:21005 mytopic 10
  • Python3样例工程的命令行形式运行 赋予“python3-examples”文件夹中脚本的可执行权限。在命令行终端执行以下命令: chmod +x python3-examples -R。 在python3-examples/pyCLI_nosec.py中的host的值修改为安装HiveServer的节点的业务平面IP,port的值修改为Hive提供Thrift服务的端口(hive.server2.thrift.port),默认为21066。 Hive多实例的python客户端命令行形式:“python3-examples/pyCLI_nosec.py”不仅要修改host,还需根据所使用的实例修改port,port为Hive提供Thrift服务的端口(hive.server2.thrift.port)。 执行以下命令运行Python3客户端: cd python3-examples python pyCLI_nosec.py 在命令行终端查看样例代码中的HQL所查询出的结果。例如: [['default', '']] [{'comment': 'from deserializer', 'columnName': 'tab_name', 'type': 'STRING_TYPE'}] ['xx']
  • 查看调测结果 IoTDB应用程序运行完成后,可通过IntelliJ IDEA运行结果查看应用程序运行情况。 JDBCExample样例程序运行结果如下所示: ... -------------------------- Time root.sg.d1.s1 root.company.line2.device1.temperature root.company.line2.device1.speed root.company.line2.device2.speed root.company.line2.device2.status root.company.line1.device1.spin root.company.line1.device1.status root.company.line1.device2.temperature root.company.line1.device2.power root.sg1.d1.s3 root.sg1.d1.s1 root.sg1.d1.s2 0, null, null, null, null, null, null, null, null, null, 1.0, 1.0, 1.0 1, null, null, null, null, null, null, null, null, null, 1.0, 1.0, 1.0 2, null, null, null, null, null, null, null, null, null, 1.0, 1.0, 1.0 3, null, null, null, null, null, null, null, null, null, 1.0, 1.0, 1.0 4, null, null, null, null, null, null, null, null, null, 1.0, 1.0, 1.0 5, null, null, null, null, null, null, null, null, null, 1.0, 1.0, 1.0 6, null, null, null, null, null, null, null, null, null, 1.0, 1.0, 1.0 7, null, null, null, null, null, null, null, null, null, 1.0, 1.0, 1.0 8, null, null, null, null, null, null, null, null, null, 1.0, 1.0, 1.0 9, null, null, null, null, null, null, null, null, null, 1.0, 1.0, 1.0 10, null, null, null, null, null, null, null, null, null, 1.0, 1.0, 1.0 -------------------------- -------------------------- count(root.sg.d1.s1) count(root.company.line2.device1.temperature) count(root.company.line2.device1.speed) count(root.company.line2.device2.speed) count(root.company.line2.device2.status) count(root.company.line1.device1.spin) count(root.company.line1.device1.status) count(root.company.line1.device2.temperature) count(root.company.line1.device2.power) count(root.sg1.d1.s3) count(root.sg1.d1.s1) count(root.sg1.d1.s2) 8237, 1, 1, 1, 1, 1, 1, 1, 1, 101, 101, 101 -------------------------- -------------------------- Time count(root.sg.d1.s1) count(root.company.line2.device1.temperature) count(root.company.line2.device1.speed) count(root.company.line2.device2.speed) count(root.company.line2.device2.status) count(root.company.line1.device1.spin) count(root.company.line1.device1.status) count(root.company.line1.device2.temperature) count(root.company.line1.device2.power) count(root.sg1.d1.s3) count(root.sg1.d1.s1) count(root.sg1.d1.s2) 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 19, 19, 19 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 20, 20 40, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 20, 20 60, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 20, 20 80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 20, 20 -------------------------- FlinkIoTDBSink样例程序运行结果如下所示: ... 19:53:41.532 [flink-akka.actor.default-dispatcher-9] DEBUG org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Received heartbeat from 5153e4ff24b25b13225f1bf67a4312d8. 19:53:41.800 [flink-akka.actor.default-dispatcher-9] DEBUG org.apache.flink.runtime.jobmaster.JobMaster - Trigger heartbeat request. 19:53:41.800 [flink-akka.actor.default-dispatcher-10] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat request from 5153e4ff24b25b13225f1bf67a4312d8. 19:53:41.802 [flink-akka.actor.default-dispatcher-9] DEBUG org.apache.flink.runtime.jobmaster.JobMaster - Received heartbeat from 7d6ef313-3f78-4cee-bbb1-e234dcac6d30. 19:53:42.988 [pool-3-thread-1] DEBUG org.apache.iotdb.flink.IoTDBSink - send event successfully 19:53:42.988 [pool-6-thread-1] DEBUG org.apache.iotdb.flink.IoTDBSink - send event successfully 19:53:42.990 [pool-4-thread-1] DEBUG org.apache.iotdb.flink.IoTDBSink - send event successfully 19:53:45.990 [pool-7-thread-1] DEBUG org.apache.iotdb.flink.IoTDBSink - send event successfully 19:53:45.992 [pool-9-thread-1] DEBUG org.apache.iotdb.flink.IoTDBSink - send event successfully 19:53:45.994 [pool-5-thread-1] DEBUG org.apache.iotdb.flink.IoTDBSink - send event successfully IoTDB Kafka样例程序运行结果如下所示: Producer.java ... [2022-01-15 15:12:34,221] INFO New Producer: start. (com.huawei.bigdata.iotdb.Producer) [2022-01-15 15:12:39,369] INFO [Producer clientId=DemoProducer] Cluster ID: uDtuaWS_QUK02EtuZQ4Xew (org.apache.kafka.clients.Metadata) [2022-01-15 15:12:57,077] INFO The Producer have send 100 messages. (com.huawei.bigdata.iotdb.Producer) [2022-01-15 15:13:04,691] INFO The Producer have send 200 messages. (com.huawei.bigdata.iotdb.Producer) [2022-01-15 15:13:11,355] INFO The Producer have send 300 messages. (com.huawei.bigdata.iotdb.Producer) [2022-01-15 15:13:17,758] INFO The Producer have send 400 messages. (com.huawei.bigdata.iotdb.Producer) [2022-01-15 15:13:24,335] INFO The Producer have send 500 messages. (com.huawei.bigdata.iotdb.Producer) [2022-01-15 15:13:30,739] INFO The Producer have send 600 messages. (com.huawei.bigdata.iotdb.Producer) [2022-01-15 15:13:37,267] INFO The Producer have send 700 messages. (com.huawei.bigdata.iotdb.Producer) KafkaConsumerMultThread.java ... [2022-01-15 15:19:27,563] INFO Consumer Thread-1 partitions:1 record: sensor_29,1642231023769,1.000000 offsets: 828 (com.huawei.bigdata.iotdb.KafkaConsumerMultThread) [2022-01-15 15:19:27,612] INFO Consumer Thread-1 partitions:1 record: sensor_31,1642231023769,1.000000 offsets: 829 (com.huawei.bigdata.iotdb.KafkaConsumerMultThread) [2022-01-15 15:19:27,612] INFO Consumer Thread-0 partitions:0 record: sensor_8,1642231023769,1.000000 offsets: 842 (com.huawei.bigdata.iotdb.KafkaConsumerMultThread) [2022-01-15 15:19:27,665] INFO Consumer Thread-1 partitions:1 record: sensor_32,1642231023769,1.000000 offsets: 830 (com.huawei.bigdata.iotdb.KafkaConsumerMultThread) [2022-01-15 15:19:27,665] INFO Consumer Thread-0 partitions:0 record: sensor_9,1642231023769,1.000000 offsets: 843 (com.huawei.bigdata.iotdb.KafkaConsumerMultThread) [2022-01-15 15:19:27,732] INFO Consumer Thread-1 partitions:1 record: sensor_33,1642231023769,1.000000 offsets: 831 (com.huawei.bigdata.iotdb.KafkaConsumerMultThread) [2022-01-15 15:19:27,732] INFO Consumer Thread-0 partitions:0 record: sensor_11,1642231023769,1.000000 offsets: 844 (com.huawei.bigdata.iotdb.KafkaConsumerMultThread) [2022-01-15 15:19:27,786] INFO Consumer Thread-0 partitions:0 record: sensor_12,1642231023769,1.000000 offsets: 845 (com.huawei.bigdata.iotdb.KafkaConsumerMultThread) [2022-01-15 15:19:27,786] INFO Consumer Thread-1 partitions:1 record: sensor_35,1642231023769,1.000000 offsets: 832 (com.huawei.bigdata.iotdb.KafkaConsumerMultThread)
  • Doris应用开发流程介绍 开发流程中各阶段的说明如下图所示。 图1 Doris应用程序开发流程 表1 Doris应用开发的流程说明 阶段 说明 参考文档 准备开发环境 在进行应用开发前,需首先准备开发环境,推荐使用Java语言进行开发,使用IntelliJ IDEA工具,同时完成JDK、Maven等初始配置。 准备本地应用开发环境 准备连接集群配置文件 应用程序开发或运行过程中,需通过集群相关配置文件信息连接MRS集群,配置文件通常包括用于安全认证的用户文件,可从已创建好的MRS集群中获取相关内容。 用于程序调测或运行的节点,需要与MRS集群内节点网络互通,同时配置hosts 域名 信息。 准备连接Doris集群配置文件 配置并导入样例工程 Doris提供了不同场景下的多种样例程序,用户可获取样例工程并导入本地开发环境中进行程序学习。 配置并导入JDBC样例工程 配置并导入SpringBoot样例工程 配置安全认证 使用JDBC或SpringBoot接口连接Doris时,需配置具有Doris管理员权限的用户进行安全认证。 根据业务场景开发程序 根据实际业务场景开发程序,调用组件接口实现对应功能。 Doris JDBC接口调用样例程序 编译并运行程序 将开发好的程序编译运行,用户可在本地Windows开发环境中进行程序调测运行,也可以将程序编译为Jar包后,提交到Linux节点上运行。 调测Doris应用 父主题: Doris开发指南(安全模式)
  • 操作步骤 控制台显示运行结果会有如下成功信息: cluset status is false Warning: Could not get charToByteConverterClass! Workflow job submitted: 0000067-160729120057089-oozie-omm-W Workflow job running ...0000067-160729120057089-oozie-omm-W Workflow job running ...0000067-160729120057089-oozie-omm-W Workflow job running ...0000067-160729120057089-oozie-omm-W Workflow job running ...0000067-160729120057089-oozie-omm-W Workflow job running ...0000067-160729120057089-oozie-omm-W Workflow job running ...0000067-160729120057089-oozie-omm-W Workflow job running ...0000067-160729120057089-oozie-omm-W Workflow job completed ...0000067-160729120057089-oozie-omm-W Workflow id[0000067-160729120057089-oozie-omm-W] status[SUCCEEDED] -----------finish Oozie ------------------- 同时在HDFS上生成目录“/user/developuser/examples/output-data/map-reduce”,包括如下两个文件: _SUCCESS part-00000 可以通过Hue的文件浏览器或者通过HDFS如下命令行查看: hdfs dfs -ls /user/developuser/examples/output-data/map-reduce 在Windows下面执行的时候可能会出现下面的异常,但是不影响业务: java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
  • Kafka应用开发流程介绍 Kafka客户端角色包括Producer和Consumer两个角色,其应用开发流程是相同的。 开发流程中各个阶段的说明如图1和表1所示。 图1 Kafka客户端程序开发流程 表1 Kafka客户端开发的流程说明 阶段 说明 参考文档 准备开发环境 Kafka的客户端程序当前推荐使用java语言进行开发,可使用IntelliJ IDEA工具开发。 Kafka的运行环境即Kafka客户端,请根据指导完成客户端的安装和配置。 准备本地应用开发环境 准备连接集群配置文件 应用程序开发或运行过程中,需通过集群相关配置文件信息连接MRS集群,配置文件通常包括集群组件信息文件以及用于安全认证的用户文件,可从已创建好的MRS集群中获取相关内容。 用于程序调测或运行的节点,需要与MRS集群内节点网络互通,同时配置hosts域名信息。 准备连接Kafka集群配置文件 配置并导入样例工程 Kafka提供了不同场景下的样例程序,您可以导入样例工程进行程序学习。 导入并配置Kafka样例工程 配置安全认证 如果您使用的是开启了Kerberos认证的MRS集群,需要进行安全认证。 配置Kafka应用安全认证 根据业务场景开发程序 提供了Producer和Consumer相关API的使用样例,包含了API和多线程的使用场景,帮助用户快速熟悉Kafka接口。 将开发好的程序编译运行,用户可在本地Windows开发环境中进行程序调测运行,也可以将程序编译为Jar包后,提交到Linux节点上运行。 开发Kafka应用 编译与运行程序 指导用户将开发好的程序编译并提交运行并查看结果。 调测Kafka应用 父主题: Kafka开发指南(安全模式)
  • 参数解释 FS Action节点中包含的各参数及其含义,请参见表1。 表1 参数含义 参数 含义 name FS活动的名称 delete 删除指定的文件和目录的标签 move 将文件从源目录移动到目标目录的标签 chmod 修改文件或目录权限的标签 path 当前文件路径 source 源文件路径 target 目标文件路径 permissions 权限字符串 “${变量名}”表示:该值来自“job.properties”所定义。 例如:${nameNode}表示的就是“hdfs://hacluster”。(可参见job.properties)
共100000条