华为云用户手册

  • 样例代码 -- 从本地文件系统/opt/hive_examples_data/目录下将employee_info.txt加载进employees_info表中. LOAD DATA LOCAL INPATH '/opt/hive_examples_data/employee_info.txt' OVERWRITE INTO TABLE employees_info; -- 从HDFS上/user/hive_examples_data/employee_info.txt加载进employees_info表中. LOAD DATA INPATH '/user/hive_examples_data/employee_info.txt' OVERWRITE INTO TABLE employees_info; 加载数据的实质是将数据拷贝到HDFS上指定表的目录下。 “LOAD DATA LOCAL INPATH”命令可以完成从本地文件系统加载文件到Hive的需求,但是当指定“LOCAL”时,这里的路径指的是当前连接的“HiveServer”的本地文件系统的路径,同时由于当前的“HiveServer”是集群式部署的,客户端在连接时是随机连接所有“HiveServer”中的一个,需要注意当前连接的“HiveServer”的本地文件系统中是否存在需要加载的文件。在无法确定当前连接的是哪一个“HiveServer”的情况下建议在所有的“HiveServer”对应路径下放置相应文件,并注意文件的权限是否正确。
  • Savepoints相关问题解决方案 用户必须为job中的所有算子均分配ID吗? 严格的说,用户只给有状态的算子分配IDs即可,因为在savepoint中仅包括有状态的算子的状态,没有状态的算子并不包含在savepoint中。 在实际应用中,强烈建议用户给所有的算子均分配ID,因为有些Flink的内置算子,如window算子是有状态的。具体哪个算子是有状态的,哪个算子是无状态的,不是十分明显。如果用户十分确定某个算子是无状态的,该算子可以不调用uid()方法分配ID。 如果用户在升级作业时新添加一个有状态的算子有什么影响? 当用户在作业中新添加一个有状态的算子时,由于该算子是新添加的,无保存的旧状态,因此无状态恢复,从0开始运行。 如果用户在升级作业时从作业中删除一个有状态的算子有什么影响? 默认情况下,savepoint会尝试将所有保存的状态恢复。如果用户使用的savepoint中包含已经删除算子的状态,恢复将会失败。 用户可以通过--allowNonRestoredState(简写为-n)参数跳过恢复已经删除的算子的状态: $ bin/flink run -s savepointPath -n [runArgs] 如果用户重新编排有状态的算子的顺序有什么影响? 如果用户已经给这些算子分配IDs,那么这些状态会正常恢复。 如果用户没有给这些算子分配IDs, 这些算子将会按新的顺序自动分配新的ID,这将导致状态恢复失败。 如果用户在作业中删除或添加或更改无状态算子的顺序有什么影响? 如果用户已经给有状态的算子分配ID,那么无状态的算子并不会影响从savepoint进行状态恢复。 如果用户没有分配IDs,有状态算子的IDs由于顺序变化可能会被分配新的IDs,这将导致状态恢复失败。 如果用户在状态恢复时改变了算子的并发度会有什么影响? 如果Flink版本高于1.2.0且不使用已经废弃的状态API,如checkpointed,用户可以从savepoint中进行状态恢复。否则,无法恢复。 父主题: Flink应用开发常见问题
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseExample”类的testCreateTable方法中 public void testCreateTable() { LOG .info("Entering testCreateTable: " + tableName); // Set the column family name to info. byte [] fam = Bytes.toBytes("info"); ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam) // Set data encoding methods. HBase provides DIFF,FAST_DIFF,PREFIX // HBase 2.0 removed `PREFIX_TREE` Data Block Encoding from column families. .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF) // Set compression methods, HBase provides two default compression // methods:GZ and SNAPPY // GZ has the highest compression rate,but low compression and // decompression effeciency,fit for cold data // SNAPPY has low compression rate, but high compression and // decompression effeciency,fit for hot data. // it is advised to use SANPPY .setCompressionType(Compression.Algorithm.SNAPPY) .build(); TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(familyDescriptor).build(); Admin admin = null; try { // Instantiate an Admin object. admin = conn.getAdmin(); if (!admin.tableExists(tableName)) { LOG.info("Creating table..."); admin.createTable(htd); LOG.info(admin.getClusterMetrics()); LOG.info(admin.listNamespaceDescriptors()); LOG.info("Table created successfully."); } else { LOG.warn("table already exists"); } } catch (IOException e) { LOG.error("Create table failed.", e); } finally { if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Failed to close admin ", e); } } } LOG.info("Exiting testCreateTable."); }
  • 功能简介 HBase通过org.apache.hadoop.hbase.client.Admin对象的createTable方法来创建表,并指定表名、列族名。创建表有两种方式,建议采用预分Region建表方式: 快速建表,即创建表后整张表只有一个Region,随着数据量的增加会自动分裂成多个Region。 预分Region建表,即创建表时预先分配多个Region,此种方法建表可以提高写入大量数据初期的数据写入速度。 表的列名以及列族名不能包含特殊字符,可以由字母、数字以及下划线组成。
  • 扫描OpenTSDB的指标数据 tsdb命令可以使用“tsdb query”命令批量查询导入的指标数据,例如执行tsdb query 0 1h-ago sum sys.cpu.user host=web01命令。 Start run net.opentsdb.tools.CliQuery, args: 0 1h-ago sum sys.cpu.user host=web01 sys.cpu.user 1356998400000 41 {host=web01, cpu=0} sys.cpu.user 1356998401000 42 {host=web01, cpu=0} sys.cpu.user 1356998402000 44 {host=web01, cpu=0} sys.cpu.user 1356998403000 47 {host=web01, cpu=0} sys.cpu.user 1356998404000 42 {host=web01, cpu=0} sys.cpu.user 1356998405000 42 {host=web01, cpu=0}
  • 在Linux中调测Phoenix样例 在linux环境中调测Phoenix样例,需有与集群环境网络相通的E CS ,详情请参见准备本地应用开发环境。 修改样例。将样例代码TestMain中“enablePhoenix”值改为“true”,开启调用Phoenix样例程序接口。 /** * Phoenix Example * if you would like to operate hbase by SQL, please enable it, * and you can refrence the url ("https://support.huaweicloud.com/devg-mrs/mrs_06_0041.html"). * step: * 1.login * 2.operate hbase by phoenix. */ boolean enablePhoenix = false; if (enablePhoenix) { PhoenixExample phoenixExample; try { phoenixExample = new PhoenixExample(conf); phoenixExample.testSQL(); } catch (Exception e) { LOG.error("Failed to run Phoenix Example, because ", e); } } 执行mvn package生成jar包,在工程目录target目录下获取,比如:hbase-examples-mrs-2.0.jar,将获取的包上传到/opt/client/Hbase/hbase/lib目录下。 执行Jar包。 在Linux客户端下执行Jar包的时候,需要用安装用户切换到客户端目录: cd $BIGDATA_CLIENT_HOME/HBase/hbase “$BIGDATA_CLIENT_HOME”指的是客户端安装目录。 然后执行: source $BIGDATA_CLIENT_HOME/bigdata_env 将HBase Phoenix API接口介绍解压后获取其中的phoenix-hbase和phoenix-core包和“htrace-core-3.1.0-incubating.jar”包拷贝到“/opt/client/HBase/hbase/lib”下。 将2中生成的Jar包和从3.2.2-准备开发用户中获取的krb5.conf和user.keytab文件拷贝上传至客户端运行环境的Hbase/hbase/conf目录下,例如“/opt/client/HBase/hbase/conf”。然后在“/opt/client/HBase/hbase/conf”目录下创建hbaseclient.properties文件,文件中user.name对应新建的用户hbaseuser,userKeytabName和krb5ConfName值对应从3.2.2-准备开发用户中获取的认证相关文件名称,如下(未开启Kerberos认证集群可跳过此步): user.name=hbaseuser userKeytabName=user.keytab krb5ConfName=krb5.conf 执行jar包程序。 hbase com.huawei.bigdata.hbase.examples.TestMain /opt/client/HBase/hbase/conf 其中,com.huawei.bigdata.hbase.examples.TestMain为举例,具体以实际样例代码为准。 “/opt/client/HBase/hbase/conf”对应于上述中user.keytab、krb5.conf等文件路径。 若运行报“Message stream modified (41)”的错误,这可能与JDK的版本有关系,可以尝试修改运行样例代码的JDK为8u_242以下版本或删除“krb5.conf”配置文件的“renew_lifetime = 0m”配置项。 phoenix应用程序运行完成后,可直接通过运行结果查看应用程序运行情况。 2020-03-14 16:20:40,192 INFO [main] client.HBaseAdmin: Operation: CREATE, Table Name: default:TEST, procId: 923 completed 2020-03-14 16:20:40,806 INFO [main] examples.PhoenixExample: 1 2020-03-14 16:20:40,807 INFO [main] examples.PhoenixExample: John 2020-03-14 16:20:40,807 INFO [main] examples.PhoenixExample: 100000 2020-03-14 16:20:40,807 INFO [main] examples.PhoenixExample: 1980-01-01 2020-03-14 16:20:40,830 INFO [main] client.HBaseAdmin: Started disable of TEST 2020-03-14 16:20:41,574 INFO [main] client.HBaseAdmin: Operation: DISABLE, Table Name: default:TEST, procId: 925 completed 2020-03-14 16:20:41,831 INFO [main] client.HBaseAdmin: Operation: DELETE, Table Name: default:TEST, procId: 927 completed
  • 代码样例 下面代码片段在com.huawei.storm.example.common.RandomSentenceSpout类中,作用在于将收到的字符串拆分成单词。 /** * {@inheritDoc} */ @Override public void nextTuple() { Utils.sleep(100); String[] sentences = new String[] {"the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"}; String sentence = sentences[random.nextInt(sentences.length)]; collector.emit(new Values(sentence)); }
  • MapReduce应用开发简介 Hadoop MapReduce是一个使用简易的并行计算软件框架,基于它写出来的应用程序能够运行在由上千个服务器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。 一个MapReduce作业(application/job)通常会把输入的数据集切分为若干独立的数据块,由map任务(task)以完全并行的方式来处理。框架会对map的输出先进行排序,然后把结果输入给reduce任务,最后返回给客户端。通常作业的输入和输出都会被存储在文件系统中。整个框架负责任务的调度和监控,以及重新执行已经失败的任务。 MapReduce主要特点如下: 大规模并行计算 适用于大型数据集 高容错性和高可靠性 合理的资源调度 父主题: MapReduce应用开发概述
  • 代码样例 如下是实例化FileSystem的代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsMain类。 /** * * Add configuration file if the application run on the linux ,then need make * the path of the core-site.xml and hdfs-site.xml to in the linux client file * */ private void confLoad() throws IOException { conf = new Configuration(); // conf file conf.addResource(new Path(PATH_TO_HDFS_SITE_XML)); conf.addResource(new Path(PATH_TO_CORE_SITE_XML)); conf.set("fs.obs.access.key","*** Provide your Access Key ***"); conf.set("fs.obs.secret.key","*** Provide your Secret Key ***"); } /** * build HDFS instance */ private void instanceBuild() throws IOException { // get filesystem // fSystem = FileSystem.get(conf); fSystem = FileSystem.get(URI.create("obs://[BuketName]"),conf); }
  • HDFS应用开发流程介绍 开发流程中各阶段的说明如图1和表1所示。 图1 HDFS应用程序开发流程 表1 HDFS应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解HDFS的基本概念。 HDFS应用开发常用概念 准备开发环境 使用Eclipse工具,请根据指导完成开发环境配置。 准备Eclipse与JDK 准备运行环境 HDFS的运行环境即HDFS客户端,请根据指导完成客户端的安装和配置。 准备HDFS应用运行环境 下载并导入样例工程 HDFS提供了不同场景下的样例程序,可以导入样例工程进行程序学习。 导入并配置HDFS样例工程 根据场景开发工程 提供样例工程,帮助用户快速了解HDFS各部件的编程接口。 HDFS样例程序开发思路 编译并运行程序 指导用户将开发好的程序编译并提交运行。 Linux:在Linux环境中调测HDFS应用 查看程序运行结果 程序运行结果会写在用户指定的路径下。用户还可以通过UI查看应用运行情况。 Linux:查看HDFS应用调测结果 父主题: HDFS应用开发概述
  • Alluxio开发环境简介 在进行应用开发时,要准备的本地开发环境如表1所示。同时需要准备运行调测的Linux环境,用于验证应用程序运行是否正常。 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,推荐Windows7以上版本。 运行环境:Linux系统。 安装JDK和Maven 开发环境的基本配置:Java JDK 8或以上、Maven 3.3.9或以上 安装和配置Eclipse或IntelliJ IDEA 用于开发Alluxio应用程序的工具。 网络 确保客户端与Alluxio服务主机在网络上互通。 父主题: 准备Alluxio应用开发环境
  • 准备Eclipse与JDK 选择Windows开发环境下,安装Eclipse,安装JDK。 开发环境安装Eclipse程序,安装要求Eclipse使用4.2或以上版本。 开发环境安装JDK程序,安装要求JDK使用1.8版本。 若使用IBM JDK,请确保Eclipse中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保Eclipse中的JDK配置为Oracle JDK。 不同的Eclipse不要使用相同的workspace和相同路径下的示例工程。 父主题: 准备MapReduce应用开发环境
  • 样例代码 -- 从本地文件系统/opt/impala_examples_data/目录下将employee_info.txt加载进employees_info表中. LOAD DATA LOCAL INPATH '/opt/impala_examples_data/employee_info.txt' OVERWRITE INTO TABLE employees_info; -- 从HDFS上/user/impala_examples_data/employee_info.txt加载进employees_info表中. LOAD DATA INPATH '/user/impala_examples_data/employee_info.txt' OVERWRITE INTO TABLE employees_info; 加载数据的实质是将数据拷贝到HDFS上指定表的目录下。 “LOAD DATA LOCAL INPATH”命令可以完成从本地文件系统加载文件到Impala的需求,但是当指定“LOCAL”时,这里的路径指的是当前连接的“Impalad”的本地文件系统的路径。
  • Kafka应用开发环境简介 Kafka开发应用时,需要准备的开发环境如下表所示: 表1 开发环境 准备项 说明 操作系统 Windows系统,推荐Windows 7以上版本。 安装JDK和Maven 开发环境的基本配置。JDK版本要求:1.7或者1.8。Maven版本要求:3.3.0及以上 安装和配置Eclipse或IntelliJ IDEA 用于开发Kafka应用程序的工具。 网络 确保本地与Kafka服务所在的VPC的至少一个节点在网络上互通。 访问云服务器的安全认证 本地可以通过密钥或密码方式登录访问Linux弹性云服务器 父主题: 准备Kafka应用开发环境
  • 回答 导致这个问题的主要原因是,yarn-client和yarn-cluster模式在提交任务时setAppName的执行顺序不同导致,yarn-client中setAppName是在向yarn注册Application之前读取,yarn-cluser模式则是在向yarn注册Application之后读取,这就导致yarn-cluster模式设置的应用名不生效。 解决措施: 在spark-submit脚本提交任务时用--name设置应用名和sparkconf.setAppName(appname)里面的应用名一样。 例如代码里设置的应用名为Spark Pi,用yarn-cluster模式提交应用时可以这样设置,在--name后面添加应用名,执行的命令如下: ./spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --name SparkPi lib/spark-examples*.jar 10
  • 操作步骤 打开IDEA工具,选择“Create New Project”。 图1 创建工程 在“New Project”页面,选择“Scala”开发环境,并选择“Scala Module”,然后单击“Next”。 如果您需要新建Java语言的工程,选择对应参数即可。 图2 选择开发环境 在工程信息页面,填写工程名称和存放路径,设置JDK版本和Scala SDK,然后单击“Finish”完成工程创建。 图3 填写工程信息
  • HBase BulkLoad和Put应用场景说明 HBase支持使用bulkload和put方式加载数据,在大部分场景下bulkload提供了更快的数据加载速度,但bulkload并不是没有缺点的,在使用时需要关注bulkload和put适合在哪些场景使用。 bulkload是通过启动MapReduce任务直接生成HFile文件,再将HFile文件注册到HBase,因此错误的使用bulkload会因为启动MapReduce任务而占用更多的集群内存和CPU资源,也可能会生成大量很小的HFile文件频繁的触发Compaction,导致查询速度急剧下降。 错误的使用put,会造成数据加载慢,当分配给RegionServer内存不足时会造成RegionServer内存溢出从而导致进程退出。 下面给出bulkload和put适合的场景: bulkload适合的场景: 大量数据一次性加载到HBase。 对数据加载到HBase可靠性要求不高,不需要生成WAL文件。 使用put加载大量数据到HBase速度变慢,且查询速度变慢时。 加载到HBase新生成的单个HFile文件大小接近HDFS block大小。 put适合的场景: 每次加载到单个Region的数据大小小于HDFS block大小的一半。 数据需要实时加载。 加载数据过程不会造成用户查询速度急剧下降。 父主题: HBase应用开发常见问题
  • 配置进程参数 Flink on YARN模式下,有JobManager和TaskManager两种进程。在任务调度和运行的过程中,JobManager和TaskManager承担了很大的责任。 因而JobManager和TaskManager的参数配置对Flink应用的执行有着很大的影响意义。用户可通过如下操作对Flink集群性能做优化。 配置JobManager内存。 JobManager负责任务的调度,以及TaskManager、RM之间的消息通信。当任务数变多,任务平行度增大时,JobManager内存都需要相应增大。 您可以根据实际任务数量的多少,为JobManager设置一个合适的内存。 在使用yarn-session命令时,添加“-jm MEM”参数设置内存。 在使用yarn-cluster命令时,添加“-yjm MEM”参数设置内存。 配置TaskManager个数。 每个TaskManager每个核同时能跑一个task,所以增加了TaskManager的个数相当于增大了任务的并发度。在资源充足的情况下,可以相应增加TaskManager的个数,以提高运行效率。 在使用yarn-session命令时,添加“-n NUM”参数设置TaskManager个数。 在使用yarn-cluster命令时,添加“-yn NUM”参数设置TaskManager个数。 配置TaskManager Slot数。 每个TaskManager多个核同时能跑多个task,相当于增大了任务的并发度。但是由于所有核共用TaskManager的内存,所以要在内存和核数之间做好平衡。 在使用yarn-session命令时,添加“-s NUM”参数设置SLOT数。 在使用yarn-cluster命令时,添加“-ys NUM”参数设置SLOT数。 配置TaskManager内存。 TaskManager的内存主要用于任务执行、通信等。当一个任务很大的时候,可能需要较多资源,因而内存也可以做相应的增加。 将在使用yarn-session命令时,添加“-tm MEM”参数设置内存。 将在使用yarn-cluster命令时,添加“-ytm MEM”参数设置内存。
  • 配置netty网络通信 Flink通信主要依赖netty网络,所以在Flink应用执行过程中,netty的设置尤为重要,网络通信的好坏直接决定着数据交换的速度以及任务执行的效率。 以下配置均可在客户端的“conf/flink-conf.yaml”配置文件中进行修改适配,默认已经是相对较优解,请谨慎修改,防止性能下降。 “taskmanager.network.netty.num-arenas”: 默认是“taskmanager.numberOfTaskSlots”,表示netty的域的数量。 “taskmanager.network.netty.server.numThreads”和“taskmanager.network.netty.client.numThreads”:默认是“taskmanager.numberOfTaskSlots”,表示netty的客户端和服务端的线程数目设置。 “taskmanager.network.netty.client.connectTimeoutSec”:默认是120s,表示taskmanager的客户端连接超时的时间。 “taskmanager.network.netty.sendReceiveBufferSize”:默认是系统缓冲区大小(cat /proc/sys/net/ipv4/tcp_[rw]mem) ,一般为4MB,表示netty的发送和接收的缓冲区大小。 “taskmanager.network.netty.transport”:默认为“nio”方式,表示netty的传输方式,有“nio”和“epoll”两种方式。
  • 经验总结 数据倾斜 当数据发生倾斜(某一部分数据量特别大),虽然没有GC(Gabage Collection,垃圾回收),但是task执行时间严重不一致。 需要重新设计key,以更小粒度的key使得task大小合理化。 修改并行度。 调用rebalance操作,使数据分区均匀。 缓冲区超时设置 由于task在执行过程中存在数据通过网络进行交换,数据在不同服务器之间传递的缓冲区超时时间可以通过setBufferTimeout进行设置。 当设置“setBufferTimeout(-1)”,会等待缓冲区满之后才会刷新,使其达到最大吞吐量;当设置“setBufferTimeout(0)”时,可以最小化延迟,数据一旦接收到就会刷新;当设置“setBufferTimeout”大于0时,缓冲区会在该时间之后超时,然后进行缓冲区的刷新。 示例可以参考如下: env.setBufferTimeout(timeoutMillis); env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
  • 回答 Flink引入了第三方软件包RocksDB的缺陷问题导致该现象的发生。建议用户将checkpoint设置为FsStateBackend方式。 用户需要在应用代码中将checkpoint设置为FsStateBackend。例如: env.setStateBackend(new FsStateBackend("hdfs://hacluster/flink-checkpoint/checkpoint/"));
  • 环境准备 安装支持环境。(开发环境请参考Spark应用开发环境简介准备) 执行以下命令安装编译工具: yum install cyrus-sasl-devel -y yum install gcc-c++ -y 安装相应的python模块。 需要安装sasl,thrift,thrift-sasl,PyHive。 pip install sasl pip install thrift pip install thrift-sasl pip install PyHive 安装python连接zookeeper工具。 pip install kazoo 从 MRS 集群上获取相应参数。 zookeeper的IP和PORT: 可以查看配置文件/opt/client/Spark/spark/conf/hive-site.xml中的配置项spark.deploy.zookeeper.url zookeeper 上存放JD BCS erver主节点的IP和PORT: 可以查看配置文件/opt/client/Spark/spark/conf/hive-site.xml中的配置项spark.thriftserver.zookeeper.dir(默认是/thriftserver),在此znode子节点(active_thriftserver)上存放了JDBCServer主节点的IP和PORT
  • 样例代码 from kazoo.client import KazooClient zk = KazooClient(hosts='ZookeeperHost') zk.start() result=zk.get("/thriftserver/active_thriftserver") result=result[0].decode('utf-8') JDBCServerHost=result[0].split(":")[0] JDBCServerPort=result[0].split(":")[1] from pyhive import hive conn = hive.Connection(host=JDBCServerHost, port=JDBCServerPort,database='default') cursor=conn.cursor() cursor.execute("select * from test") for result in cursor.fetchall(): print result 其中,ZookeeperHost使用4获取到的zookeeper IP和PORT替换。
  • Impala应用开发流程介绍 开发流程中各阶段的说明如图1和表1所示。 图1 Impala应用程序开发流程 表1 Impala应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解Impala的基本概念。 Impala应用开发常用概念 准备开发和运行环境 Impala的应用程序支持使用Java、Python两种语言进行开发。推荐使用Eclipse工具,请根据指导完成不同语言的开发环境配置。 Impala应用开发环境简介 根据场景开发工程 提供了Java、Python两种不同语言的样例工程,还提供了从建表、数据加载到数据查询的样例工程。 Impala样例程序开发思路 运行程序及查看结果 指导用户将开发好的程序编译提交运行并查看结果。 在Linux中调测Impala JDBC应用 父主题: Impala应用开发概述
  • MRS组件应用开发流程说明 通常MRS组件应用开发流程如下所示,各组件应用的开发编译操作可参考组件开发指南对应章节。 图1 MRS组件应用开发流程 表1 MRS组件应用开发流程说明 阶段 说明 准备开发环境 在进行应用开发前,需首先准备开发环境,推荐使用IntelliJ IDEA工具,同时本地需完成JDK、Maven等初始配置。 准备连接集群配置文件 应用程序开发或运行过程中,需通过集群相关配置文件信息连接MRS集群,配置文件通常包括用于安全认证的用户文件,可从已创建好的MRS集群中获取相关内容。 用于程序调测或运行的节点,需要与MRS集群内节点网络互通。 配置并导入样例工程 MRS提供了不同组件场景下的多种样例程序,用户可获取样例工程并导入本地开发环境中进行程序学习。 配置安全认证 连接开启了Kerberos认证的MRS集群时,应用程序中需配置具有相关资源访问权限的用户进行安全认证。 根据业务场景开发程序 根据实际业务场景开发程序,调用组件接口实现对应功能。 编译并运行程序 将开发好的程序编译运行,用户可在本地Windows开发环境中进行程序调测运行,也可以将程序编译为Jar包后,提交到Linux节点上运行。
  • Hive应用开发常用概念 客户端 客户端直接面向用户,可通过Java API、Thrift API访问服务端进行Hive的相关操作。本文中的Hive客户端特指Hive client的安装目录,里面包含通过Java API访问Hive的样例代码。 HiveQL语言 Hive Query Language,类SQL语句。 HCatalog HCatalog是建立在Hive元数据之上的一个表信息管理层,吸收了Hive的DDL命令。为MapReduce提供读写接口,提供Hive命令行接口来进行数据定义和元数据查询。基于Hive的HCatalog功能,Hive、MapReduce开发人员能够共享元数据信息,避免中间转换和调整,能够提升数据处理的效率。 WebHCat WebHCat运行用户通过Rest API来执行Hive DDL,提交MapReduce任务,查询MapReduce任务执行结果等操作。 父主题: Hive应用开发概述
  • HTTP REST API Master REST API:https://docs.alluxio.io/os/restdoc/2.0/master/index.html Worker REST API:https://docs.alluxio.io/os/restdoc/2.0/worker/index.html Proxy REST API:https://docs.alluxio.io/os/restdoc/2.0/proxy/index.html Job REST API:https://docs.alluxio.io/os/restdoc/2.0/job/index.html
  • 应用开发操作步骤 确认MRS产品Storm和Kafka组件已经安装,并正常运行。 已搭建Storm示例代码工程,将storm-examples导入到Eclipse开发环境,参见导入并配置Storm样例工程。 用WinScp工具将Storm客户端安装包导入Linux环境并安装客户端,参见准备Linux客户端环境。 如果集群启用了安全服务,需要从管理员处获取一个“人机”用户,用于认证,并且获取到该用户的keytab文件。将获取到的文件拷贝到示例工程的 src/main/resources目录。 获取的用户需要同时属于storm组和kafka组。 下载并安装Kafka客户端程序,参见《Kafka应用开发》。
  • 操作场景 本文档主要说明如何使用Storm-Kafka工具包,完成Storm和Kafka之间的交互。包含KafkaSpout和KafkaBolt两部分。KafkaSpout主要完成Storm从Kafka中读取数据的功能;KafkaBolt主要完成Storm向Kafka中写入数据的功能。 本章节代码样例基于Kafka新API,对应Eclipse工程中com.huawei.storm.example.kafka.NewKafkaTopology.java。 本章节只适用于MRS产品Storm与Kafka组件间的访问。本章中描述的jar包的具体版本信息请以实际情况为准。
  • 部署运行及结果查看 获取相关配置文件,获取方式如下。 安全模式:参见4获取keytab文件。 普通模式:无。 在Storm示例代码根目录执行如下命令打包:"mvn package"。执行成功后,将会在target目录生成storm-examples-1.0.jar。 使用Kafka客户端创建拓扑中所用到的Topic,执行命令。 ./kafka-topics.sh --create --topic input --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka ./kafka-topics.sh --create --topic output --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka “--zookeeper”后面填写的是ZooKeeper地址,需要改为安装集群时配置的ZooKeeper地址。 安全模式下,需要kafka管理员用户创建Topic。 在Linux系统中完成拓扑的提交。提交命令示例(拓扑名为kafka-test)。 storm jar /opt/jartarget/storm-examples-1.0.jar com.huawei.storm.example.kafka.NewKafkaTopology kafka-test 安全模式下,在提交storm-examples-1.0.jar之前,请确保已经进行kerberos安全登录,并且keytab方式下,登录用户和所上传keytab所属用户必须是同一个用户。 安全模式下,kafka需要用户有相应Topic的访问权限,因此首先需要给用户赋权,再提交拓扑。 拓扑提交成功后,可以向Kafka中发送数据,观察是否有相关信息生成。 在Linux系统中进入Kafka客户端所在目录,在Kafka/kafka/bin目录下启动consumer观察数据是否生成。执行命令: ./kafka-console-consumer.sh --bootstrap-server {ip:port} --topic output --new-consumer --consumer.config ../../../Kafka/kafka/config/consumer.properties 同时在Linux系统中进入Kafka客户端所在目录,在Kafka/kafka/bin目录下启动producer,向Kafka中写入数据。执行命令: ./kafka-console-producer.sh --broker-list {ip:port} --topic input --producer.config ../../../Kafka/kafka/config/producer.properties 向input中写入测试数据,可以观察到output中有对应的数据产生,则说明Storm-Kafka拓扑运行成功。
共100000条