华为云用户手册

  • 参考信息 表2 “安全级别”和“Facility”字段数值编码 安全级别 Facility 数值编码 Emergency kernel messages 0 Alert user-level messages 1 Critical mail system 2 Error system daemons 3 Warning security/authorization messages (note 1) 4 Notice messages generated internally by syslog 5 Informational line printer subsystem 6 Debug network news subsystem 7 - UUCP subsystem 8 - clock daemon (note 2) 9 - security/authorization messages (note 1) 10 - FTP daemon 11 - NTP subsystem 12 - log audit (note 1) 13 - log alert (note 1) 14 - clock daemon (note 2) 15 - local use 0~7 (local0 ~ local7) 16~23 表3 报文格式信息域表 信息域 描述 dn 集群名称 id 告警ID name 告警名称 serialNo 告警序列号 说明: 故障告警及其对应的恢复告警的告警序列号相同。 category 告警类型,取值范围: 0:故障告警 1:恢复告警 2:事件 occurTime 告警产生时间 clearTime 告警清除时间 isAutoClear 告警是否自动清除,取值范围: 1:是 0:否 locationInfo 告警位置信息 clearType 告警清除类型,取值范围: -1:未清除 0:自动清除 2:手动清除 level 告警级别,取值范围: 1:紧急告警 2:重要告警 3:次要告警 4:提示告警 cause 告警原因 additionalInfo 附加信息 object 告警对象
  • HDFS对接OBS 以客户端安装用户登录安装了HDFS客户端的节点。 执行以下命令,切换到客户端安装目录。 cd 客户端安装目录 执行以下命令配置环境变量。 source bigdata_env 如果集群为安全模式,执行以下命令进行用户认证,该用户需具有OBS目录的读写权限。普通模式集群无需执行用户认证。 kinit HDFS组件操作用户 在HDFS命令行显式添加要访问的OBS文件系统。 例如: 使用以下命令访问OBS文件系统。 hdfs dfs -ls obs://OBS并行文件系统名称/路径 使用以下命令创建OBS文件系统下的目录: hdfs dfs -mkdir obs://OBS并行文件系统名称/hadoop 使用以下命令上传客户端节点“/opt/test.txt”文件到“obs://OBS并行文件系统名称/hadoop”路径下。 hdfs dfs -put /opt/test.txt obs://OBS并行文件系统名称/hadoop OBS文件系统打印大量日志可能导致读写性能受影响,可通过调整OBS客户端日志级别优化,日志调整方式如下: cd 客户端安装目录/HDFS/hadoop/etc/hadoop vi log4j.properties 在文件中添加OBS日志级别配置 log4j.logger.org.apache.hadoop.fs.obs=WARN log4j.logger.com.obs=WARN 图1 添加OBS日志级别
  • 操作场景 该章节指导用户开启Guardian组件存算分离操作。开启后Guardian可以在存算分离场景下为HDFS、Hive、Spark、Loader、HetuEngine等服务提供访问OBS的临时认证凭据。 配置Guardian服务对接OBS主要操作如下: 创建OBS并行文件系统 创建普通账号委托 创建云服务委托并绑定集群 为Guardian组件配置访问OBS权限 开启Hive表的级联授权功能 配置回收站清理策略
  • HDFS对接OBS 以客户端安装用户登录安装了HDFS客户端的节点。 执行以下命令,切换到客户端安装目录。 cd 客户端安装目录 执行以下命令配置环境变量。 source bigdata_env 如果集群为安全模式,执行以下命令进行用户认证。普通模式集群无需执行用户认证。 kinit 组件业务用户 在hdfs命令行显式添加要访问的OBS文件系统。 例如: 使用以下命令访问OBS文件系统。 hdfs dfs -ls obs://OBS并行文件系统名称/路径 例如,执行以下命令访问“mrs-word001”并行文件系统,返回文件列表即表示访问OBS成功,如图1所示: hadoop fs -ls obs://mrs-word001/ 图1 Hadoop验证返回文件列表 使用以下命令上传客户端节点“/opt/test.txt”文件到OBS文件系统路径下。 hdfs dfs -put /opt/test.txt obs://OBS并行文件系统名称/路径 OBS文件系统打印大量日志可能导致读写性能受影响,可通过调整OBS客户端日志级别优化,日志调整方式如下: cd 客户端安装目录/HDFS/hadoop/etc/hadoop vi log4j.properties 在文件中添加OBS日志级别配置: log4j.logger.org.apache.hadoop.fs.obs=WARN log4j.logger.com.obs=WARN 可执行以下命令查看: tail -4 log4j.properties 图2 查看日志级别
  • Yarn对接OBS 以客户端安装用户登录安装了Yarn客户端的节点。 执行以下命令,切换到客户端安装目录。 cd 客户端安装目录 执行以下命令配置环境变量。 source bigdata_env 如果集群为安全模式,执行以下命令进行用户认证,该用户需具有OBS目录的读写权限。普通模式集群无需执行用户认证。 kinit HDFS组件操作用户 在Yarn命令行显式添加要访问的OBS文件系统。 使用以下命令访问OBS文件系统。 hdfs dfs -ls obs://OBS并行文件系统名称/路径 使用以下命令创建OBS文件系统下的目录: hdfs dfs -mkdir obs://OBS并行文件系统名称/hadoop1 执行以下Yarn任务访问OBS: yarn jar 客户端安装目录/HDFS/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar pi -Dmapreduce.job.hdfs-servers=NAMESERVICE -fs obs://OBS并行文件系统名称 1 1 其中“NAMESERVICE”为HDFS文件系统中的NameService,默认为“hdfs://hacluster”,如有多个NameService, 以“,”分隔。 例如: yarn jar /opt/hadoopclient/HDFS/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar pi -Dmapreduce.job.hdfs-servers=hdfs://hacluster -fs obs://bucketname 1 1 执行以下命令写入数据到OBS: yarn jar 客户端安装目录/HDFS/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar teragen 100 obs://OBS并行文件系统名称/hadoop1/teragen1 执行以下命令将OBS下的数据复制到HDFS: hadoop distcp obs://OBS并行文件系统名称/hadoop1/teragen1 /tmp OBS文件系统打印大量日志可能导致读写性能受影响,可通过调整OBS客户端日志级别优化,日志调整方式如下: cd 客户端安装目录/Yarn/config vi log4j.properties 在文件中添加OBS日志级别配置(应用若使用自带的log4j.properties,添加同样配置即可) log4j.logger.org.apache.hadoop.fs.obs=WARN log4j.logger.com.obs=WARN 图1 添加OBS日志级别配置
  • 备份Doris业务数据 如果数据要备份至远端HDFS中,需满足以下条件: 需要准备一个用于备份数据的备集群,认证模式需要与主集群相同。 主集群内至少需要部署一个Doris服务的DBroker实例。 如果主备集群部署为安全模式,且主备集群不是由同一个 FusionInsight Manager管理,则必须配置系统互信,请参见配置 MRS 集群间互信。如果主备集群部署为普通模式,则不需要配置互信。 备集群上的时间必须与主集群一致,而且主备集群上的NTP服务必须使用同一个时间源。 检查备集群HDFS是否有充足的空间,备份文件保存的目录建议使用用户自定义的目录。 需确保主备集群中Doris和HDFS的“hadoop.rpc.protection”配置项的值保持一致。 根据业务需要,规划备份任务的类型、周期、备份对象、备份目录等策略规格。 如果数据要备份至OBS中,需要当前Doris集群已对接OBS,并具有访问OBS的权限。
  • 导入或导出数据时缺少MySQL驱动包 若执行sqoop import或sqoop export命令报错“Could not load db driver class: com.mysql.jdbc.Driver”,如图1所示,则表示缺少MySQL驱动包,需在MySQL官网(https://downloads.mysql.com/archives/c-j/)下载对应MySQL驱动包,解压并上传至“客户端安装目录/Sqoop/sqoop/lib”目录下,再执行Sqoop导入或导出数据命令即可。 图1 缺少MySQL驱动包报错
  • Sqoop使用样例 通过sqoop import导入MySQL数据到HDFS sqoop import --connect jdbc:mysql://10.100.231.134:3306/test --username root --password xxx --query 'SELECT * FROM component where $CONDITIONS and component_id ="MRS 1.0_002"' --target-dir /tmp/component_test --delete-target-dir --fields-terminated-by "," -m 1 --as-textfile 通过sqoop export 导出OBS数据到MySQL sqoop export --connect jdbc:mysql://10.100.231.134:3306/test --username root --password xxx --table component14 -export-dir obs://obs-file-bucket/xx/part-m-00000 --fields-terminated-by ',' -m 1 通过sqoop import导入MySQL数据到OBS sqoop import --connect jdbc:mysql://10.100.231.134:3306/test --username root --password xxx --table component --target-dir obs://obs-file-bucket/xx --delete-target-dir --fields-terminated-by "," -m 1 --as-textfile 通过sqoop import导入MySQL数据到Hive OBS外表 sqoop import --connect jdbc:mysql://10.100.231.134:3306/test --username root --password xxx --table component --hive-import --hive-table component_test01 --fields-terminated-by "," -m 1 --as-textfile
  • 准备认证机制代码 在开启Kerberos认证的环境下,各个组件之间的相互通信不能够简单地互通,而需要在通信之前进行相互认证,以确保通信的安全性。Kafka应用开发需要进行Kafka、ZooKeeper、Kerberos的安全认证,这些安全认证只需要生成一个jaas文件并设置相关环境变量即可。提供了LoginUtil相关接口来完成这些配置,如下样例代码中只需要配置用户自己申请的账号名称和对应的keytab文件名称即可,由于人机账号的keytab会随用户密码过期而失效,故建议使用机机账号进行配置。 认证样例代码: 设置keytab认证文件模块 /** * 用户自己申请的账号keytab文件名称 */ private static final String USER_KEYTAB_FILE = "用户自己申请的账号keytab文件名称"; /** * 用户自己申请的账号名称 */ private static final String USER_PRINCIPAL = "用户自己申请的账号名称"; MRS服务Kerberos认证模块,如果服务没有开启kerberos认证,这块逻辑不执行 public static void securityPrepare() throws IOException { String filePath = System.getProperty("user.dir") + File.separator + "conf" + File.separator; String krbFile = filePath + "krb5.conf"; String userKeyTableFile = filePath + USER_KEYTAB_FILE; //windows路径下分隔符替换 userKeyTableFile = userKeyTableFile.replace("\\", "\\\\"); krbFile = krbFile.replace("\\", "\\\\"); LoginUtil.setKrb5Config(krbFile); LoginUtil.setZookeeperServerPrincipal("zookeeper/hadoop.hadoop.com"); LoginUtil.setJaasFile(USER_PRINCIPAL, userKeyTableFile); } 如果修改了集群kerberos 域名 ,需要在代码中增加kerberos.domain.name的配置,并按照hadoop.expr=toLowerCase(%{default_realm}%{KerberosServer})规则配置正确的域名信息。例如:修改域名为HUAWEI.COM,则配置为hadoop.huawei.com。
  • Presto JDBC使用样例 以下示例为Presto JDBC使用样例。 以下代码片段用于实现JDBC连接Presto TPCDS Catalog。 详情请参考PrestoJDBCExample类。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 private static Connection connection; private static Statement statement; /** * Only when Kerberos authentication enabled, configurations in presto-examples/conf/presto.properties * should be set. More details please refer to https://prestodb.io/docs/0.215/installation/jdbc.html. */ private static void initConnection(String url, boolean krbsEnabled) throws SQLException { if (krbsEnabled) { String filePath = System.getProperty("user.dir") + File.separator + "conf" + File.separator; File proFile = new File(filePath + "presto.properties");if (proFile.exists()) { Properties props = new Properties(); try { props.load(new FileInputStream(proFile)); } catch (IOException e) { e.printStackTrace(); } connection = DriverManager.getConnection(url, props); } } else { connection = DriverManager.getConnection(url, "presto", null); } statement = connection.createStatement(); } private static void releaseConnection() throws SQLException { statement.close(); connection.close(); } public static void main(String[] args) throws SQLException { try { /** * Replace example_ip with your cluster presto server ip. * By default, Kerberos authentication disabled cluster presto service port is 7520, Kerberos * authentication enabled cluster presto service port is 7521 * The postfix /tpcds/sf1 means to use tpcds catalog and sf1 schema, you can use hive catalog as well * If Kerberos authentication enabled, set the second param to true. * see PrestoJDBCExample#initConnection(java.lang.String, boolean). */ initConnection("jdbc:presto://example_ip:7520/tpcds/sf1", false); //initConnection("jdbc:presto://example_ip:7521/tpcds/sf1", true); ResultSet resultSet = statement.executeQuery("select * from call_center"); while (resultSet.next()) { System.out.println(resultSet.getString("cc_name") + " : " + resultSet.getString("cc_employees")); } } catch (SQLException e) { e.printStackTrace(); } finally { releaseConnection(); } }
  • 配置文件介绍 登录HDFS时会使用到如表1所示的配置文件。这些文件均已导入到“hdfs-example”工程的“conf”目录。 表1 配置文件 文件名称 作用 获取地址 core-site.xml 配置HDFS详细参数。 MRS_Services_ClientConfig\HDFS\config\core-site.xml hdfs-site.xml 配置HDFS详细参数。 MRS_Services_ClientConfig\HDFS\config\hdfs-site.xml user.keytab 对于Kerberos安全认证提供HDFS用户信息。 如果是安全模式集群,您可以联系管理员获取相应账号对应权限的keytab文件和krb5文件。 krb5.conf Kerberos server配置信息。 不同集群的“user.keytab”、“krb5.conf”不能共用。 “conf”目录下的“log4j.properties”文件可根据自己的需要进行配置。
  • 代码样例 如下是代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsMain类。 在Linux客户端运行应用的初始化代码,代码样例如下所示。 /** * 初始化,获取一个FileSystem实例 * * @throws IOException */ private void init() throws IOException { confLoad(); authentication(); instanceBuild(); } /** * * 如果程序运行在Linux上,则需要core-site.xml、hdfs-site.xml的路径, * 修改为在Linux下客户端文件的绝对路径。 * */ private void confLoad() throws IOException { conf = new Configuration(); // conf file conf.addResource(new Path(PATH_TO_HDFS_SITE_XML)); conf.addResource(new Path(PATH_TO_CORE_SITE_XML)); } /** * kerberos security authentication * 如果程序运行在Linux上,则需要krb5.conf和keytab文件的路径, * 修改为在Linux下客户端文件的绝对路径。并且需要将样例代码中的keytab文件和principal文件 * 分别修改为当前用户的keytab文件名和用户名。 * */ private void authentication() throws IOException { // 安全模式 if ("kerberos".equalsIgnoreCase(conf.get("hadoop.security.authentication"))) { System.setProperty("java.security.krb5.conf", PATH_TO_KRB5_CONF); LoginUtil.login(PRNCIPAL_NAME, PATH_TO_KEYTAB, PATH_TO_KRB5_CONF, conf); } } /** * build HDFS instance */ private void instanceBuild() throws IOException { // get filesystem fSystem = FileSystem.get(conf); }
  • 操作步骤 查看运行结果获取应用运行情况 HdfsMain Linux样例程序安全集群运行结果如下所示: [root@node-master1dekG client]# hadoop jar hdfs-examples-1.0.jar com.huawei.bigdata.hdfs.examples.HdfsMain WARNING: Use "yarn jar" to launch YARN applications. 20/03/25 16:29:45 INFO security.UserGroupInformation: Login successful for user hdfsuser using keytab file user.keytab 20/03/25 16:29:45 INFO security.LoginUtil: Login success!!!!!!!!!!!!!! success to create path /user/hdfs-examples success to write. result is : hi, I am bigdata. It is successful if you can see me. success to read. success to delete the file /user/hdfs-examples/test.txt success to delete path /user/hdfs-examples success to create path /user/hdfs-examples StoragePolicy:FROZEN StoragePolicy:COLD StoragePolicy:WARM StoragePolicy:HOT StoragePolicy:ONE_SSD StoragePolicy:ALL_SSD StoragePolicy:LAZY_PERSIST succee to set Storage Policy path /user/hdfs-examples success to delete path /user/hdfs-examples HdfsMain Linux样例程序普通集群运行结果如下所示: [root@node-master2VknR client]# hadoop jar hdfs-examples-1.0.jar com.huawei.bigdata.hdfs.examples.HdfsMain WARNING: Use "yarn jar" to launch YARN applications. success to create path /user/hdfs-examplessuccess to write. result is : hi, I am bigdata. It is successful if you can see me. success to read. success to delete the file /user/hdfs-examples/test.txt success to delete path /user/hdfs-example ssuccess to create path /user/hdfs-examples StoragePolicy:FROZEN StoragePolicy:COLD StoragePolicy:WARM StoragePolicy:HOT StoragePolicy:ONE_SSD StoragePolicy:ALL_SSD StoragePolicy:LAZY_PERSIST succee to set Storage Policy path /user/hdfs-examples success to delete path /user/hdfs-examples 查看HDFS日志获取应用运行情况 您可以查看HDFS的namenode日志了解应用运行情况,并根据日志信息调整应用程序。
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseExample”类的testScanData方法中 public void testScanData() { LOG .info("Entering testScanData."); Table table = null; // Instantiate a ResultScanner object. ResultScanner rScanner = null; try { // Create the Configuration instance. table = conn.getTable(tableName); // Instantiate a Get object. Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name")); // Set the Caching size. scan.setCaching(1000);//注[1] // Submit a scan request. rScanner = table.getScanner(scan); // Print query results. for (Result r = rScanner.next(); r != null; r = rScanner.next()) { for (Cell cell : r.rawCells()) { LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + "," + Bytes.toString(CellUtil.cloneQualifier(cell)) + "," + Bytes.toString(CellUtil.cloneValue(cell))); } } LOG.info("Scan data successfully."); } catch (IOException e) { LOG.error("Scan data failed ", e); } finally { if (rScanner != null) { // Close the scanner object. rScanner.close(); } if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed ", e); } } } LOG.info("Exiting testScanData."); }
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseExample”类的testGet方法中 public void testGet() { LOG.info("Entering testGet."); // Specify the column family name. byte[] familyName = Bytes.toBytes("info"); // Specify the column name. byte[][] qualifier = {Bytes.toBytes("name"), Bytes.toBytes("address")}; // Specify RowKey. byte[] rowKey = Bytes.toBytes("012005000201"); Table table = null; try { // Create the Configuration instance. table = conn.getTable(tableName); // Instantiate a Get object. Get get = new Get(rowKey); // Set the column family name and column name. get.addColumn(familyName, qualifier[0]); get.addColumn(familyName, qualifier[1]); // Submit a get request. Result result = table.get(get); // Print query results. for (Cell cell : result.rawCells()) { LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + "," + Bytes.toString(CellUtil.cloneQualifier(cell)) + "," + Bytes.toString(CellUtil.cloneValue(cell))); } LOG.info("Get data successfully."); } catch (IOException e) { LOG.error("Get data failed ", e); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed ", e); } } } LOG.info("Exiting testGet."); }
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HIndexExample”类的scanDataByHIndex方法中 public void scanDataByHIndex() { LOG.info("Entering HIndex-based Query."); Table table = null; ResultScanner rScanner = null; try { table = conn.getTable(tableName); // Create a filter for indexed column. SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("age"), CompareOp.GREATER_OR_EQUAL, Bytes.toBytes("26")); filter.setFilterIfMissing(true); Scan scan = new Scan(); scan.setFilter(filter); rScanner = table.getScanner(scan); // Scan the data LOG.info("Scan data using indices.."); for (Result result : rScanner) { LOG.info("Scanned row is:"); for (Cell cell : result.rawCells()) { LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + "," + Bytes.toString(CellUtil.cloneQualifier(cell)) + "," + Bytes.toString(CellUtil.cloneValue(cell))); } } LOG.info("Successfully scanned data using indices for table " + tableName + "."); } catch (IOException e) { LOG.error("Failed to scan data using indices for table " + tableName + "." + e); } finally { if (rScanner != null) { rScanner.close(); } if (table != null) { try { table.close(); } catch (IOException e) { LOG.error("failed to close table, ", e); } } } LOG.info("Entering HIndex-based Query."); }
  • 代码样例 如下是删除文件的代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsMain类。 /** * 删除文件 * * @throws IOException */ private void delete() throws IOException { Path beDeletedPath = new Path(DEST_PATH + File.separator + FILE_NAME); fSystem.deleteOnExit(beDeletedPath); System.out.println("succee to delete the file " + DEST_PATH + File.separator + FILE_NAME); }
  • 查看HBase应用调测结果 HBase应用程序运行完成后,可直接通过运行结果查看应用程序运行情况,也可以通过HBase日志获取应用运行情况。 运行结果会有如下成功信息: 2018-01-17 19:44:28,068 INFO [main] examples.HBaseExample: Entering dropTable. 2018-01-17 19:44:28,074 INFO [main] client.HBaseAdmin: Started disable of hbase_sample_table 2018-01-17 19:44:30,310 INFO [main] client.HBaseAdmin: Disabled hbase_sample_table 2018-01-17 19:44:31,727 INFO [main] client.HBaseAdmin: Deleted hbase_sample_table 2018-01-17 19:44:31,727 INFO [main] examples.HBaseExample: Drop table successfully. 2018-01-17 19:44:31,727 INFO [main] examples.HBaseExample: Exiting dropTable. 2018-01-17 19:44:31,727 INFO [main] client.ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService 2018-01-17 19:44:31,733 INFO [main] client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x13002d37b3933708 2018-01-17 19:44:31,736 INFO [main-EventThread] zookeeper.ClientCnxn: EventThread shut down for session: 0x13002d37b3933708 2018-01-17 19:44:31,737 INFO [main] zookeeper.ZooKeeper: Session: 0x13002d37b3933708 closed 2018-01-17 19:44:31,750 INFO [main] examples.TestMain: -----------finish HBase ------------------- 父主题: 在Linux中调测HBase应用
  • 功能介绍 下面代码片段在com.huawei.bigdata.kafka.example.SimpleConsumerDemo类中,用于实现使用新SimpleConsumer API订阅Topic,并进行消息消费。(注意:SimpleConsumer API仅支持访问未设置ACL的Topic,安全接口说明见Kafka安全接口介绍) SimpleConsumer API属于lowlevel的Consumer API需要访问zookeeper元数据,管理消费Topic队列的offset,一般情况不推荐使用。
  • 场景说明 假定用户有某个周末网民网购停留时间的日志文本,基于某些业务要求,要求开发MapReduce应用程序实现如下功能。 统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 log2.txt:周日网民停留日志 LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
  • 功能介绍 统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。 主要分为三个部分。 从原文件中筛选女性网民上网时间数据信息,通过类CollectionMapper继承Mapper抽象类实现。 汇总每个女性上网时间,并输出时间大于两个小时的女性网民信息,通过类CollectionReducer继承Reducer抽象类实现。 main方法提供建立一个MapReduce job,并提交MapReduce作业到hadoop集群。
  • 数据规划 首先需要把原日志文件放置在HDFS系统里。 本地新建两个文本文件,将log1.txt中的内容复制保存到input_data1.txt,将log2.txt中的内容复制保存到input_data2.txt。 在HDFS上建立一个文件夹,“/tmp/input”,并上传input_data1.txt,input_data2.txt到此目录,命令如下。 在Linux系统HDFS客户端使用命令hdfs dfs -mkdir /tmp/input 在Linux系统HDFS客户端使用命令hdfs dfs -put local_filepath /tmp/input
  • 功能介绍 每一个Consumer实例都属于一个Consumer group,每一条消息只会被同一个Consumer group里的一个Consumer实例消费(不同的Consumer group可以同时消费同一条消息)。 下面代码片段在com.huawei.bigdata.kafka.example.Old_Consumer类中,作用在于订阅指定Topic的消息。(注意:旧Consumer API仅支持访问未设置ACL的Topic,安全接口说明见Kafka安全接口介绍)
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseExample”类的testFilterList方法中 public void testFilterList() { LOG.info("Entering testFilterList."); Table table = null; // Instantiate a ResultScanner object. ResultScanner rScanner = null; try { // Create the Configuration instance. table = conn.getTable(tableName); // Instantiate a Get object. Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name")); // Instantiate a FilterList object in which filters have "and" // relationship with each other. FilterList list = new FilterList(Operator.MUST_PASS_ALL); // Obtain data with age of greater than or equal to 20. list.addFilter(new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("age"), CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(new Long(20)))); // Obtain data with age of less than or equal to 29. list.addFilter(new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("age"), CompareOp.LESS_OR_EQUAL, Bytes.toBytes(new Long(29)))); scan.setFilter(list); // Submit a scan request. rScanner = table.getScanner(scan); // Print query results. for (Result r = rScanner.next(); r != null; r = rScanner.next()) { for (Cell cell : r.rawCells()) { LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + "," + Bytes.toString(CellUtil.cloneQualifier(cell)) + "," + Bytes.toString(CellUtil.cloneValue(cell))); } } LOG.info("Filter list successfully."); } catch (IOException e) { LOG.error("Filter list failed ", e); } finally { if (rScanner != null) { // Close the scanner object. rScanner.close(); } if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed ", e); } } } LOG.info("Exiting testFilterList."); }
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseExample”类的testModifyTable方法中 public void testModifyTable() { LOG.info("Entering testModifyTable."); // Specify the column family name. byte[] familyName = Bytes.toBytes("education"); Admin admin = null; try { // Instantiate an Admin object. admin = conn.getAdmin(); // Obtain the table descriptor. TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName).build(); // Check whether the column family is specified before modification. if (!htd.hasColumnFamily(familyName)) { // Create the column descriptor. ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.of(familyName); TableDescriptor td = TableDescriptorBuilder.newBuilder(admin.getDescriptor(tableName)).setColumnFamily(cfd).build(); // Disable the table to get the table offline before modifying // the table. admin.disableTable(tableName);//注[1] // Submit a modifyTable request. admin.modifyTable(td); // Enable the table to get the table online after modifying the // table. admin.enableTable(tableName); } LOG.info("Modify table successfully."); } catch (IOException e) { LOG.error("Modify table failed ", e); } finally { if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Close admin failed ", e); } } } LOG.info("Exiting testModifyTable."); }
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseExample”类的dropTable方法中 public void dropTable() { LOG.info("Entering dropTable."); Admin admin = null; try { admin = conn.getAdmin(); if (admin.tableExists(tableName)) { // Disable the table before deleting it. admin.disableTable(tableName);//注[1] // Delete table. admin.deleteTable(tableName); } LOG.info("Drop table successfully."); } catch (IOException e) { LOG.error("Drop table failed ", e); } finally { if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Close admin failed ", e); } } } LOG.info("Exiting dropTable."); }
  • 准备运行调测环境 在弹性云服务器管理控制台,申请一个新的弹性云服务器,用于用户应用程序开发、运行、调测。 弹性云服务器的安全组需要和MRS集群Master节点的安全组相同。 弹性云服务器的VPC需要与MRS集群在同一个VPC中。 弹性云服务器的网卡需要与MRS集群在同一个网段中。 申请弹性IP,绑定新申请的弹性云主机IP,并配置安全组出入规则。 下载客户端程序,请参考下载MRS客户端。 以root用户安装集群客户端。 执行以下命令解压客户端包。 tar -xvf /opt/MRS_Services_Client.tar 执行以下命令校验安装文件包。 sha256sum -c /opt/MRS_Services_ClientConfig.tar.sha256 MRS_Services_ClientConfig.tar:OK 执行以下命令解压安装文件包。 tar -xvf /opt/MRS_Services_ClientConfig.tar 执行如下命令安装客户端到指定目录(绝对路径),例如“/opt/client”。目录会自动创建。 cd /opt/MRS_Services_ClientConfig sh install.sh /opt/client Components client installation is complete.
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseExample”类的testCreateTable方法中 public void testCreateTable() { LOG.info("Entering testCreateTable: " + tableName); // Set the column family name to info. byte [] fam = Bytes.toBytes("info"); ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(fam) // Set data encoding methods. HBase provides DIFF,FAST_DIFF,PREFIX // HBase 2.0 removed `PREFIX_TREE` Data Block Encoding from column families. .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF) // Set compression methods, HBase provides two default compression // methods:GZ and SNAPPY // GZ has the highest compression rate,but low compression and // decompression effeciency,fit for cold data // SNAPPY has low compression rate, but high compression and // decompression effeciency,fit for hot data. // it is advised to use SANPPY .setCompressionType(Compression.Algorithm.SNAPPY) .build(); TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(familyDescriptor).build(); Admin admin = null; try { // Instantiate an Admin object. admin = conn.getAdmin(); if (!admin.tableExists(tableName)) { LOG.info("Creating table..."); admin.createTable(htd); LOG.info(admin.getClusterMetrics()); LOG.info(admin.listNamespaceDescriptors()); LOG.info("Table created successfully."); } else { LOG.warn("table already exists"); } } catch (IOException e) { LOG.error("Create table failed.", e); } finally { if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Failed to close admin ", e); } } } LOG.info("Exiting testCreateTable."); }
  • 功能简介 HBase通过org.apache.hadoop.hbase.client.Admin对象的createTable方法来创建表,并指定表名、列族名。创建表有两种方式,建议采用预分Region建表方式: 快速建表,即创建表后整张表只有一个Region,随着数据量的增加会自动分裂成多个Region。 预分Region建表,即创建表时预先分配多个Region,此种方法建表可以提高写入大量数据初期的数据写入速度。 表的列名以及列族名不能包含特殊字符,可以由字母、数字以及下划线组成。
  • 在Linux中调测Phoenix样例 在linux环境中调测Phoenix样例,需有与集群环境网络相通的E CS ,详情请参见准备本地应用开发环境。 修改样例。将样例代码TestMain中“enablePhoenix”值改为“true”,开启调用Phoenix样例程序接口。 /** * Phoenix Example * if you would like to operate hbase by SQL, please enable it, * and you can refrence the url ("https://support.huaweicloud.com/devg-mrs/mrs_06_0041.html"). * step: * 1.login * 2.operate hbase by phoenix. */ boolean enablePhoenix = false; if (enablePhoenix) { PhoenixExample phoenixExample; try { phoenixExample = new PhoenixExample(conf); phoenixExample.testSQL(); } catch (Exception e) { LOG.error("Failed to run Phoenix Example, because ", e); } } 执行mvn package生成jar包,在工程目录target目录下获取,比如:hbase-examples-mrs-2.0.jar,将获取的包上传到/opt/client/Hbase/hbase/lib目录下。 执行Jar包。 在Linux客户端下执行Jar包的时候,需要用安装用户切换到客户端目录: cd $BIGDATA_CLIENT_HOME/HBase/hbase “$BIGDATA_CLIENT_HOME”指的是客户端安装目录。 然后执行: source $BIGDATA_CLIENT_HOME/bigdata_env 将HBase Phoenix API接口介绍解压后获取其中的phoenix-hbase和phoenix-core包和“htrace-core-3.1.0-incubating.jar”包拷贝到“/opt/client/HBase/hbase/lib”下。 将2中生成的Jar包和从3.2.2-准备开发用户中获取的krb5.conf和user.keytab文件拷贝上传至客户端运行环境的Hbase/hbase/conf目录下,例如“/opt/client/HBase/hbase/conf”。然后在“/opt/client/HBase/hbase/conf”目录下创建hbaseclient.properties文件,文件中user.name对应新建的用户hbaseuser,userKeytabName和krb5ConfName值对应从3.2.2-准备开发用户中获取的认证相关文件名称,如下(未开启Kerberos认证集群可跳过此步): user.name=hbaseuser userKeytabName=user.keytab krb5ConfName=krb5.conf 执行jar包程序。 hbase com.huawei.bigdata.hbase.examples.TestMain /opt/client/HBase/hbase/conf 其中,com.huawei.bigdata.hbase.examples.TestMain为举例,具体以实际样例代码为准。 “/opt/client/HBase/hbase/conf”对应于上述中user.keytab、krb5.conf等文件路径。 若运行报“Message stream modified (41)”的错误,这可能与JDK的版本有关系,可以尝试修改运行样例代码的JDK为8u_242以下版本或删除“krb5.conf”配置文件的“renew_lifetime = 0m”配置项。 phoenix应用程序运行完成后,可直接通过运行结果查看应用程序运行情况。 2020-03-14 16:20:40,192 INFO [main] client.HBaseAdmin: Operation: CREATE, Table Name: default:TEST, procId: 923 completed 2020-03-14 16:20:40,806 INFO [main] examples.PhoenixExample: 1 2020-03-14 16:20:40,807 INFO [main] examples.PhoenixExample: John 2020-03-14 16:20:40,807 INFO [main] examples.PhoenixExample: 100000 2020-03-14 16:20:40,807 INFO [main] examples.PhoenixExample: 1980-01-01 2020-03-14 16:20:40,830 INFO [main] client.HBaseAdmin: Started disable of TEST 2020-03-14 16:20:41,574 INFO [main] client.HBaseAdmin: Operation: DISABLE, Table Name: default:TEST, procId: 925 completed 2020-03-14 16:20:41,831 INFO [main] client.HBaseAdmin: Operation: DELETE, Table Name: default:TEST, procId: 927 completed
共100000条