云服务器内容精选

  • Spark使用说明 MRS 3.3.0-LTS及之后的版本中,Spark2x服务改名为Spark,服务包含的角色名也有差异,例如JobHistory2x变更为JobHistory。 相关涉及服务名称、角色名称的描述和操作请以实际版本为准。 Spark是一个开源的,并行数据处理框架,能够帮助用户简单、快速的开发大数据应用,对数据进行离线处理、流式处理、交互式分析等。 相比于Hadoop,Spark拥有明显的性能优势。 父主题: 使用Spark/Spark2x
  • 回答 打开 FusionInsight Manager页面,看到Yarn服务的业务IP地址为192网段。 从Yarn的日志看到,Yarn读取的Spark Web UI地址为http://10.120.169.53:23011,是10网段的IP地址。由于192网段的IP和10网段的IP不能互通,所以导致访问Spark Web UI界面失败。 修改方案: 登录10.120.169.53客户端机器,修改/etc/hosts文件,将10.120.169.53更改为相对应的192网段的IP地址。再重新运行Spark应用,这时就可以打开Spark Web UI界面。
  • 场景说明 假定用户有某个周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Spark应用程序实现如下要求: 统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 log2.txt:周日网民停留日志 LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
  • 问题 在Spark2x的spark-shell上执行如下语句失败: val acctId = List(("49562", "Amal", "Derry"), ("00000", "Fred", "Xanadu")) val rddLeft = sc.makeRDD(acctId) val dfLeft = rddLeft.toDF("Id", "Name", "City") //dfLeft.show val acctCustId = List(("Amal", "49562", "CO"), ("Dave", "99999", "ZZ")) val rddRight = sc.makeRDD(acctCustId) val dfRight = rddRight.toDF("Name", "CustId", "State") //dfRight.show val dfJoin = dfLeft.join(dfRight, dfLeft("Id") === dfRight("CustId"), "outer") dfJoin.show dfJoin.repartition(1).write.format("com.databricks.spark.csv").option("delimiter", "\t").option("header", "true").option("treatEmptyValuesAsNulls", "true").option("nullValue", "").save("/tmp/outputDir")
  • Spark2x开源新特性说明 Spark2x版本相对于Spark 1.5版本新增了一些开源特性。 具体特性或相关概念如下: DataSet,详见SparkSQL和DataSet原理。 Spark SQL Native DDL/DML,详见SparkSQL和DataSet原理。 SparkSession,详见SparkSession原理。 Structured Streaming,详见Structured Streaming原理。 小文件优化。 聚合算法优化。 Datasource表优化。 合并CBO优化。 父主题: Spark2x开源增强特性
  • 实现方案 多租户模式的HA方案原理如图1所示。 图1 Spark JD BCS erver多租户 ProxyServer在启动时,向ZooKeeper注册自身消息,在指定目录中写入节点信息,节点信息包含了该实例对应的IP,端口,版本号和序列号等信息(多节点信息之间以逗号隔开)。 多租户模式下,MRS页面上的JDB CS erver实例是指ProxyServer(JDBCServer代理)。 示例如下: serverUri=192.168.169.84:22550 ;version=8.1.0.1;sequence=0000001244,serverUri=192.168.195.232:22550 ;version=8.1.0.1;sequence=0000001242,serverUri=192.168.81.37:22550 ;version=8.1.0.1;sequence=0000001243, 客户端连接ProxyServer时,需要指定Namespace,即访问ZooKeeper哪个目录下的ProxyServer实例。在连接的时候,会根据当前租户名的Hash值与Zookeeper下的Namespace实例个数取模获取连接的实例,详细URL参见URL连接介绍。 客户端成功连接ProxyServer服务,ProxyServer服务首先确认是否有该租户的JDBCServer存在,如果有,直接将Beeline连上真正的JDBCServer;如果没有,则以YARN-Cluster模式启动一个新的JDBCServer。JDBCServer启动成功后,ProxyServer会获取JDBCServer的地址,并将Beeline连上JDBCServer。 客户端发送SQL语句给ProxyServer,ProxyServer将语句转交给真正连上的JDBCServer处理。最后JDBCServer服务将结果返回给ProxyServer,ProxyServer再将结果返回给客户端。 在HA方案中,每个ProxyServer服务(即实例)都是独立且等同的,当其中一个实例在升级或者业务中断时,其他的实例也能接受客户端的连接请求。
  • 背景介绍 JDBCServer多主实例方案中,JDBCServer的实现使用YARN-Client模式,但YARN资源队列只有一个,为了解决这种资源局限的问题,引入了多租户模式。 多租户模式是将JDBCServer和租户绑定,每一个租户对应一个或多个JDBCServer,而一个JDBCServer只给一个租户提供服务。不同的租户可以配置不同的YARN队列,从而达到资源隔离,且JDBCServer根据需求动态启动,可避免浪费资源。
  • Spark Streaming常用概念 Dstream DStream(又称Discretized Stream)是Spark Streaming提供的抽象概念。 DStream表示一个连续的数据流,是从数据源获取或者通过输入流转换生成的数据流。从本质上说,一个DStream表示一系列连续的RDD。RDD是一个只读的、可分区的分布式数据集。 DStream中的每个RDD包含了一个区间的数据。如图4所示。 图4 DStream与RDD关系 应用到DStream上的所有算子会被转译成下层RDD的算子操作,如图5所示。这些下层的RDD转换会通过Spark引擎进行计算。DStream算子隐藏大部分的操作细节,并且提供了方便的High-level API给开发者使用。 图5 DStream算子转译
  • Spark简介 Spark是分布式批处理框架,提供分析挖掘与迭代式内存计算能力,支持多种语言(Scala/Java/Python)的应用开发。 适用以下场景: 数据处理(Data Processing):可以用来快速处理数据,兼具容错性和可扩展性。 迭代计算(Iterative Computation):支持迭代计算,有效应对多步的数据处理逻辑。 数据挖掘(Data Mining):在海量数据基础上进行复杂的挖掘分析,可支持各种数据挖掘和机器学习算法。 流式处理(Streaming Processing):支持秒级延迟的流式处理,可支持多种外部数据源。 查询分析(Query Analysis):支持标准SQL查询分析,同时提供DSL(DataFrame), 并支持多种外部输入。 本文档重点介绍Spark、Spark SQL和Spark Streaming应用开发指导。
  • Spark SQL常用概念 DataSet DataSet是一个由特定域的对象组成的强类型集合,可通过功能或关系操作并行转换其中的对象。 每个Dataset还有一个非类型视图,即由多个列组成的DataSet,称为DataFrame。 DataFrame是一个由多个列组成的结构化的分布式数据集合,等同于关系数据库中的一张表,或者是R/Python中的data frame。DataFrame是Spark SQL中的最基本的概念,可以通过多种方式创建,例如结构化的数据集、Hive表、外部数据库或者是RDD。
  • Structured Streaming常用概念 Input Source 输入数据源,数据源需要支持根据offset重放数据,不同的数据源有不同的容错性。 Sink 数据输出,Sink要支持幂等性写入操作,不同的sink有不同的容错性。 outputMode 结果输出模式,当前支持3种输出模: Complete Mode:整个更新的结果集都会写入外部存储。整张表的写入操作将由外部存储系统的连接器完成。 Append Mode:当时间间隔触发时,只有在Result Table中新增加的数据行会被写入外部存储。这种方式只适用于结果集中已经存在的内容不希望发生改变的情况下,如果已经存在的数据会被更新,不适合适用此种方式。 Update Mode:当时间间隔触发时,只有在Result Table中被更新的数据才会被写入外部存储系统。注意,和Complete Mode方式的不同之处是不更新的结果集不会写入外部存储。 Trigger 输出触发器,当前支持以下几种trigger: 默认:以微批模式执行,每个批次完成后自动执行下个批次。 固定间隔:固定时间间隔执行。 一次执行:只执行一次query,完成后退出。 连续模式:实验特性,可实现低至1ms延迟的流处理(推荐100ms)。 Structured Streaming支持微批模式和连续模式。微批模式不能保证对数据的低延迟处理,但是在相同时间下有更大的吞吐量;连续模式适合毫秒级的数据处理延迟,当前暂时还属于实验特性。 在当前版本中,若需要使用流流Join功能,则output模式只能选择append模式。 图6 微批模式运行过程简图 图7 连续模式运行过程简图
  • Spark开发接口简介 Spark支持使用Scala、Java和Python语言进行程序开发,由于Spark本身是由Scala语言开发出来的,且Scala语言具有简洁易懂的特性,推荐用户使用Scala语言进行Spark应用程序开发。 按不同的语言分,Spark的API接口如表1所示。 表1 Spark API接口 功能 说明 Scala API 提供Scala语言的API,Spark Core、SparkSQL和Spark Streaming模块的常用接口请参见Spark Scala API接口介绍。由于Scala语言的简洁易懂,推荐用户使用Scala接口进行程序开发。 Java API 提供Java语言的API,Spark Core、SparkSQL和Spark Streaming模块的常用接口请参见Spark Java API接口介绍。 Python API 提供Python语言的API,Spark Core、SparkSQL和Spark Streaming模块的常用接口请参见Spark Python API接口介绍。 按不同的模块分,Spark Core和Spark Streaming使用上表中的API接口进行程序开发。而SparkSQL模块,支持CLI或者JDBCServer两种方式访问。其中JDBCServer的连接方式也有Beeline和JDBC客户端代码两种。详情请参见Spark JDBCServer接口介绍。 spark-sql脚本、spark-shell脚本和spark-submit脚本(运行的应用中带SQL操作),不支持使用proxy user参数去提交任务。另外,由于本文档中涉及的样例程序已添加安全认证,建议不要使用proxy user参数去提交任务。
  • Structured Streaming常用概念 Input Source 输入数据源,数据源需要支持根据offset重放数据,不同的数据源有不同的容错性。 Sink 数据输出,Sink要支持幂等性写入操作,不同的sink有不同的容错性。 outputMode 结果输出模式,当前支持3种输出模: Complete Mode:整个更新的结果集都会写入外部存储。整张表的写入操作将由外部存储系统的连接器完成。 Append Mode:当时间间隔触发时,只有在Result Table中新增加的数据行会被写入外部存储。这种方式只适用于结果集中已经存在的内容不希望发生改变的情况下,如果已经存在的数据会被更新,不适合适用此种方式。 Update Mode:当时间间隔触发时,只有在Result Table中被更新的数据才会被写入外部存储系统。注意,和Complete Mode方式的不同之处是不更新的结果集不会写入外部存储。 Trigger 输出触发器,当前支持以下几种trigger: 默认:以微批模式执行,每个批次完成后自动执行下个批次。 固定间隔:固定时间间隔执行。 一次执行:只执行一次query,完成后退出。 连续模式:实验特性,可实现低至1ms延迟的流处理(推荐100ms)。 Structured Streaming支持微批模式和连续模式。微批模式不能保证对数据的低延迟处理,但是在相同时间下有更大的吞吐量;连续模式适合毫秒级的数据处理延迟,当前暂时还属于实验特性。 在当前版本中,若需要使用流流Join功能,则output模式只能选择append模式。 图6 微批模式运行过程简图 图7 连续模式运行过程简图
  • Spark SQL常用概念 DataSet DataSet是一个由特定域的对象组成的强类型集合,可通过功能或关系操作并行转换其中的对象。 每个Dataset还有一个非类型视图,即由多个列组成的DataSet,称为DataFrame。 DataFrame是一个由多个列组成的结构化的分布式数据集合,等同于关系数据库中的一张表,或者是R/Python中的data frame。DataFrame是Spark SQL中的最基本的概念,可以通过多种方式创建,例如结构化的数据集、Hive表、外部数据库或者是RDD。
  • Spark Streaming常用概念 Dstream DStream(又称Discretized Stream)是Spark Streaming提供的抽象概念。 DStream表示一个连续的数据流,是从数据源获取或者通过输入流转换生成的数据流。从本质上说,一个DStream表示一系列连续的RDD。RDD是一个只读的、可分区的分布式数据集。 DStream中的每个RDD包含了一个区间的数据。如图4所示。 图4 DStream与RDD关系 应用到DStream上的所有算子会被转译成下层RDD的算子操作,如图5所示。这些下层的RDD转换会通过Spark引擎进行计算。DStream算子隐藏大部分的操作细节,并且提供了方便的High-level API给开发者使用。 图5 DStream算子转译