云服务器内容精选
-
回答 该应用程序中使用了DStream中的print算子来显示结果,该算子会调用RDD中的take算子来实现底层的计算。 Take算子会以Partition为单位多次触发计算。 在该问题中,由于Shuffle操作,导致take算子默认有两个Partition,Spark首先计算第一个Partition,但由于没有数据输入,导致获取结果不足10个,从而触发第二次计算,因此会出现RDD的DAG结构打印两次的现象。 在代码中将print算子修改为foreach(collect),该问题则不会出现。
-
配置场景 当Spark Streaming应用与Kafka对接,Spark Streaming应用异常终止并从checkpoint恢复重启后,对于进入Kafka数据的任务,系统默认优先处理应用终止前(A段时间)未完成的任务和应用终止到重启完成这段时间内(B段时间)进入Kafka数据生成的任务,最后再处理应用重启完成后(C段时间)进入Kafka数据生成的任务。并且对于B段时间进入Kafka的数据,Spark将按照终止时间(batch时间)生成相应个数的任务,其中第一个任务读取全部数据,其余任务可能不读取数据,造成任务处理压力不均匀。 如果A段时间的任务和B段时间任务处理得较慢,则会影响C段时间任务的处理。针对上述场景,Spark提供Kafka后进先出功能。 图1 Spark Streaming应用重启时间轴 开启此功能后,Spark将优先调度C段时间内的任务,如果存在多个C段任务,则按照任务产生的先后顺序调度执行,再执行A段时间和B段时间的任务。另外,对于B段时间进入Kafka的数据,Spark除了按照终止时间生成相应任务,还将这个期间进入Kafka的所有数据均匀分配到各个任务,避免任务处理压力不均匀。 约束条件: 目前该功能只适用于Spark Streaming中的Direct方式,且执行结果与上一个batch时间处理结果没有依赖关系(即无state操作,如updatestatebykey)。对多条数据输入流,需要相对独立无依赖的状态,否则可能导致数据切分后结果发生变化。 Kafka后进先出功能的开启要求应用只能对接Kafka输入源。 如果提交应用的同时开启Kafka后进先出和流控功能,对于B段时间进入Kafka的数据,将不启动流控功能,以确保读取这些数据的任务调度优先级最低。应用重新启动后C段时间的任务启用流控功能。
-
配置描述 在Spark Driver端的“spark-defaults.conf”配置文件中进行设置。 表1 参数说明 参数 说明 默认值 spark.streaming.kafka.direct.lifo 配置是否开启Kafka后进先出功能。 false spark.streaming.kafka010.inputstream.class 获取解耦在 FusionInsight 侧的类。 org.apache.spark.streaming.kafka010.xxDirectKafkaInputDStream
-
回答 经过定位发现,导致这个问题的原因是:Spark Streaming的计算核数少于Receiver的个数,导致部分Receiver启动以后,系统已经没有资源去运行计算任务,导致第一个任务一直在等待,后续任务一直在排队。从现象上看,就是如问题中的图1中所示,会有两个任务一直在等待。 因此,当Web出现两个任务一直在等待的情况,首先检查Spark的核数是否大于Receiver的个数。 Receiver在Spark Streaming中是一个常驻的Spark Job,Receiver对于Spark是一个普通的任务,但它的生命周期和Spark Streaming任务相同,并且占用一个核的计算资源。 在调试和测试等经常使用默认配置的场景下,要时刻注意核数与Receiver个数的关系。
-
问题 运行一个Spark Streaming任务,确认有数据输入后,发现没有任何处理的结果。打开Web界面查看Spark Job执行情况,发现如下图所示:有两个Job一直在等待运行,但一直无法成功运行。 图1 Active Jobs 继续查看已经完成的Job,发现也只有两个,说明Spark Streaming都没有触发数据计算的任务(Spark Streaming默认有两个尝试运行的Job,就是图中两个) 图2 Completed Jobs
-
问题 在Spark Streaming应用执行过程中重启Kafka时,应用无法从Kafka获取topic offset,从而导致生成Job失败。如图1所示,其中2017/05/11 10:57:00~2017/05/11 10:58:00为Kafka重启时间段。2017/05/11 10:58:00重启成功后对应的“Input Size”的值显示为“0 records”。 图1 Web UI界面部分batch time对应Input Size为0 records
-
回答 Kafka重启成功后应用会按照batch时间把2017/05/11 10:57:00~2017/05/11 10:58:00缺失的RDD补上(如图2所示),尽管UI界面上显示读取的数据个数为“0”,但实际上这部分数据在补的RDD中进行了处理,因此,不存在数据丢失。 Kafka重启时间段的数据处理机制如下。 Spark Streaming应用使用了state函数(例如:updateStateByKey),在Kafka重启成功后,Spark Streaming应用生成2017/05/11 10:58:00 batch任务时,会按照batch时间把2017/05/11 10:57:00~2017/05/11 10:58:00缺失的RDD补上(Kafka重启前Kafka上未读取完的数据,属于2017/05/11 10:57:00之前的batch),如图2所示。 图2 重启时间段缺失数据处理机制
-
配置场景 当Spark Streaming应用与Kafka对接,Spark Streaming应用异常终止并从checkpoint恢复重启后,对于进入Kafka数据的任务,系统默认优先处理应用终止前(A段时间)未完成的任务和应用终止到重启完成这段时间内(B段时间)进入Kafka数据生成的任务,最后再处理应用重启完成后(C段时间)进入Kafka数据生成的任务。并且对于B段时间进入Kafka的数据,Spark将按照终止时间(batch时间)生成相应个数的任务,其中第一个任务读取全部数据,其余任务可能不读取数据,造成任务处理压力不均匀。 若A段时间的任务和B段时间任务处理得较慢,则会影响C段时间任务的处理。针对上述场景,Spark提供Kafka后进先出功能。 图1 Spark Streaming应用重启时间轴 开启此功能后,Spark将优先调度C段时间内的任务,若存在多个C段任务,则按照任务产生的先后顺序调度执行,再执行A段时间和B段时间的任务。另外,对于B段时间进入Kafka的数据,Spark除了按照终止时间生成相应任务,还将这个期间进入Kafka的所有数据均匀分配到各个任务,避免任务处理压力不均匀。 约束条件: 目前该功能只适用于Spark Streaming中的Direct方式,且执行结果与上一个batch时间处理结果没有依赖关系(即无state操作,如updatestatebykey)。对多条数据输入流,需要相对独立无依赖的状态,否则可能导致数据切分后结果发生变化。 Kafka后进先出功能的开启要求应用只能对接Kafka输入源。 若提交应用的同时开启Kafka后进先出和流控功能,对于B段时间进入Kafka的数据,将不启动流控功能,以确保读取这些数据的任务调度优先级最低。应用重新启动后C段时间的任务启用流控功能。
-
配置描述 在Spark Driver端的“spark-defaults.conf”配置文件中进行设置。 表1 参数说明 参数 说明 默认值 spark.streaming.kafka.direct.lifo 配置是否开启Kafka后进先出功能。 false spark.streaming.kafka010.inputstream.class 获取解耦在FusionInsight侧的类 org.apache.spark.streaming.kafka010.HWDirectKafkaInputDStream
-
数据规划 Spark Streaming样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户)。 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。 本地新建文件“input_data1.txt”,将“log1.txt”的内容复制保存到“input_data1.txt”。 在客户端安装节点下创建文件目录:“/home/data”。将上述文件上传到此“/home/data”目录下。 创建Topic。 {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。 $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic} 启动Kafka的Producer,向Kafka发送数据。 java -cp {ClassPath} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic} 其中,ClassPath除样例工程jar包路径外,还应包含Spark客户端Kafka jar包的绝对路径,例如:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/*:{ClassPath}
-
数据规划 Spark Streaming样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户)。 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。 本地新建文件“input_data1.txt”,将“log1.txt”的内容复制保存到“input_data1.txt”。 在客户端安装节点下创建文件目录:“/home/data”。将上述文件上传到此“/home/data”目录下。 将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”。 创建Topic。 {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。 $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic} 启动Kafka的Producer,向Kafka发送数据。 java -cp {ClassPath} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic} 其中,ClassPath除样例jar包路径外,还应包含Spark客户端Kafka jar包的绝对路径,例如:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/*:{ClassPath}
-
打包项目 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中编包并运行Spark程序。 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/female/user.keytab”,“/opt/female/krb5.conf”。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt” )下。
-
数据规划 Spark Streaming样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户)。 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。 本地新建文件“input_data1.txt”,将“log1.txt”的内容复制保存到“input_data1.txt”。 在客户端安装节点下创建文件目录:“/home/data”。将上述文件上传到此“/home/data”目录下。 创建Topic。 {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。 $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic} 启动Kafka的Producer,向Kafka发送数据。 java -cp {ClassPath} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic} 其中,ClassPath应包含Spark客户端Kafka jar包的绝对路径,如/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/*
-
打包项目 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/female/user.keytab”,“/opt/female/krb5.conf”。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt” )下。 准备依赖包,将下列jar包上传到Spark客户端所在服务器,“$SPARK_HOME/jars/streamingClient010”目录下。 spark-streaming-kafkaWriter-0-10_2.12-3.1.1-hw-ei-311001.jar kafka-clients-xxx.jar kafka_2.12-xxx.jar spark-sql-kafka-0-10_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar spark-streaming-kafka-0-10_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar spark-token-provider-kafka-0-10_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar 版本号中包含hw-ei的依赖包请从华为开源镜像站下载。 版本号中不包含hw-ei的依赖包都来自开源仓库,请从Maven中心仓获取。
-
数据规划 Spark Streaming样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户)。 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。 本地新建文件“input_data1.txt”,将“log1.txt”的内容复制保存到“input_data1.txt”。 在客户端安装节点下创建文件目录:“/home/data”。将上述文件上传到此“/home/data”目录下。 将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”。 创建Topic。 {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。 $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic} 启动Kafka的Producer,向Kafka发送数据。 java -cp {ClassPath} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic} 其中,ClassPath应包含Spark客户端Kafka jar包的绝对路径,如/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/*
更多精彩内容
CDN加速
GaussDB
文字转换成语音
免费的服务器
如何创建网站
域名网站购买
私有云桌面
云主机哪个好
域名怎么备案
手机云电脑
SSL证书申请
云点播服务器
免费OCR是什么
电脑云桌面
域名备案怎么弄
语音转文字
文字图片识别
云桌面是什么
网址安全检测
网站建设搭建
国外CDN加速
SSL免费证书申请
短信批量发送
图片OCR识别
云数据库MySQL
个人域名购买
录音转文字
扫描图片识别文字
OCR图片识别
行驶证识别
虚拟电话号码
电话呼叫中心软件
怎么制作一个网站
Email注册网站
华为VNC
图像文字识别
企业网站制作
个人网站搭建
华为云计算
免费租用云托管
云桌面云服务器
ocr文字识别免费版
HTTPS证书申请
图片文字识别转换
国外域名注册商
使用免费虚拟主机
云电脑主机多少钱
鲲鹏云手机
短信验证码平台
OCR图片文字识别
SSL证书是什么
申请企业邮箱步骤
免费的企业用邮箱
云免流搭建教程
域名价格