华为云用户手册

  • 应用开发操作步骤 确认产品Storm组件已经安装,且正常运行。 参考获取 MRS 应用开发样例工程,获取样例代码解压目录中“src\storm-examples”目录下的样例工程文件夹storm-examples并将storm-examples导入到IntelliJ IDEA开发环境,参见准备Storm应用开发环境。 工程导入后,修改样例工程的“resources/flux-examples”目录下的“jdbc.properties”文件,根据实际环境信息修改相关参数。 #配置JDBC服务端IP地址 JDBC_SERVER_NAME= #配置JDBC服务端端口 JDBC_PORT_NUM= #配置JDBC登录用户名 JDBC_USER_NAME= #配置JDBC登录用户密码 #密码明文存储存在安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全 JDBC_PASSWORD= #配置database表名 JDBC_BASE_TBL= 在Linux环境下安装Storm客户端。 集群的Master节点或者Core节点使用客户端可参考集群内节点使用MRS客户端,MRS集群外客户端的安装操作可参考集群外节点使用MRS客户端。
  • 功能分解 根据上述场景进行功能分解,如表1所示: 表1 在应用中开发的功能 序号 步骤 代码示例 1 创建一个Spout用来生成随机文本 请参见创建Storm Spout 2 创建一个Bolt用来将收到的随机文本拆分成一个个单词 请参见创建Storm Bolt 3 创建一个Blot用来统计收到的各单词次数 请参见创建Storm Bolt 4 创建topology 请参见创建Storm Topology 部分代码请参考开发Storm应用,完整代码请参考Storm-examples示例工程。
  • 场景说明 一个动态单词统计系统,数据源为持续生产随机文本的逻辑单元,业务处理流程如下: 数据源持续不断地发送随机文本给文本拆分逻辑,如“apple orange apple”。 单词拆分逻辑将数据源发送的每条文本按空格进行拆分,如“apple”,“orange”,“apple”,随后将每个单词逐一发给单词统计逻辑。 单词统计逻辑每收到一个单词就进行加一操作,并将实时结果打印输出,如: apple:1 orange:1 apple:2
  • 部署运行及结果查看 导出本地jar包,请参见打包Storm样例工程应用。 将4中获取的配置文件和5中获取的jar包合并统一打出完整的业务jar包,请参见打包Storm应用业务。 将开发好的yaml文件及相关的properties文件复制至storm客户端所在主机的任意目录下,如“/opt”。 执行命令提交拓扑。 storm jar /opt/jartarget/source.jar org.apache.storm.flux.Flux --remote /opt/my-topology.yaml 如果设置业务以本地模式启动,则提交命令如下: storm jar /opt/jartarget/source.jar org.apache.storm.flux.Flux --local /opt/my-topology.yaml 如果业务设置为本地模式,请确保提交环境为普通模式环境,当前不支持安全环境下使用命令提交本地模式的业务。 如果使用了properties文件,则提交命令如下: storm jar /opt/jartarget/source.jar org.apache.storm.flux.Flux --remote /opt/my-topology.yaml --filter /opt/my-prop.properties 拓扑提交成功后请自行登录storm UI查看。
  • 准备JDBC/HCatalog开发环境 表1 JDBC/HCatalog开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows7以上版本。 运行环境:Windows系统或Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置。版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端:Oracle JDK:支持1.8版本;IBM JDK:支持1.8.5.11版本。 TaiShan客户端:OpenJDK:支持1.8.0_272版本。 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装和配置IntelliJ IDEA 用于开发Hive应用程序的工具。版本要求如下: JDK使用1.8版本,IntelliJ IDEA使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 华为提供开源镜像站,各服务样例工程依赖的Jar包通过华为开源镜像站下载,剩余所依赖的开源Jar包请直接从Maven中央库或者其他用户自定义的仓库地址下载,详情请参考配置华为开源镜像仓。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-Zip 16.04版本。
  • Storm应用开发流程 本文档主要基于Java API进行Storm拓扑的开发。 开发流程中各阶段的说明如图1和表1所示: 图1 拓扑开发流程 表1 Storm应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解Storm的基本概念,了解场景需求,拓扑等。 Storm应用开发常用概念 准备开发和运行环境 Storm的应用程序当前推荐使用Java语言进行开发。可使用IntelliJ IDEA工具。 Storm的运行环境即Storm客户端,请根据指导完成客户端的安装和配置。 准备Storm应用开发和运行环境 准备工程 Storm提供了不同场景下的样例程序,您可以导入样例工程进行程序学习。 导入并配置Storm样例工程 根据场景开发拓扑 提供了Storm拓扑的构造和Spout/Bolt开发过程。 开发Storm应用 打包IntelliJ IDEA代码 Storm样例程序是在Linux环境下运行,需要将IntelliJ IDEA中的代码打包成jar包。 打包Storm样例工程应用 打包业务 将IntelliJ IDEA代码生成的jar包与工程依赖的jar包,合并导出可提交的source.jar。 打包Storm应用业务 提交拓扑 指导用户将开发好的程序提交运行。 提交Storm拓扑 查看程序运行结果 指导用户提交拓扑后查看程序运行结果。 查看Storm应用调测结果 父主题: Storm应用开发概述
  • 操作步骤 将从IntelliJ IDEA中导出的jar包复制到Linux客户端指定目录(例如“/opt/jarsource”)。 若业务需要访问外部组件,其所依赖的配置文件请参考相关开发指引,获取到配置文件后将配置文件放在1中指定的目录下。 若业务需要访问外部组件,其所依赖的jar包请参考相关开发指引,获取到jar包后将jar包放在1中指定的目录下。 在Storm客户端安装目录“Storm/storm-1.2.1/bin”下执行打包命令,将上述jar包打成一个完整的业务jar包放入指定目录/opt/jartarget(可为任意空目录)。执行sh storm-jartool.sh /opt/jarsource/ /opt/jartarget命令后,会在“/opt/jartarget”下生成source.jar。
  • 回答 导致这个问题的主要原因是,yarn-client和yarn-cluster模式在提交任务时setAppName的执行顺序不同导致,yarn-client中setAppName是在向yarn注册Application之前读取,yarn-cluster模式则是在向yarn注册Application之后读取,这就导致yarn-cluster模式设置的应用名不生效。 解决措施: 在spark-submit脚本提交任务时用--name设置应用名和sparkconf.setAppName(appname)里面的应用名一样。 比如代码里设置的应用名为Spark Pi,用yarn-cluster模式提交应用时可以这样设置,在--name后面添加应用名,执行的命令如下: ./spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --name SparkPi jars/original-spark-examples*.jar 10
  • 代码样例 代码示例中请根据实际情况,修改“OOZIE_URL_DEFAULT”为实际的任意Oozie的主机名,例如“https://10-1-131-131:21003/oozie/”。 public void test(String jobFilePath) { try { runJob(jobFilePath); } catch (Exception exception) { exception.printStackTrace(); } } private void runJob(String jobFilePath) throws OozieClientException, InterruptedException { Properties conf = getJobProperties(jobFilePath); String user = PropertiesCache.getInstance().getProperty("submit_user"); conf.setProperty("user.name", user); // submit and start the workflow job String jobId = oozieClient.run(conf); logger.info("Workflow job submitted: {}" , jobId); // wait until the workflow job finishes printing the status every 10 seconds while (oozieClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) { logger.info("Workflow job running ... {}" , jobId); Thread.sleep(10 * 1000); } // print the final status of the workflow job logger.info("Workflow job completed ... {}" , jobId); logger.info(String.valueOf(oozieClient.getJobInfo(jobId))); } /** * Get job.properties File in filePath * * @param filePath file path * @return job.properties * @since 2020-09-30 */ public Properties getJobProperties(String filePath) { File configFile = new File(filePath); if (!configFile.exists()) { logger.info(filePath , "{} is not exist."); } InputStream inputStream = null; // create a workflow job configuration Properties properties = oozieClient.createConfiguration(); try { inputStream = new FileInputStream(filePath); properties.load(inputStream); } catch (Exception e) { e.printStackTrace(); } finally { if (inputStream != null) { try { inputStream.close(); } catch (IOException ex) { ex.printStackTrace(); } } } return properties; }
  • 操作步骤 获取样例代码。 下载样例工程的Maven工程源码和配置文件,请参见获取代码样例工程。 将样例代码导入IDEA中。 获取配置文件。 从集群的客户端中获取文件。在“$SPARK_HOME/conf”中下载hive-site.xml与spark-defaults.conf文件到本地。 在HDFS中上传数据。 在Linux中新建文本文件data,将如下数据内容保存到data文件中。 Miranda,32 Karlie,23 Candice,27 在Linux系统HDFS客户端使用命令hadoop fs -mkdir /data(hdfs dfs命令有同样的作用),创建对应目录。 在Linux系统HDFS客户端使用命令hadoop fs -put data /data,上传数据文件。 在样例代码中配置相关参数。 将加载数据的sql语句改为“LOAD DATA INPATH 'hdfs:/data/data' INTO TABLE CHILD”。 在程序运行时添加运行参数,分别为hive-site.xml与spark-defaults.conf文件的路径。 运行程序。
  • 准备环境 执行以下命令,检查是否已安装Go。 go version 返回如下回显时,说明Go已经安装。 go version go1.16.5 linux/amd64 如果未安装Go,请下载并安装。 进入Go脚本所在的bin目录下。 执行“touch go.mod”命令新建一个“go.mod”,并增加以下代码,添加依赖。 module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-client-go/v2 v2.1.2 ) 执行如下命令增加代理。 export GOPROXY=https://goproxy.cn,direct 执行如下命令下载依赖。 go mod tidy
  • 注意事项 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。 无法确保定时消息仅投递一次,定时消息可能会重复投递。 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
  • 发送定时消息 发送定时消息的示例代码如下(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package main import ( "context" "fmt" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" "os" ) func main() { p, _ := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), producer.WithRetry(2), //producer.WithTls(true), //创建实例时,如果开启了SSL,请添加此行代码。 ) err := p.Start() if err != nil { fmt.Printf("start producer error: %s", err.Error()) os.Exit(1) } msg := primitive.NewMessage("test", []byte("Hello RocketMQ Go Client!")) msg.WithProperty("__STARTDELIVERTIME", strconv.FormatInt(time.Now().UnixMilli()+3000, 10)) res, err := p.SendSync(context.Background(), msg) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Printf("send message success: result=%s\n", res.String()) } err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) } } 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 192.168.0.1:8100:表示实例连接地址和端口。 test:表示Topic名称。
  • 注意事项 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。 无法确保定时消息仅投递一次,定时消息可能会重复投递。 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
  • 注意事项 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。 无法确保定时消息仅投递一次,定时消息可能会重复投递。 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
  • 发送定时消息 发送定时消息的示例代码如下,或者通过ProducerDelayMessageExample.java获取更多示例代码。 import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.SessionCredentialsProvider; import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; public class ProducerDelayMessageExample { private static final Logger log = LoggerFactory.getLogger(ProducerDelayMessageExample.class); private ProducerDelayMessageExample() { } public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String topic = "yourDelayTopic"; // 填入grpc连接地址/grpc公网连接地址 String endpoints = "yourEndpoints"; // 创建实例时,如果开启了ACL才需要添加以下代码。 String accessKey = System.getenv("ACL_User_Name"); String secretKey = System.getenv("ACL_Secret_Key"); //ACL_User_Name为用户名,ACL_Secret_Key为用户的密钥。创建用户的步骤,请参见创建用户。用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。 SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // .enableSsl(false) // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。 // .setCredentialProvider(sessionCredentialsProvider) // 创建实例时,如果开启了ACL,请添加此行代码。 .build(); final Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topic) .build(); byte[] body = "This is a delay message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); String tag = "yourMessageTagA"; Duration messageDelayTime = Duration.ofSeconds(10); final Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("yourMessageKey") // 设置定时消息投递时间戳 .setDeliveryTimestamp(System.currentTimeMillis() + messageDelayTime.toMillis()) .setBody(body) .build(); try { final SendReceipt sendReceipt = producer.send(message); log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (Throwable t) { log.error("Failed to send message", t); } // 不再使用后,手动关闭producer。 producer.close(); } }
  • 注意事项 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。 无法确保定时消息仅投递一次,定时消息可能会重复投递。 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
  • 发送定时消息 发送定时消息的示例代码如下: import java.nio.charset.StandardCharsets; import java.time.Instant; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; public class ScheduledMessageProducer1 { public static final String TOPIC_NAME = "ScheduledTopic"; public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException, RemotingException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); // 填入连接地址 producer.setNamesrvAddr("192.168.0.1:8100"); //producer.setUseTLS(true); //创建实例时,如果开启了SSL,请增加此行代码。 producer.start(); // 定时消息投递时间戳,该消息10秒后投递 final long deliverTimestamp = Instant.now().plusSeconds(10).toEpochMilli(); // 创建消息对象 Message msg = new Message(TOPIC_NAME, "TagA", "KEY", "scheduled message".getBytes(StandardCharsets.UTF_8)); // 设置消息定时投递的时间戳属性 msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(deliverTimestamp)); // 发送消息,该消息将会在10秒后投递 SendResult sendResult = producer.send(msg); // 打印发送结果和预计投递时间 System.out.printf("%s %s%n", sendResult, UtilAll.timeMillisToHumanString2(deliverTimestamp)); producer.shutdown(); } }
  • 发送定时消息 发送定时消息的示例代码如下(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 import time from rocketmq.client import Producer, Message topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('messageKey') msg.set_tags('messageTag') msg.set_property('property', 'test') msg.set_body('message body') return msg def send_delay_message(): producer = Producer(gid) producer.set_name_server_address(name_srv) producer.start() msg = create_message() msg.set_property('__STARTDELIVERTIME', str(int(round((time.time() + 3) * 1000)))) ret = producer.send_sync(msg) print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) producer.shutdown() if __name__ == '__main__': send_delay_message() 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 topic:表示Topic名称。 name_srv:表示实例连接地址和端口。 set_keys:设置消息索引键,可根据关键字精确查找某条消息。 set_tags:设置消息Tag,用于消费端根据指定Tag过滤消息。
  • 注意事项 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。 无法确保定时消息仅投递一次,定时消息可能会重复投递。 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
  • 当使用distcp命令时,如果某些被拷贝的文件内容较大时如何处理 当使用distcp命令时,如果某些被拷贝的文件内容较大时,建议修改执行拷贝任务的mapreduce的超时时间。可以通过在distcp命令中指定mapreduce.task.timeout选项实现。例如,修改超时时间为30分钟,则命令如下: hadoop distcp -Dmapreduce.task.timeout=1800000 hdfs://cluster1/source hdfs://cluster2/target 您也可以使用选项filters,不对这种大文件进行拷贝,命令示例如下: hadoop distcp -filters /opt/client/filterfile hdfs://cluster1/source hdfs://cluster2/target 其中filterfile是本地文件,它的内容是多条用于匹配不拷贝文件路径的正则表达式,它的内容示例如下: .*excludeFile1.* .*excludeFile2.*
  • 使用dynamic策略执行distcp命令时,报错“Too many chunks created with splitRatio” 使用dynamic策略执行distcp命令时,命令异常退出,报“Too many chunks created with splitRatio”的错误。 这个问题的原因是“distcp.dynamic.max.chunks.tolerable”的值(默认值为20000)小于“distcp.dynamic.split.ratio”的值(默认为2)乘以Map数。即一般出现在Map数超过10000的情况。可以通过-m参数降低Map数小于10000: hadoop distcp -strategy dynamic -m 9500 hdfs://cluster1/source hdfs://cluster2/target 或通过-D参数指定更大的“distcp.dynamic.max.chunks.tolerable”的值: hadoop distcp -Ddistcp.dynamic.max.chunks.tolerable=30000 -strategy dynamic hdfs://cluster1/source hdfs://cluster2/target
  • 相关文档 如果使用distcp命令拷贝空文件夹报错,请参见使用distcp命令拷贝空文件夹报错。 如果使用distcp命令在安全集群上执行失败并发生异常,请参见执行distcp命令报错如何处理。 执行distcp跨集群拷贝文件时,出现部分文件拷贝失败并报错“ Source and target differ in block-size. Use -pb to preserve block-sizes during copy. ”,请参见执行distcp跨集群拷贝文件报错“Source and target differ in block-size”。 更多使用distcp命令时常见问题如下:
  • 操作步骤 登录安装客户端的节点。 执行以下命令,切换到客户端安装目录。 cd /opt/client 执行以下命令配置环境变量。 source bigdata_env 如果集群为安全模式,执行distcp命令的用户所属的用户组必须为supergroup组,且执行以下命令进行用户认证。普通模式集群无需执行用户认证。 kinit 组件业务用户 执行distcp命令。例如: hadoop distcp hdfs://hacluster/source hdfs://hacluster/target
  • update和overwrite选项用法说明 -update:用于被拷贝的文件在目标位置中不存在,或者更新目标位置中被拷贝文件的内容。 在使用update选项的情况下,如果被拷贝文件在目标位置中已经存在,但文件内容不同,则目标位置的文件内容会被更新。 -overwrite:用于覆盖在目标位置中已经存在的文件。 在使用overwrite选项的情况下,如果被拷贝文件在目标位置中已经存在,目标位置的文件依然会被覆盖。 如果多个源位置有相同名称的文件,则distcp命令会失败。 在不使用update和overwrite选项的情况下,如果被拷贝文件在目标位置中已经存在,则该文件会跳过。 不加选项和加两个选项中任一个选项的区别,示例如下: 假设,源位置的文件结构如下: hdfs://cluster1/source/first/1 hdfs://cluster1/source/first/2 hdfs://cluster1/source/second/10 hdfs://cluster1/source/second/20 不加选项的命令: hadoop distcp hdfs://cluster1/source/first hdfs://cluster1/source/second hdfs://cluster2/target 上述命令默认会在目标位置创建文件夹first、second,所以拷贝结果如下: hdfs://cluster2/target/first/1 hdfs://cluster2/target/first/2 hdfs://cluster2/target/second/10 hdfs://cluster2/target/second/20 加两个选项中任一个选项的命令,例如加update选项: hadoop distcp -update hdfs://cluster1/source/first hdfs://cluster1/source/second hdfs://cluster2/target 上述命令只会将源位置的内容拷贝到目标位置,所以拷贝结果如下: hdfs://cluster2/target/1 hdfs://cluster2/target/2 hdfs://cluster2/target/10 hdfs://cluster2/target/20
  • 操作步骤 可对INSERT...SELECT操作做如下的调优操作。 如果建的是Hive表,将存储类型设为Parquet,从而减少执行INSERT...SELECT语句的时间。 建议使用spark-sql或者在Beeline/JD BCS erver模式下使用spark用户来执行INSERT...SELECT操作,避免执行更改文件owner的操作,从而减少执行INSERT...SELECT语句的时间。 在Beeline/JDB CS erver模式下,executor的用户跟driver是一致的,driver是JDBCServer服务的一部分,是由spark用户启动的,因此其用户也是spark用户,且当前无法实现在运行时将Beeline端的用户透传到executor,因此使用非spark用户时需要对文件进行更改owner为Beeline端的用户,即实际用户。 如果查询的数据是大量的小文件将会产生大量map操作,从而导致输出存在大量的小文件,在执行重命名文件操作时将会耗费较多时间,此时可以通过设置“spark.sql.files.maxPartitionBytes”与“spark.files.openCostInBytes”来设置一个partition读取的最大字节,在一个partition中合并多个小文件来减少输出文件数及执行重命名文件操作的时间,从而减少执行INSERT...SELECT语句的时间。 上述优化操作并不能解决全部的性能问题,对于以下场景仍然需要较多时间: 对于动态分区表,如果其分区数非常多,那么也需要执行较长的时间。
  • 使用HDFS客户端 安装客户端,详细操作请参考使用MRS客户端。 以客户端安装用户,登录安装客户端的节点。 执行以下命令,切换到客户端安装目录,例如“/opt/client”。 cd /opt/client 执行以下命令配置环境变量。 source bigdata_env 如果集群为安全模式,执行以下命令进行用户认证。普通模式集群无需执行用户认证。 kinit 组件业务用户 执行HDFS Shell命令。例如: hdfs dfs -ls /
  • 如何设置HDFS客户端运行时的日志级别? HDFS客户端运行时的日志是默认输出到管理控制台的,其级别默认为INFO。如果需要开启DEBUG级别日志,可以通过导出一个环境变量来设置,命令如下: export HADOOP_ROOT_ LOG GER=DEBUG,console 在执行完上面命令后,再执行HDFS Shell命令时,即可打印出DEBUG级别日志。 如果想恢复INFO级别日志,可执行如下命令: export HADOOP_ROOT_LOGGER=INFO,console
  • 相关文档 如果HDFS客户端写文件close失败,客户端提示数据块没有足够副本数,处理方法请参见HDFS客户端写文件close失败。 如果需要调整客户端日志级别,处理方法请参见调整HDFS Shell客户端日志级别。 如果使用HDFS客户端命令删除超长目录报错,处理方法请参见HDFS客户端无法删除超长目录。 如果集群外节点安装客户端使用hdfs命令上传文件失败,报错no route to host,处理方法请参见集群外节点安装客户端上传HDFS文件失败。 更多使用HDFS客户端常见问题如下:
  • 功能介绍 AddDoublesUDF主要用来对两个及多个浮点数进行相加,在该样例中可以掌握如何编写和使用UDF。 一个普通UDF必须继承自“org.apache.hadoop.hive.ql.exec.UDF”。 一个普通UDF必须至少实现一个evaluate()方法,evaluate函数支持重载。 开发自定义函数需要在工程中添加“hive-exec-*.jar”依赖包,可从Hive服务的安装目录下获取,例如在“${BIGDATA_HOME}/components/ FusionInsight _HD_*/Hive/disaster/plugin/lib/”目录下获取。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全