云服务器内容精选
-
写数据至多个Sink表 EXECUTE STATEMENT SET BEGIN ... END; 是写数据至多个Sink表的必填语句,用于定义在同一个作业中执行多个插入数据的操作。 写数据至多个Sink表时,EXECUTE STATEMENT SET BEGIN ... END;是必填项。 语法格式 1 2 3 4 5 6 7 8 9 10 11 EXECUTE STATEMENT SET BEGIN -- 第一个DML语句 INSERT INTO your_sink1 SELECT ... FROM your_source WHERE ...; -- 第二个DML语句 INSERT INTO your_sink2 SELECT ... FROM your_source WHERE ... ... END; 示例 本例定义了源表datagen_source、Sink表print_sinkA和print_sinkB。然后使用EXECUTE STATEMENT执行两个INSERT INTO语句,分将转换后的数据写入两个不同的sink。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 --使用 datagen connector创建源表 datagen_source CREATE TABLE datagen_source ( name VARCHAR, age BIGINT ) WITH ( 'connector' = 'datagen' ); --使用print connector创建结果表 print_sinkA 和 print_sinkB CREATE TABLE print_sinkA( name VARCHAR, age BIGINT ) WITH ( 'connector' = 'print' ); CREATE TABLE print_sinkB( name VARCHAR, age BIGINT ) WITH ( 'connector' = 'print' ); --使用 EXECUTE STATEMENT SET BEGIN来执行两个 INSERT INTO 语句。 --第一个INSERT INTO语句将datagen_source表中的数据按需转换后写入 print_sinkA。 --第二个 INSERT INTO 语句将数据按需转换后写入 print_sinkB。。 EXECUTE STATEMENT SET BEGIN INSERT INTO print_sinkA SELECT UPPER(name), min(age) FROM datagen_source GROUP BY UPPER(name); INSERT INTO print_sinkB SELECT LOWER(name), max(age) FROM datagen_source GROUP BY LOWER(name); END;
-
写数据至一个Sink表 语法格式 1 2 INSERT INTO your_sink SELECT ... FROM your_source WHERE ... 示例 本例定义了两个表my_source 和my_sink,并使用INSERT INTO语句source表选择数据并插入到sink表。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 --使用datagen connector创建源表my_source CREATE TABLE my_source ( name VARCHAR, age BIGINT ) WITH ( 'connector' = 'datagen'); --使用jdbc connector创建目标表my_sink CREATE TABLE my_sink ( name VARCHAR, age BIGINT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://xxx/your-database', 'table-name' = 'your-table', 'username' = 'your-username', 'password' = 'your-password' ); --使用INSERT INTO语句从my_source表选择数据,并插入到my_sink表 INSERT INTO my_sink SELECT name, age FROM my_source;
-
示例 从Kafka源表中读取数据,将DWS表作为维表,并将二者生成的宽表信息写入Kafka结果表中,其具体步骤如下: 参考增强型跨源连接,在 DLI 上根据DWS和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。 设置DWS和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据DWS和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。 连接DWS数据库实例,在DWS中创建相应的表,作为维表,表名为area_info,SQL语句如下: create table public.area_info( area_id VARCHAR, area_province_name VARCHAR, area_city_name VARCHAR, area_county_name VARCHAR, area_street_name VARCHAR, region_name VARCHAR); 连接DWS数据库实例,向DWS维表area_info中插入测试数据,其语句如下: insert into area_info (area_id, area_province_name, area_city_name, area_county_name, area_street_name, region_name) values ('330102', 'a1', 'b1', 'c1', 'd1', 'e1'), ('330106', 'a1', 'b1', 'c2', 'd2', 'e1'), ('330108', 'a1', 'b1', 'c3', 'd3', 'e1'), ('330110', 'a1', 'b1', 'c4', 'd4', 'e1'); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。该作业脚本将Kafka作为数据源,DWS作为维表,数据输出到Kafka结果表中。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaSourceTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'dws-order', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); --创建地址维表 create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) WITH ( 'connector' = 'gaussdb', 'driver' = 'org.postgresql.Driver', 'url' = 'jdbc:postgresql://DwsAddress:DwsPort/DwsDbName', 'table-name' = 'area_info', 'username' = 'DwsUserName', 'password' = 'DwsPassword', 'lookup.cache.max-rows' = '10000', 'lookup.cache.ttl' = '2h' ); --根据地址维表生成详细的包含地址的订单信息宽表 create table order_detail( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) with ( 'connector' = 'kafka', 'topic' = 'KafkaSinkTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'format' = 'json' ); insert into order_detail select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name, area.area_id, area.area_province_name, area.area_city_name, area.area_county_name, area.area_street_name, area.region_name from orders left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id; 连接Kafka集群,向kafka中source topic中插入如下测试数据: {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} 连接Kafka集群,读取kafka中sink topic中数据,结果参考如下: {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106","area_province_name":"a1","area_city_name":"b1","area_county_name":"c2","area_street_name":"d2","region_name":"e1"} {"order_id":"202103251202020001","order_channel":"miniAppShop","order_time":"2021-03-25 12:02:02","pay_amount":60.0,"real_pay":60.0,"pay_time":"2021-03-25 12:03:00","user_id":"0002","user_name":"Bob","area_id":"330110","area_province_name":"a1","area_city_name":"b1","area_county_name":"c4","area_street_name":"d4","region_name":"e1"} {"order_id":"202103251505050001","order_channel":"qqShop","order_time":"2021-03-25 15:05:05","pay_amount":500.0,"real_pay":400.0,"pay_time":"2021-03-25 15:10:00","user_id":"0003","user_name":"Cindy","area_id":"330108","area_province_name":"a1","area_city_name":"b1","area_county_name":"c3","area_street_name":"d3","region_name":"e1"}
-
常见问题 Q:若Flink作业日志中有如下报错信息,应该怎么解决? java.io.IOException: unable to open JDBC writer ... Caused by: org.postgresql.util.PSQLException: The connection attempt failed. ... Caused by: java.net.SocketTimeoutException: connect timed out A:应考虑是跨源没有绑定,或者跨源没有绑定成功。 参考增强型跨源连接章节,重新配置跨源。参考DLI跨源连接DWS失败进行问题排查。 Q:如果该DWS表在某schema下,则应该如何配置? A:如下示例是使用schema为dbuser2下的表area_info: --创建地址维表 create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) WITH ( 'connector' = 'gaussdb', 'driver' = 'org.postgresql.Driver', 'url' = 'jdbc:postgresql://DwsAddress:DwsPort/DwsDbname', 'table-name' = 'dbuser2.area_info', 'username' = 'DwsUserName', 'password' = 'DwsPassword', 'lookup.cache.max-rows' = '10000', 'lookup.cache.ttl' = '2h' );
-
参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 说明 connector 是 无 String connector类型,需配置为'gaussdb'。 url 是 无 String jdbc连接地址。 使用gsjdbc4驱动连接时,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。 使用gsjdbc200驱动连接时,格式为:jdbc:gaussdb://${ip}:${port}/${dbName}。 table-name 是 无 String 读取数据库中的数据所在的表名。 driver 否 无 String jdbc连接驱动,默认为: org.postgresql.Driver。 使用gsjdbc4驱动连接时,加载的数据库驱动类为:org.postgresql.Driver。 使用gsjdbc200驱动连接时,加载的数据库驱动类为:com.huawei.gauss200.jdbc.Driver。 username 否 无 String 数据库认证用户名,需要和'password'一起配置。 password 否 无 String 数据库认证密码,需要和'username'一起配置。 scan.partition.column 否 无 String 用于对输入进行分区的列名。 与scan.partition.lower-bound、scan.partition.upper-bound、scan.partition.num必须同时存在或者同时不存在。 scan.partition.lower-bound 否 无 Integer 第一个分区的最小值。 与scan.partition.column、scan.partition.upper-bound、scan.partition.num必须同时存在或者同时不存在。 scan.partition.upper-bound 否 无 Integer 最后一个分区的最大值。 与scan.partition.column、scan.partition.lower-bound、scan.partition.num必须同时存在或者同时不存在。 scan.partition.num 否 无 Integer 分区的个数。 与scan.partition.column、scan.partition.upper-bound、scan.partition.upper-bound必须同时存在或者同时不存在。 scan.fetch-size 否 0 Integer 每次从数据库拉取数据的行数。默认值为0,表示不限制。 scan.auto-commit 否 true Boolean 设置自动提交标志。 它决定每一个statement是否以事务的方式自动提交。 lookup.cache.max-rows 否 无 Integer 维表配置,缓存的最大行数,超过该值时,最先添加的数据将被标记为过期。 默认表示不使用该配置。 lookup.cache.ttl 否 无 Duration 维表配置,缓存超时时间,超过该时间的数据会被剔除。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。 默认表示不使用该配置。 lookup.max-retries 否 3 Integer 维表配置,数据拉取最大重试次数。
-
注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 with参数中字段只能使用单引号,不能使用双引号。
-
语法格式 1 2 3 4 5 6 7 8 9 10 11 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'gaussdb', 'url' = '', 'table-name' = '', 'username' = '', 'password' = '' );
-
参数说明 表1 参数说明 参数 是否必选 默认值 类型 说明 connector 是 无 String 指定要使用的连接器,这里是'gaussdb' url 是 无 String jdbc连接地址 。 使用gsjdbc4驱动连接时,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。 使用gsjdbc200驱动连接时,格式为:jdbc:gaussdb://${ip}:${port}/${dbName}。 table-name 是 无 String 操作的表名。如果该DWS表在某schema下,则格式为:'schema\".\"具体表名',具体可以参考常见问题说明。 driver 否 org.postgresql.Driver String jdbc连接驱动,默认为: org.postgresql.Driver。 使用gsjdbc4驱动连接时,加载的数据库驱动类为:org.postgresql.Driver。 使用gsjdbc200驱动连接时,加载的数据库驱动类为:com.huawei.gauss200.jdbc.Driver。 username 否 无 String DWS数据库认证用户名,需要和'password'一起配置 password 否 无 String DWS数据库认证密码,需要和'username'一起配置 write.mode 否 无 String 数据写入模式,支持: copy, insert以及upsert三种。默认值为upsert。 该参数与'primary key'配合使用。 未配置'primary key'时,支持copy及insert两种模式追加写入。 配置'primary key',支持copy、upsert以及insert三种模式更新写入。 注意:由于dws不支持更新分布列,因而配置的更新主键必须包含dws表中定义的所有分布列。 sink.buffer-flush.max-rows 否 100 Integer 每次写入请求缓存的最大行数。 它能提升写入数据的性能,但是也可能增加延迟。 设置为 "0" 关闭此选项。 sink.buffer-flush.interval 否 1s Duration 刷新缓存的间隔,在这段时间内以异步线程刷新数据。 它能提升写入数据库的性能,但是也可能增加延迟。 设置为 "0" 关闭此选项。 注意:"sink.buffer-flush.max-size" 和 "sink.buffer-flush.max-rows" 同时设置为 "0",并设置刷新缓存的间隔,则以完整的异步处理方式刷新缓存。 格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。 sink.max-retries 否 3 Integer 写入最大重试次数。 write.escape-string-value 否 false Boolean 是否对string类型值进行转义。该参数仅用于write.mode为copy模式下。 key-by-before-sink 否 false Boolean 在sink算子前是否按指定的主键进行分区。 该参数旨在解决多并发写入的场景下且write.mode为upsert时,如果多个子任务中写入sink的一批数据具有不止一条相同的主键,并且主键相同的这些数据先后顺序不一致,就会导致两个子任务在向DWS根据主键获取行锁时发生互锁的问题。
-
示例 该示例是从kafka数据源中读取数据,并以insert模式写入DWS结果表中,其具体步骤如下: 参考增强型跨源连接,在DLI上根据DWS和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。 设置DWS和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据DWS和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。 连接DWS数据库,在DWS中创建相应的表,表名为dws_order,SQL语句参考如下: create table public.dws_order( order_id VARCHAR, order_channel VARCHAR, order_time VARCHAR, pay_amount FLOAT8, real_pay FLOAT8, pay_time VARCHAR, user_id VARCHAR, user_name VARCHAR, area_id VARCHAR); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。该作业脚本将Kafka作业数据源,将DWS作为结果表。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE dwsSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'gaussdb', 'url' = 'jdbc:postgresql://DWSAddress:DWSPort/DWSdbName', 'table-name' = 'dws_order', 'driver' = 'org.postgresql.Driver', 'username' = 'DWSUserName', 'password' = 'DWSPassword', 'write.mode' = 'insert' ); insert into dwsSink select * from kafkaSource; 连接Kafka集群,向Kafka中输入以下测试数据。 {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} 从DWS中使用如下SQL语句查看数据结果。 select * from dws_order 数据结果参考如下: 202103241000000001 webShop 2021-03-24 10:00:00 100.0 100.0 2021-03-24 10:02:03 0001 Alice 330106
-
前提条件 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 请务必确保您的账户下已在数据仓库服务(DWS)里创建了DWS集群。如何创建DWS集群,请参考《数据仓库服务管理指南》中“创建集群”章节。 请确保已创建DWS数据库表。 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖 探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。
-
注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 with参数中字段只能使用单引号,不能使用双引号。 若需要使用upsert模式,则必须在DWS结果表和该结果表连接的DWS表都定义主键。 若DWS在不同的schema中存在相同名称的表,则在flink opensource sql中需要指定相应的schema。 使用gsjdbc4驱动连接时,加载的数据库驱动类为:org.postgresql.Driver。该驱动为默认,创建表时可以不填该驱动参数。 例如,使用gsjdbc4驱动连接、upsert模式写入数据到DWS中。 1 2 3 4 5 6 7 8 9 10 11 12 13 create table dwsSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector' = 'gaussdb', 'url' = 'jdbc:postgresql://DwsAddress:DwsPort/DwsDatabase', 'table-name' = 'car_info', 'username' = 'DwsUserName', 'password' = 'DwsPasswrod', 'write.mode' = 'upsert' ); 使用gsjdbc200驱动连接时,加载的数据库驱动类为:com.huawei.gauss200.jdbc.Driver。 当DWS表test在名为ads_game_sdk_base的schema下时,可以参考如下样例创建DWS结果表。 create table dwsSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector' = 'gaussdb', 'table-name' = 'ads_game_sdk_base.test', 'driver' = 'com.huawei.gauss200.jdbc.Driver', 'url' = 'jdbc:gaussdb://DwsAddress:DwsPort/DwsDatabase', 'username' = 'DwsUserName', 'password' = 'DwsPasswrod', 'write.mode' = 'upsert' );
-
语法格式 DWS结果表中不允许指定所有属性为PRIMARY KEY。 1 2 3 4 5 6 7 8 9 10 11 12 13 create table dwsSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'gaussdb', 'url' = '', 'table-name' = '', 'driver' = '', 'username' = '', 'password' = '' );
-
功能描述 DLI将Flink作业的输出数据输出到数据仓库服务(DWS)中。DWS数据库内核兼容PostgreSQL,PostgreSQL数据库可存储更加复杂类型的数据,支持空间信息服务、多版本并发控制(MVCC)、高并发,适用场景包括位置应用、金融保险、互联网电商等。 数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。DWS的更多信息,请参见《数据仓库服务管理指南》。 推荐使用DWS服务自研的DWS Connector。 DWS-Connector的使用方法请参考dws-connector-flink。
-
语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector' = 'gaussdb', 'url' = '', 'table-name' = '', 'username' = '', 'password' = '' );
更多精彩内容
CDN加速
GaussDB
文字转换成语音
免费的服务器
如何创建网站
域名网站购买
私有云桌面
云主机哪个好
域名怎么备案
手机云电脑
SSL证书申请
云点播服务器
免费OCR是什么
电脑云桌面
域名备案怎么弄
语音转文字
文字图片识别
云桌面是什么
网址安全检测
网站建设搭建
国外CDN加速
SSL免费证书申请
短信批量发送
图片OCR识别
云数据库MySQL
个人域名购买
录音转文字
扫描图片识别文字
OCR图片识别
行驶证识别
虚拟电话号码
电话呼叫中心软件
怎么制作一个网站
Email注册网站
华为VNC
图像文字识别
企业网站制作
个人网站搭建
华为云计算
免费租用云托管
云桌面云服务器
ocr文字识别免费版
HTTPS证书申请
图片文字识别转换
国外域名注册商
使用免费虚拟主机
云电脑主机多少钱
鲲鹏云手机
短信验证码平台
OCR图片文字识别
SSL证书是什么
申请企业邮箱步骤
免费的企业用邮箱
云免流搭建教程
域名价格