华为云用户手册

  • 准备运行调测环境 在弹性云服务器管理控制台,申请一个新的弹性云服务器,用于应用开发、运行、调测。 弹性云服务器的安全组需要和 MRS 集群Master节点的安全组相同。 弹性云服务器的VPC需要与MRS集群在同一个VPC中。 弹性云服务器的网卡需要与MRS集群在同一个网段中。 申请弹性IP,绑定新申请的E CS 的IP,并配置安全组出入规则。 下载客户端程序,请参考下载MRS客户端。 登录存放下载的客户端的节点,再安装客户端。 执行以下命令解压客户端包: cd /opt tar -xvf /opt/MRS_Services_Client.tar 执行以下命令校验安装文件包: sha256sum -c /opt/MRS_Services_ClientConfig.tar.sha256 MRS_Services_ClientConfig.tar:OK 执行以下命令解压安装文件包: tar -xvf /opt/MRS_Services_ClientConfig.tar 执行如下命令安装客户端到指定目录(绝对路径),例如“/opt/client”。目录会自动创建。 cd /opt/MRS_Services_ClientConfig sh install.sh /opt/client Components client installation is complete.
  • Flink应用开发流程介绍 开发流程中各阶段的说明如图1和表1所示。 图1 Flink应用程序开发流程 表1 Flink应用开发流程说明 阶段 说明 参考文档 了解基本概念 开始开发应用前,需要了解Flink的基本概念。 Flink应用开发常用概念 准备开发环境和运行环境 Flink的应用程序支持使用Scala、Java两种语言进行开发。推荐使用IDEA工具,请根据指导完成不同语言的开发环境配置。Flink的运行环境即Flink客户端,请根据指导完成客户端的安装和配置。 准备本地应用开发环境 准备工程 Flink提供了样例程序,您可以导入样例工程进行程序学习。或者您可以根据指导,新建一个Flink工程。 配置并导入Flink样例工程 根据场景开发工程 提供了Scala、Java两种不同语言的样例工程,帮助用户快速了解Flink各部件的编程接口。 Flink DataStream应用开发思路 编译并运行程序 指导用户将开发好的程序编译并提交运行。 编译并运行Flink应用 查看程序运行结果 程序运行结果会写在用户指定的路径下,用户还可以通过UI查看应用运行情况。 查看Flink应用运行结果 调优程序 您可以根据程序运行情况,对程序进行调优,使其性能满足业务场景需求。 调优完成后,请重新进行编译和运行。 Flink应用性能调优建议 父主题: Flink应用开发概述
  • 操作步骤 对于Java开发环境,推荐使用IDEA工具,安装要求如下。 JDK使用1.7版本(或1.8版本) IntelliJ IDEA(版本:13.1.6) Spark不支持当客户端程序使用IBM JDK 1.7运行时,使用yarn-client模式向服务端提交Spark任务。 Oracle JDK需进行安全加固,具体操作如下。 到Oracle官方网站获取与JDK版本对应的JCE(Java Cryptography Extension)文件。JCE文件解压后包含“local_policy.jar”和“US_export_policy.jar”。拷贝jar包到如下路径。 Linux:JDK安装目录/jre/lib/security Windows:JDK安装目录\jre\lib\security 将“客户端安装目录/JDK/jdk/jre/lib/ext/”目录下“ SMS 4JA.jar”拷贝到如下路径。 Linux:JDK安装目录/jre/lib/ext/ Windows:JDK安装目录\jre\lib\ext\ 安装IntelliJ IDEA和JDK工具,并进行相应的配置。 安装JDK。 安装IntelliJ IDEA工具。 在IntelliJ IDEA中配置JDK。 打开IntelliJ IDEA,选择“Configure”。 图1 Quick Start 在“Configure”页面中选择的“Project Defaults”。 图2 Configure 在“Project Defaults”页面中,选择“Project Structure”。 图3 Project Defaults 在打开的“Project Structure”页面中,选择“SDKs”,单击绿色加号添加JDK。 图4 添加JDK 在弹出的“Select Home Directory for JDK”窗口,选择对应的JDK目录,然后单击“OK”。 图5 选择JDK目录 完成JDK选择后,单击“OK”完成配置。 图6 完成JDK配置
  • 基本概念 data point:时间序列数据点,包括metric、timestamp、value和tag。表示某个metric在某个时间点的数值。 metric:指标项。例如,在系统监控中的CPU使用率、内存、IO等指标。 timestamp:UNIX时间戳(自Epoch以来的秒或毫秒),即value产生的时间。 value:某个metric的值,是JSON格式的事件或直方图/摘要。 tag:标签,是由Tagk和Tagv组成的键值对。用于描述该点所属的时间序列。 标签允许您从不同的源或相关实体中分离出类似的数据点,因此您可以轻松地单独或成组地绘制它们。标签的一个常见用法是使用生成数据点的机器名称以及机器所属的集群或池的名称来注释数据点。这使您可以轻松地制作显示每个服务器的服务状态的仪表盘,以及显示跨逻辑服务器池的聚合状态的仪表盘。
  • 数据规划 Flink样例工程的数据存储在Kafka组件中。Flink向Kafka组件发送数据(需要有kafka权限用户),并从Kafka组件获取数据。 确保集群安装完成,包括HDFS、Yarn、Flink和Kafka。 创建Topic。 在服务端配置用户创建topic的权限。 开启Kerberos认证的安全集群将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”。配置完后重启kafka服务。未开启Kerberos认证的普通集群无需此配置。 用户使用Linux命令创建topic,如果是安全集群,用户执行命令前需要使用kinit命令进行人机认证,如:kinit flinkuser。 flinkuser需要用户自己创建,并拥有创建Kafka的topic权限。具体操作请参考准备Flink应用开发用户。 创建topic的命令格式: bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --partitions {partitionNum} --replication-factor {replicationNum} --topic {Topic} 表1 参数说明 参数名 说明 {zkQuorum} ZooKeeper集群信息,格式为IP:port。 {partitionNum} topic的分区数。 {replicationNum} topic中每个partition数据的副本数。 {Topic} Topic名称。 示例:在Kafka的客户端路径下执行命令,此处以ZooKeeper集群的IP:port是10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181,Topic名称为topic1的数据为例。 bin/kafka-topics.sh --create --zookeeper 10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181/kafka --partitions 5 --replication-factor 1 --topic topic1 如果集群开启了kerberos, 执行该步骤进行安全认证,否则跳过该步骤。 Kerberos认证配置 客户端配置。 在Flink配置文件“flink-conf.yaml”中,增加kerberos认证相关配置(主要在“contexts”项中增加“KafkaClient”),示例如下: security.kerberos.login.keytab: /home/demo/flink/release/flink-1.2.1/keytab/admin.keytab security.kerberos.login.principal: admin security.kerberos.login.contexts: Client,KafkaClient security.kerberos.login.use-ticket-cache: false 运行参数。 关于“SASL_PLAINTEXT”协议的运行参数示例如下: --topic topic1 --bootstrap.servers 10.96.101.32:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka //10.96.101.32:21007表示kafka服务器的IP:port
  • Eclipse代码样例 创建Topology。 private static final String DEFAULT_FS_URL = "obs://mybucket"; public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); // 分隔符格式,当前采用“|”代替默认的“,”对tuple中的field进行分隔 // HdfsBolt必选参数 RecordFormat format = new DelimitedRecordFormat() .withFieldDelimiter("|"); // 同步策略,每1000个tuple对文件系统进行一次同步 // HdfsBolt必选参数 SyncPolicy syncPolicy = new CountSyncPolicy(1000); // 文件大小循环策略,当文件大小到达5M时,从头开始写 // HdfsBolt必选参数 FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.KB); // 写入HDFS的目的文件 // HdfsBolt必选参数 FileNameFormat fileNameFormat = new DefaultFileNameFormat() .withPath("/user/foo/"); //创建HdfsBolt HdfsBolt bolt = new HdfsBolt() .withFsUrl(DEFAULT_FS_URL) .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy); //Spout生成随机语句 builder.setSpout("spout", new RandomSentenceSpout(), 1); builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout"); builder.setBolt("count", bolt, 1).fieldsGrouping("split", new Fields("word")); Config conf = new Config(); //命令行提交拓扑 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); }
  • 部署运行及结果查看 在Storm示例代码根目录执行如下命令打包:"mvn package"。执行成功后,将会在target目录生成storm-examples-1.0.jar。 执行命令提交拓扑。 提交命令示例(拓扑名为obs-test)。 storm jar /opt/jartarget/storm-examples-1.0.jar com.huawei.storm.example.obs.SimpleOBSTopology obs://my-bucket obs-test 拓扑提交成功后请登录OBS Browser查看。
  • 功能介绍 下面代码片段在com.huawei.bigdata.kafka.example.SimpleConsumerDemo类中,用于实现使用新SimpleConsumer API订阅Topic,并进行消息消费。(注意:SimpleConsumer API仅支持访问未设置ACL的Topic,安全接口说明见Kafka安全接口介绍) SimpleConsumer API属于lowlevel的Consumer API需要访问zookeeper元数据,管理消费Topic队列的offset,一般情况不推荐使用。
  • 常用命令 Shell命令执行方法: 进入Spark客户端目录。 初始化环境变量。 source /opt/client/bigdata_env 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户。如果当前集群未启用Kerberos认证,则无需执行此命令。当前用户为准备Spark应用开发用户时增加的开发用户。 kinit MRS集群用户 例如: 开发用户为“机机”用户时请执行:kinit -kt user.keytab sparkuser 开发用户为“人机”用户时请执行:kinit sparkuser 执行Spark shell命令。 Spark常用的命令如下所示:
  • 功能介绍 每一个Consumer实例都属于一个Consumer group,每一条消息只会被同一个Consumer group里的一个Consumer实例消费(不同的Consumer group可以同时消费同一条消息)。 下面代码片段在com.huawei.bigdata.kafka.example.Old_Consumer类中,作用在于订阅指定Topic的消息。(注意:旧Consumer API仅支持访问未设置ACL的Topic,安全接口说明见Kafka安全接口介绍)
  • 应用开发操作步骤 确认Storm和HBase组件已经安装,并正常运行。 将storm-examples导入到Eclipse开发环境,请参见导入并配置Storm样例工程。 如果集群启用了安全服务,按登录方式分为以下两种。 keytab方式:需要从管理员处获取一个“人机”用户,用于认证,并且获取到该用户的keytab文件。 票据方式:从管理员处获取一个“人机”用户,用于后续的安全登录,开启Kerberos服务的renewable和forwardable开关并且设置票据刷新周期,开启成功后重启kerberos及相关组件。 获取的用户需要属于storm组。 Kerberos服务的renewable、forwardable开关和票据刷新周期的设置在Kerberos服务的配置页面的“系统”标签下,票据刷新周期的修改可以根据实际情况修改“kdc_renew_lifetime”和“kdc_max_renewable_life”的值。 下载并安装HBase客户端程序。 获取相关配置文件。获取方法如下。 在安装好的hbase客户端目录下找到目录“/opt/client/HBase/hbase/conf”,在该目录下获取到core-site.xml、hdfs-site.xml、hbase-site.xml配置文件。将这些文件拷贝到示例工程的 src/main/resources目录。 如果使用keytab登录方式,按3获取keytab文件;如果使用票据方式,则无需获取额外的配置文件。 获取到的keytab文件默认文件名为user.keytab,若用户需要修改,可直接修改文件名,但在提交任务时需要额外上传修改后的文件名作为参数。
  • 部署运行及结果查看 在Storm示例代码根目录执行如下命令打包:"mvn package"。执行成功后,将会在target目录生成storm-examples-1.0.jar。 执行命令提交拓扑。 keytab方式下,若用户修改了keytab文件名,如修改为“huawei.keytab”,则需要在命令中增加第二个参数进行说明,提交命令示例(拓扑名为hbase-test): storm jar /opt/jartarget/storm-examples-1.0.jar com.huawei.storm.example.hbase.SimpleHBaseTopology hbase-test huawei.keytab 安全模式下在提交source.jar之前,请确保已经进行kerberos安全登录,并且keytab方式下,登录用户和所上传keytab所属用户必须是同一个用户。 因为示例中的HBaseBolt并没有建表功能,在提交之前确保hbase中存在相应的表,若不存在需要手动建表,hbase shell建表语句如下create 'WordCount', 'cf'。 安全模式下hbase需要用户有相应表甚至列族和列的访问权限,因此首先需要在hbase所在集群上使用hbase管理员用户登录,之后在hbase shell中使用grant命令给提交用户申请相应表的权限,如示例中的WordCount,成功之后再使用提交用户登录并提交拓扑。 拓扑提交成功后请自行登录HBase集群查看WordCount表是否有数据生成。 如果使用票据登录,则需要使用命令行定期上传票据,具体周期由票据刷新截止时间而定,步骤如下。 在安装好的storm客户端目录的Storm/storm-0.10.0/conf/storm.yaml文件尾部新起一行添加如下内容。 topology.auto-credentials: - backtype.storm.security.auth.kerberos.AutoTGT 执行命令:./storm upload-credentials hbase-test
  • 代码样例 如下是代码片段,详细代码请参考ExampleClient类。 /** * load configurations from alluxio-site.properties * @throws IOException */ private void loadConf() throws IOException { InputStream fileInputStream = null; alluxioConf = new Properties(); File propertiesFile = new File(PATH_TO_ALLUXIO_SITE_PROPERTIES); try { fileInputStream = new FileInputStream(propertiesFile); alluxioConf.load(fileInputStream); } catch (FileNotFoundException e) { System.out.println(PATH_TO_ALLUXIO_SITE_PROPERTIES + "does not exist. Exception: " + e); } catch (IOException e) { System.out.println("Failed to load configuration file. Exception: " + e); } finally{ close(fileInputStream); } } /** * build Alluxio instance */ private void instanceBuild() throws IOException { // get filesystem InstancedConfiguration conf = new InstancedConfiguration(ConfigurationUtils.defaults()); conf.set(PropertyKey.MASTER_RPC_ADDRESSES, alluxioConf.get("alluxio.master.rpc.addresses")); FileSystemContext fsContext = FileSystemContext.create(conf); fSystem = FileSystem.Factory.create(fsContext); }
  • 代码样例 如下是代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsMain类和HdfsWriter类。 /** * 追加文件内容 * * @throws IOException */ private void append() throws Exception { final String content = "I append this content."; InputStream in = (InputStream) new ByteArrayInputStream( content.getBytes()); try { HdfsWriter writer = new HdfsWriter(fSystem, DEST_PATH + File.separator + FILE_NAME); writer.doAppend(in); System.out.println("success to append."); } finally { //务必要关闭流资源. close(in); } }
  • 场景说明 假定用户开发一个应用程序,用于管理企业中的使用A业务的用户信息,如表1所示,A业务操作流程如下: 创建用户信息表。 在用户信息中新增用户的学历、职称等信息。 根据用户编号查询用户姓名和地址。 根据用户姓名进行查询。 查询年龄段在[20–29]之间的用户信息。 数据统计,统计用户信息表的人员数、年龄最大值、年龄最小值、平均年龄。 用户销户,删除用户信息表中该用户的数据。 A业务结束后,删除用户信息表。 表1 用户信息 编号 姓名 性别 年龄 地址 12005000201 Zhang San Male 19 Shenzhen City, Guangdong Province 12005000202 Li Wanting Female 23 Hangzhou City, Zhejiang Province 12005000203 Wang Ming Male 26 Ningbo City, Zhejiang Province 12005000204 Li Gang Male 18 Xiangyang City, Hubei Province 12005000205 Zhao Enru Female 21 Shangrao City, Jiangxi Province 12005000206 Chen Long Male 32 Zhuzhou City, Hunan Province 12005000207 Zhou Wei Female 29 Nanyang City, Henan Province 12005000208 Yang Yiwen Female 30 Wenzhou City, Zhejiang Province 12005000209 Xu Bing Male 26 Weinan City, Shaanxi Province 12005000210 Xiao Kai Male 25 Dalian City, Liaoning Province
  • 功能分解 根据上述的业务场景进行功能分解,需要开发的功能点如表2所示。 表2 在HBase中开发的功能 序号 步骤 代码实现 1 根据表1中的信息创建表。 请参见创建HBase表。 2 导入用户数据。 请参见插入HBase数据。 3 增加“教育信息”列族,在用户信息中新增用户的学历、职称等信息。 请参见修改HBase表。 4 根据用户编号查询用户姓名和地址。 请参见使用Get读取HBase数据。 5 根据用户姓名进行查询。 请参见使用HBase过滤器Filter。 6 用户销户,删除用户信息表中该用户的数据。 请参见删除HBase数据。 7 A业务结束后,删除用户信息表。 请参见删除HBase表。
  • 准备运行调测环境 在弹性云服务器管理控制台,申请一个新的弹性云服务器,用于用户应用程序开发、运行、调测。 弹性云服务器的安全组需要和MRS集群Master节点的安全组相同。 弹性云服务器的VPC需要与MRS集群在同一个VPC中。 弹性云服务器的网卡需要与MRS集群在同一个网段中。 申请弹性IP,绑定新申请的弹性云主机IP,并配置安全组出入规则。 下载客户端程序,请参考下载MRS客户端。 以root用户安装集群客户端。 执行以下命令解压客户端包。 tar -xvf /opt/MRS_Services_Client.tar 执行以下命令校验安装文件包。 sha256sum -c /opt/MRS_Services_ClientConfig.tar.sha256 MRS_Services_ClientConfig.tar:OK 执行以下命令解压安装文件包。 tar -xvf /opt/MRS_Services_ClientConfig.tar 执行如下命令安装客户端到指定目录(绝对路径),例如“/opt/client”。目录会自动创建。 cd /opt/MRS_Services_ClientConfig sh install.sh /opt/client Components client installation is complete.
  • 操作步骤 运行结果会有如下成功信息: ... 2020-01-09 10:43:49,338 INFO [main] examples.HBaseExample: Entering dropTable. 2020-01-09 10:43:49,341 INFO [main] client.HBaseAdmin: Started disable of hbase_sample_table 2020-01-09 10:43:50,080 INFO [main] client.HBaseAdmin: Operation: DISABLE, Table Name: default:hbase_sample_table, procId: 41 completed 2020-01-09 10:43:50,550 INFO [main] client.HBaseAdmin: Operation: DELETE, Table Name: default:hbase_sample_table, procId: 43 completed 2020-01-09 10:43:50,550 INFO [main] examples.HBaseExample: Drop table successfully. 2020-01-09 10:43:50,550 INFO [main] examples.HBaseExample: Exiting dropTable. 2020-01-09 10:43:50,550 INFO [main] client.ConnectionImplementation: Closing master protocol: MasterService 2020-01-09 10:43:50,556 INFO [main] examples.TestMain: -----------finish to test HBase API------------------- 在Windows环境运行样例代码时会出现下面的异常,但是不影响业务: java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. 日志说明 日志级别默认为INFO,可以通过调整日志打印级别(DEBUG,INFO,WARN,ERROR,FATAL)来显示更详细的信息。可以通过修改log4j.properties文件来实现,如: hbase.root.logger=INFO,console log4j.logger.org.apache.zookeeper=INFO #log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG log4j.logger.org.apache.hadoop.hbase=INFO # Make these two classes DEBUG-level. Make them DEBUG to see more zk debug. log4j.logger.org.apache.hadoop.hbase.zookeeper.ZKUtil=INFO log4j.logger.org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher=INFO
  • 代码样例 /** * create file,write file */ private void write() throws IOException { final String content = "hi, I am bigdata. It is successful if you can see me."; FileOutStream out = null; try { AlluxioURI path = new AlluxioURI(testFilePath); out = fSystem.createFile(path); out.write(content.getBytes()); } catch (Exception e){ System.out.println("Failed to write file. Exception:" + e); } finally { close(out); } }
  • Storm接口介绍 Storm采用的接口同开源社区版本保持一致,详情请参见: http://storm.apache.org/documentation/Home.html。 Storm-HDFS采用的接口同开源社区版本保持一致,详情参见: https://github.com/apache/storm/tree/v0.10.0/external/storm-hdfs。 Storm-HBase采用的接口同开源社区版本保持一致,详情参见: https://github.com/apache/storm/tree/v0.10.0/external/storm-hbase。 Storm-Kafka采用的接口同开源社区版本保持一致,详情参见: https://github.com/apache/storm/tree/v0.10.0/external/storm-kafka。 Storm-JDBC采用的接口同开源社区版本保持一致,详情参见: https://github.com/apache/storm/tree/v0.10.0/external/storm-jdbc。 父主题: Storm应用开发常见问题
  • 代码样例 如下是写文件的代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsMain类和HdfsWriter类。 /** * 创建文件,写文件 * * @throws IOException * @throws ParameterException */ private void write() throws IOException, ParameterException { final String content = "hi, I am bigdata. It is successful if you can see me."; InputStream in = (InputStream) new ByteArrayInputStream( content.getBytes()); try { HdfsWriter writer = new HdfsWriter(fSystem, DEST_PATH + File.separator + FILE_NAME); writer.doWrite(in); System.out.println("success to write."); } finally { //务必要关闭流资源 close(in); } }
  • Hive应用开发环境简介 在进行应用开发时,要准备的本地开发环境如表1所示。同时需要准备运行调测的Linux环境,用于验证应用程序运行正常。 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,推荐Windows7以上版本。 运行环境:Linux系统。 安装JDK 开发和运行环境的基本配置。版本要求如下: MRS集群的服务端和客户端仅支持自带的Oracle JDK(版本为1.8),不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的,支持Oracle JDK和IBM JDK。 Oracle JDK:支持1.7和1.8版本。 IBM JDK:推荐1.7.8.10、1.7.9.40和1.8.3.0版本。 说明: 在HCatalog的开发环境中,基于安全考虑,MRS服务端只支持TLS 1.1和TLS 1.2加密协议,IBM JDK默认TLS只支持1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS1.0/1.1/1.2。 详情请参见:https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装和配置Eclipse 用于开发Hive应用程序的工具。版本要求如下: JDK使用1.7版本,Eclipse使用3.7.1及以上版本。 JDK使用1.8版本,Eclipse使用4.3.2及以上版本。 说明: 若使用IBM JDK,请确保Eclipse中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保Eclipse中的JDK配置为Oracle JDK。 不同的Eclipse不要使用相同的workspace和相同路径下的示例工程。 网络 确保客户端与Hive服务主机在网络上互通。 父主题: 准备Hive应用开发环境
  • OpenTSDB应用开发流程介绍 开发流程中各阶段的说明如图1和表1所示。 图1 Opentsdb应用程序开发流程 表1 Opentsdb应用开发流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解OpenTSDB的基本概念,了解场景需求,设计表等。 OpenTSDB应用开发常用概念 准备开发环境和运行环境 OpenTSDB的应用程序当前推荐使用Java语言进行开发。可使用Eclipse工具。OpenTSDB的运行环境即OpenTSDB客户端,请根据指导完成客户端的安装和配置。 OpenTSDB应用开发环境简介 准备工程 OpenTSDB提供了不同场景下的样例程序,您可以导入样例工程进行程序学习。或者您可以根据指导,新建一个OpenTSDB工程。 导入并配置OpenTSDB样例工程 根据场景开发工程 提供了Java语言的样例工程,包含从创建metric、写入到查询流程的样例工程。 OpenTSDB样例程序开发思路 编译并运行程序 指导用户将开发好的程序编译并提交运行。 调测OpenTSDB应用 查看程序运行结果 程序运行结果会写在用户指定的路径下。用户还可以通过UI查看导入数据的状态。 查看OpenTSDB应用调测结果 父主题: OpenTSDB应用开发概述
  • 运行结果观察方式 样例程序工程jar包运行结果可以在logs目录下的client.log观察,默认状态下的log4j.properties没有将运行状态输出,若需要观察程序运行的信息,需将log4j.properties按如下方式配置: # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. kafka.logs.dir=logs log4j.rootLogger=INFO, stdout, kafkaAppender log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n log4j.logger.kafka=INFO, kafkaAppender log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH log4j.appender.kafkaAppender.File=${kafka.logs.dir}/client.log log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n # Turn on all our debugging info #log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender #log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender #log4j.logger.kafka.perf=DEBUG, kafkaAppender #log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender #log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG 将kafkaAppender添加到rootLogger,并将日志级别调整到需要观察的级别。
  • 示例:Maven工程打包到Linux下运行样例 执行mvn package生成jar包,在工程目录target目录下获取,比如:kafka-examples-1.6.0.jar。 执行mvn dependency:copy-dependencies -DoutputDirectory=kafka-examples-lib -DincludeScope=compile,导出kafka样例工程依赖的jar包,比如放到kafka-examples-lib目录。 在第一步指定的目录下生成一个Jar包和一个存放lib的文件夹。 将刚才生成的依赖库文件夹(此处为“kafka-examples-lib”)拷贝到MRS服务的某个Linux环境上任意目录下,例如:“/opt/example”,然后将刚才生成的jar包拷贝到“/opt/example/kafka-examples-lib”目录下。 将样例工程的conf目录拷贝到与依赖库文件夹同级目录下,即“/opt/example”目录下,并创建logs目录,用于记录jar包运行日志。 切换到root用户,将拷贝进去的conf,kafka-examples-lib,logs目录修改为omm:wheel用户组所有,执行以下命令切换用户。 sudo su - root chown -R omm:wheel /opt/example/* 切换为omm用户,进入拷贝目录下“/opt/example”,首先确保conf目录下和依赖库文件目录下的所有文件,对当前用户均具有可读权限;同时保证已安装jdk并已设置java相关环境变量,然后执行命令,如java -cp .:/opt/example/conf:/opt/example/kafka-examples-lib/* com.huawei.bigdata.kafka.example.Producer,运行样例工程。 su - omm chmod 750 /opt/example cd /opt/example java -cp .:/opt/example/conf:/opt/example/kafka-examples-lib/* com.huawei.bigdata.kafka.example.Producer
  • 在Linux中调测Impala JDBC应用 在运行调测环境上创建一个目录作为运行目录,如“/opt/impala_examples”(Linux环境),并在该目录下创建子目录“conf”。 执行mvn package ,在工程target目录下获取jar包,比如: impala-examples-mrs-2.1-jar-with-dependencies.jar ,拷贝到“/opt/impala_examples”下 开启Kerberos认证的安全集群下把从4获取的user.keytab和krb5.conf拷贝到/opt/impala_examples/conf下。普通集群可跳过该步骤。 在Linux环境下执行如下命令运行样例程序。 chmod +x /opt/impala_examples -R cd /opt/impala_examples java -cp impala-examples-mrs-2.1-jar-with-dependencies.jar com.huawei.bigdata.impala.example.ExampleMain 在命令行终端查看样例代码中的Impala SQL所查询出的结果。 Linux环境运行成功结果会有如下信息。 Create table success! _c0 0 Delete table success! 父主题: 调测Impala应用
  • 获取MRS样例工程 MRS样例工程下载地址为https://github.com/huaweicloud/huaweicloud-mrs-example。 切换分支为与MRS集群相匹配的版本分支,例如“mrs-3.2.0.1”,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 图1 MRS样例工程代码下载 MRS LTS版本对应样例工程下载地址: MRS 3.3.0-LTS版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.3.0。 MRS 3.2.0-LTS.1版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.2.0.1。 MRS 3.1.2-LTS.3版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.2。 MRS普通版本对应样例工程下载地址: MRS 3.0.2版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.0.2。 MRS 3.1.0版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.0。 MRS 3.1.5版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.5。 MRS 2.1.x版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-2.1。 MRS 1.9.x版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-1.9。 MRS 1.8.x版本:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-1.8。 MRS 1.8之前版本:http://mapreduceservice.obs-website.cn-north-1.myhuaweicloud.com/。
  • MRS各组件样例工程汇总 MRS样例代码库提供了各组件的基本功能样例工程供用户使用,当前版本各组件提供的样例工程汇总参见表1。 表1 MRS组件样例工程汇总 组件 样例工程位置 描述 ClickHouse clickhouse-examples 指导用户基于Java语言,实现MRS集群中的ClickHouse的数据表创建、删除以及数据的插入、查询等操作。 本工程中包含了建立服务端连接、创建数据库、创建数据表、插入数据、查询数据及删除数据表等操作示例。 ClickHouseJDBC-Transaction-JavaExample ClickHouse事务开发代码样例,仅MRS 3.3.0及之后版本支持。 Doris doris-examples/doris-jdbc-example Doris数据读写操作的应用开发示例,仅MRS 3.3.0及之后版本支持。 通过调用Doris接口可实现创建用户表、向表中插入数据、查询表数据、删除表等功能 Flink 开启Kerberos认证集群的样例工程目录“flink-examples/flink-examples-security”。 未开启Kerberos认证集群的样例工程目录为“flink-examples/flink-examples-normal”。 FlinkCheckpointJavaExample Flink异步Checkpoint机制的Java/Scala示例程序。 本工程中,程序使用自定义算子持续产生数据,产生的数据为一个四元组(Long,String,String,Integer)。数据经统计后,将统计结果打印到终端输出。每隔6秒钟触发一次checkpoint,然后将checkpoint的结果保存到HDFS中。 FlinkCheckpointScalaExample FlinkHBaseJavaExample Flink API作业读写HBase数据的Java示例程序。 MRS 3.2.0及之后版本支持。 FlinkKafkaJavaExample Flink向Kafka生产并消费数据的Java/Scala示例程序。 在本工程中,假定某个Flink业务每秒就会收到1个消息记录,启动Producer应用向Kafka发送数据,然后启动Consumer应用从Kafka接收数据,对数据内容进行处理后并打印输出。 FlinkKafkaScalaExample FlinkPipelineJavaExample Flink Job Pipeline的Java/Scala示例程序。 本样例中一个发布者Job自己每秒钟产生10000条数据,另外两个Job作为订阅者,分别订阅一份数据。订阅者收到数据之后将其转化格式,并抽样打印输出。 FlinkPipelineScalaExample FlinkSqlJavaExample 使用客户端通过jar作业提交SQL作业的应用开发示例。 FlinkStreamJavaExample Flink构造DataStream的Java/Scala示例程序。 本工程示例为基于业务要求分析用户日志数据,读取文本数据后生成相应的DataStream,然后筛选指定条件的数据,并获取结果。 FlinkStreamScalaExample FlinkStreamSqlJoinExample Flink SQL Join示例程序。 本工程示例调用flink-connector-kafka模块的接口,生产并消费数据。生成Table1和Table2,并使用Flink SQL对Table1和Table2进行联合查询,打印输出结果。 FlinkRESTAPIJavaExample 本工程示例调用FlinkServer的RestAPI创建租户。 flink-examples/flink-sql 本工程示例使用Flink Jar提交SQL作业。 flink-examples/pyflink-example pyflink-kafka 本工程示例使用Python提交普通作业,提供Python读写Kafka作业的样例。 pyflink-sql 本工程示例使用Python提交SQL作业,提供Python提交SQL作业的样例。 HBase hbase-examples hbase-example HBase数据读写操作及全局二级索引的应用开发示例。通过调用HBase接口可实现以下功能: 创建用户表、导入用户数据、增加用户信息、查询用户信息及为用户表创建二级索引等功能。 MRS 3.3.0及之后版本,可实现创建/删除全局二级索引、修改全局二级索引状态、以及基于全局二级索引查询等功能。 hbase-rest-example HBase Rest接口应用开发示例。 使用Rest接口实现查询HBase集群信息、获取表、操作NameSpace、操作表等功能。 hbase-thrift-example 访问HBase ThriftServer应用开发示例。 访问ThriftServer操作表、向表中写数据、从表中读数据。 hbase-zk-example HBase访问ZooKeeper应用开发示例。 在同一个客户端进程内同时访问MRS ZooKeeper和第三方的ZooKeeper,其中HBase客户端访问MRS ZooKeeper,客户应用访问第三方ZooKeeper。 HDFS 开启Kerberos认证集群的样例工程目录“hdfs-example-security”。 未开启Kerberos认证集群的样例工程目录为“hdfs-example-normal”。 HDFS文件操作的Java示例程序。 本工程主要给出了创建HDFS文件夹、写文件、追加文件内容、读文件和删除文件/文件夹等相关接口操作示例。 hdfs-c-example HDFS C语言开发代码样例。 本示例提供了基于C语言的HDFS文件系统连接、文件操作如创建文件、读写文件、追加文件、删除文件等。 HetuEngine 开启Kerberos认证集群的样例工程目录为“hetu-examples/hetu-examples-security”。 未开启Kerberos认证集群的样例工程目录为“hetu-examples/hetu-examples-normal”。 通过不同方式连接HetuEngine的Java、Python示例程序。 通过HSFabric、HSBroker等连接方式,使用用户名和密码连接到HetuEngine,或通过KeyTab文件认证方式连接HetuEngine,组装对应的SQL发送到HetuEngine执行,完成对Hive数据源的增删改查操作。 Hive hive-examples hive-jdbc-example Hive JDBC处理数据Java示例程序。 本工程使用JDBC接口连接Hive,在Hive中执行相关数据操作。使用JDBC接口实现创建表、加载数据、查询数据等功能,还可实现在同一个客户端进程内同时访问 FusionInsight ZooKeeper和第三方的ZooKeeper。 hive-jdbc-example-multizk hcatalog-example Hive HCatalog处理数据Java示例程序。 使用HCatalog接口实现通过Hive命令行方式对MRS Hive元数据进行数据定义和查询操作。 python-examples 使用Python连接Hive执行SQL样例。 可实现使用Python对接Hive并提交数据分析任务。 python3-examples 使用Python3连接Hive执行SQL样例。 可实现使用Python3对接Hive并提交数据分析任务。 IoTDB iotdb-examples iotdb-flink-example 通过Flink访问IoTDB数据的示例程序,包括FlinkIoTDBSink和FlinkIoTDBSource。 FlinkIoTDBSink可实现通过Flink job将时序数据写入到IoTDB中。FlinkIoTDBSource则通过Flink job将时序数据从IoTDB读取出来并且打印。 iotdb-jdbc-example IoTDB JDBC处理数据Java示例程序。 本示例演示了如何使用JDBC接口连接IoTDB,并执行IoTDB SQL语句。 iotdb-kafka-example 通过Kafka访问IoTDB数据的示例程序。 本示例演示了如何先将时序数据发送到Kafka,再使用多线程将数据写入到IoTDB中。 iotdb-session-example IoTDB Session处理数据Java示例程序。 本示例演示了如何使用Session方式连接IoTDB,并执行IoTDB SQL语句。 iotdb-udf-exmaple 该样例程序介绍了如何实现一个简单的IoTDB自定义函数(UDF)。 Kafka kafka-examples Kafka流式数据的处理Java示例程序。 本工程基于Kafka Streams完成单词统计功能,通过读取输入Topic中的消息,统计每条消息中的单词个数,从输出Topic消费数据,然后将统计结果以Key-Value的形式输出。 Manager manager-examples FusionInsight Manager API接口调用示例。 本工程调用Manager API接口实现集群用户的创建、修改及删除等操作。 MapReduce 开启Kerberos认证集群的样例工程目录“mapreduce-example-security”。 未开启Kerberos认证集群的样例工程目录为“mapreduce-example-normal”。 MapReduce任务提交Java示例程序。 本工程提供了一个MapReduce统计数据的应用开发示例,实现数据分析、处理,并输出满足用户需要的数据信息。 另外以MapReduce访问HDFS、HBase、Hive为例,介绍如何编写MapReduce作业访问多个服务组件。帮助用户理解认证、配置加载等关键使用方式。 Oozie 开启Kerberos认证集群的样例工程目录“oozie-examples/ooziesecurity-examples”。 未开启Kerberos认证集群的样例工程目录为“oozie-examples/oozienormal-examples”。 OozieMapReduceExample Oozie提交MapReduce任务示例程序。 本示例演示了如何通过Java API提交MapReduce作业和查询作业状态,对网站的日志文件进行离线分析。 OozieSparkHBaseExample 使用Oozie调度Spark访问HBase的示例程序。 OozieSparkHiveExample 使用Oozie调度Spark访问Hive的示例程序。 Spark 开启Kerberos认证集群的样例工程目录“spark-examples/sparksecurity-examples”。 未开启Kerberos认证集群的样例工程目录为“spark-examples/sparknormal-examples”。 SparkHbasetoCarbonJavaExample Spark同步HBase数据到CarbonData的Java示例程序。 本示例工程中,应用将数据实时写入HBase,用于点查业务。数据每隔一段时间批量同步到CarbonData表中,用于分析型查询业务。 SparkHbasetoHbaseJavaExample Spark从HBase读取数据再写入HBase的Java/Scala/Python示例程序。 本示例工程中,Spark应用程序实现两个HBase表数据的分析汇总。 SparkHbasetoHbasePythonExample SparkHbasetoHbaseScalaExample SparkHivetoHbaseJavaExample Spark从Hive读取数据再写入到HBase的Java/Scala/Python示例程序。 本示例工程中,Spark应用程序实现分析处理Hive表中的数据,并将结果写入HBase表。 SparkHivetoHbasePythonExample SparkHivetoHbaseScalaExample SparkJavaExample Spark Core任务的Java/Python/Scala/R示例程序。 本工程应用程序实现从HDFS上读取文本数据并计算分析。 SparkRExample示例不支持未开启Kerberos认证的集群。 SparkPythonExample SparkScalaExample SparkRExample SparkLauncherJavaExample 使用Spark Launcher提交作业的Java/Scala示例程序。 本工程应用程序通过org.apache.spark.launcher.SparkLauncher类采用Java/Scala命令方式提交Spark应用。 SparkLauncherScalaExample SparkOnHbaseJavaExample Spark on HBase场景的Java/Scala/Python示例程序。 本工程应用程序以数据源的方式去使用HBase,将数据以Avro格式存储在HBase中,并从中读取数据以及对读取的数据进行过滤等操作。 SparkOnHbasePythonExample SparkOnHbaseScalaExample SparkOnHudiJavaExample Spark on Hudi场景的Java/Scala/Python示例程序。 本工程应用程序使用Spark操作Hudi执行插入数据、查询数据、更新数据、增量查询、特定时间点查询、删除数据等操作。 SparkOnHudiPythonExample SparkOnHudiScalaExample SparkOnMultiHbaseScalaExample Spark同时访问两个集群中的HBase的Scala示例程序。 本示例不支持未开启Kerberos认证的集群。 SparkSQLJavaExample Spark SQL任务的Java/Python/Scala示例程序。 本工程应用程序实现从HDFS上读取文本数据并计算分析。 SparkSQLPythonExample SparkSQLScalaExample SparkStreamingKafka010JavaExample Spark Streaming从Kafka接收数据并进行统计分析的Java/Scala示例程序。 本工程应用程序实时累加计算Kafka中的流数据,统计每个单词的记录总数。 SparkStreamingKafka010ScalaExample SparkStreamingtoHbaseJavaExample010 Spark Streaming读取Kafka数据并写入HBase的Java/Scala/Python示例程序。 本工程应用程序每5秒启动一次任务,读取Kafka中的数据并更新到指定的HBase表中。 SparkStreamingtoHbasePythonExample010 SparkStreamingtoHbaseScalaExample010 SparkStructuredStreamingJavaExample 在Spark应用中,通过使用StructuredStreaming调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数。 SparkStructuredStreamingPythonExample SparkStructuredStreamingScalaExample SparkThriftServerJavaExample 通过JDBC访问Spark SQL的Java/Scala示例程序。 本示例中,用户自定义JD BCS erver的客户端,使用JDBC连接来进行表的创建、数据加载、查询和删除。 SparkThriftServerScalaExample StructuredStreamingADScalaExample 使用Structured Streaming,从kafka中读取广告请求数据、广告展示数据、广告点击数据,实时获取广告有效展示统计数据和广告有效点击统计数据,将统计结果写入kafka中。 StructuredStreamingStateScalaExample 在Spark结构流应用中,跨批次统计每个session期间发生了多少次event以及本session的开始和结束timestamp;同时输出本批次被更新状态的session。 SpringBoot(MRS 3.3.0及之后版本支持) clickhouse-examples clickhouse-rest-client-example SpringBoot连接ClickHouse服务应用开发示例。 本示例中,包含了建立服务端连接、创建数据库、创建数据表、插入数据、查询数据等操作示例。 doris-examples doris-rest-client-example Doris数据读写操作的SpringBoot应用开发示例。 提供SpringBoot连接Doris的样例程序。 flink-examples flink-dws-read-example GaussDB (DWS) SpringBoot方式连接Flink服务的应用开发示例。 flink-dws-sink-example hbase-examples SpringBoot连接Phoenix应用开发示例。 提供SpringBoot连接HBase与Phoenix的样例程序。 hive-examples hive-rest-client-example SpringBoot连接Hive应用开发示例。 本工程使用SpringBoot方式连接Hive,在Hive中执行创建表、加载数据、查询数据、删除表等操作。 kafka-examples SpringBoot连接Kafka实现Topic生产消费的应用开发示例。
  • 场景说明 假定某个业务Kafka每30秒就会收到5个用户的消费记录。Hbase的table1表存储用户历史消费的金额信息。 现table1表有10条记录,表示有用户名分别为1-10的用户,用户的历史消费金额初始化都是0元。 基于某些业务要求,开发的Spark应用程序实现如下功能: 实时累加计算用户的消费金额信息:即用户总消费金额=用户的消费金额(kafka数据) + 用户历史消费金额(table1表的值),更新到table1表。
  • 数据规划 创建HBase表,并插入数据。 通过HBase创建名为table1的表,命令如下。 create 'table1', 'cf' 通过HBase执行如下命令,将数据插入table1表中。 put 'table1', '1', 'cf:cid', '0' put 'table1', '2', 'cf:cid', '0' put 'table1', '3', 'cf:cid', '0' put 'table1', '4', 'cf:cid', '0' put 'table1', '5', 'cf:cid', '0' put 'table1', '6', 'cf:cid', '0' put 'table1', '7', 'cf:cid', '0' put 'table1', '8', 'cf:cid', '0' put 'table1', '9', 'cf:cid', '0' put 'table1', '10', 'cf:cid', '0' Spark Streaming样例工程的数据存储在Kafka中。 确保集群安装完成,包括HDFS、Yarn、Spark。 将kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”(普通集群不需配置)。 创建Topic。 {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。 $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic} 启动样例代码的Producer,向Kafka发送数据。 {ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考编包并运行Spark应用章节中导出jar包的操作步骤。 java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient/*:{JAR_PATH} com.huawei.bigdata.spark.examples.streaming.StreamingExampleProducer {BrokerList} {Topic} 如果开启了kerberos认证,需要将客户端的配置文件“spark-defaults.conf”和sparkJDBC服务端中的配置项spark.yarn.security.credentials.hbase.enabled置为true。 {zkQuorum}格式为zkIp:2181。 JAR_PATH为程序jar包所在路径。 brokerlist格式为brokerIp:9092。
共100000条