华为云用户手册

  • 运行任务 进入Spark客户端目录,调用bin/spark-submit脚本运行代码,运行命令分别如下(类名与文件名等请与实际代码保持一致,此处仅为示例): 运行Java或Scala样例代码 bin/spark-submit --class com.huawei.bigdata.spark.examples.SparkHivetoHbase --master yarn --deploy-mode client /opt/female/SparkHivetoHbase-1.0.jar 运行Python样例程序 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现。将所提供 Java代码使用maven打包成jar,并放在相同目录下,运行python程序时要使用--jars把jar包加载到classpath中。 由于Python样例代码中未给出认证信息,请在执行应用程序时通过配置项“--keytab”和“--principal”指定认证信息。 bin/spark-submit --master yarn --deploy-mode client --keytab /opt/FIclient/user.keytab --principal sparkuser --jars /opt/female/SparkHivetoHbasePythonExample/SparkHivetoHbase-1.0.jar /opt/female/SparkHivetoHbasePythonExample/SparkHivetoHbasePythonExample.py
  • 场景说明 假定Hive的person表存储用户当天消费的金额信息,HBase的table2表存储用户历史消费的金额信息。 现person表有记录name=1,account=100,表示用户1在当天消费金额为100元。 table2表有记录key=1,cf:cid=1000,表示用户1的历史消息记录金额为1000元。 基于某些业务要求,要求开发Spark应用程序实现如下功能: 根据用户名累计用户的历史消费金额,即用户总消费金额=100(用户当天的消费金额) + 1000(用户历史消费金额)。 上例所示,运行结果table2表用户key=1的总消费金融为cf:cid=1100元。
  • 打包项目 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/female/user.keytab”,“/opt/female/krb5.conf”。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt/female/” )下。
  • 在Linux调测程序 编译并生成Jar包,并将Jar包复制到与依赖库文件夹同级的目录“src/main/resources”下,具体步骤请参考在Linux调测程序。 运行Consumer样例工程的命令如下。 java -cp /opt/client/lib/*:/opt/client/src/main/resources com.huawei.bigdata.kafka.example.Consumer
  • 建立ClickHouse连接 以下代码片段在“ClickhouseJDBCHaDemo”类的initConnection方法中。在创建连接时传入表1中配置的user和password作为认证凭据,ClickHouse会带着用户名和密码在服务端进行安全认证。 MRS 3.3.0之前版本: clickHouseProperties.setPassword(userPass); clickHouseProperties.setUser(userName); BalancedClickhouseDataSource balancedClickhouseDataSource = new BalancedClickhouseDataSource(JDBC_PREFIX + UriList, clickHouseProperties); MRS 3.3.0及之后版本: clickHouseProperties.setProperty(ClickHouseDefaults.USER.getKey(), userName); clickHouseProperties.setProperty(ClickHouseDefaults.PASSWORD.getKey(), userPass); try { clickHouseProperties.setProperty(ClickHouseClientOption.FAILOVER.getKey(), "21"); clickHouseProperties.setProperty(ClickHouseClientOption.LOAD_BALANCING_POLICY.getKey(), "roundRobin"); balancedClickhouseDataSource = new ClickHouseDataSource(JDBC_PREFIX + UriList, clickHouseProperties); } catch (Exception e) { LOG .error("Failed to create balancedClickHouseProperties."); throw e; } 父主题: 开发ClickHouse应用
  • ClickHouse应用开发流程介绍 开发流程中各阶段的说明如图1所示。 图1 ClickHouse应用程序开发流程 表1 ClickHouse应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解ClickHouse的基本概念。 基本概念 准备开发和运行环境 ClickHouse的应用程序支持多种语言开发,主要为Java语言,推荐使用IntelliJ IDEA工具,请根据指导完成开发环境配置。 准备ClickHouse应用开发和运行环境 根据场景开发工程 提供样例工程,帮助用户快速了解ClickHouse各部件的编程接口。 导入并配置ClickHouse样例工程 运行程序及查询结果 用户可以直接通过运行结果查看应用程序运行情况。 在本地Windows环境中调测ClickHouse应用(MRS 3.3.0之前版本) 在Linux环境中调测ClickHouse应用(MRS 3.3.0之前版本) 父主题: ClickHouse开发指南(普通模式)
  • 回答 首先查看ZooKeeper中/flink_base的目录权限是否为:'world,'anyone: cdrwa;如果不是,请修改/flink_base的目录权限为:'world,'anyone: cdrwa,然后继续根据步骤二排查;如果是,请根据步骤二排查。 由于在Flink配置文件中“high-availability.zookeeper.client.acl”默认为“creator”,即谁创建谁有权限,由于原有用户已经使用ZooKeeper上的/flink_base/flink目录,导致新创建的用户访问不了ZooKeeper上的/flink_base/flink目录。 新用户可以通过以下操作来解决问题。 查看客户端的配置文件“conf/flink-conf.yaml”。 修改配置项“high-availability.zookeeper.path.root”对应的ZooKeeper目录,例如:/flink2。 重新提交任务。
  • 功能简介 HBase通过org.apache.hadoop.hbase.client.Admin对象的createTable方法来创建表,并指定表名、列族名。创建表有两种方式(强烈建议采用预分Region建表方式): 快速建表,即创建表后整张表只有一个Region,随着数据量的增加会自动分裂成多个Region。 预分Region建表,即创建表时预先分配多个Region,此种方法建表可以提高写入大量数据初期的数据写入速度。 表的列名以及列族名不能包含特殊字符,可以由字母、数字以及下划线组成。
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的testCreateTable方法中。 public void testCreateTable() { LOG.info("Entering testCreateTable."); // Specify the table descriptor. TableDescriptorBuilder htd = TableDescriptorBuilder.newBuilder(tableName);(1) // Set the column family name to info. ColumnFamilyDescriptorBuilder hcd = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info"));(2) // Set data encoding methods, HBase provides DIFF,FAST_DIFF,PREFIX hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); // Set compression methods, HBase provides two default compression // methods:GZ and SNAPPY // GZ has the highest compression rate,but low compression and // decompression effeciency,fit for cold data // SNAPPY has low compression rate, but high compression and // decompression effeciency,fit for hot data. // it is advised to use SNAANPPY hcd.setCompressionType(Compression.Algorithm.SNAPPY);//注[1] htd.setColumnFamily(hcd.build()); (3) Admin admin = null; try { // Instantiate an Admin object. admin = conn.getAdmin(); (4) if (!admin.tableExists(tableName)) { LOG.info("Creating table..."); admin.createTable(htd.build());//注[2] (5) LOG.info(admin.getClusterMetrics().toString()); LOG.info(admin.listNamespaceDescriptors().toString()); LOG.info("Table created successfully."); } else { LOG.warn("table already exists"); } } catch (IOException e) { LOG.error("Create table failed " ,e); } finally { if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Failed to close admin " ,e); } } } LOG.info("Exiting testCreateTable."); }
  • 注意事项 注[1] 可以设置列族的压缩方式,代码片段如下: //设置编码算法,HBase提供了DIFF,FAST_DIFF,PREFIX三种编码算法 hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); //设置文件压缩方式,HBase默认提供了GZ和SNAPPY两种压缩算法 //其中GZ的压缩率高,但压缩和解压性能低,适用于冷数据 //SNAPPY压缩率低,但压缩解压性能高,适用于热数据 //建议默认开启SNAPPY压缩 hcd.setCompressionType(Compression.Algorithm.SNAPPY); 注[2] 可以通过指定起始和结束RowKey,或者通过RowKey数组预分Region两种方式建表,代码片段如下: // 创建一个预划分region的表 byte[][] splits = new byte[4][]; splits[0] = Bytes.toBytes("A"); splits[1] = Bytes.toBytes("H"); splits[2] = Bytes.toBytes("O"); splits[3] = Bytes.toBytes("U"); admin.createTable(htd, splits);
  • 场景说明 假定Hive的person表存储用户当天消费的金额信息,HBase的table2表存储用户历史消费的金额信息。 现person表有记录name=1,account=100,表示用户1在当天消费金额为100元。 table2表有记录key=1,cf:cid=1000,表示用户1的历史消息记录金额为1000元。 基于某些业务要求,要求开发Spark应用程序实现如下功能: 根据用户名累计用户的历史消费金额,即用户总消费金额=100(用户当天的消费金额) + 1000(用户历史消费金额)。 上例所示,运行结果table2表用户key=1的总消费金融为cf:cid=1100元。
  • 运行任务 进入Spark客户端目录,调用bin/spark-submit脚本运行代码,运行命令分别如下(类名与文件名等请与实际代码保持一致,此处仅为示例): 运行Java或Scala样例代码 bin/spark-submit --class com.huawei.bigdata.spark.examples.SparkHivetoHbase --master yarn --deploy-mode client /opt/female/SparkHivetoHbase-1.0.jar 运行Python样例程序 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现。将所提供 Java代码使用maven打包成jar,并放在相同目录下,运行python程序时要使用--jars把jar包加载到classpath中。 bin/spark-submit --master yarn --deploy-mode client --jars /opt/female/SparkHivetoHbasePythonExample/SparkHivetoHbase-1.0.jar /opt/female/SparkHivetoHbasePythonExample/SparkHivetoHbasePythonExample.py
  • 数据规划 在开始开发应用前,需要创建Hive表,命名为person,并插入数据。同时,创建HBase table2表,用于将分析后的数据写入。 将原日志文件放置到HDFS系统中。 在本地新建一个空白的log1.txt文件,并在文件内写入如下内容: 1,100 在HDFS中新建一个目录/tmp/input,并将log1.txt文件上传至此目录。 在Linux系统HDFS客户端使用命令hadoop fs -mkdir /tmp/input(hdfs dfs命令有同样的作用),创建对应目录。 在Linux系统HDFS客户端使用命令hadoop fs -put log1.txt /tmp/input,上传数据文件。 将导入的数据放置在Hive表里。 首先,确保JD BCS erver已启动。然后使用Beeline工具,创建Hive表,并插入数据。 执行如下命令,创建命名为person的Hive表。 create table person ( name STRING, account INT ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY '\\' STORED AS TEXTFILE; 执行如下命令插入数据。 load data inpath '/tmp/input/log1.txt' into table person; 创建HBase表。 确保JDB CS erver已启动,然后使用Spark-beeline工具,创建HBase表,并插入数据。 执行如下命令,创建命名为table2的HBase表。 create table table2 ( key string, cid string ) using org.apache.spark.sql.hbase.HBaseSource options( hbaseTableName "table2", keyCols "key", colsMapping "cid=cf.cid"); 通过HBase插入数据,执行如下命令。 put 'table2', '1', 'cf:cid', '1000'
  • 操作场景 为了运行 FusionInsight MRS产品Flink组件的SpringBoot接口样例代码,需要完成下面的操作。当前支持 GaussDB (DWS)样例工程。 该章节以在Linux环境下开发GaussDB(DWS) SpringBoot方式连接Flink服务的应用程序为例。 执行GaussDB(DWS)样例需提前登录GaussDB(DWS)所在节点创建用于接受数据的空表“test_lzh1”,创建命令如下: create table test_lzh1 (id integer not null);
  • 样例代码 -- 查看薪水支付币种为美元的雇员联系方式. SELECT a.name, b.tel_phone, b.email FROM employees_info a JOIN employees_contact b ON(a.id = b.id) WHERE usd_flag='D'; -- 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中. INSERT OVERWRITE TABLE employees_info_extended PARTITION (entrytime = '2014') SELECT a.id, a.name, a.usd_flag, a.salary, a.deductions, a.address, b.tel_phone, b.email FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE a.entrytime = '2014'; -- 使用Hive中已有的函数COUNT(),统计表employees_info中有多少条记录. SELECT COUNT(*) FROM employees_info; -- 查询使用以“cn”结尾的邮箱的员工信息. SELECT a.name, b.tel_phone FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE b.email like '%cn';
  • 扩展使用 配置Hive中间过程的 数据加密 指定表的格式为RCFile(推荐使用)或SequenceFile,加密算法为ARC4Codec。SequenceFile是Hadoop特有的文件格式,RCFile是Hive优化的文件格式。RCFile优化了列存储,在对大表进行查询时,综合性能表现比SequenceFile更优。 set hive.exec.compress.output=true; set hive.exec.compress.intermediate=true; set hive.intermediate.compression.codec=org.apache.hadoop.io.encryption.arc4.ARC4Codec; 自定义函数,具体内容请参见创建Hive用户自定义函数。
  • 问题 Structured Streaming的cluster模式,在数据处理过程中终止ApplicationManager,执行应用时显示如下异常。 2017-05-09 20:46:02,393 | INFO | main | client token: Token { kind: YARN_CLIENT_TOKEN, service: } diagnostics: User class threw exception: org.apache.spark.sql.AnalysisException: This query does not support recovering from checkpoint location. Delete hdfs://hacluster/structuredtest/checkpoint/offsets to start over.; ApplicationMaster host: 10.96.101.170 ApplicationMaster RPC port: 0 queue: default start time: 1494333891969 final status: FAILED tracking URL: https://9-96-101-191:8090/proxy/application_1493689105146_0052/ user: spark2x | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) Exception in thread "main" org.apache.spark.SparkException: Application application_1493689105146_0052 finished with failed status
  • 回答 原因分析:显示该异常是因为“recoverFromCheckpointLocation”的值判定为false,但却配置了checkpoint目录。 参数“recoverFromCheckpointLocation”的值为代码中“outputMode == OutputMode.Complete()”语句的判断结果(outputMode的默认输出方式为“append”)。 处理方法:编写应用时,用户可以根据具体情况修改数据的输出方式。 将输出方式修改为“complete”,“recoverFromCheckpointLocation”的值会判定为true。此时配置了checkpoint目录时就不会显示异常。
  • 代码样例 以租户用户为“test92”,租户ID为“92”,获取具有FlinkServer管理员权限的用户名为“flinkserveradmin”的代理访问API为例,以下代码为完整示例。 public class TestCreateTenants { public static void main(String[] args) { ParameterTool paraTool = ParameterTool.fromArgs(args); final String hostName = paraTool.get("hostName"); // 修改hosts文件,使用主机名 final String keytab = paraTool.get("keytab"); // user.keytab路径 final String krb5 = paraTool.get("krb5"); // krb5.conf路径 final String principal = paraTool.get("principal"); // 认证用户 System.setProperty("java.security.krb5.conf", krb5); String url = "https://"+hostName+":28943/flink/v1/tenants"; String jsonstr = "{" + "\n\t \"tenantId\":\"92\"," + "\n\t \"tenantName\":\"test92\"," + "\n\t \"remark\":\"test tenant remark1\"," + "\n\t \"updateUser\":\"test_updateUser1\"," + "\n\t \"createUser\":\"test_createUser1\"" + "\n}"; try { LoginClient.getInstance().setConfigure(url, principal, keytab, ""); LoginClient.getInstance().login(); // 先使用flinkserver管理员用户登录 String proxyUrl = "https://"+hostName+":28943/flink/v1/proxyUserLogin"; // 调用代理用户接口,获取普通用户token String result = HttpClientUtil.doPost(proxyUrl, "{\n" + "\t\"realUser\": \"flinkserveradmin\"\n" + "}", "utf-8", true); Gson gson = new Gson(); JsonObject jsonObject = gson.fromJson(result, JsonObject.class); String token = jsonObject.get("result").toString(); token = "hadoop_auth=" + token; System.out.println(HttpClientUtil.doPost(url, jsonstr, "utf-8", true , token)); } catch (Exception e) { System.out.println(e); } } }
  • 配置ClickHouse连接属性 在ClickhouseJDBCHaDemo、Demo、NativeJDBCHaDemo和Util文件创建connection的样例中设置连接属性,如下样例代码设置socket超时时间为60s。 ClickHouseProperties clickHouseProperties = new ClickHouseProperties(); clickHouseProperties.setSocketTimeout(60000); 如果导入并配置ClickHouse样例工程中的“clickhouse-example.properties”配置文件中“sslUsed”参数配置为“true”时,则需要在ClickhouseJDBCHaDemo、Demo、NativeJDBCHaDemo和Util文件创建connection的样例中设置如下连接属性: clickHouseProperties.setSsl(true); clickHouseProperties.setSslMode("none"); 父主题: 开发ClickHouse应用
  • 使用Python提交Flink普通作业 获取样例工程“flink-examples/pyflink-example/pyflink-kafka”中的“pyflink-kafka.py”和“insertData2kafka.sql”。 参考准备本地应用开发环境将准备好的Python虚拟环境打包,获取“venv.zip”文件。 zip -q -r venv.zip venv/ 以root用户登录主管理节点,将1和2获取的“venv.zip”、“pyflink-kafka.py”和“insertData2kafka.sql”文件上传至客户端环境。 per-job模式:将上述文件上传到“客户端安装目录/Flink/flink”。 yarn-application模式:将上述文件和“flink-connector-kafka-实际版本号.jar”包上传到“客户端安装目录/Flink/flink/yarnship”。 修改“pyflink-kafka.py”中的“specific_jars”路径。 per-job模式:修改为SQL文件的实际路径。如:file:///客户端安装目录/Flink/flink/lib/flink-connector-kafka-实际版本号.jar yarn-application模式:修改为:file://"+os.getcwd()+"/../../../../yarnship/flink-connector-kafka-实际版本号.jar 修改“pyflink-kafka.py”中的“file_path”路径。 per-job模式:修改为sql文件的实际路径。如:客户端安装目录/Flink/flink/insertData2kafka.sql yarn-application模式:修改为os.getcwd() + "/../../../../yarnship/insertData2kafka.sql" 执行以下命令指定运行环境。 export PYFLINK_CLIENT_EXECUTABLE=venv.zip/venv/bin/python3 执行以下命令运行程序。 per-job模式: ./bin/flink run --detached -t yarn-per-job -Dyarn.application.name=py_kafka -pyarch venv.zip -pyexec venv.zip/venv/bin/python3 -py pyflink-kafka.py 运行结果: yarn-application模式 ./bin/flink run-application --detached -t yarn-application -Dyarn.application.name=py_kafka -Dyarn.ship-files=/opt/client/Flink/flink/yarnship/ -pyarch yarnship/venv.zip -pyexec venv.zip/venv/bin/python3 -pyclientexec venv.zip/venv/bin/python3 -pyfs yarnship -pym pyflink-kafka 运行结果: 父主题: PyFlink样例程序
  • 开发思路 写HBase: 通过参数指定“hbase-site.xml”文件的父目录,Flink Sink可以获取到HBase的Connection。 通过Connection判断表是否存在,如果不存在则创建表。 将接收到的数据转化成Put对象,写到HBase。 读HBase: 通过参数指定“hbase-site.xml”文件的父目录,Flink Source可以获取到HBase的Connection。 通过Connection判断表是否存在,如果不存在则作业失败,需要通过HBase Shell创建表或上游作业创建表。 读取HBase中的数据,将Result数据转化成Row对象发送给下游算子。
  • 代码样例 如下是删除文件的代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsExample类。 /** * 删除文件 * * @throws java.io.IOException */ private void delete() throws IOException { Path beDeletedPath = new Path(DEST_PATH + File.separator + FILE_NAME); if (fSystem.delete(beDeletedPath, true)) { LOG.info("success to delete the file " + DEST_PATH + File.separator + FILE_NAME); } else { LOG.warn("failed to delete the file " + DEST_PATH + File.separator + FILE_NAME); } }
  • 数据规划 确保以多主实例模式启动了JDBCServer服务,并至少有一个实例可连接客户端。在JDBCServer节点上分别创建“/home/data”文件,内容如下: Miranda,32 Karlie,23 Candice,27 确保其对启动JDBCServer的用户有读写权限。 确保客户端classpath下有“hive-site.xml”文件,且根据实际集群情况配置所需要的参数。JDBCServer相关参数详情,请参见Spark JDBCServer接口介绍。
  • 运行任务 进入Spark客户端目录,使用java -cp命令运行代码(类名与文件名等请与实际代码保持一致,此处仅为示例): 运行Java样例代码: java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/hive/*:$SPARK_HOME/conf:/opt/female/SparkThriftServerJavaExample-1.0.jar com.huawei.bigdata.spark.examples.ThriftServerQueriesTest $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/conf/spark-defaults.conf 运行Scala样例代码: java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/hive/*:$SPARK_HOME/conf:/opt/female/SparkThriftServerExample-1.0.jar com.huawei.bigdata.spark.examples.ThriftServerQueriesTest $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/conf/spark-defaults.conf 集群开启ZooKeeper的SSL特性后(查看ZooKeeper服务的ssl.enabled参数),请在执行命令中添加-Dzookeeper.client.secure=true -Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty两项参数: java -Dzookeeper.client.secure=true -Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/hive/*:$SPARK_HOME/conf:/opt/female/SparkThriftServerJavaExample-1.0.jar com.huawei.bigdata.spark.examples.ThriftServerQueriesTest $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/conf/spark-defaults.conf
  • 开发思路 写HBase: 通过参数指定“hbase-site.xml”文件的父目录,Flink Sink可以获取到HBase的Connection。 通过Connection判断表是否存在,如果不存在则创建表。 将接收到的数据转化成Put对象,写到HBase。 读HBase: 通过参数指定“hbase-site.xml”文件的父目录,Flink Source可以获取到HBase的Connection。 通过Connection判断表是否存在,如果不存在则作业失败,需要通过HBase Shell创建表或上游作业创建表。 读取HBase中的数据,将Result数据转化成Row对象发送给下游算子。
  • 开发思路 数据准备。 创建三张表,雇员信息表“employees_info”、雇员联络信息表“employees_contact”、雇员信息扩展表“employees_info_extended”。 雇员信息表“employees_info”的字段为雇员编号、姓名、支付薪水币种、薪水金额、缴税税种、工作地、入职时间,其中支付薪水币种“R”代表人民币,“D”代表美元。 雇员联络信息表“employees_contact”的字段为雇员编号、电话号码、e-mail。 雇员信息扩展表“employees_info_extended”的字段为雇员编号、姓名、电话号码、e-mail、支付薪水币种、薪水金额、缴税税种、工作地,分区字段为入职时间。 创建表代码实现请见创建Hive表。 加载雇员信息数据到雇员信息表“employees_info”中。 加载数据代码实现请见加载数据到Hive表中。 雇员信息数据如表1所示: 表1 雇员信息数据 编号 姓名 支付薪水币种 薪水金额 缴税税种 工作地 入职时间 1 Wang R 8000.01 personal income tax&0.05 Country1:City1 2014 3 Tom D 12000.02 personal income tax&0.09 Country2:City2 2014 4 Jack D 24000.03 personal income tax&0.09 Country3:City3 2014 6 Linda D 36000.04 personal income tax&0.09 Country4:City4 2014 8 Zhang R 9000.05 personal income tax&0.05 Country5:City5 2014 加载雇员联络信息数据到雇员联络信息表“employees_contact”中。 雇员联络信息数据如表2所示: 表2 雇员联络信息数据 编号 电话号码 e-mail 1 135 XXXX XXXX xxxx@xx.com 3 159 XXXX XXXX xxxxx@xx.com.cn 4 186 XXXX XXXX xxxx@xx.org 6 189 XXXX XXXX xxxx@xxx.cn 8 134 XXXX XXXX xxxx@xxxx.cn 数据分析。 数据分析代码实现,请见查询Hive表数据。 查看薪水支付币种为美元的雇员联系方式。 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中。 统计表employees_info中有多少条记录。 查询使用以“cn”结尾的邮箱的员工信息。 提交数据分析任务,统计表employees_info中有多少条记录。
  • 开发流程 工作流配置文件“workflow.xml”(“coordinator.xml”是对工作流进行调度,“bundle.xml”是对一组coordinator进行管理)与“job.properties”。 如果有实现代码,需要开发对应的jar包,例如Java Action;如果是Hive,则需要开发SQL文件。 上传配置文件与jar包(包括依赖的jar包)到HDFS,上传的路径取决于“workflow.xml”中的“oozie.wf.application.path”配置的路径。 提供三种方式对工作流进行操作,详情请参见Oozie应用开发常见问题。 Shell命令 Java API Hue Oozie客户端提供了比较完整的examples示例供用户参考,包括各种类型的Action,以及Coordinator以及Bundle的使用。以客户端安装目录为“/opt/client”为例,examples具体目录为“/opt/client/Oozie/oozie-client-*/examples”。 如下通过一个Mapreduce工作流的示例演示如何配置,并通过Shell命令调用。
  • 代码样例 package com.huawei.bigdata.iotdb; import org.apache.iotdb.udf.api.UDTF; import org.apache.iotdb.udf.api.access.Row; import org.apache.iotdb.udf.api.collector.PointCollector; import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations; import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters; import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy; import org.apache.iotdb.udf.api.type.Type; import java.io.IOException; public class UDTFExample implements UDTF { @Override public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) { configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.INT32); } @Override public void transform(Row row, PointCollector collector) throws IOException { collector.putInt(row.getTime(), -row.getInt(0)); } }
  • Doris样例工程介绍 MRS样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下Doris相关样例工程: 表1 Doris相关样例工程 样例工程位置 描述 doris-examples/doris-example Doris数据读写操作的应用开发示例。 通过调用Doris接口可实现创建用户表、向表中插入数据、查询表数据、删除表等功能,相关业务场景介绍请参见Doris JDBC接口调用样例程序。 springboot/doris-examples Doris数据读写操作的SpringBoot应用开发示例。 提供Doris对接SpringBoot的样例,样例介绍请参见配置并导入SpringBoot样例工程。
共100000条