华为云用户手册

  • 功能介绍 针对添加了二级索引的用户表,您可以通过Filter来查询数据。其数据查询性能高于针对无二级索引用户表的数据查询。 HIndex支持的Filter类型为“SingleColumnValueFilter”,“SingleColumnValueExcludeFilter”以及“SingleColumnValuePartitionFilter”。 HIndex支持的Comparator为“BinaryComparator”,“BitComparator”,“LongComparator”,“DecimalComparator”,“DoubleComparator”,“FloatComparator”,“IntComparator”,“NullComparator”。 二级索引的使用规则如下: 针对某一列或者多列创建了单索引的场景下: 当查询时使用此列进行过滤时,不管是AND还是OR操作,该索引都会被利用来提升查询性能。 例如:Filter_Condition(IndexCol1) AND/OR Filter_Condition(IndexCol2) 当查询时使用“索引列AND非索引列”过滤时,此索引会被利用来提升查询性能。 例如:Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(NonIndexCol1) 当查询时使用“索引列OR非索引列”过滤时,此索引将不会被使用,查询性能不会因为索引得到提升。 例如:Filter_Condition(IndexCol1) AND/OR Filter_Condition(IndexCol2) OR Filter_Condition(NonIndexCol1) 针对多个列创建的联合索引场景下: 当查询时使用的列(多个),是联合索引所有对应列的一部分或者全部,且列的顺序与联合索引一致时,此索引会被利用来提升查询性能。 例如,针对C1、C2、C3列创建了联合索引,生效的场景包括: Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) Filter_Condition(IndexCol1) 不生效的场景包括: Filter_Condition(IndexCol2) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol3) Filter_Condition(IndexCol2) Filter_Condition(IndexCol3) 当查询时使用“索引列AND非索引列”过滤时,此索引会被利用来提升查询性能。 例如: Filter_Condition(IndexCol1) AND Filter_Condition(NonIndexCol1) Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2) AND Filter_Condition(NonIndexCol1) 当查询时使用“索引列OR非索引列”过滤时,此索引不会被使用,查询性能不会因为索引得到提升。 例如: Filter_Condition(IndexCol1) OR Filter_Condition(NonIndexCol1) (Filter_Condition(IndexCol1) AND Filter_Condition(IndexCol2))OR ( Filter_Condition(NonIndexCol1)) 当查询时使用多个列进行范围查询时,只有联合索引中最后一个列可指定取值范围,前面的列只能设置为“=”。 例如:针对C1、C2、C3列创建了联合索引,需要进行范围查询时,只能针对C3设置取值范围,过滤条件为“C1=XXX,C2=XXX,C3=取值范围”。 针对添加了二级索引的用户表,可以通过Filter来查询数据,在单列索引和复合列索引上进行过滤查询,查询结果都与无索引结果相同,且其数据查询性能高于无二级索引用户表的数据查询性能。
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“PhoenixSample”类的testSelect方法中。 /** * Select Data */ public void testSelect() { LOG .info("Entering testSelect."); String URL = "jdbc:phoenix:" + conf.get("hbase.zookeeper.quorum"); // Query String querySQL = "SELECT * FROM TEST WHERE id = ?"; Connection conn = null; PreparedStatement preStat = null; Statement stat = null; ResultSet result = null; try { // Create Connection conn = DriverManager.getConnection(url, props); // Create Statement stat = conn.createStatement(); // Create PrepareStatement preStat = conn.prepareStatement(querySQL); // Execute query preStat.setInt(1, 1); result = preStat.executeQuery(); // Get result while (result.next()) { int id = result.getInt("id"); String name = result.getString(1); System.out.println("id: " + id); System.out.println("name: " + name); } LOG.info("Select successfully."); } catch (Exception e) { LOG.error("Select failed.", e); } finally { if (null != result) { try { result.close(); } catch (Exception e2) { LOG.error("Result close failed.", e2); } } if (null != stat) { try { stat.close(); } catch (Exception e2) { LOG.error("Stat close failed.", e2); } } if (null != conn) { try { conn.close(); } catch (Exception e2) { LOG.error("Connection close failed.", e2); } } } LOG.info("Exiting testSelect."); }
  • 代码样例 下面代码片段在com.huawei.hadoop.hbase.example包的“HBaseSample”类的testScanDataByIndex方法中: 样例:使用二级索引查找数据 public void testScanDataByIndex() { LOG.info("Entering testScanDataByIndex."); Table table = null; ResultScanner scanner = null; try { table = conn.getTable(tableName); // Create a filter for indexed column. Filter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("name"), CompareOperator.EQUAL, "Li Gang".getBytes()); Scan scan = new Scan(); scan.setFilter(filter); scanner = table.getScanner(scan); LOG.info("Scan indexed data."); for (Result result : scanner) { for (Cell cell : result.rawCells()) { LOG.info("{}:{},{},{}", Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneFamily(cell)), Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell))); } } LOG.info("Scan data by index successfully."); } catch (IOException e) { LOG.error("Scan data by index failed."); } finally { if (scanner != null) { // Close the scanner object. scanner.close(); } try { if (table != null) { table.close(); } } catch (IOException e) { LOG.error("Close table failed."); } } LOG.info("Exiting testScanDataByIndex."); }
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的testModifyTable方法中 public void testModifyTable() { LOG.info("Entering testModifyTable."); // Specify the column family name. byte[] familyName = Bytes.toBytes("education"); Admin admin = null; try { // Instantiate an Admin object. admin = conn.getAdmin(); // Obtain the table descriptor. TableDescriptor htd = admin.getDescriptor(tableName); // Check whether the column family is specified before modification. if (!htd.hasColumnFamily(familyName)) { // Create the column descriptor. TableDescriptor tableBuilder = TableDescriptorBuilder.newBuilder(htd) .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(familyName).build()).build(); // Disable the table to get the table offline before modifying // the table. admin.disableTable(tableName);//注[1] // Submit a modifyTable request. admin.modifyTable(tableBuilder); // Enable the table to get the table online after modifying the // table. admin.enableTable(tableName); } LOG.info("Modify table successfully."); } catch (IOException e) { LOG.error("Modify table failed " ,e); } finally { if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Close admin failed " ,e); } } } LOG.info("Exiting testModifyTable."); }
  • 回答 由于浏览器所在的计算机IP地址未加到Web访问白名单导致。用户可以通过修改客户端的配置文件“conf/flink-conf.yaml”来解决问题。 确认配置项“jobmanager.web.ssl.enabled”的值是否是“false”,若不是,请修改为“false”。 确认配置项“jobmanager.web.access-control-allow-origin”和“jobmanager.web.allow-access-address”中是否已经添加浏览器所在的计算机IP地址。如果没有添加,可以通过这两项配置项进行添加。例如: jobmanager.web.access-control-allow-origin: 浏览器所在的计算机IP地址jobmanager.web.allow-access-address: 浏览器所在的计算机IP地址
  • 注意事项 当前二级索引不支持使用SubstringComparator类定义的对象作为Filter的比较器。 例如,如下示例中的用法当前不支持: Scan scan = new Scan();filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);filterList.addFilter(new SingleColumnValueFilter(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier),CompareOperator.EQUAL, new SubstringComparator(substring)));scan.setFilter(filterList);
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的testSingleColumnValueFilter方法中。 public void testSingleColumnValueFilter() { LOG.info("Entering testSingleColumnValueFilter."); Table table = null; ResultScanner rScanner = null; try { table = conn.getTable(tableName); Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name")); // Set the filter criteria. SingleColumnValueFilter filter = new SingleColumnValueFilter( Bytes.toBytes("info"), Bytes.toBytes("name"), CompareOperator.EQUAL, Bytes.toBytes("Xu Bing")); scan.setFilter(filter); // Submit a scan request. rScanner = table.getScanner(scan); // Print query results. for (Result r = rScanner.next(); r != null; r = rScanner.next()) { for (Cell cell : r.rawCells()) { LOG.info("{}:{},{},{}", Bytes.toString(CellUtil.cloneRow(cell)), Bytes.toString(CellUtil.cloneFamily(cell)), Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell))); } } LOG.info("Single column value filter successfully."); } catch (IOException e) { LOG.error("Single column value filter failed " ,e); } finally { if (rScanner != null) { // Close the scanner object. rScanner.close(); } if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed " ,e); } } } LOG.info("Exiting testSingleColumnValueFilter."); }
  • 样例工程运行依赖包参考信息 Flink客户端lib目录、opt目录中都有flink jar包,其中lib目录中默认是flink核心jar包,opt目录中是对接外部组件的jar包(例如flink-connector-kafka*.jar),若应用开发中需要请手动复制相关jar包到lib目录中。 针对Flink提供的几个样例工程,其对应的运行依赖包如下: 表1 样例工程运行依赖包 样例工程 依赖包 依赖包获取地址 DataStream程序 异步Checkpoint机制程序 flink-dist_*.jar 可在Flink的客户端或者服务端安装路径的lib目录下获取。 使用Flink Jar提交SQL作业程序 FlinkServer REST API程序 flink-dist_*.jar flink-table_*.jar 可在Flink的客户端或者服务端安装路径的lib目录下获取。 向Kafka生产并消费数据程序 kafka-clients-*.jar flink-connector-kafka_*.jar kafka-clients-*.jar由Kafka组件发布提供,可在Kafka组件客户端或者服务端安装路径下的lib目录下获取。 flink-connector-kafka_*.jar可在Flink客户端或者服务端安装路径的opt目录下获取。 pipeline程序 flink-connector-netty_*.jar flink-dist_*.jar flink-connector-netty_*.jar可在二次开发样例代码编译后产生的lib文件夹下获取。 flink-dist_*.jar可在Flink的客户端或者服务端安装路径的lib目录下获取。 Stream SQL Join程序 kafka-clients-*.jar flink-connector-kafka_*.jar flink-dist_*.jar flink-table_*.jar kafka-clients-*.jar由Kafka组件发布提供,可在Kafka组件客户端或者服务端安装路径下的lib目录下获取。 flink-connector-kafka_*.jar可在Flink客户端或者服务端安装路径的opt目录下获取。 flink-dist_*.jar、flink-table_*.jar可在Flink的客户端或者服务端安装路径的lib目录下获取。 Flink读写HBase程序 flink-connector-hbase*.jar flink-dist_*.jar flink-table_*.jar hbase-clients-*.jar flink-connector-hbase_*.jar可在Flink客户端或者服务端安装路径的opt目录下获取。 flink-dist_*.jar、flink-table_*.jar可在Flink的客户端或者服务端安装路径的lib目录下获取。 hbase-clients-*.jar由HBase组件发布提供,可在HBase组件客户端或者服务端安装路径下的lib目录下获取。 Flink读写Hudi程序 hbase-unsafe-*.jar 可在二次开发样例代码编译后产生的lib文件夹下获取。
  • Hive应用开发流程介绍 开发流程中各阶段的说明如图1和表1所示。 图1 Hive应用程序开发流程 表1 Hive应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解Hive的基本概念。 常用概念 准备开发和运行环境 Hive的应用程序支持使用Java、Python两种语言进行开发。推荐使用IntelliJ IDEA工具,请根据指导完成不同语言的开发环境配置。 准备Hive应用开发环境 根据场景开发工程 提供了Java、Python两种不同语言的样例工程,还提供了从建表、数据加载到数据查询的样例工程。 开发Hive应用 运行程序及查看结果 指导用户将开发好的程序编译提交运行并查看结果。 调测Hive应用 父主题: Hive开发指南(普通模式)
  • Python3样例工程的命令行形式运行 赋予“python3-examples”文件夹中脚本的可执行权限。在命令行终端执行以下命令: chmod +x python3-examples -R。 在python3-examples/pyCLI_nosec.py中的host的值修改为安装HiveServer的节点的业务平面IP,port的值修改为Hive提供Thrift服务的端口(hive.server2.thrift.port),默认值为“10000”。 执行以下命令运行Python3客户端: cd python3-examples python pyCLI_nosec.py 在命令行终端查看样例代码中的HQL所查询出的结果。例如: [['default', '']] [{'comment': 'from deserializer', 'columnName': 'tab_name', 'type': 'STRING_TYPE'}] ['xx']
  • HetuEngine应用开发流程介绍 开发流程中各阶段的说明如图1所示: 图1 HetuEngine应用程序开发流程 表1 HetuEngine应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解HetuEngine的基本概念,了解场景需求等。 HetuEngine基本概念 准备开发和运行环境 HetuEngine的应用程序支持使用任何语言调用JDBC接口进行开发,当前样例主要是java语言。推荐使用IDEA工具,请根据指导完成不同语言的开发环境配置。HetuEngine的运行环境即客户端,请根据指导完成客户端的安装和配置。 准备本地应用开发环境 准备工程 HetuEngine提供了不同场景下的样例程序,您可以导入样例工程进行程序学习。或者您可以根据指导,新建一个HetuEngine工程。 导入并配置HetuEngine样例工程 根据场景开发工程 提供了Java语言的样例工程,包括连接HetuEngine、SQL语句执行、结果解析,断开连接等全流程的样例工程。 开发HetuEngine应用 编译并运行程序 指导用户将开发好的程序编译并提交运行。 调测HetuEngine应用 查看程序运行结果 程序运行结果会根据结果解析部分的实现显示到期望显示的地方。 调测HetuEngine应用 父主题: HetuEngine开发指南(普通模式)
  • 配置样例代码 在开发环境IntelliJ IDEA中,单击“src/springboot/hbase-examples/src/main/resources”目录下的“springclient.properties”文件,按需修改如下表1中提供的参数: 表1 配置说明表 配置名称 含义 conf.path HBase配置文件所在目录,即“hbase-example\src\main\resources\conf”。
  • HetuEngine基本概念 HSBroker:HetuEngine的服务代理,用作用户租户管理校验,HetuEngine访问URL的获取等。 Coordinator:HetuEngine服务的资源协调者,负责SQL解析和优化等事务。 Worker:负责执行任务和处理数据。 Connector:HetuEngine访问数据库的接口,HetuEngine通过Connector的驱动连接数据源,读取数据源元数据和对数据进行增删改查等操作。 Catalog:HetuEngine中一个catalog配置文件对应一个数据源,一个数据源可以有多个不同catalog配置,可以通过数据源的properties文件进行配置。 Schema:对应数据库的Schema名称。 Table:对应数据库的表名。
  • HetuEngine连接方式说明 表1 HetuEngine连接方式说明 连接方式 是否支持用户名密码认证方式 是否支持Keytab认证方式 是否支持客户端跨网段访问 使用前提 HSFabric 是 是 是 确保业务侧和HetuServer服务端HSFabric所在业务节点网络互通 适用于双平面的网络场景 只需对外开放HSFabric固定的IP,端口 支持范围: MRS 3.1.3及之后版本 HSBroker 是 否 否 确保业务侧和HetuServer服务端HSBroker、Coordinator(随机分布在Yarn NodeManger)所在业务节点网络互通 需对外开放Coordinator的IP,端口 支持范围:MRS 3.1.0及之后版本
  • 代码样例 以下代码片段在“hbase-zk-example\src\main\java\com\huawei\hadoop\hbase\example”包的“TestZKSample”类中,用户主要需要关注“login”和“connectApacheZK”这两个方法。 private static void login(String keytabFile, String principal) throws IOException { conf = HBaseConfiguration.create(); //In Windows environment String confDirPath = TestZKSample.class.getClassLoader().getResource("").getPath() + File.separator;[1] //In Linux environment //String confDirPath = System.getProperty("user.dir") + File.separator + "conf" + File.separator; // Set zoo.cfg for hbase to connect to fi zookeeper. conf.set("hbase.client.zookeeper.config.path", confDirPath + "zoo.cfg"); if (User.isHBaseSecurityEnabled(conf)) { // jaas.conf file, it is included in the client pakcage file System.setProperty("java.security.auth.login.config", confDirPath + "jaas.conf"); // set the kerberos server info,point to the kerberosclient System.setProperty("java.security.krb5.conf", confDirPath + "krb5.conf"); // set the keytab file name conf.set("username.client.keytab.file", confDirPath + keytabFile); // set the user's principal try { conf.set("username.client.kerberos.principal", principal); User.login(conf, "username.client.keytab.file", "username.client.kerberos.principal", InetAddress.getLocalHost().getCanonicalHostName()); } catch (IOException e) { throw new IOException("Login failed.", e); } } } private void connectApacheZK() throws IOException, org.apache.zookeeper.KeeperException { try { // Create apache zookeeper connection. ZooKeeper digestZk = new ZooKeeper("127.0.0.1:2181", 60000, null); LOG.info("digest directory:{}", digestZk.getChildren("/", null)); LOG.info("Successfully connect to apache zookeeper."); } catch (InterruptedException e) { LOG.error("Found error when connect apache zookeeper ", e); } }
  • 通过Python API的方式提交Flink读写Kafka作业到Yarn上代码样例 下面列出pyflink-kafka.py的主要逻辑代码作为演示,在提交之前需要确保“file_path” 为要运行的SQL的路径,建议写全路径。 完整代码参见“flink-examples/pyflink-example/pyflink-kafka”中的“pyflink-kafka.py”。 import osimport loggingimport sysfrom pyflink.common import JsonRowDeserializationSchema, JsonRowSerializationSchemafrom pyflink.common.typeinfo import Typesfrom pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumerfrom pyflink.datastream import StreamExecutionEnvironmentfrom pyflink.table import TableEnvironment, EnvironmentSettingsdef read_sql(file_path): if not os.path.isfile(file_path): raise TypeError(file_path + " does not exist") all_the_text = open(file_path).read() return all_the_textdef exec_sql(): # 提交前修改sql路径 # file_path = "/opt/client/Flink/flink/insertData2kafka.sql" # file_path = os.getcwd() + "/../../../../yarnship/insertData2kafka.sql" # file_path = "/opt/client/Flink/flink/conf/ssl/insertData2kafka.sql" file_path = "insertData2kafka.sql" sql = read_sql(file_path) t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) statement_set = t_env.create_statement_set() sqlArr = sql.split(";") for sqlStr in sqlArr: sqlStr = sqlStr.strip() if sqlStr.lower().startswith("create"): print("---------create---------------") print(sqlStr) t_env.execute_sql(sqlStr) if sqlStr.lower().startswith("insert"): print("---------insert---------------") print(sqlStr) statement_set.add_insert_sql(sqlStr) statement_set.execute()def read_write_kafka(): # find kafka connector jars env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) specific_jars = "file:///opt/client/Flink/flink/lib/flink-connector-kafka-xxx.jar" # specific_jars = "file://" + os.getcwd() + "/../../../../yarnship/flink-connector-kafka-xxx.jar" # specific_jars = "file:///opt/client/Flink/flink/conf/ssl/flink-connector-kafka-xxx.jar" # the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues env.add_jars(specific_jars) kafka_properties = {'bootstrap.servers': '192.168.20.162:21005', 'group.id': 'test_group'} deserialization_schema = JsonRowDeserializationSchema.builder() \ .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build() kafka_consumer = FlinkKafkaConsumer( topics='test_source_topic', deserialization_schema=deserialization_schema, properties=kafka_properties) print("---------read ---------------") ds = env.add_source(kafka_consumer) serialization_schema = JsonRowSerializationSchema.builder().with_type_info( type_info=Types.ROW([Types.INT(), Types.STRING()])).build() kafka_producer = FlinkKafkaProducer( topic='test_sink_topic', serialization_schema=serialization_schema, producer_config=kafka_properties) print("--------write------------------") ds.add_sink(kafka_producer) env.execute("pyflink kafka test")if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") print("------------------insert data to kafka----------------") exec_sql() print("------------------read_write_kafka----------------") read_write_kafka() 表1 使用Python提交普通作业参数说明 参数 说明 示例 bootstrap.servers Kafka的Broker实例业务IP和端口。 192.168.12.25:21005 specific_jars “客户端安装目录/Flink/flink/lib/flink-connector-kafka-*.jar”包路径,建议写全路径。 说明: 当作业需要以yarn-application模式提交时,需替换如下路径,jar包版本号请以实际为准: specific_jars="file://"+os.getcwd()+"/../../../../yarnship/flink-connector-kafka-1.15.0-h0.cbu.mrs.330.r13.jar" specific_jars = file:///客户端安装目录/Flink/flink/lib/flink-connector-kafka-1.15.0-h0.cbu.mrs.330.r13.jar file_path “insertData2kafka.sql”文件路径,建议写全路径。需在二次样例代码中获取并上传至客户端指定目录。 说明: 当作业需要以yarn-application模式提交时,需替换如下路径: file_path = os.getcwd() + "/../../../../yarnship/insertData2kafka.sql" file_path = /客户端安装目录/Flink/flink/insertData2kafka.sql SQL示例: create table kafka_sink_table ( age int, name varchar(10)) with ( 'connector' = 'kafka', 'topic' = 'test_source_topic', --写入Kafka的topic名称,需确保与上述Python文件中的topic相同 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'test_group', 'format' = 'json');create TABLE datagen_source_table ( age int, name varchar(10)) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1');INSERT INTO kafka_sink_tableSELECT *FROM datagen_source_table;
  • 通过Python API的方式提交Flink SQL作业到Yarn上代码样例 下面列出pyflink-sql.py的主要逻辑代码作为演示,在提交之前需要确保“file_path” 为要运行的SQL的路径,建议写全路径。 完整代码参见“flink-examples/pyflink-example/pyflink-sql”中的“pyflink-sql.py”。 import loggingimport sysimport osfrom pyflink.table import (EnvironmentSettings, TableEnvironment)def read_sql(file_path): if not os.path.isfile(file_path): raise TypeError(file_path + " does not exist") all_the_text = open(file_path).read() return all_the_textdef exec_sql(): # 提交之前修改SQL路径 file_path = "datagen2kafka.sql" sql = read_sql(file_path) t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) statement_set = t_env.create_statement_set() sqlArr = sql.split(";") for sqlStr in sqlArr: sqlStr = sqlStr.strip() if sqlStr.lower().startswith("create"): print("---------create---------------") print(sqlStr) t_env.execute_sql(sqlStr) if sqlStr.lower().startswith("insert"): print("---------insert---------------") print(sqlStr) statement_set.add_insert_sql(sqlStr) statement_set.execute()if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") exec_sql() 表2 使用Python提交SQL作业参数说明 参数 说明 示例 file_path “datagen2kafka.sql”文件路径,建议写全路径。需在二次样例代码中获取并上传至客户端指定目录。 说明: 当作业需要以yarn-application模式提交时,需替换如下路径: file_path = os.getcwd() + "/../../../../yarnship/datagen2kafka.sql" file_path = /客户端安装目录/Flink/flink/datagen2kafka.sql SQL示例: create table kafka_sink ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20)) with ( 'connector' = 'kafka', 'topic' = 'input2', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup2', 'scan.startup.mode' = 'latest-offset', 'format' = 'json');create TABLE datagen_source ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20)) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1');INSERT INTO kafka_sinkSELECT *FROM datagen_source;
  • 数据规划 发布者Job使用自定义算子每秒钟产生10000条数据 数据包含两个属性:分别是Int和String类型 配置文件 nettyconnector.registerserver.topic.storage:设置NettySink的IP、端口及并发度信息在第三方注册服务器上的路径(必填),例如: nettyconnector.registerserver.topic.storage: /flink/nettyconnector nettyconnector.sinkserver.port.range:设置NettySink的端口范围(必填),例如: nettyconnector.sinkserver.port.range: 28444-28943 nettyconnector.sinkserver.subnet:设置网络所属域,例如: nettyconnector.sinkserver.subnet: 10.162.0.0/16 接口说明 注册服务器接口 注册服务器用来保存NettySink的IP、端口以及并发度信息,以便NettySource连接使用。为用户提供以下接口: public interface RegisterServerHandler { /** * 启动注册服务器 * @param configuration Flink的Configuration类型 */void start(Configuration configuration) throws Exception;/** *注册服务器上创建Topic节点(目录) * @param topic topic节点名称 */void createTopicNode(String topic) throw Exception;/***将信息注册到某个topic节点(目录)下* @param topic 需要注册到的目录* @param registerRecord 需要注册的信息*/void register(String topic, RegisterRecord registerRecord) throws Exception;/** *删除topic节点 * @param topic 待删除topic */ void deleteTopicNode(String topic) throws Exception;/** *注销注册信息 *@param topic 注册信息所在的topic *@param recordId 待注销注册信息ID */void unregister(String topic, int recordId) throws Exception;/** * 查询信息* @param 查询信息所在的topic*@recordId 查询信息的ID*/RegisterRecord query(String topic, int recordId) throws Exception;/** * 查询某个Topic是否存在 * @param topic */Boolean isExist(String topic) throws Exception;/** *关闭注册服务器句柄 */void shutdown() throws Exception; 工程基于以上接口提供了ZookeeperRegisterHandler供用户使用。 NettySink算子 Class NettySink(String name, String topic, RegisterServerHandler registerServerHandler,int numberOfSubscribedJobs) name:为本NettySink的名称。 topic:为本NettySink产生数据的Topic,每个不同的NettySink(并发度除外)必须使用不同的TOPIC,否则会引起订阅混乱,数据无法正常分发。 registerServerHandler:为注册服务器的句柄。 numberOfSubscribedJobs:为订阅本NettySink的作业数量,该数量必须是明确的,只有当所有订阅者都连接上NettySink,NettySink才发送数据。 NettySource算子 Class NettySource(String name,String topic,RegisterServerHandler registerServerHandler) name:为本NettySource的名称,该NettySource必须是唯一的(并发度除外),否则,连接NettySink时会出现冲突,导致无法连接。 topic:订阅的NettySink的topic。 registerServerHandler:为注册服务器的句柄。 NettySource的并发度必须与NettySource的并发度相同,否则无法正常创建连接。
  • 简介 Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。 Flink最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。 Flink技术栈如图1所示。 图1 Flink技术栈 Flink在当前版本中重点构建如下特性,其他特性继承开源社区,不做增强。 DataStream Checkpoint 窗口 Job Pipeline 配置表
  • Flink基本概念 DataStream 数据流,是指Flink系统处理的最小数据单元。该数据单元最初由外部系统导入,可以通过Socket、Kafka和文件等形式导入,在Flink系统处理后,通过Socket、Kafka和文件等输出到外部系统,这是Flink的核心概念。 Data Transformation 数据处理单元,会将一或多个DataStream转换成一个新的DataStream。 具体可以细分如下几类: 一对一的转换:如Map。 一对0、1或多个的转换:如FlatMap。 一对0或1的转换,如Filter。 多对1转换,如Union。 多个聚合的转换,如window、keyby。 CheckPoint CheckPoint是Flink数据处理高可靠、最重要的机制。该机制可以保证应用在运行过程中出现失败时,应用的所有状态能够从某一个检查点恢复,保证数据仅被处理一次(Exactly Once)。 SavePoint Savepoint是指允许用户在持久化存储中保存某个checkpoint,以便用户可以暂停自己的任务进行升级。升级完后将任务状态设置为savepoint存储的状态开始恢复运行,保证数据处理的延续性。
  • 架构 Flink架构如图2所示。 图2 Flink架构 Flink整个系统包含三个部分: Client Flink Client主要给用户提供向Flink系统提交用户任务(流式作业)的能力。 TaskManager Flink系统的业务执行节点,执行具体的用户任务。TaskManager可以有多个,各个TaskManager都平等。 JobManager Flink系统的管理节点,管理所有的TaskManager,并决策用户任务在哪些Taskmanager执行。JobManager在HA模式下可以有多个,但只有一个主JobManager。 Flink系统提供的关键能力: 低时延 提供ms级时延的处理能力。 Exactly Once 提供异步快照机制,保证所有数据真正只处理一次。 HA JobManager支持主备模式,保证无单点故障。 水平扩展能力 TaskManager支持手动水平扩展。
  • Flink开发接口简介 Flink DataStream API提供Scala和Java两种语言的开发方式,如表1所示。 表1 Flink DataStream API接口 功能 说明 Scala API 提供Scala语言的API,提供过滤、join、窗口、聚合等数据处理能力。由于Scala语言的简洁易懂,推荐用户使用Scala接口进行程序开发。 Java API 提供Java语言的API,提供过滤、join、窗口、聚合等数据处理能力。
  • 样例工程介绍 MRS样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下Flink相关样例工程,安全模式路径为“flink-examples/flink-examples-security”,普通模式路径为“flink-examples/flink-examples-normal”: 表2 Flink相关样例工程 样例工程 描述 FlinkCheckpointJavaExample 异步Checkpoint机制程序的应用开发示例。 假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,并做到状态严格一致性,即:当应用出现异常并恢复后,各个算子的状态能够处于统一的状态。 相关业务场景介绍请参见Flink开启Checkpoint样例程序。 FlinkCheckpointScalaExample FlinkHBaseJavaExample 通过Flink API作业读写HBase数据的应用开发示例。 相关业务场景介绍请参见Flink读取HBase表样例程序。 FlinkHudiJavaExample 通过Flink API作业读写Hudi数据的应用开发示例。 相关业务场景介绍请参见Flink读取Hudi表样例程序。 FlinkKafkaJavaExample 向Kafka生产并消费数据程序的应用开发示例。 通过调用flink-connector-kafka模块的接口,生产并消费数据。 相关业务场景介绍请参见Flink Kafka样例程序。 FlinkKafkaScalaExample FlinkPipelineJavaExample Job Pipeline程序的应用开发示例。 相关业务场景介绍请参见Flink Job Pipeline样例程序。 发布者Job自己每秒钟产生10000条数据,然后经由该job的NettySink算子向下游发送。另外两个Job作为订阅者,分别订阅一份数据并打印输出。 FlinkPipelineScalaExample FlinkRESTAPIJavaExample 调用FlinkServer的RestAPI创建租户的应用开发示例。 相关业务场景介绍请参见FlinkServer REST API样例程序。 FlinkStreamJavaExample DataStream程序的应用开发示例。 相关业务场景介绍请参见Flink DataStream样例程序。 假定用户有某个网站周末网民网购停留时间的日志文本,另有一张网民个人信息的csv格式表,可通过Flink应用程序实现例如实时统计总计网购时间超过2个小时的女性网民信息,包含对应的个人详细信息的功能。 FlinkStreamScalaExample FlinkStreamSqlJoinExample Stream SQL Join程序的应用开发示例。 相关业务场景介绍请参见Flink Join样例程序。 假定某个Flink业务1每秒就会收到1条消息记录,消息记录某个用户的基本信息,包括名字、性别、年龄。另有一个Flink业务2会不定时收到1条消息记录,消息记录该用户的名字、职业信息。实现实时的以根据业务2中消息记录的用户名字作为关键字,对两个业务数据进行联合查询的功能。 FlinkStreamSqlJoinScalaExample flink-sql 使用客户端通过jar作业提交SQL作业的应用开发示例。 相关业务场景介绍请参见Flink Jar作业提交SQL样例程序。 pyflink-example 提供Python读写Kafka作业和Python提交SQL作业的样例。 相关业务场景介绍请参见PyFlink样例程序。
  • 数据规划 创建HBase表,构造数据,列需要包含key,modify_time,valid。其中每条数据key值全表唯一,modify_time代表修改时间,valid代表是否为有效数据(该样例中'1'为有效,'0'为无效数据)。 示例:进入hbase shell,执行如下命令: create 'hbase_table','key','info' put 'hbase_table','1','info:modify_time','2019-11-22 23:28:39' put 'hbase_table','1','info:valid','1' put 'hbase_table','2','info:modify_time','2019-11-22 23:28:39' put 'hbase_table','2','info:valid','1' put 'hbase_table','3','info:modify_time','2019-11-22 23:28:39' put 'hbase_table','3','info:valid','0' put 'hbase_table','4','info:modify_time','2019-11-22 23:28:39' put 'hbase_table','4','info:valid','1' 上述数据的modify_time列可设置为当前时间之前的值。 put 'hbase_table','5','info:modify_time','2021-03-03 15:20:39' put 'hbase_table','5','info:valid','1' put 'hbase_table','6','info:modify_time','2021-03-03 15:20:39' put 'hbase_table','6','info:valid','1' put 'hbase_table','7','info:modify_time','2021-03-03 15:20:39' put 'hbase_table','7','info:valid','0' put 'hbase_table','8','info:modify_time','2021-03-03 15:20:39' put 'hbase_table','8','info:valid','1' put 'hbase_table','4','info:valid','0' put 'hbase_table','4','info:modify_time','2021-03-03 15:20:39' 上述数据的modify_time列可设置为样例程序启动后30分钟内的时间值(此处的30分钟为样例程序默认的同步间隔时间,可修改)。 put 'hbase_table','9','info:modify_time','2021-03-03 15:32:39' put 'hbase_table','9','info:valid','1' put 'hbase_table','10','info:modify_time','2021-03-03 15:32:39' put 'hbase_table','10','info:valid','1' put 'hbase_table','11','info:modify_time','2021-03-03 15:32:39' put 'hbase_table','11','info:valid','0' put 'hbase_table','12','info:modify_time','2021-03-03 15:32:39' put 'hbase_table','12','info:valid','1' 上述数据的modify_time列可设置为样例程序启动后30分钟到60分钟内的时间值,即第二次同步周期。 在sparksql中创建HBase的hive外表,命令如下: create table external_hbase_table(key string ,modify_time STRING, valid STRING) using org.apache.spark.sql.hbase.HBaseSource options(hbaseTableName "hbase_table", keyCols "key", colsMapping "modify_time=info.modify_time,valid=info.valid"); 在sparksql中创建CarbonData表: create table carbon01(key string,modify_time STRING, valid STRING) stored as carbondata; 初始化加载当前hbase表中所有数据到CarbonData表; insert into table carbon01 select * from external_hbase_table where valid='1'; 用spark-submit提交命令: spark-submit --master yarn --deploy-mode client --keytab /opt/FIclient/user.keytab --principal sparkuser --class com.huawei.bigdata.spark.examples.HBaseExternalHivetoCarbon /opt/example/HBaseExternalHivetoCarbon-1.0.jar
  • 打包项目 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/user.keytab”,“/opt/krb5.conf”。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt/” )下。
  • Spark Streaming常用接口 Spark Streaming中常见的类有: pyspark.streaming.StreamingContext:是Spark Streaming功能的主入口,负责提供创建DStreams的方法,入参中需要设置批次的时间间隔。 pyspark.streaming.DStream:是一种代表RDDs连续序列的数据类型,代表连续数据流。 dsteam.PariDStreamFunctions:键值对的DStream,常见的操作如groupByKey和reduceByKey。 对应的Spark Streaming的JAVA API是JavaStreamingContext,JavaDStream和JavaPairDStream。 Spark Streaming的常见方法与Spark Core类似,下表罗列了Spark Streaming特有的一些方法。 表3 Spark Streaming常用接口介绍 方法 说明 socketTextStream(hostname, port, storageLevel) 从TCP源主机:端口创建一个输入流。 start() 启动Spark Streaming计算。 awaitTermination(timeout) 当前进程等待终止,如Ctrl+C等。 stop(stopSparkContext, stopGraceFully) 终止Spark Streaming计算,stopSparkContext用于判断是否需要终止相关的SparkContext,StopGracefully用于判断是否需要等待所有接收到的数据处理完成。 UpdateStateByKey(func) 更新DStream的状态。使用此方法,需要定义State和状态更新函数。 window(windowLength, slideInterval) 根据源DStream的窗口批次计算得到一个新的DStream。 countByWindow(windowLength, slideInterval) 返回流中滑动窗口元素的个数。 reduceByWindow(func, windowLength, slideInterval) 当调用在DStream的KV对上,返回一个新的DStream的KV对,其中每个Key的Value根据滑动窗口中批次的reduce函数聚合得到。 join(other,numPartitions) 实现不同的Spark Streaming之间做合并操作。
  • SparkSQL常用接口 Spark SQL中在Python中重要的类有: pyspark.sql.SQLContext:是Spark SQL功能和DataFrame的主入口。 pyspark.sql.DataFrame:是一个以命名列方式组织的分布式数据集。 pyspark.sql.HiveContext:获取存储在Hive中数据的主入口。 pyspark.sql.DataFrameStatFunctions:统计功能中一些函数。 pyspark.sql.functions:DataFrame中内嵌的函数。 pyspark.sql.Window:sql中提供窗口功能。 表4 Spark SQL常用的Action 方法 说明 collect() 返回一个数组,包含DataFrame的所有列。 count() 返回DataFrame中的行数。 describe() 计算统计信息,包含计数,平均值,标准差,最小值和最大值。 first() 返回第一行。 head(n) 返回前n行。 show() 用表格形式显示DataFrame。 take(num) 返回DataFrame中的前num行。 表5 基本的DataFrame Functions 方法 说明 explain() 打印出SQL语句的逻辑计划和物理计划。 printSchema() 打印schema信息到控制台。 registerTempTable(name) 将DataFrame注册为一张临时表,命名为name,其周期和SQLContext绑定在一起。 toDF() 返回一个列重命名的DataFrame。
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala 版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkGetExample SparkOnHbaseJavaExample.jar bulktable python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample.jar HBaseBulkGetExample.py bulktable yarn-cluster模式: java/scala 版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkGetExample --files /opt/user.keytab,/opt/krb5.conf SparkOnHbaseJavaExample.jar bulktable python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode cluster --files /opt/user.keytab,/opt/krb5.conf --jars SparkOnHbaseJavaExample.jar HBaseBulkGetExample.py bulktable
  • 场景说明 假定一个广告业务,存在广告请求事件、广告展示事件、广告点击事件,广告主需要实时统计有效的广告展示和广告点击数据。 已知: 终端用户每次请求一个广告后,会生成广告请求事件,保存到kafka的adRequest topic中。 请求一个广告后,可能用于多次展示,每次展示,会生成广告展示事件,保存到kafka的adShow topic中。 每个广告展示,可能会产生多次点击,每次点击,会生成广告点击事件,保存到kafka的adClick topic中。 广告有效展示的定义如下: 请求到展示的时长超过A分钟算无效展示。 A分钟内多次展示,每次展示事件为有效展示。 广告有效点击的定义如下: 展示到点击时长超过B分钟算无效点击。 B分钟内多次点击,仅首次点击事件为有效点击。 基于此业务场景,模拟简单的数据结构如下: 广告请求事件 数据结构:adID^reqTime 广告展示事件 数据结构:adID^showID^showTime 广告点击事件 数据结构:adID^showID^clickTime 数据关联关系如下: 广告请求事件与广告展示事件通过adID关联。 广告展示事件与广告点击事件通过adID+showID关联。 数据要求: 数据从产生到到达流处理引擎的延迟时间不超过2小时 广告请求事件、广告展示事件、广告点击事件到达流处理引擎的时间不能保证有序和时间对齐
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上(文件上传的路径需要和生成的jar包路径一致)。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全