云服务器内容精选

  • 示例 从Kafka中读取编码格式为csv,对象为kafkaSource的表。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 create table kafkaSource( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'test-topic', 'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092', 'connector.properties.group.id' = 'test-group', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'csv' ); 从Kafka中读取编码格式为不含嵌套的json数据,对象为kafkaSource的表。 例如不含嵌套的json数据格式为: {"car_id": 312, "car_owner": "wang", "car_brand": "tang"} {"car_id": 313, "car_owner": "li", "car_brand": "lin"} {"car_id": 314, "car_owner": "zhao", "car_brand": "han"} 则创建表语句为: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 create table kafkaSource( car_id STRING, car_owner STRING, car_brand STRING ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'test-topic', 'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092', 'connector.properties.group.id' = 'test-group', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ); 从Kafka中读取编码格式包含嵌套的json数据,对象为kafkaSource的表。 例如包含嵌套的json数据格式为: { "id":"1", "type":"online", "data":{ "patient_id":1234, "name":"bob1234", "age":"Bob", "gmt_create":"Bob", "gmt_modify":"Bob" } } 则创建表语句为: CREATE table kafkaSource( id STRING, type STRING, data ROW( patient_id STRING, name STRING, age STRING, gmt_create STRING, gmt_modify STRING) ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'test-topic', 'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092', 'connector.properties.group.id' = 'test-group', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,对于kafka,需配置为'kafka'。 connector.version 是 Kafka版本,支持:'0.10'、 '0.11'。0.10或0.11版本号对应kafka版本号2.11-2.4.0及其他历史版本。 format.type 是 数据反序列化格式,支持:'csv', 'json'及'avro'等。 format.field-delimiter 否 属性分隔符,仅当编码格式为csv时,用户可以自定义属性分隔符,默认为“,”英文逗号。 connector.topic 是 kafka topic名。该参数和“connector.topic-pattern”两个参数只能使用其中一个。 connector.topic-pattern 否 匹配读取kafka topic名称的正则表达式。该参数和“connector.topic”两个参数只能使用其中一个。 例如: 'topic.*' '(topic-c|topic-d)' '(topic-a|topic-b|topic-\\d*)' '(topic-a|topic-b|topic-[0-9]*)' connector.properties.bootstrap.servers 是 kafka brokers地址,以逗号分隔。 connector.properties.group.id 否 消费组名称 connector.startup-mode 否 consumer启动模式,支持:'earliest-offset', 'latest-offset', 'group-offsets', 'specific-offsets'及'timestamp'。默认值为'group-offsets'。 connector.specific-offsets 否 指定消费offset,'startup-mode'为'specific-offsets'时需配置,格式为: 'partition:0,offset:42;partition:1,offset:300'。 connector.startup-timestamp-millis 否 指定起始消费时间戳,'startup-mode'为'timestamp'时需配置。 connector.properties.* 否 配置kafka任意原生属性。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' WATERMARK FOR rowtime_column_name AS watermark-strategy_expression) ) with ( 'connector.type' = 'kafka', 'connector.version' = '', 'connector.topic' = '', 'connector.properties.bootstrap.servers' = '', 'connector.properties.group.id' = '', 'connector.startup-mode' = '', 'format.type' = '' );
  • 分区扫描功能介绍 为了加速Source任务实例中的数据读取,Flink为JDBC表提供了分区扫描功能。以下参数定义了从多个任务并行读取时如何对表进行分区。 scan.partition.column:用于对输入进行分区的列名,该列的数据类型必须是数字,日期或时间戳。 scan.partition.num: 分区数。 scan.partition.lower-bound:第一个分区的最小值。 scan.partition.upper-bound:最后一个分区的最大值。 建表时以上扫描分区参数必须同时存在或者同时不存在。 scan.partition.lower-bound和scan.partition.upper-bound参数仅用于决定分区步长,而不是用于过滤表中的行,表中的所有行都会被分区并返回。
  • 数据类型映射 表2 数据类型映射 MySQL类型 PostgreSQL类型 Flink SQL类型 TINYINT - TINYINT SMALLINT TINYINT UNSIGNED SMALLINT INT2 SMALLSERIAL SERIAL2 SMALLINT INT MEDIUMINT SMALLINT UNSIGNED INTEGER SERIAL INT BIGINT INT UNSIGNED BIGINT BIGSERIAL BIGINT BIGINT UNSIGNED - DECIMAL(20, 0) BIGINT BIGINT BIGINT FLOAT REAL FLOAT4 FLOAT DOUBLE DOUBLE PRECISION FLOAT8 DOUBLE PRECISION DOUBLE NUMERIC(p, s) DECIMAL(p, s) NUMERIC(p, s) DECIMAL(p, s) DECIMAL(p, s) BOOLEAN TINYINT(1) BOOLEAN BOOLEAN DATE DATE DATE TIME [(p)] TIME [(p)] [WITHOUT TIMEZONE] TIME [(p)] [WITHOUT TIMEZONE] DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE] TIMESTAMP [(p)] [WITHOUT TIMEZONE] CHAR(n) VARCHAR(n) TEXT CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT STRING BINARY VARBINARY BLOB BYTEA BYTES - ARRAY ARRAY
  • 参数说明 表1 参数说明 参数 是否必选 默认值 类型 说明 connector 是 无 String 指定要使用的连接器,当前固定为'jdbc'。 url 是 无 String 数据库的URL。 table-name 是 无 String 读取数据库中的数据所在的表名。 driver 否 无 String 连接数据库所需要的驱动。若未配置,则会自动通过URL提取。 username 否 无 String 数据库认证用户名,需要和'password'一起配置。 password 否 无 String 数据库认证密码,需要和'username'一起配置。 scan.partition.column 否 无 String 用于对输入进行分区的列名。分区扫描参数,具体请参考分区扫描功能介绍。 scan.partition.num 否 无 Integer 分区的个数。分区扫描参数,具体请参考分区扫描功能介绍。 scan.partition.lower-bound 否 无 Integer 第一个分区的最小值。分区扫描参数,具体请参考分区扫描功能介绍。 scan.partition.upper-bound 否 无 Integer 最后一个分区的最大值。分区扫描参数,具体请参考分区扫描功能介绍。 scan.fetch-size 否 0 Integer 每次从数据库拉取数据的行数。若指定为0,则会忽略sql hint。 scan.auto-commit 否 true Boolean 是否设置自动提交,以确定事务中的每个statement是否自动提交 pwd_auth_name 否 无 String DLI 侧创建的Password类型的跨源认证名称。用户若配置该配置项则不用在SQL中配置账号和密码。
  • 语法格式 create table jbdcSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector' = 'jdbc', 'url' = '', 'table-name' = '', 'username' = '', 'password' = '' );
  • 示例 从Redis中读取数据。 create table redisSource( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'redis', 'connector.host' = 'xx.xx.xx.xx', 'connector.port' = '6379', 'connector.password' = 'xx', 'connector.table-name' = 'car_info' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,对于redis,需配置为'redis'。 connector.host 是 redis连接地址。 connector.port 是 redis连接端口。 connector.password 否 redis认证密码。 connector.deploy-mode 否 redis部署模式,支持standalone/cluster,默认standalone。 connector.table-name 否 table存储模式下必配,redis中存储表名。在table存储模式下,数据将以hash类型存储到redis,其中key为:${table-name}:${ext-key},field名为列名。 说明: table存储模式:将connector.table-name、connector.key-column作为redis的key。redis的hash类型,每个key对应一个hashmap,hashmap的hashkey为源表的字段名,hashvalue为源表的字段值。 connector.use-internal-schema 否 table存储模式下可配置,是否使用redis中已存在schema,默认为false。 connector.key-column 否 table存储模式下可配置,将该字段值作为redis中的ext-key,未配置时,ext-key为生成的uuid。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector.type' = 'redis', 'connector.host' = '', 'connector.port' = '' );