华为云用户手册

  • Structured Streaming常用概念 Input Source 输入数据源,数据源需要支持根据offset重放数据,不同的数据源有不同的容错性。 Sink 数据输出,Sink要支持幂等性写入操作,不同的sink有不同的容错性。 outputMode 结果输出模式,当前支持3种输出模: Complete Mode:整个更新的结果集都会写入外部存储。整张表的写入操作将由外部存储系统的连接器完成。 Append Mode:当时间间隔触发时,只有在Result Table中新增加的数据行会被写入外部存储。这种方式只适用于结果集中已经存在的内容不希望发生改变的情况下,如果已经存在的数据会被更新,不适合适用此种方式。 Update Mode:当时间间隔触发时,只有在Result Table中被更新的数据才会被写入外部存储系统。注意,和Complete Mode方式的不同之处是不更新的结果集不会写入外部存储。 Trigger 输出触发器,当前支持以下几种trigger: 默认:以微批模式执行,每个批次完成后自动执行下个批次。 固定间隔:固定时间间隔执行。 一次执行:只执行一次query,完成后退出。 连续模式:实验特性,可实现低至1ms延迟的流处理(推荐100ms)。 Structured Streaming支持微批模式和连续模式。微批模式不能保证对数据的低延迟处理,但是在相同时间下有更大的吞吐量;连续模式适合毫秒级的数据处理延迟,当前暂时还属于实验特性。 在当前版本中,若需要使用流流Join功能,则output模式只能选择append模式。 图6 微批模式运行过程简图 图7 连续模式运行过程简图
  • Spark Streaming常用概念 Dstream DStream(又称Discretized Stream)是Spark Streaming提供的抽象概念。 DStream表示一个连续的数据流,是从数据源获取或者通过输入流转换生成的数据流。从本质上说,一个DStream表示一系列连续的RDD。RDD是一个只读的、可分区的分布式数据集。 DStream中的每个RDD包含了一个区间的数据。如图4所示。 图4 DStream与RDD关系 应用到DStream上的所有算子会被转译成下层RDD的算子操作,如图5所示。这些下层的RDD转换会通过Spark引擎进行计算。DStream算子隐藏大部分的操作细节,并且提供了方便的High-level API给开发者使用。 图5 DStream算子转译
  • Spark简介 Spark是分布式批处理框架,提供分析挖掘与迭代式内存计算能力,支持多种语言(Scala/Java/Python)的应用开发。 适用以下场景: 数据处理(Data Processing):可以用来快速处理数据,兼具容错性和可扩展性。 迭代计算(Iterative Computation):支持迭代计算,有效应对多步的数据处理逻辑。 数据挖掘(Data Mining):在海量数据基础上进行复杂的挖掘分析,可支持各种数据挖掘和机器学习算法。 流式处理(Streaming Processing):支持秒级延迟的流式处理,可支持多种外部数据源。 查询分析(Query Analysis):支持标准SQL查询分析,同时提供DSL(DataFrame), 并支持多种外部输入。 本文档重点介绍Spark、Spark SQL和Spark Streaming应用开发指导。
  • Spark开发接口简介 Spark支持使用Scala、Java和Python语言进行程序开发,由于Spark本身是由Scala语言开发出来的,且Scala语言具有简洁易懂的特性,推荐用户使用Scala语言进行Spark应用程序开发。 按不同的语言分类,Spark的API接口如表1所示。 表1 Spark API接口 接口类型 说明 Scala API 提供Scala语言的API,Spark Core、SparkSQL和Spark Streaming模块的常用接口请参见Spark Scala API接口介绍。 Java API 提供Java语言的API,Spark Core、SparkSQL和Spark Streaming模块的常用接口请参见Spark Java API接口介绍。 Python API 提供Python语言的API,Spark Core、SparkSQL和Spark Streaming模块的常用接口请参见Spark Python API接口介绍。 按不同的模块分,Spark Core和Spark Streaming使用上表中的API接口进行程序开发。而SparkSQL模块,支持CLI或者JD BCS erver两种方式访问。其中JDB CS erver的连接方式也有Beeline和JDBC客户端代码两种。详情请参见Spark JDBCServer接口介绍。 spark-sql脚本、spark-shell脚本和spark-submit脚本(运行的应用中带SQL操作),不支持使用proxy user参数去提交任务。另外,由于本文档中涉及的样例程序已添加安全认证,建议不要使用proxy user参数去提交任务。
  • Spark SQL常用概念 DataSet DataSet是一个由特定域的对象组成的强类型集合,可通过功能或关系操作并行转换其中的对象。 每个Dataset还有一个非类型视图,即由多个列组成的DataSet,称为DataFrame。 DataFrame是一个由多个列组成的结构化的分布式数据集合,等同于关系数据库中的一张表,或者是R/Python中的data frame。DataFrame是Spark SQL中的最基本的概念,可以通过多种方式创建,例如结构化的数据集、Hive表、外部数据库或者是RDD。
  • 常用概念 在Doris中,数据都以表(Table)的形式进行逻辑上的描述。 Row&Column 一张表包括行(Row)和列(Column): Row:即用户的一行数据。 Column: 用于描述一行数据中不同的字段。 Column可以分为两大类:Key和Value。从业务角度看,Key和Value可以分别对应维度列和指标列。从聚合模型的角度来说,Key列相同的行,会聚合成一行。其中Value列的聚合方式由用户在建表时指定。 Tablet&Partition 在Doris的存储引擎中,用户数据被水平划分为若干个数据分片(Tablet,也称作数据分桶)。每个Tablet包含若干数据行。各个Tablet之间的数据没有交集,并且在物理上是独立存储的。 多个Tablet在逻辑上归属于不同的分区(Partition)。一个Tablet只属于一个Partition,而一个Partition包含若干个Tablet。因为Tablet在物理上是独立存储的,所以可以视为Partition在物理上也是独立。Tablet是数据移动、复制等操作的最小物理存储单元。 若干个Partition组成一个Table。Partition可以视为是逻辑上最小的管理单元。数据的导入与删除,仅能针对一个Partition进行。 数据模型 Doris的数据模型主要分为三类:Aggregate、Unique、Duplicate。 Aggregate模型 导入数据时,对于Key列相同的行会聚合成一行,而Value列会按照设置的AggregationType进行聚合。 AggregationType目前有以下四种聚合方式: SUM:求和,多行的Value进行累加。 REPLACE:替代,下一批数据中的Value会替换之前导入过的行中的Value。 MAX:保留最大值。 MIN:保留最小值。 Unique模型 在某些多维分析场景下,用户更关注的是如何保证Key的唯一性,即如何获得Primary Key唯一性约束。因此,引入了Unique数据模型。 读时合并 Unique模型的读时合并实现完全可以用Aggregate模型中的REPLACE方式替代。其内部的实现方式和数据存储方式也完全一样。 写时合并 Unique模型的写时合并实现,查询性能更接近于Duplicate模型,在有主键约束需求的场景上相比Aggregate模型有较大的查询性能优势,尤其是在聚合查询以及需要用索引过滤大量数据的查询中。 在开启了写时合并选项的Unique表上,数据在导入阶段就会去将被覆盖和被更新的数据进行标记删除,同时将新的数据写入新的文件。在查询的时候,所有被标记删除的数据都会在文件级别被过滤掉,读取出来的数据就都是最新的数据,消除掉了读时合并中的数据聚合过程,并且能够在很多情况下支持多种谓词的下推。因此在许多场景都能带来比较大的性能提升,尤其是在有聚合查询的情况下。 Duplicate模型 在某些多维分析场景下,数据既没有主键,也没有聚合需求。可以引入Duplicate数据模型来满足这类需求。 这种数据模型区别于Aggregate和Unique模型。数据完全按照导入文件中的数据进行存储,不会有任何聚合。即使两行数据完全相同,也都会保留。 而在建表语句中指定的DUPLICATE KEY,只是用来指明底层数据按照那些列进行排序。 数据模型的选择建议 因为数据模型在建表时就已经确定,且无法修改。所以,选择一个合适的数据模型非常重要。 Aggregate模型可以通过预聚合,极大地降低聚合查询时所需扫描的数据量和查询的计算量,非常适合有固定模式的报表类查询场景。但是该模型对count(*) 查询不友好。同时因为固定了Value列上的聚合方式,在进行其他类型的聚合查询时,需要考虑语意正确性。 Unique模型针对需要唯一主键约束的场景,可以保证主键唯一性约束。但是无法利用ROLLUP等预聚合带来的查询优势。 对于聚合查询有较高性能需求的用户,推荐使用自1.2版本加入的写时合并实现。 Unique模型仅支持整行更新,如果用户既需要唯一主键约束,又需要更新部分列(例如将多张源表导入到一张Doris表的情形),则可以考虑使用Aggregate模型,同时将非主键列的聚合类型设置为REPLACE_IF_NOT_NULL。 Duplicate适合任意维度的Ad-hoc查询。虽然同样无法利用预聚合的特性,但是不受聚合模型的约束,可以发挥列存模型的优势(只读取相关列,而不需要读取所有Key列)。
  • Doris样例工程介绍 MRS 样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下Doris相关样例工程: 表1 Doris相关样例工程 样例工程位置 描述 doris-examples/doris-example Doris数据读写操作的应用开发示例。 通过调用Doris接口可实现创建用户表、向表中插入数据、查询表数据、删除表等功能,相关业务场景介绍请参见Doris JDBC接口调用样例程序。 springboot/doris-examples Doris数据读写操作的SpringBoot应用开发示例。 提供Doris对接SpringBoot的样例,样例介绍请参见配置并导入SpringBoot样例工程。
  • Spark SQL常用接口 Spark SQL中重要的类有: SQLContext:是Spark SQL功能和DataFrame的主入口。 DataFrame:是一个以命名列方式组织的分布式数据集 DataFrameReader:从外部存储系统加载DataFrame的接口。 DataFrameStatFunctions:实现DataFrame的统计功能。 UserDefinedFunction:用户自定义的函数。 常见的Actions方法有: 表6 Spark SQL方法介绍 方法 说明 Row[] collect() 返回一个数组,包含DataFrame的所有列。 long count() 返回DataFrame的行数。 DataFrame describe(java.lang.String... cols) 计算统计信息,包含计数,平均值,标准差,最小值和最大值。 Row first() 返回第一行。 Row[] head(int n) 返回前n行。 void show() 用表格形式显示DataFrame的前20行。 Row[] take(int n) 返回DataFrame中的前n行。 表7 基本的DataFrame Functions介绍 方法 说明 void explain(boolean extended) 打印出SQL语句的逻辑计划和物理计划。 void printSchema() 打印schema信息到控制台。 registerTempTable 将DataFrame注册为一张临时表,其周期和SQLContext绑定在一起。 DataFrame toDF(java.lang.String... colNames) 返回一个列重命名的DataFrame。 DataFrame sort(java.lang.String sortCol,java.lang.String... sortCols) 根据不同的列,按照升序或者降序排序。 GroupedData rollup(Column... cols) 对当前的DataFrame特定列进行多维度的回滚操作。
  • 创建Doris连接 以下代码片段在“JDBCExample”类的“createConnection”方法中。 USER和PASSWD为在创建连接时用于进行安全认证的用户名和密码。 Class.forName(JDBC_DRIVER); String dbUrl = String.format(DB_URL_PATTERN, HOST, PORT); connection = DriverManager.getConnection(dbUrl, USER, PASSWD); 父主题: Doris JDBC接口调用样例程序
  • 场景说明 在安全集群环境下,各个组件之间的相互通信不能够简单的互通,而需要在通信之前进行相互认证,以确保通信的安全性。 用户在开发Oozie应用程序时,某些场景下需要Oozie与Hadoop、Hive等之间进行通信。那么Oozie应用程序中需要写入安全认证代码,确保Oozie程序能够正常运行。 安全认证有两种方式: 命令行认证: 提交Oozie应用程序运行前,在Oozie客户端执行如下命令获得认证。 kinit 组件业务用户 代码认证(Kerberos安全认证): 通过获取客户端的principal和keytab文件在应用程序中进行认证,用于Kerberos安全认证的keytab文件和principal文件您可以联系管理员创建并获取,具体使用方法在样例代码中会有详细说明。 目前样例代码统一调用LoginUtil类进行安全认证,支持Oracle JAVA平台和IBM JAVA平台。 代码示例中请根据实际情况,修改“USERNAME”为实际用户名,例如“developuser”。 private static void login(String keytabFilePath, String krb5FilePath, String user) throws IOException { Configuration conf = new Configuration(); conf.set(KERBEROS_PRINCIPAL, user); conf.set(KEYTAB_FILE, keytabFilePath); conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); conf.set(HADOOP_SECURITY_AUTHORIZATION, "true"); /* * if need to connect zk, please provide jaas info about zk. of course, * you can do it as below: * System.setProperty("java.security.auth.login.config", confDirPath + * "jaas.conf"); but the demo can help you more : Note: if this process * will connect more than one zk cluster, the demo may be not proper. you * can contact us for more help */ LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_ LOG IN_CONTEXT_NAME, user, keytabFilePath); LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL); LoginUtil.login(user, keytabFilePath, krb5FilePath, conf); }
  • 场景说明 假定Hive的person表存储用户当天消费的金额信息,HBase的table2表存储用户历史消费的金额信息。 现person表有记录name=1,account=100,表示用户1在当天消费金额为100元。 table2表有记录key=1,cf:cid=1000,表示用户1的历史消息记录金额为1000元。 基于某些业务要求,要求开发Spark应用程序实现如下功能: 根据用户名累计用户的历史消费金额,即用户总消费金额=100(用户当天的消费金额) + 1000(用户历史消费金额)。 上例所示,运行结果table2表用户key=1的总消费金融为cf:cid=1100元。
  • 运行任务 进入Spark客户端目录,调用bin/spark-submit脚本运行代码,运行命令分别如下(类名与文件名等请与实际代码保持一致,此处仅为示例): 运行Java或Scala样例代码 bin/spark-submit --class com.huawei.bigdata.spark.examples.SparkHivetoHbase --master yarn --deploy-mode client /opt/female/SparkHivetoHbase-1.0.jar 运行Python样例程序 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现。将所提供 Java代码使用maven打包成jar,并放在相同目录下,运行python程序时要使用--jars把jar包加载到classpath中。 bin/spark-submit --master yarn --deploy-mode client --jars /opt/female/SparkHivetoHbasePythonExample/SparkHivetoHbase-1.0.jar /opt/female/SparkHivetoHbasePythonExample/SparkHivetoHbasePythonExample.py
  • 数据规划 在开始开发应用前,需要创建Hive表,命名为person,并插入数据。同时,创建HBase table2表,用于将分析后的数据写入。 将原日志文件放置到HDFS系统中。 在本地新建一个空白的log1.txt文件,并在文件内写入如下内容: 1,100 在HDFS中新建一个目录/tmp/input,并将log1.txt文件上传至此目录。 在Linux系统HDFS客户端使用命令hadoop fs -mkdir /tmp/input(hdfs dfs命令有同样的作用),创建对应目录。 在Linux系统HDFS客户端使用命令hadoop fs -put log1.txt /tmp/input,上传数据文件。 将导入的数据放置在Hive表里。 首先,确保JDBCServer已启动。然后使用Beeline工具,创建Hive表,并插入数据。 执行如下命令,创建命名为person的Hive表。 create table person ( name STRING, account INT ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY '\\' STORED AS TEXTFILE; 执行如下命令插入数据。 load data inpath '/tmp/input/log1.txt' into table person; 创建HBase表。 确保JDBCServer已启动,然后使用Spark-beeline工具,创建HBase表,并插入数据。 执行如下命令,创建命名为table2的HBase表。 create table table2 ( key string, cid string ) using org.apache.spark.sql.hbase.HBaseSource options( hbaseTableName "table2", keyCols "key", colsMapping "cid=cf.cid"); 通过HBase插入数据,执行如下命令。 put 'table2', '1', 'cf:cid', '1000'
  • Spark SQL常用接口 Spark SQL中重要的类有: SQLContext:是Spark SQL功能和DataFrame的主入口。 DataFrame:是一个以命名列方式组织的分布式数据集 DataFrameReader:从外部存储系统加载DataFrame的接口。 DataFrameStatFunctions:实现DataFrame的统计功能。 UserDefinedFunction:用户自定义的函数。 常见的Actions方法有: 表6 Spark SQL方法介绍 方法 说明 Row[] collect() 返回一个数组,包含DataFrame的所有列。 long count() 返回DataFrame的行数。 DataFrame describe(java.lang.String... cols) 计算统计信息,包含计数,平均值,标准差,最小值和最大值。 Row first() 返回第一行。 Row[] head(int n) 返回前n行。 void show() 用表格形式显示DataFrame的前20行。 Row[] take(int n) 返回DataFrame中的前n行。 表7 基本的DataFrame Functions介绍 方法 说明 void explain(boolean extended) 打印出SQL语句的逻辑计划和物理计划。 void printSchema() 打印schema信息到控制台。 registerTempTable 将DataFrame注册为一张临时表,其周期和SQLContext绑定在一起。 DataFrame toDF(java.lang.String... colNames) 返回一个列重命名的DataFrame。 DataFrame sort(java.lang.String sortCol,java.lang.String... sortCols) 根据不同的列,按照升序或者降序排序。 GroupedData rollup(Column... cols) 对当前的DataFrame特定列进行多维度的回滚操作。
  • 场景说明 假定Hive的person表存储用户当天消费的金额信息,HBase的table2表存储用户历史消费的金额信息。 现person表有记录name=1,account=100,表示用户1在当天消费金额为100元。 table2表有记录key=1,cf:cid=1000,表示用户1的历史消息记录金额为1000元。 基于某些业务要求,要求开发Spark应用程序实现如下功能: 根据用户名累计用户的历史消费金额,即用户总消费金额=100(用户当天的消费金额) + 1000(用户历史消费金额)。 上例所示,运行结果table2表用户key=1的总消费金融为cf:cid=1100元。
  • 打包项目 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/female/user.keytab”,“/opt/female/krb5.conf”。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt/female/” )下。
  • 运行任务 进入Spark客户端目录,调用bin/spark-submit脚本运行代码,运行命令分别如下(类名与文件名等请与实际代码保持一致,此处仅为示例): 运行Java或Scala样例代码 bin/spark-submit --class com.huawei.bigdata.spark.examples.SparkHivetoHbase --master yarn --deploy-mode client /opt/female/SparkHivetoHbase-1.0.jar 运行Python样例程序 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现。将所提供 Java代码使用maven打包成jar,并放在相同目录下,运行python程序时要使用--jars把jar包加载到classpath中。 由于Python样例代码中未给出认证信息,请在执行应用程序时通过配置项“--keytab”和“--principal”指定认证信息。 bin/spark-submit --master yarn --deploy-mode client --keytab /opt/FIclient/user.keytab --principal sparkuser --jars /opt/female/SparkHivetoHbasePythonExample/SparkHivetoHbase-1.0.jar /opt/female/SparkHivetoHbasePythonExample/SparkHivetoHbasePythonExample.py
  • 数据规划 使用Spark-Beeline工具创建Spark和HBase表table1、table2,并通过HBase插入数据。 确保JDBCServer已启动。登录Spark2x客户端节点。 使用Spark-Beeline工具创建Spark表table1。 create table table1 ( key string, cid string ) using org.apache.spark.sql.hbase.HBaseSource options( hbaseTableName "table1", keyCols "key", colsMapping "cid=cf.cid"); 通过HBase插入数据,命令如下: put 'table1', '1', 'cf:cid', '100' 使用Spark-Beeline工具创建Spark表table2。 create table table2 ( key string, cid string ) using org.apache.spark.sql.hbase.HBaseSource options( hbaseTableName "table2", keyCols "key", colsMapping "cid=cf.cid"); 通过HBase插入数据,命令如下: put 'table2', '1', 'cf:cid', '1000'
  • 运行任务 进入Spark客户端目录,调用bin/spark-submit脚本运行代码,运行命令分别如下(类名与文件名等请与实际代码保持一致,此处仅为示例): 运行Java或Scala样例代码 bin/spark-submit --jars --conf spark.yarn.user.classpath.first=true --class com.huawei.bigdata.spark.examples.SparkHbasetoHbase --master yarn --deploy-mode client /opt/female/SparkHbasetoHbase-1.0.jar 运行Python样例程序 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现。将所提供 Java代码使用maven打包成jar,并放在相同目录下,运行python程序时要使用--jars把jar包加载到classpath中。 bin/spark-submit --master yarn --deploy-mode client --conf spark.yarn.user.classpath.first=true --jars /opt/female/SparkHbasetoHbasePythonExample/SparkHbasetoHbase-1.0.jar,/opt/female/protobuf-java-2.5.0.jar /opt/female/SparkHbasetoHbasePythonExample/SparkHbasetoHbasePythonExample.py
  • 场景说明 假定HBase的table1表存储用户当天消费的金额信息,table2表存储用户历史消费的金额信息。 现table1表有记录key=1,cf:cid=100,表示用户1在当天消费金额为100元。 table2表有记录key=1,cf:cid=1000,表示用户1的历史消息记录金额为1000元。 基于某些业务要求,要求开发Spark应用程序实现如下功能: 根据用户名累计用户的历史消费金额,即用户总消费金额=100(用户当天的消费金额) + 1000(用户历史消费金额)。 上例所示,运行结果table2表用户key=1的总消费金融为cf:cid=1100元。
  • 查看Windows调测结果 Doris应用程序运行完成后,可通过如下方式查看运行情况。 通过IntelliJ IDEA运行结果查看应用程序运行情况。 通过Doris日志获取应用程序运行情况。 各样例程序运行结果如下: “doris-jdbc-example”样例运行成功后,显示信息如下: 2023-08-17 23:13:13,473 | INFO | main | Start execute doris example. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:41) 2023-08-17 23:13:13,885 | INFO | main | Start create database. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:44) 2023-08-17 23:13:13,949 | INFO | main | Database created successfully. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:46) 2023-08-17 23:13:13,950 | INFO | main | Start create table. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:49) 2023-08-17 23:13:14,132 | INFO | main | Table created successfully. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:51) 2023-08-17 23:13:14,133 | INFO | main | Start to insert data into the table. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:54) 2023-08-17 23:13:14,733 | INFO | main | Inserting data to the table succeeded. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:56) 2023-08-17 23:13:14,733 | INFO | main | Start to query table data. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:59) 2023-08-17 23:13:15,079 | INFO | main | Start to print query result. | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:121) 2023-08-17 23:13:15,079 | INFO | main | c1 c2 c3 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:126) 2023-08-17 23:13:15,079 | INFO | main | 0 0 0 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 1 10 100 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 2 20 200 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 3 30 300 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 4 40 400 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 5 50 500 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 6 60 600 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,080 | INFO | main | 7 70 700 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,081 | INFO | main | 8 80 800 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,081 | INFO | main | 9 90 900 | com.huawei.bigdata.doris.example.JDBCExample.query(JDBCExample.java:134) 2023-08-17 23:13:15,081 | INFO | main | Querying table data succeeded. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:61) 2023-08-17 23:13:15,081 | INFO | main | Start to delete the table. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:64) 2023-08-17 23:13:15,114 | INFO | main | Table deleted successfully. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:66) 2023-08-17 23:13:15,124 | INFO | main | Doris example execution successfully. | com.huawei.bigdata.doris.example.JDBCExample.main(JDBCExample.java:71) Process finished with exit code 0 Doris对接SpringBoot运行结果 在浏览器中访问链接“http://样例运行节点IP地址:8080/doris/example/executesql”,IDEA正常打印日志,请求返回如下图所示: 图7 返回样例运行信息
  • 场景说明 在安全集群环境下,各个组件之间的相互通信不能够简单的互通,而需要在通信之前进行相互认证,以确保通信的安全性。 用户在提交MapReduce应用程序时,需要与Yarn、HDFS等之间进行通信。那么提交MapReduce的应用程序中需要写入安全认证代码,确保MapReduce程序能够正常运行。 安全认证有两种方式: 命令行认证: 提交MapReduce应用程序运行前,在MapReduce客户端执行如下命令获得认证。 kinit 组件业务用户 代码认证: 通过获取客户端的principal和keytab文件在应用程序中进行认证。
  • Spark Streaming常用接口 Spark Streaming中常见的类有: StreamingContext:是Spark Streaming功能的主入口,负责提供创建DStreams的方法,入参中需要设置批次的时间间隔。 dstream.DStream:是一种代表RDDs连续序列的数据类型,代表连续数据流。 dstream.PariDStreamFunctions:键值对的DStream,常见的操作如groupByKey和reduceByKey。 对应的Spark Streaming的JAVA API是JavaStreamingContext,JavaDStream和JavaPairDStream。 Spark Streaming的常见方法与Spark Core类似,下表罗列了Spark Streaming特有的一些方法。 表4 Spark Streaming方法介绍 方法 说明 socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String] 从TCP源主机:端口创建一个输入流。 start():Unit 启动Spark Streaming计算。 awaitTermination(timeout: long):Unit 当前进程等待终止,如Ctrl+C等。 stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit 终止Spark Streaming计算。 transform[T](dstreams: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) ? RDD[T])(implicit arg0: ClassTag[T]): DStream[T] 对每一个RDD应用function操作得到一个新的DStream。 UpdateStateByKey(func) 更新DStream的状态。使用此方法,需要定义状态和状态更新函数。 window(windowLength, slideInterval) 根据源DStream的窗口批次计算得到一个新的DStream。 countByWindow(windowLength, slideInterval) 返回流中滑动窗口元素的个数。 reduceByWindow(func, windowLength, slideInterval) 当调用在DStream的KV对上,返回一个新的DStream的KV对,其中每个Key的Value根据滑动窗口中批次的reduce函数聚合得到。 join(otherStream, [numTasks]) 实现不同的Spark Streaming之间做合并操作。 DStreamKafkaWriter.writeToKafka() 支持将DStream中的数据批量写入到Kafka。 DStreamKafkaWriter.writeToKafkaBySingle() 支持将DStream中的数据逐条写入到Kafka。 表5 Spark Streaming增强特性接口 方法 说明 DStreamKafkaWriter.writeToKafka() 支持将DStream中的数据批量写入到Kafka。 DStreamKafkaWriter.writeToKafkaBySingle() 支持将DStream中的数据逐条写入到Kafka。
  • SparkSQL常用接口 Spark SQL中常用的类有: SQLContext:是Spark SQL功能和DataFrame的主入口。 DataFrame:是一个以命名列方式组织的分布式数据集。 HiveContext:获取存储在Hive中数据的主入口。 表6 常用的Actions方法 方法 说明 collect(): Array[Row] 返回一个数组,包含DataFrame的所有列。 count(): Long 返回DataFrame中的行数。 describe(cols: String*): DataFrame 计算统计信息,包含计数,平均值,标准差,最小值和最大值。 first(): Row 返回第一行。 Head(n:Int): Row 返回前n行。 show(numRows: Int, truncate: Boolean): Unit 用表格形式显示DataFrame。 take(n:Int): Array[Row] 返回DataFrame中的前n行。 表7 基本的DataFrame Functions 方法 说明 explain(): Unit 打印出SQL语句的逻辑计划和物理计划。 printSchema(): Unit 打印schema信息到控制台。 registerTempTable(tableName: String): Unit 将DataFrame注册为一张临时表,其周期和SQLContext绑定在一起。 toDF(colNames: String*): DataFrame 返回一个列重命名的DataFrame。
  • Spark SQL常用接口 Spark SQL中重要的类有: SQLContext:是Spark SQL功能和DataFrame的主入口。 DataFrame:是一个以命名列方式组织的分布式数据集 DataFrameReader:从外部存储系统加载DataFrame的接口。 DataFrameStatFunctions:实现DataFrame的统计功能。 UserDefinedFunction:用户自定义的函数。 常见的Actions方法有: 表5 Spark SQL方法介绍 方法 说明 Row[] collect() 返回一个数组,包含DataFrame的所有列。 long count() 返回DataFrame的行数。 DataFrame describe(java.lang.String... cols) 计算统计信息,包含计数,平均值,标准差,最小值和最大值。 Row first() 返回第一行。 Row[] head(int n) 返回前n行。 void show() 用表格形式显示DataFrame的前20行。 Row[] take(int n) 返回DataFrame中的前n行。 表6 基本的DataFrame Functions介绍 方法 说明 void explain(boolean extended) 打印出SQL语句的逻辑计划和物理计划。 void printSchema() 打印schema信息到控制台。 registerTempTable 将DataFrame注册为一张临时表,其周期和SQLContext绑定在一起。 DataFrame toDF(java.lang.String... colNames) 返回一个列重命名的DataFrame。 DataFrame sort(java.lang.String sortCol,java.lang.String... sortCols) 根据不同的列,按照升序或者降序排序。 GroupedData rollup(Column... cols) 对当前的DataFrame特定列进行多维度的回滚操作。
  • 操作步骤 一个简单的流处理系统由以下三部分组件组成:数据源 + 接收器 + 处理器。数据源为Kafka,接受器为Streaming中的Kafka数据源接收器,处理器为Streaming。 对Streaming调优,就必须使三个部件的性能都最优化。 数据源调优 在实际的应用场景中,数据源为了保证数据的容错性,会将数据保存在本地磁盘中,而Streaming的计算结果往往全部在内存中完成,数据源很有可能成为流式系统的最大瓶颈点。 对Kafka的性能调优,有以下几个点: 使用Kafka-0.8.2以后版本,可以使用异步模式的新Producer接口。 配置多个Broker的目录,设置多个IO线程,配置Topic合理的Partition个数。 详情请参见Kafka开源文档中的“性能调优”部分:http://kafka.apache.org/documentation.html。 接收器调优 Streaming中已有多种数据源的接收器,例如Kafka、Flume、MQTT、ZeroMQ等,其中Kafka的接收器类型最多,也是最成熟一套接收器。 Kafka包括三种模式的接收器API: KafkaReceiver:直接接收Kafka数据,进程异常后,可能出现数据丢失。 ReliableKafkaReceiver:通过ZooKeeper记录接收数据位移。 DirectKafka:直接通过RDD读取Kafka每个Partition中的数据,数据高可靠。 从实现上来看,DirectKafka的性能会是最好的,实际测试上来看,DirectKafka也确实比其他两个API性能好。因此推荐使用DirectKafka的API实现接收器。 数据接收器作为一个Kafka的消费者,对于它的配置优化,请参见Kafka开源文档:http://kafka.apache.org/documentation.html。 处理器调优 Streaming的底层由Spark执行,因此大部分对于Spark的调优措施,都可以应用在Streaming之中,例如: 数据序列化 配置内存 设置并行度 使用External Shuffle Service提升性能 在做Spark Streaming的性能优化时需注意一点,越追求性能上的优化,Streaming整体的可靠性会越差。例如: “spark.streaming.receiver.writeAheadLog.enable”配置为“false”的时候,会明显减少磁盘的操作,提高性能,但由于缺少WAL机制,会出现异常恢复时,数据丢失。 因此,在调优Streaming的时候,这些保证数据可靠性的配置项,在生产环境中是不能关闭的。
  • 场景说明 假定Hive的person表存储用户当天消费的金额信息,HBase的table2表存储用户历史消费的金额信息。 现person表有记录name=1,account=100,表示用户1在当天消费金额为100元。 table2表有记录key=1,cf:cid=1000,表示用户1的历史消息记录金额为1000元。 基于某些业务要求,要求开发Spark应用程序实现如下功能: 根据用户名累计用户的历史消费金额,即用户总消费金额=100(用户当天的消费金额) + 1000(用户历史消费金额)。 上例所示,运行结果table2表用户key=1的总消费金融为cf:cid=1100元。
  • Spark Streaming常用概念 Dstream DStream(又称Discretized Stream)是Spark Streaming提供的抽象概念。 DStream表示一个连续的数据流,是从数据源获取或者通过输入流转换生成的数据流。从本质上说,一个DStream表示一系列连续的RDD。RDD一个只读的、可分区的分布式数据集。 DStream中的每个RDD包含了一个区间的数据。如图4所示。 图4 DStream与RDD关系 应用到DStream上的所有算子会被转译成下层RDD的算子操作,如图5所示。这些下层的RDD转换会通过Spark引擎进行计算。DStream算子隐藏大部分的操作细节,并且提供了方便的High-level API给开发者使用。 图5 DStream算子转译
  • Spark SQL常用概念 DataFrame DataFrame是一个由多个列组成的结构化的分布式数据集合,等同于关系数据库中的一张表,或者是R/Python中的Data Frame。DataFrame是Spark SQL中的最基本的概念,可以通过多种方式创建,例如结构化的数据集、Hive表、外部数据库或者RDD。 Spark SQL的程序入口是SQLContext类(或其子类),创建SQLContext时需要一个SparkContext对象作为其构造参数。SQLContext其中一个子类是HiveContext,相较于其父类,HiveContext添加了HiveQL的parser、UDF以及读取存量Hive数据的功能等。但注意,HiveContext并不依赖运行时的Hive,只是依赖Hive的类库。 由SQLContext及其子类可以方便的创建SparkSQL中的基本数据集DataFrame,DataFrame向上提供多种多样的编程接口,向下兼容多种不同的数据源,例如Parquet、JSON、Hive数据、Database、HBase等,这些数据源都可以使用统一的语法来读取。
  • 安全认证代码(Java版) 目前样例代码统一调用LoginUtil类进行安全认证。 在Spark样例工程代码中,不同的样例工程,使用的认证代码不同,基本安全认证或带ZooKeeper认证。样例工程中使用的示例认证参数如表2所示,请根据实际情况修改对应参数值。 表2 参数描述 参数 示例参数值 描述 userPrincipal sparkuser 用户用于认证的账号Principal,您可以联系管理员获取此账号。 userKeytabPath /opt/FIclient/user.keytab 用户用于认证的Keytab文件,您可以联系管理员获取文件。 krb5ConfPath /opt/FIclient/KrbClient/kerberos/var/krb5kdc/krb5.conf krb5.conf文件路径和文件名称。 ZKServerPrincipal zookeeper/hadoop.hadoop.com ZooKeeper服务端principal。请联系管理员获取对应账号。 基本安全认证: Spark Core和Spark SQL程序不需要访问HBase或ZooKeeper,所以使用基本的安全认证代码即可。请在程序中添加如下代码,并根据实际情况设置安全认证相关参数: String userPrincipal = "sparkuser"; String userKeytabPath = "/opt/FIclient/user.keytab"; String krb5ConfPath = "/opt/FIclient/KrbClient/kerberos/var/krb5kdc/krb5.conf"; Configuration hadoopConf = new Configuration(); LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf); 带ZooKeeper认证: 由于“Spark Streaming”、“通过JDBC访问Spark SQL”和“Spark on HBase”样例程序,不仅需要基础安全认证,还需要添加ZooKeeper服务端Principal才能完成安全认证。请在程序中添加如下代码,并根据实际情况设置安全认证相关参数: String userPrincipal = "sparkuser"; String userKeytabPath = "/opt/FIclient/user.keytab"; String krb5ConfPath = "/opt/FIclient/KrbClient/kerberos/var/krb5kdc/krb5.conf"; String ZKServerPrincipal = "zookeeper/hadoop.hadoop.com"; String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client"; String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal"; Configuration hadoopConf = new Configuration(); LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userPrincipal, userKeytabPath); LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZKServerPrincipal); LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf);
共100000条