华为云用户手册

  • 场景说明 该样例以MapReduce访问HDFS、HBase、Hive为例,介绍如何编写MapReduce作业访问多个服务组件。帮助用户理解认证、配置加载等关键使用方式。 该样例逻辑过程如下: 以HDFS文本文件为输入数据: log1.txt:数据输入文件 YuanJing,male,10 GuoYijun,male,5 Map阶段: 获取输入数据的一行并提取姓名信息。 查询HBase一条数据。 查询Hive一条数据。 将HBase查询结果与Hive查询结果进行拼接作为Map输出。 Reduce阶段: 获取Map输出中的最后一条数据。 将数据输出到HBase。 将数据保存到HDFS。
  • 代码样例 如下是写文件的代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsExample类。 /** * 创建文件,写文件 * * @throws java.io.IOException * @throws com.huawei.bigdata.hdfs.examples.ParameterException */ private void write() throws IOException { final String content = "hi, I am bigdata. It is successful if you can see me."; FSDataOutputStream out = null; try { out = fSystem.create(new Path(DEST_PATH + File.separator + FILE_NAME)); out.write(content.getBytes()); out.hsync(); LOG .info("success to write."); } finally { // make sure the stream is closed finally. IOUtils.closeStream(out); } }
  • 常用概念 Colocation 同分布(Colocation)功能是将存在关联关系的数据或可能要进行关联操作的数据存储在相同的存储节点上。HDFS文件同分布的特性是,将那些需进行关联操作的文件存放在相同的数据节点上,在进行关联操作计算时,避免了到别的数据节点上获取数据的动作,大大降低了网络带宽的占用。 Client HDFS Client主要包括五种方式:JAVA API、C API、Shell、HTTP REST API、WEB UI五种方式,可参考HDFS常用API介绍、HDFS Shell命令介绍。 JAVA API 提供HDFS文件系统的应用接口,本开发指南主要介绍如何使用Java API进行HDFS文件系统的应用开发。 C API 提供HDFS文件系统的应用接口,使用C语言开发的用户可参考C接口的描述进行应用开发。 Shell 提供shell命令完成HDFS文件系统的基本操作。 HTTP REST API 提供除Shell、Java API和C API以外的其他接口,可通过此接口监控HDFS状态等信息。 WEB UI 提供Web可视化组件管理界面。
  • 代码样例 如下是删除文件的代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsExample类。 /** * 删除文件 * * @throws java.io.IOException */ private void delete() throws IOException { Path beDeletedPath = new Path(DEST_PATH + File.separator + FILE_NAME); if (fSystem.delete(beDeletedPath, true)) { LOG.info("success to delete the file " + DEST_PATH + File.separator + FILE_NAME); } else { LOG.warn("failed to delete the file " + DEST_PATH + File.separator + FILE_NAME); } }
  • 架构 Flink架构如图2所示。 图2 Flink架构 Flink整个系统包含三个部分: Client Flink Client主要给用户提供向Flink系统提交用户任务(流式作业)的能力。 TaskManager Flink系统的业务执行节点,执行具体的用户任务。TaskManager可以有多个,各个TaskManager都平等。 JobManager Flink系统的管理节点,管理所有的TaskManager,并决策用户任务在哪些Taskmanager执行。JobManager在HA模式下可以有多个,但只有一个主JobManager。 Flink系统提供的关键能力: 低时延 提供ms级时延的处理能力。 Exactly Once 提供异步快照机制,保证所有数据真正只处理一次。 HA JobManager支持主备模式,保证无单点故障。 水平扩展能力 TaskManager支持手动水平扩展。
  • 组件介绍 Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。 Flink最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。 Flink技术栈如图1所示。 图1 Flink技术栈 Flink在当前版本中重点构建如下特性,其他特性继承开源社区,不做增强。 DataStream Checkpoint 窗口 Job Pipeline 配置表
  • 基本概念 DataStream 数据流,是指Flink系统处理的最小数据单元。该数据单元最初由外部系统导入,可以通过Socket、Kafka和文件等形式导入,在Flink系统处理后,在通过Socket、Kafka和文件等输出到外部系统,这是Flink的核心概念。 Data Transformation 数据处理单元,会将一或多个DataStream转换成一个新的DataStream。 具体可以细分如下几类: 一对一的转换:如Map。 一对0、1或多个的转换:如FlatMap。 一对0或1的转换,如Filter。 多对1转换,如Union。 多个聚合的转换,如window、keyby。 CheckPoint CheckPoint是Flink数据处理高可靠、最重要的机制。该机制可以保证应用在运行过程中出现失败时,应用的所有状态能够从某一个检查点恢复,保证数据仅被处理一次(Exactly Once)。 SavePoint Savepoint是指允许用户在持久化存储中保存某个checkpoint,以便用户可以暂停自己的任务进行升级。升级完后将任务状态设置为savepoint存储的状态开始恢复运行,保证数据处理的延续性。
  • 样例工程介绍 MRS 样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下Flink相关样例工程,安全模式路径为“flink-examples/flink-examples-security”,普通模式路径为“flink-examples/flink-examples-normal”: 表2 Flink相关样例工程 样例工程 描述 FlinkCheckpointJavaExample 异步Checkpoint机制程序的应用开发示例。 假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,并做到状态严格一致性,即:当应用出现异常并恢复后,各个算子的状态能够处于统一的状态。 相关业务场景介绍请参见Flink开启Checkpoint样例程序。 FlinkCheckpointScalaExample FlinkHBaseJavaExample 通过Flink API作业读写HBase数据的应用开发示例。 相关业务场景介绍请参见Flink读取HBase表样例程序。 FlinkHudiJavaExample 通过Flink API作业读写Hudi数据的应用开发示例。 相关业务场景介绍请参见Flink读取Hudi表样例程序。 FlinkKafkaJavaExample 向Kafka生产并消费数据程序的应用开发示例。 通过调用flink-connector-kafka模块的接口,生产并消费数据。 相关业务场景介绍请参见Flink Kafka样例程序。 FlinkKafkaScalaExample FlinkPipelineJavaExample Job Pipeline程序的应用开发示例。 相关业务场景介绍请参见Flink Job Pipeline样例程序。 发布者Job自己每秒钟产生10000条数据,然后经由该job的NettySink算子向下游发送。另外两个Job作为订阅者,分别订阅一份数据并打印输出。 FlinkPipelineScalaExample FlinkRESTAPIJavaExample 调用FlinkServer的RestAPI创建租户的应用开发示例。 相关业务场景介绍请参见FlinkServer REST API样例程序。 FlinkStreamJavaExample DataStream程序的应用开发示例。 相关业务场景介绍请参见Flink DataStream样例程序。 假定用户有某个网站周末网民网购停留时间的日志文本,另有一张网民个人信息的csv格式表,可通过Flink应用程序实现例如实时统计总计网购时间超过2个小时的女性网民信息,包含对应的个人详细信息的功能。 FlinkStreamScalaExample FlinkStreamSqlJoinExample Stream SQL Join程序的应用开发示例。 相关业务场景介绍请参见Flink Join样例程序。 假定某个Flink业务1每秒就会收到1条消息记录,消息记录某个用户的基本信息,包括名字、性别、年龄。另有一个Flink业务2会不定时收到1条消息记录,消息记录该用户的名字、职业信息。实现实时的以根据业务2中消息记录的用户名字作为关键字,对两个业务数据进行联合查询的功能。 FlinkStreamSqlJoinScalaExample flink-sql 使用客户端通过jar作业提交SQL作业的应用开发示例。 相关业务场景介绍请参见Flink Jar作业提交SQL样例程序。 pyflink-example 提供Python读写Kafka作业和Python提交SQL作业的样例。 相关业务场景介绍请参见PyFlink样例程序。
  • REST API增强 SQL相关的命令:获取所有SQL语句和执行时间最长的SQL语句 SparkUI命令: curl -k -i --negotiate -u: "https://192.168.195.232:8090/proxy/application_1476947670799_0053/api/v1/applications/application_1476947670799_0053/SQL" 其中192.168.195.232为ResourceManager主节点的业务IP,8090为ResourceManager的端口号,application_1476947670799_0053为在YARN中的应用ID。 可以在命令后的url路径增加相应的参数设置,搜索对应的SQL语句。 例如,查看100条sql语句: curl -k -i --negotiate -u: "https://192.168.195.232:8090/proxy/application_1476947670799_0053/api/v1/applications/application_1476947670799_0053/SQL?limit=100" 查看正在运行的参数: curl -k -i --negotiate -u: "https://192.168.195.232:8090/proxy/application_1476947670799_0053/api/v1/applications/application_1476947670799_0053/SQL?completed=false" JobHistory命令: curl -k -i --negotiate -u: "https://192.168.227.16:4040/api/v1/applications/application_1478570725074_0004/SQL" 其中192.168.227.16为JobHistory节点的业务IP,4040为JobHistory的端口号,application_1478570725074_0004为应用ID。 结果: SparkUI命令和JobHistory命令的查询结果均为: { "longestDurationOfCompletedSQL" : [ { "id" : 0, "status" : "COMPLETED", "description" : "getCallSite at SQLExecution.scala:48", "submissionTime" : "2016/11/08 15:39:00", "duration" : "2 s", "runningJobs" : [ ], "successedJobs" : [ 0 ], "failedJobs" : [ ] } ], "sqls" : [ { "id" : 0, "status" : "COMPLETED", "description" : "getCallSite at SQLExecution.scala:48", "submissionTime" : "2016/11/08 15:39:00", "duration" : "2 s", "runningJobs" : [ ], "successedJobs" : [ 0 ], "failedJobs" : [ ] }] } 结果分析: 通过这个命令,可以查询当前应用的所有SQL语句的信息(即结果中“sqls”的部分),执行时间最长的SQL语句的信息(即结果中“longestDurationOfCompletedSQL”的部分)。每个SQL语句的信息如下表3。 表3 SQL的常用信息 参数 描述 id SQL语句的ID status SQL语句的执行状态,有RUNNING、COMPLETED、FAILED三种 runningJobs SQL语句产生的job中,正在执行的job列表 successedJobs SQL语句产生的job中,执行成功的job列表 failedJobs SQL语句产生的job中,执行失败的job列表 JDBC Server相关的命令:获取连接数,正在执行的SQL数,所有session信息,所有SQL的信息 命令: curl -k -i --negotiate -u: "https://192.168.195.232:8090/proxy/application_1476947670799_0053/api/v1/applications/application_1476947670799_0053/sqlserver" 其中192.168.195.232为ResourceManager主节点的业务IP,8090为ResourceManager的端口号,application_1476947670799_0053为在YARN中的应用ID。 结果: { "sessionNum" : 1, "runningSqlNum" : 0, "sessions" : [ { "user" : "spark", "ip" : "192.168.169.84", "sessionId" : "9dfec575-48b4-4187-876a-71711d3d7a97", "startTime" : "2016/10/29 15:21:10", "finishTime" : "", "duration" : "1 minute 50 seconds", "totalExecute" : 1 } ], "sqls" : [ { "user" : "spark", "jobId" : [ ], "groupId" : "e49ff81a-230f-4892-a209-a48abea2d969", "startTime" : "2016/10/29 15:21:13", "finishTime" : "2016/10/29 15:21:14", "duration" : "555 ms", "statement" : "show tables", "state" : "FINISHED", "detail" : "== Parsed Logical Plan ==\nShowTablesCommand None\n\n== Analyzed Logical Plan ==\ntableName: string, isTemporary: boolean\nShowTablesCommand None\n\n== Cached Logical Plan ==\nShowTablesCommand None\n\n== Optimized Logical Plan ==\nShowTablesCommand None\n\n== Physical Plan ==\nExecutedCommand ShowTablesCommand None\n\nCode Generation: true" } ] } 结果分析: 通过这个命令,可以查询当前JDBC应用的session连接数,正在执行的SQL数,所有的session和SQL信息。每个session的信息如下表4,每个SQL的信息如下表5。 表4 session常用信息 参数 描述 user 该session连接的用户 ip session所在的节点IP sessionId session的ID startTime session开始连接的时间 finishTime session结束连接的时间 duration session连接时长 totalExecute 在该session上执行的SQL数 表5 sql常用信息 参数 描述 user SQL执行的用户 jobId SQL语句包含的job id列表 groupId SQL所在的group id startTime SQL开始时间 finishTime SQL结束时间 duration SQL执行时长 statement 对应的语句 detail 对应的逻辑计划,物理计划 JDBC api增强通过beeline里面获取的executionID 取消当前正在执行的SQL 命令: curl -k -i --negotiate -X PUT -u: "https://192.168.195.232:8090/proxy/application_1477722033672_0008/api/v1/applications/application_1477722033672_0008/cancel/execution?executionId=8" 结果: 取消executionId 执行序号为8的job任务。 补充说明: spark-beeline里面执行SQL语句,如果该SQL语句产生spark任务,该SQL的executionId将会被打印在beeline里面,这个时候如果想取消这条sql的执行,可以用上述命令。 Streaming相关的命令:获取平均输入频率,平均调度时延,平均执行时长,总时延平均值 命令: curl -k -i --negotiate -u: "https://192.168.195.232:8090/proxy/application_1477722033672_0008/api/v1/applications/application_1477722033672_0008/streaming/statistics" 其中192.168.195.232为ResourceManager主节点的业务IP,8090为ResourceManager的端口号,application_1477722033672_0008为在YARN中的应用ID。 结果: { "startTime" : "2018-12-25T08:58:10.836GMT", "batchDuration" : 1000, "numReceivers" : 1, "numActiveReceivers" : 1, "numInactiveReceivers" : 0, "numTotalCompletedBatches" : 373, "numRetainedCompletedBatches" : 373, "numActiveBatches" : 0, "numProcessedRecords" : 1, "numReceivedRecords" : 1, "avgInputRate" : 0.002680965147453083, "avgSchedulingDelay" : 14, "avgProcessingTime" : 47, "avgTotalDelay" : 62 } 结果分析: 通过这个命令,可以查询当前Streaming应用的平均输入频率(events/sec),平均调度时延(ms),平均执行时长(ms),总时延平均值(ms)。
  • 功能简介 Spark的REST API以JSON格式展现Web UI的一些指标,提供用户一种更简单的方法去创建新的展示和监控的工具,并且支持查询正在运行的app和已经结束的app的相关信息。开源的Spark REST接口支持对Jobs、Stages、Storage、Environment和Executors的信息进行查询, FusionInsight 版本中添加了查询SQL、JDBC Server和Streaming的信息的REST接口。开源REST接口完整和详细的描述请参考官网上的文档以了解其使用方法:https://spark.apache.org/docs/3.1.1/monitoring.html#rest-api。
  • 代码样例 如下是写文件的代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsExample类。 /** * 创建文件,写文件 * * @throws java.io.IOException * @throws com.huawei.bigdata.hdfs.examples.ParameterException */ private void write() throws IOException { final String content = "hi, I am bigdata. It is successful if you can see me."; FSDataOutputStream out = null; try { out = fSystem.create(new Path(DEST_PATH + File.separator + FILE_NAME)); out.write(content.getBytes()); out.hsync(); LOG.info("success to write."); } finally { // make sure the stream is closed finally. IOUtils.closeStream(out); } }
  • 代码样例 如下是代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsExample类。 /** * 追加文件内容 * * @throws java.io.IOException */ private void append() throws IOException { final String content = "I append this content."; FSDataOutputStream out = null; try { out = fSystem.append(new Path(DEST_PATH + File.separator + FILE_NAME)); out.write(content.getBytes()); out.hsync(); LOG.info("success to append."); } finally { // make sure the stream is closed finally. IOUtils.closeStream(out); } }
  • 准备开发环境 在进行应用开发时,要准备的开发和运行环境如表1所示。 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows 7以上版本。 运行环境:Windows系统或Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装和配置IntelliJ IDEA 开发环境的基本配置,建议使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 不同的IntelliJ IDEA不要使用相同的workspace和相同路径下的示例工程 安装Maven 开发环境基本配置。用于项目管理,贯穿软件开发生命周期。 安装JDK 开发和运行环境的基本配置,版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端:Oracle JDK:支持1.8版本;IBM JDK:支持1.8.5.11版本。 TaiShan客户端:OpenJDK:支持1.8.0_272版本。 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 准备开发用户 参考获取MRS应用开发样例工程进行操作,准备用于应用开发的集群用户并授予相应权限。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-Zip 16.04版本。
  • 代码样例 /** * read file * @throws java.io.IOException */ private void read() throws IOException { AlluxioURI path = new AlluxioURI(testFilePath); FileInStream in = null; try{ in = fSystem.openFile(path); byte[] buffer = new byte[1024]; int len; String content = ""; while((len = in.read(buffer)) != -1){ String bufferStr = new String(buffer,0, len); content += bufferStr; } System.out.println(content); } catch (Exception e){ System.out.println("Failed to read file. Exception:" + e); } finally { close(in); } }
  • 架构 Flink架构如图2所示。 图2 Flink架构 Flink整个系统包含三个部分: Client Flink Client主要给用户提供向Flink系统提交用户任务(流式作业)的能力。 TaskManager Flink系统的业务执行节点,执行具体的用户任务。TaskManager可以有多个,各个TaskManager都平等。 JobManager Flink系统的管理节点,管理所有的TaskManager,并决策用户任务在哪些Taskmanager执行。JobManager在HA模式下可以有多个,但只有一个主JobManager。 Flink系统提供的关键能力: 低时延 提供ms级时延的处理能力。 Exactly Once 提供异步快照机制,保证所有数据真正只处理一次。 HA JobManager支持主备模式,保证无单点故障。 水平扩展能力 TaskManager支持手动水平扩展。
  • Flink开发接口简介 Flink DataStream API提供Scala和Java两种语言的开发方式,如表1所示。 表1 Flink DataStream API接口 功能 说明 Scala API 提供Scala语言的API,提供过滤、join、窗口、聚合等数据处理能力。由于Scala语言的简洁易懂,推荐用户使用Scala接口进行程序开发。 Java API 提供Java语言的API,提供过滤、join、窗口、聚合等数据处理能力。
  • 样例工程介绍 MRS样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下Flink相关样例工程,安全模式路径为“flink-examples/flink-examples-security”,普通模式路径为“flink-examples/flink-examples-normal”: 表2 Flink相关样例工程 样例工程 描述 FlinkCheckpointJavaExample 异步Checkpoint机制程序的应用开发示例。 假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,并做到状态严格一致性,即:当应用出现异常并恢复后,各个算子的状态能够处于统一的状态。 相关业务场景介绍请参见Flink开启Checkpoint样例程序。 FlinkCheckpointScalaExample FlinkHBaseJavaExample 通过Flink API作业读写HBase数据的应用开发示例。 相关业务场景介绍请参见Flink读取HBase表样例程序。 FlinkHudiJavaExample 通过Flink API作业读写Hudi数据的应用开发示例。 相关业务场景介绍请参见Flink读取Hudi表样例程序。 FlinkKafkaJavaExample 向Kafka生产并消费数据程序的应用开发示例。 通过调用flink-connector-kafka模块的接口,生产并消费数据。 相关业务场景介绍请参见Flink Kafka样例程序。 FlinkKafkaScalaExample FlinkPipelineJavaExample Job Pipeline程序的应用开发示例。 相关业务场景介绍请参见Flink Job Pipeline样例程序。 发布者Job自己每秒钟产生10000条数据,然后经由该job的NettySink算子向下游发送。另外两个Job作为订阅者,分别订阅一份数据并打印输出。 FlinkPipelineScalaExample FlinkRESTAPIJavaExample 调用FlinkServer的RestAPI创建租户的应用开发示例。 相关业务场景介绍请参见FlinkServer REST API样例程序。 FlinkStreamJavaExample DataStream程序的应用开发示例。 相关业务场景介绍请参见Flink DataStream样例程序。 假定用户有某个网站周末网民网购停留时间的日志文本,另有一张网民个人信息的csv格式表,可通过Flink应用程序实现例如实时统计总计网购时间超过2个小时的女性网民信息,包含对应的个人详细信息的功能。 FlinkStreamScalaExample FlinkStreamSqlJoinExample Stream SQL Join程序的应用开发示例。 相关业务场景介绍请参见Flink Join样例程序。 假定某个Flink业务1每秒就会收到1条消息记录,消息记录某个用户的基本信息,包括名字、性别、年龄。另有一个Flink业务2会不定时收到1条消息记录,消息记录该用户的名字、职业信息。实现实时的以根据业务2中消息记录的用户名字作为关键字,对两个业务数据进行联合查询的功能。 FlinkStreamSqlJoinScalaExample flink-sql 使用客户端通过jar作业提交SQL作业的应用开发示例。 相关业务场景介绍请参见Flink Jar作业提交SQL样例程序。 pyflink-example 提供Python读写Kafka作业和Python提交SQL作业的样例。 相关业务场景介绍请参见PyFlink样例程序。
  • 基本概念 DataStream 数据流,是指Flink系统处理的最小数据单元。该数据单元最初由外部系统导入,可以通过Socket、Kafka和文件等形式导入,在Flink系统处理后,在通过Socket、Kafka和文件等输出到外部系统,这是Flink的核心概念。 Data Transformation 数据处理单元,会将一或多个DataStream转换成一个新的DataStream。 具体可以细分如下几类: 一对一的转换:如Map。 一对0、1或多个的转换:如FlatMap。 一对0或1的转换,如Filter。 多对1转换,如Union。 多个聚合的转换,如window、keyby。 CheckPoint CheckPoint是Flink数据处理高可靠、最重要的机制。该机制可以保证应用在运行过程中出现失败时,应用的所有状态能够从某一个检查点恢复,保证数据仅被处理一次(Exactly Once)。 SavePoint Savepoint是指允许用户在持久化存储中保存某个checkpoint,以便用户可以暂停自己的任务进行升级。升级完后将任务状态设置为savepoint存储的状态开始恢复运行,保证数据处理的延续性。
  • 简介 Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。 Flink最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。 Flink技术栈如图1所示。 图1 Flink技术栈 Flink在当前版本中重点构建如下特性,其他特性继承开源社区,不做增强。 DataStream Checkpoint 窗口 Job Pipeline 配置表
  • 功能介绍 本小节介绍了如何使用HQL创建内部表、外部表的基本操作。创建表主要有以下三种方式。 自定义表结构,以关键字EXTERNAL区分创建内部表和外部表。 内部表,如果对数据的处理都由Hive完成,则应该使用内部表。在删除内部表时,元数据和数据一起被删除。 外部表,如果数据要被多种工具(如Pig等)共同处理,则应该使用外部表,可避免对该数据的误操作。删除外部表时,只删除掉元数据。 根据已有表创建新表,使用CREATE LIKE句式,完全复制原有的表结构,包括表的存储格式。 根据查询结果创建新表,使用CREATE AS SELECT句式。 这种方式比较灵活,可以在复制原表表结构的同时指定要复制哪些字段,不包括表的存储格式。
  • 简介 ThriftServer是Hive中的HiveServer2的另外一个实现,它底层使用了Spark SQL来处理SQL语句,从而比Hive拥有更高的性能。 ThriftServer是一个JDBC接口,用户可以通过JDBC连接ThriftServer来访问SparkSQL的数据。ThriftServer在启动的时候,会启动一个SparkSQL的应用程序,而通过JDBC连接进来的客户端共同分享这个sparkSQL应用程序的资源,也就是说不同的用户之间可以共享数据。ThriftServer启动时还会开启一个侦听器,等待JDBC客户端的连接和提交查询。所以,在配置ThriftServer的时候,至少要配置ThriftServer的主机名和端口,如果要使用Hive数据的话,还要提供Hive Metastore的URIs。 ThriftServer默认在安装节点上的10000端口起一个JDBC服务,可以通过Beeline或者JDBC客户端代码来连接它,从而执行SQL命令。 如果您需要了解ThriftServer的其他信息,请参见Spark官网:http://spark.apache.org/docs/1.5.1/sql-programming-guide.html#distributed-sql-engine。
  • 创建ClickHouse数据库 通过on cluster语句在集群中创建表1中以databaseName参数值为数据库名的数据库。 private void createDatabase(String databaseName, String clusterName) throws Exception { String createDbSql = "create database if not exists " + databaseName + " on cluster " + clusterName; util.exeSql(createDbSql); } 父主题: 开发ClickHouse应用
  • Flink应用开发常用概念 DataStream 数据流,是指Flink系统处理的最小数据单元。该数据单元最初由外部系统导入,可以通过socket、Kafka和文件等形式导入,在Flink系统处理后,在通过Socket、Kafka和文件等输出到外部系统,这是Flink的核心概念。 Data Transformation 数据处理单元,会将一或多个DataStream转换成一个新的DataStream。 具体可以细分如下几类: 一对一的转换:如Map。 一对0、1或多个的转换:如FlatMap。 一对0或1的转换,如Filter。 多对1转换,如Union。 多个聚合的转换,如window、keyby。 Topology 一个Topology代表用户的一个执行任务。一个Topology由输入(如kafka soruce)、输出(如kafka sink)和多个Data Transformation组成。 CheckPoint CheckPoint是Flink数据处理高可靠、最重要的机制。该机制可以保证应用在运行过程中出现失败时,应用的所有状态能够从某一个检查点恢复,保证数据仅被处理一次(Exactly Once)。 SavePoint Savepoint是指允许用户在持久化存储中保存某个checkpoint,以便用户可以暂停自己的任务进行升级。升级完后将任务状态设置为savepoint存储的状态开始恢复运行,保证数据处理的延续性。 父主题: Flink应用开发概述
  • 功能简介 本小节介绍了如何使用Impala SQL建内部表、外部表的基本操作。创建表主要有以下三种方式。 自定义表结构,以关键字EXTERNAL区分创建内部表和外部表。 内部表,如果对数据的处理都由Impala完成,则应该使用内部表。在删除内部表时,元数据和数据一起被删除。 外部表,如果数据要被多种工具(如Pig等)共同处理,则应该使用外部表,可避免对该数据的误操作。删除外部表时,只删除掉元数据。 根据已有表创建新表,使用CREATE LIKE句式,完全复制原有的表结构,包括表的存储格式。 根据查询结果创建新表,使用CREATE AS SELECT句式。 这种方式比较灵活,可以在复制原表表结构的同时指定要复制哪些字段,不包括表的存储格式。
  • 代码样例 如下是代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples的HdfsExample类。 在Linux客户端运行应用和在Windows环境下运行应用的初始化代码相同,代码样例如下所示。 // 完成初始化和认证 confLoad(); authentication(); // 创建一个用例 HdfsExample hdfs_examples = new HdfsExample("/user/hdfs-examples", "test.txt"); /** * * 如果程序运行在Linux上,则需要core-site.xml、hdfs-site.xml的路径修改为在Linux下客户端文件的绝对路径 * * */ private static void confLoad() throws IOException { conf = new Configuration(); // conf file conf.addResource(new Path(PATH_TO_HDFS_SITE_XML)); conf.addResource(new Path(PATH_TO_CORE_SITE_XML)); // conf.addResource(new Path(PATH_TO_SMALL_SITE_XML)); } /** *安全认证 * */ private static void authentication() throws IOException { // security mode if ("kerberos".equalsIgnoreCase(conf.get("hadoop.security.authentication"))) { System.setProperty("java.security.krb5.conf", PATH_TO_KRB5_CONF); LoginUtil.login(PRNCIPAL_NAME, PATH_TO_KEYTAB, PATH_TO_KRB5_CONF, conf); } } /** *创建用例 */ public HdfsExample(String path, String fileName) throws IOException { this.DEST_PATH = path; this.FILE_NAME = fileName; instanceBuild(); } private void instanceBuild() throws IOException { fSystem = FileSystem.get(conf); }
  • 简介 JD BCS erver是Hive中的HiveServer2的另外一个实现,它底层使用了Spark SQL来处理SQL语句,从而比Hive拥有更高的性能。 JDB CS erver是一个JDBC接口,用户可以通过JDBC连接JDBCServer来访问SparkSQL的数据。JDBCServer在启动的时候,会启动一个sparkSQL的应用程序,而通过JDBC连接进来的客户端共同分享这个sparkSQL应用程序的资源,也就是说不同的用户之间可以共享数据。JDBCServer启动时还会开启一个侦听器,等待JDBC客户端的连接和提交查询。所以,在配置JDBCServer的时候,至少要配置JDBCServer的主机名和端口,如果要使用hive数据的话,还要提供hive metastore的uris。 JDBCServer默认在安装节点上的22550端口起一个JDBC服务(通过参数hive.server2.thrift.port配置),可以通过Beeline或者JDBC客户端代码来连接它,从而执行SQL命令。 如果您需要了解JDBCServer的其他信息,请参见Spark官网:http://spark.apache.org/docs/3.1.1/sql-programming-guide.html#distributed-sql-engine。
  • 增强特性 对比开源社区,华为还提供了两个增强特性,JDBCServer HA方案和设置JDBCServer连接的超时时间。 JDBCServer的HA方案,多个JDBCServer主节点同时提供服务,当其中一个节点发生故障时,新的客户端连接会分配到其他主节点上,从而保障无间断为集群提供服务。Beeline和JDBC客户端代码两种连接方式的操作相同。 设置客户端与JDBCServer连接的超时时间。 Beeline 在网络拥塞的情况下,这个特性可以避免beeline由于无限等待服务端的返回而挂起。使用方式如下: 启动beeline时,在后面追加“--socketTimeOut=n”,其中“n”表示等待服务返回的超时时长,单位为秒,默认为“0”(表示永不超时)。建议根据业务场景,设置为业务所能容忍的最大等待时长。 JDBC客户端代码 在网络拥塞的情况下,这个特性可以避免客户端由于无限等待服务端的返回而挂起。使用方式如下: 在执行“DriverManager.getConnection”方法获取JDBC连接前,添加“DriverManager.setLoginTimeout(n)”方法来设置超时时长,其中n表示等待服务返回的超时时长,单位为秒,类型为Int,默认为“0”(表示永不超时)。建议根据业务场景,设置为业务所能容忍的最大等待时长。
  • 安全认证代码 目前样例代码统一调用LoginUtil类进行安全认证。 在HDFS样例工程代码中,不同的样例工程,使用的认证代码不同,包括基本安全认证和带ZooKeeper认证。 基本安全认证: com.huawei.bigdata.hdfs.examples包的HdfsExample类样例程序不需要访问HBase或ZooKeeper,所以使用基本的安全认证代码即可。示例代码如下: ... private static final String PATH_TO_HDFS_SITE_XML = HdfsExample.class.getClassLoader().getResource("hdfs-site.xml").getPath(); private static final String PATH_TO_CORE_SITE_XML = HdfsExample.class.getClassLoader().getResource("core-site.xml").getPath(); private static final String PRNCIPAL_NAME = "hdfsDeveloper"; private static final String PATH_TO_KEYTAB = HdfsExample.class.getClassLoader().getResource("user.keytab").getPath(); private static final String PATH_TO_KRB5_CONF = HdfsExample.class.getClassLoader().getResource("krb5.conf").getPath(); private static Configuration conf = null; } ... private static void authentication() throws IOException { // security mode if ("kerberos".equalsIgnoreCase(conf.get("hadoop.security.authentication"))) { System.setProperty("java.security.krb5.conf", PATH_TO_KRB5_CONF); LoginUtil.login(PRNCIPAL_NAME, PATH_TO_KEYTAB, PATH_TO_KRB5_CONF, conf); } } 带ZooKeeper认证: com.huawei.bigdata.hdfs.examples包的“ColocationExample”类样例程序不仅需要基础安全认证,还需要添加ZooKeeper服务端Principal才能完成安全认证。示例代码如下: ... private static final String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal"; private static final String PRINCIPAL = "username.client.kerberos.principal"; private static final String KEYTAB = "username.client.keytab.file"; private static final String PRNCIPAL_NAME = "hdfsDeveloper"; private static final String LOGIN_CONTEXT_NAME = "Client"; private static final String PATH_TO_KEYTAB = System.getProperty("user.dir") + File.separator + "conf" + File.separator + "user.keytab"; private static final String PATH_TO_KRB5_CONF = ColocationExample.class.getClassLoader().getResource("krb5.conf") .getPath(); private static String zookeeperDefaultServerPrincipal = null; private static Configuration conf = new Configuration(); private static DFSColocationAdmin dfsAdmin; private static DFSColocationClient dfs; private static void init() throws IOException { LoginUtil.login(PRNCIPAL_NAME, PATH_TO_KEYTAB, PATH_TO_KRB5_CONF, conf); LoginUtil.setJaasConf(LOGIN_CONTEXT_NAME, PRNCIPAL_NAME, PATH_TO_KEYTAB); zookeeperDefaultServerPrincipal = "zookeeper/hadoop." + KerberosUtil.getKrb5DomainRealm().toLowerCase(); LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, zookeeperDefaultServerPrincipal); } ...
  • 场景说明 访问安全集群环境中的服务,需要先通过Kerberos安全认证。所以HDFS应用程序中需要写入安全认证代码,确保HDFS程序能够正常运行。 安全认证有两种方式: 命令行认证: 提交HDFS应用程序运行前,在HDFS客户端执行如下命令进行认证。 kinit 组件业务用户 该方式仅适用于Linux操作系统,且安装了HDFS的客户端。 代码认证: 通过获取客户端的principal和keytab文件进行认证。 注意修改代码中的PRINCIPAL_NAME变量为实际使用的值。 private static final String PRNCIPAL_NAME = "hdfsDeveloper";
  • 数据规划 创建HDFS数据文件。 在Linux系统上新建文本文件,将log1.txt中的内容复制保存到data.txt。 在HDFS上创建一个文件夹,“/tmp/examples/multi-components/mapreduce/input/”,并上传data.txt到此目录,命令如下: 在Linux系统HDFS客户端使用命令hdfs dfs -mkdir -p /tmp/examples/multi-components/mapreduce/input/ 在Linux系统HDFS客户端使用命令hdfs dfs -put data.txt /tmp/examples/multi-components/mapreduce/input/ 创建HBase表并插入数据。 在Linux系统HBase客户端执行source bigdata_env,并使用命令hbase shell。 在HBase shell交互窗口创建数据表table1,该表有一个列族cf,使用命令create 'table1', 'cf'。 插入一条rowkey为1、列名为cid、数据值为123的数据,使用命令put 'table1', '1', 'cf:cid', '123'。 执行命令quit退出。 创建Hive表并载入数据。 在Linux系统Hive客户端使用命令beeline。 在Hive beeline交互窗口创建数据表person,该表有3个字段:name/gender/stayTime,使用命令CREATE TABLE person(name STRING, gender STRING, stayTime INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' stored as textfile;。 在Hive beeline交互窗口加载数据文件,LOAD DATA INPATH '/tmp/examples/multi-components/mapreduce/input/' OVERWRITE INTO TABLE person;。 执行命令!q退出。 由于Hive加载数据将HDFS对应数据目录清空,所以需再次执行1。
共100000条