华为云用户手册

  • 注意事项 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。 在理想情况下,定时消息设定的时间与实际发送时间的误差在0.1s以内。但在定时消息投递压力过大时,会触发定时消息投递流控机制,精度会变差。 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。 无法确保定时消息仅投递一次,定时消息可能会重复投递。 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
  • 订阅顺序消息 只需要在订阅普通消息的代码基础上增加consumer.WithConsumerOrder(true),参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 package main import ( "context" "fmt" "os" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) func main() { c, _ := rocketmq.NewPushConsumer( consumer.WithGroupName("testGroup"), consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.1:8100"})), consumer.WithConsumerModel(consumer.Clustering), consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset), consumer.WithConsumerOrder(true), ) err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { orderlyCtx, _ := primitive.GetOrderlyCtx(ctx) fmt.Printf("orderly context: %v\n", orderlyCtx) fmt.Printf("subscribe orderly callback: %v \n", msgs) return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } // Note: start after subscribe err = c.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } time.Sleep(time.Hour) err = c.Shutdown() if err != nil { fmt.Printf("Shutdown Consumer error: %s", err.Error()) } } 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 testGroup:表示消费组名称。 192.168.0.1:8100:表示实例连接地址和端口。 test:表示Topic名称。
  • 准备环境 执行以下命令,检查是否已安装Go。 go version 返回如下回显时,说明Go已经安装。 go version go1.16.5 linux/amd64 如果未安装Go,请下载并安装。 进入Go脚本所在的bin目录下。 执行“touch go.mod”命令新建一个“go.mod”,并增加以下代码,添加依赖。 module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-client-go/v2 v2.1.2 ) 执行如下命令增加代理。 export GOPROXY=https://goproxy.cn,direct 执行如下命令下载依赖。 go mod tidy
  • 发送定时消息 发送定时消息的示例代码如下,或者通过ProducerDelayMessageExample.java获取更多示例代码。 import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.SessionCredentialsProvider; import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; public class ProducerDelayMessageExample { private static final Logger log = LoggerFactory.getLogger(ProducerDelayMessageExample.class); private ProducerDelayMessageExample() { } public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String topic = "yourDelayTopic"; // 填入grpc连接地址/grpc公网连接地址 String endpoints = "yourEndpoints"; // 填入用户名/密钥,用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。创建实例时,如果开启了ACL才需要添加以下代码。 String accessKey = System.getenv("ROCKETMQ_AK"); String secretKey = System.getenv("ROCKETMQ_SK"); SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // .enableSsl(false) // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。 // .setCredentialProvider(sessionCredentialsProvider) // 创建实例时,如果开启了ACL,请添加此行代码。 .build(); final Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topic) .build(); byte[] body = "This is a delay message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); String tag = "yourMessageTagA"; Duration messageDelayTime = Duration.ofSeconds(10); final Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("yourMessageKey") // 设置定时消息投递时间戳 .setDeliveryTimestamp(System.currentTimeMillis() + messageDelayTime.toMillis()) .setBody(body) .build(); try { final SendReceipt sendReceipt = producer.send(message); log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (Throwable t) { log.error("Failed to send message", t); } // 不再使用后,手动关闭producer。 producer.close(); } }
  • 注意事项 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。 在理想情况下,定时消息设定的时间与实际发送时间的误差在0.1s以内。但在定时消息投递压力过大时,会触发定时消息投递流控机制,精度会变差。 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。 无法确保定时消息仅投递一次,定时消息可能会重复投递。 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
  • 发送定时消息 发送定时消息的示例代码如下: import java.nio.charset.StandardCharsets; import java.time.Instant; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; public class ScheduledMessageProducer1 { public static final String TOPIC_NAME = "ScheduledTopic"; public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException, RemotingException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); // 填入连接地址 producer.setNamesrvAddr("192.168.0.1:8100"); //producer.setUseTLS(true); //创建实例时,如果开启了SSL,请增加此行代码。 producer.start(); // 定时消息投递时间戳,该消息10秒后投递 final long deliverTimestamp = Instant.now().plusSeconds(10).toEpochMilli(); // 创建消息对象 Message msg = new Message(TOPIC_NAME, "TagA", "KEY", "scheduled message".getBytes(StandardCharsets.UTF_8)); // 设置消息定时投递的时间戳属性 msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(deliverTimestamp)); // 发送消息,该消息将会在10秒后投递 SendResult sendResult = producer.send(msg); // 打印发送结果和预计投递时间 System.out.printf("%s %s%n", sendResult, UtilAll.timeMillisToHumanString2(deliverTimestamp)); producer.shutdown(); } }
  • 注意事项 定时消息的最大延迟时间为1年,延迟超过1年的消息将会发送失败。 定时消息的定时时间如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。 在理想情况下,定时消息设定的时间与实际发送时间的误差在0.1s以内。但在定时消息投递压力过大时,会触发定时消息投递流控机制,精度会变差。 在0.1s的精度内,不保证消息投递的顺序性。即如果两条定时消息的定时时间差距小于0.1s,他们投递的顺序与他们发送的顺序无法确保一致。 无法确保定时消息仅投递一次,定时消息可能会重复投递。 定时消息的定时时间是服务端开始向消费端投递的时间。如果消费者当前有消息堆积,那么定时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差,以服务端时间为准。 设置定时消息的投递时间后,依然受消息老化时间限制,默认消息老化时间为2天。例如,设置定时消息5天后才能被消费,如果第5天后一直没被消费,那么这条消息将在第7天被删除。 定时消息将占用普通消息约3倍的存储空间,大量使用定时消息时需要注意存储空间占用。
  • 发送事务消息 参考如下示例代码,或者通过TransactionProducer.java获取更多示例代码。 import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.io.UnsupportedEncodingException; public class Main { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { TransactionListener transactionListener = new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { System.out.println("开始执行本地事务: " + message); return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { System.out.println("收到回查,重新查询事务状态: " + messageExt); return LocalTransactionState.COMMIT_MESSAGE; } }; TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); //填入连接地址 producer.setNamesrvAddr("192.168.0.1:8100"); //producer.setUseTLS(true); //创建实例时,如果开启了SSL,请增加此行代码。 producer.setTransactionListener(transactionListener); producer.start(); Message msg = new Message("TopicTest", "TagA", "KEY", "Hello RocketMQ ".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); producer.shutdown(); }}
  • 发送事务消息 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 import time from rocketmq.client import Message, TransactionMQProducer, TransactionStatus topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def check_callback(msg): print('check: ' + msg.body.decode('utf-8')) return TransactionStatus.COMMIT def local_execute(msg, user_args): print('local: ' + msg.body.decode('utf-8')) return TransactionStatus.UNKNOWN def send_transaction_message(count): producer = TransactionMQProducer(gid, check_callback) producer.set_name_server_address(name_srv) producer.start() for n in range(count): msg = create_message() ret = producer.send_message_in_transaction(msg, local_execute, None) print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) print('send transaction message done') while True: time.sleep(3600) if __name__ == '__main__': send_transaction_message(10)
  • 同步发送 同步发送是指消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息的通讯方式。 参考如下示例代码,或者通过ProducerNormalMessageExample.java获取更多示例代码。 import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.SessionCredentialsProvider; import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; public class ProducerNormalMessageExample { private static final Logger log = LoggerFactory.getLogger(ProducerNormalMessageExample.class); public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String topic = "yourNormalTopics"; // 填入grpc连接地址/grpc公网连接地址 String endpoints = "yourEndpoints"; // 填入用户名/密钥,用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。创建实例时,如果开启了ACL才需要添加以下代码。 String accessKey = System.getenv("ROCKETMQ_AK"); String secretKey = System.getenv("ROCKETMQ_SK"); SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // .enableSsl(false) // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。 // .setCredentialProvider(sessionCredentialsProvider) // 创建实例时,如果开启了ACL,请添加此行代码。 .build(); final Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topic) .build(); byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); String tag = "yourMessageTagA"; final Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("yourMessageKey") .setBody(body) .build(); try { final SendReceipt sendReceipt = producer.send(message); log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (Throwable t) { log.error("Failed to send message", t); } // 不再使用后,手动关闭producer。 producer.close(); } }
  • 同步发送 同步发送是指消息发送方发出一条消息到服务端,服务端接收并处理消息,然后返回响应给发送方,发送方收到响应后才会发送下一条消息的通讯方式。 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 from rocketmq.client import Producer, Message topic = 'TopicTest' gid = 'test' name_srv = '192.168.0.1:8100' def create_message(): msg = Message(topic) msg.set_keys('XXX') msg.set_tags('XXX') msg.set_property('property', 'test') msg.set_body('message body') return msg def send_message_sync(): producer = Producer(gid) producer.set_name_server_address(name_srv) producer.start() msg = create_message() ret = producer.send_sync(msg) print('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id) producer.shutdown() if __name__ == '__main__': send_message_sync() 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 topic:表示Topic名称。 gid:表示生产者组名称,请根据业务实际情况输入生产者组名称。 name_srv:表示实例连接地址和端口。
  • 订阅普通消息 参考如下示例代码(以下加粗内容需要替换为实例自有信息,请根据实际情况替换)。 import time from rocketmq.client import PushConsumer, ConsumeStatus def callback(msg): print(msg.id, msg.body, msg.get_property('property')) return ConsumeStatus.CONSUME_SUC CES S def start_consume_message(): consumer = PushConsumer('consumer_group') consumer.set_name_server_address('192.168.0.1:8100') consumer.subscribe('TopicTest', callback) print('start consume message') consumer.start() while True: time.sleep(3600) if __name__ == '__main__': start_consume_message() 示例代码中的参数说明如下,请参考收集连接信息获取参数值。 consumer_group:表示消费组名称。 192.168.0.1:8100:表示实例连接地址和端口。 TopicTest:表示Topic名称。
  • 发送顺序消息 参考如下示例代码,或者通过ProducerFifoMessageExample.java获取更多示例代码。 import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.SessionCredentialsProvider; import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; public class ProducerFifoMessageExample { private static final Logger log = LoggerFactory.getLogger(ProducerFifoMessageExample.class); public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String topic = "yourNormalTopics"; // 填入grpc连接地址/grpc公网连接地址 String endpoints = "yourEndpoints"; // 填入用户名/密钥,用户名和密钥直接硬编码到代码中或者明文存储都存在很大的风险,建议在配置文件或者环境变量中密文存放,使用时解密。创建实例时,如果开启了ACL才需要添加以下代码。 String accessKey = System.getenv("ROCKETMQ_AK"); String secretKey = System.getenv("ROCKETMQ_SK"); SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // .enableSsl(false) // 创建实例时,如果将“SSL”配置为“PLAINTEXT”,则请增加此行代码。如果将“SSL”配置为“PERMISSIVE”,则请根据实际情况选择是否增加此行代码。 // .setCredentialProvider(sessionCredentialsProvider) // 创建实例时,如果开启了ACL,请添加此行代码。 .build(); final Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topic) .build(); byte[] body = "This is a FIFO message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); String tag = "yourMessageTagA"; final Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("yourMessageKey") // 指定消息组,相同消息组的消息会被分配到同一个队列。 .setMessageGroup("yourMessageGroup0") .setBody(body) .build(); try { final SendReceipt sendReceipt = producer.send(message); log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (Throwable t) { log.error("Failed to send message", t); } // 不再使用后,手动关闭producer。 producer.close(); } }
  • 准备环境 执行以下命令,检查是否已安装Go。 go version 返回如下回显时,说明Go已经安装。 go version go1.16.5 linux/amd64 如果未安装Go,请下载并安装。 进入Go脚本所在的bin目录下。 执行“touch go.mod”命令新建一个“go.mod”,并增加以下代码,添加依赖。 module rocketmq-example-go go 1.13 require ( github.com/apache/rocketmq-client-go/v2 v2.1.2 ) 执行如下命令增加代理。 export GOPROXY=https://goproxy.cn,direct 执行如下命令下载依赖。 go mod tidy
  • Join表函数(UDTF) 功能描述 将表与表函数的结果进行 join 操作。左表(outer)中的每一行将会与调用表函数所产生的所有结果中相关联行进行 join 。 注意事项 针对横向表的左外部联接当前仅支持文本常量 TRUE 作为谓词。 示例 如果表函数返回了空结果,左表(outer)的行将会被删除 SELECT users, tag FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag; 如果表函数返回了空结果,将会保留相对应的外部行并用空值填充 SELECT users, tag FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE;
  • Join Temporal Table Function 功能描述 注意事项 目前仅支持在 Temporal Tables 上的 inner join 示例 假如Rates是一个 Temporal Table Function, join 可以使用 SQL 进行如下的表达: SELECT o_amount, r_rate FROM Orders, LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency;
  • 示例 从Kafka源表中读取数据,将Redis表作为维表,并将二者生成的宽表信息写入Kafka结果表中,其具体步骤如下: 参考增强型跨源连接,根据Redis和Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。 设置Redis和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Redis的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 登录Redis客户端,通过如下命令向Redis发送如下数据: HMSET 330102 area_province_name a1 area_province_name b1 area_county_name c1 area_street_name d1 region_name e1 HMSET 330106 area_province_name a1 area_province_name b1 area_county_name c2 area_street_name d2 region_name e1 HMSET 330108 area_province_name a1 area_province_name b1 area_county_name c3 area_street_name d3 region_name e1 HMSET 330110 area_province_name a1 area_province_name b1 area_county_name c4 area_street_name d4 region_name e1 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,提交运行作业。该作业脚本将Kafka为数据源,Redis作为维表,数据写入到Kafka结果表中。 如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaSourceTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); --创建地址维表 create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string, primary key (area_id) not enforced -- redis的key ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'hash', 'deploy-mode' = 'master-replica' ); --根据地址维表生成详细的包含地址的订单信息宽表 create table order_detail( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) with ( 'connector' = 'kafka', 'topic' = 'kafkaSinkTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'format' = 'json' ); insert into order_detail select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name, area.area_id, area.area_province_name, area.area_city_name, area.area_county_name, area.area_street_name, area.region_name from orders left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id; 连接Kafka集群,向Kafka的source topic中插入如下测试数据: {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"appShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} 连接Kafka集群,在Kafka的sink topic读取数据,结果数据参考如下: {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106","area_province_name":"a1","area_city_name":"b1","area_county_name":"c2","area_street_name":"d2","region_name":"e1"} {"order_id":"202103251202020001","order_channel":"miniAppShop","order_time":"2021-03-25 12:02:02","pay_amount":60.0,"real_pay":60.0,"pay_time":"2021-03-25 12:03:00","user_id":"0002","user_name":"Bob","area_id":"330110","area_province_name":"a1","area_city_name":"b1","area_county_name":"c4","area_street_name":"d4","region_name":"e1"} {"order_id":"202103251505050001","order_channel":"appshop","order_time":"2021-03-25 15:05:05","pay_amount":500.0,"real_pay":400.0,"pay_time":"2021-03-25 15:10:00","user_id":"0003","user_name":"Cindy","area_id":"330108","area_province_name":"a1","area_city_name":"b1","area_county_name":"c3","area_street_name":"d3","region_name":"e1"}
  • 语法格式 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (',' watermark for rowtime_column_name as watermark-strategy_expression) ,PRIMARY KEY (attr_name, ...) NOT ENFORCED ) with ( 'connector' = 'redis', 'host' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 说明 connector 是 无 String connector类型,需配置为'redis'。 host 是 无 String redis连接地址。 port 否 6379 Integer redis连接端口。 password 否 无 String redis认证密码。 namespace 否 无 String redis key的namespace delimiter 否 : String redis的key和namespace之间的分隔符。 data-type 否 hash String redis的数据类型,有下列选项 hash list set sorted-set string data-type取值约束详见data-type取值约束说明。 schema-syntax 否 fields String redis的schema语义,包含以下值: fields:适用于所有数据类型 fields-scores:适用于sorted set数据类型 array:适用于list、set、sorted set数据类型 array-scores:适用于sorted set数据类型 map:适用于hash、sorted set数据类型 schema-syntax取值约束详见schema-syntax取值约束说明。 deploy-mode 否 standalone String redis集群的部署模式,支持standalone、master-replica、cluster,默认standalone。 retry-count 是 5 Integer 设置每个连接请求的队列大小。如果超过队列大小,则命令调用将导致RedisException。将requestQueueSize设置为较低的值将导致在过载期间或连接处于断开状态时更早出现异常。更高的值意味着达到边界需要更长的时间,但可能会有更多的请求排队,并使用更多的堆空间。默认请设置为2147483647。 connection-timeout-millis 否 10000 Integer 尝试连接redis集群时的最大超时时间。 commands-timeout-millis 否 2000 Integer 等待操作完成响应的最大时间。 rebalancing-timeout-millis 否 15000 Integer redis集群失败时的休眠时间。 scan-keys-count 否 1000 Integer 每次扫描时读取的数量。 default-score 否 0 Double 当data-type设置为“sorted-set”数据类型的默认score。 deserialize-error-policy 否 fail-job Enum 数据解析失败时的处理方式。 枚举类型,包含以下值: fail-job:作业失败 skip-row:跳过当前数据 null-field:设置当前数据为null skip-null-values 否 true Boolean 是否跳过null。 lookup.async 否 false Boolean 作为redis维表时,是否使用异步 I/O。 lookup.parallelism 否 无 int 定义查找联接运算符的自定义并行度。默认情况下,如果未定义此选项,则规划器将通过考虑全局配置(如果定义了选项“lookup.parallelism”)来推导并行度,否则将考虑输入运算符的并行度。 lookup.batch.interval 否 1s Duration 批量查找联接可以使用最大延迟来缓冲输入记录。批量查找联接可以使用最大延迟来缓冲输入记录。 lookup.batch.size 否 100L long 可以缓冲的最大输入记录数,以便进行批量查找联接。 lookup.batch 否 false Boolean 指定是否启用批量查找优化。如果启用,用户必须同时设置 lookup.batch.interval 和 lookup.batch.size 选项。此外,由于底层批处理间隔干扰机制的实现,用户必须在 flink 配置中显式启用 table.exec.batch-lookup.enabled' 选项 ignore-retractions 否 false Boolean 连接器应忽略更新插入/撤回流模式下的收回消息。 key-column 否 无 String Redis 表schema的key
  • Join表函数(UDTF) 功能描述 将表与表函数的结果进行 join 操作。左表(outer)中的每一行将会与调用表函数所产生的所有结果中相关联行进行 join 。 注意事项 针对横向表的左外部联接当前仅支持文本常量 TRUE 作为谓词。 示例 若表函数返回了空结果,左表(outer)的行将会被删除 SELECT users, tag FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag; 若表函数返回了空结果,将会保留相对应的外部行并用空值填充 SELECT users, tag FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE;
  • Join Temporal Table Function 功能描述 注意事项 目前仅支持在 Temporal Tables 上的 inner join 示例 假如Rates是一个 Temporal Table Function, join 可以使用 SQL 进行如下的表达: SELECT o_amount, r_rate FROM Orders, LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency;
  • Join表函数(UDTF) 功能描述 将表与表函数的结果进行 join 操作。左表(outer)中的每一行将会与调用表函数所产生的所有结果中相关联行进行 join 。 注意事项 针对横向表的左外部联接当前仅支持文本常量 TRUE 作为谓词。 示例 若表函数返回了空结果,左表(outer)的行将会被删除 SELECT users, tag FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag; 若表函数返回了空结果,将会保留相对应的外部行并用空值填充 SELECT users, tag FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE;
  • Join Temporal Table Function 功能描述 注意事项 目前仅支持在 Temporal Tables 上的 inner join 示例 假如Rates是一个 Temporal Table Function, join 可以使用 SQL 进行如下的表达: SELECT o_amount, r_rate FROM Orders, LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency;
  • 示例 使用Spark SQL创建Hive语法OBS表,并插入10条数据。模拟数据源。 CREATE TABLE IF NOT EXISTS demo.student( name STRING, score DOUBLE) PARTITIONED BY (classNo INT) STORED AS PARQUET LOCATION 'obs://demo/spark.db/student'; INSERT INTO demo.student PARTITION(classNo=1) VALUES ('Alice', 90.0), ('Bob', 80.0), ('Charlie', 70.0), ('David', 60.0), ('Eve', 50.0), ('Frank', 40.0), ('Grace', 30.0), ('Hank', 20.0), ('Ivy', 10.0), ('Jack', 0.0); 使用Flink SQL展示使用批的方式,从Hive语法OBS表demo.student中读取数据,并打印。需要开启checkpoint。 CREATE CATA LOG myhive WITH ( 'type' = 'hive', 'default-database' = 'demo', 'hive-conf-dir' = '/opt/flink/conf' ); USE CATALOG myhive; create table if not exists print ( name STRING, score DOUBLE, classNo INT) with ('connector' = 'print'); insert into print select * from student; 结果(taskmanager的out日志): +I[Alice, 90.0, 1] +I[Bob, 80.0, 1] +I[Charlie, 70.0, 1] +I[David, 60.0, 1] +I[Eve, 50.0, 1] +I[Frank, 40.0, 1] +I[Grace, 30.0, 1] +I[Hank, 20.0, 1] +I[Ivy, 10.0, 1] +I[Jack, 0.0, 1] 使用Flink SQL展示使用流的方式,从Hive语法OBS表demo.student中读取数据,并打印。 CREATE CATALOG myhive WITH ( 'type' = 'hive' , 'default-database' = 'demo', 'hive-conf-dir' = '/opt/flink/conf' ); USE CATALOG myhive; create table if not exists print ( name STRING, score DOUBLE, classNo INT) with ('connector' = 'print'); insert into print select * from student /*+ OPTIONS('streaming-source.enable' = 'true', 'streaming-source.monitor-interval' = '3 m') */;
  • 参数说明 请参考使用Hive语法创建OBS表,和Hive 文档了解每个DDL语句的语义。 表1 TBLPROPERTIES 参数说明 参数 是否必选 默认参数 数据类型 说明 streaming-source.enable 否 false Boolean 是否启用流源。 注意: 请确保每个分区/文件都应该以原子方式写入,否则读取器可能会得到不完整的数据。 streaming-source.partition.include 否 all String 设置分区读取的选项,支持的选项是 'all' 和 'latest'。默认情况下,该选项为 'all' 。 'all' 表示读取所有分区; 'latest'仅在流式处理 Hive 源表用作temporal table时才有效。'latest' 表示按'streaming-source.partition.order'的顺序读取最新的分区。 Flink 支持对最新的 hive 分区进行临时连接,通过启用 'streaming-source.enable',并将 'streaming-source.partition.include' 设置为 'latest'。同时,用户可以通过配置以下分区相关选项来分配分区比较顺序和数据更新间隔。 streaming-source.monitor-interval 否 None Duration 连续监视分区/文件的时间间隔。注意:Hive 流式处理读取的默认间隔为'1 min',Hive 流式处理temporal join的默认间隔为 '60 min',这是因为在当前 Hive 流式处理临时联接实现中,每个 TM 都会访问 Hive metaStore,这可能会对 metaStore 产生压力,这将在未来得到改善。 streaming-source.partition-order 否 partition-name String 流源的分区顺序,支持 create-time、partition-time 和 partition-name。 create-time 比较分区/文件创建时间,这不是 Hive metaStore 中的分区创建时间,而是文件系统中的文件夹/文件修改时间,如果分区文件夹以某种方式更新,例如将新文件添加到文件夹中,可能会影响数据的使用方式。 partition-time 比较从分区名称中提取的时间。 partition-name 比较分区名称的字母顺序。 对于非分区表,此值应始终为“create-time”。 默认情况下,该值为 partition-name。该选项与已弃用的选项“streaming-source.consume-order”相等。 streaming-source.consume-start-offset 否 None String 流式处理消费的起始偏移量。如何解析和比较偏移量取决于您的订单。对于 create-time 和 partition-time,应为时间戳字符串 (yyyy-[m]m-[d]d [hh:mm:ss])。 对于partition-time,将使用分区时间提取器从分区中提取时间。对于 partition-name,是分区名称字符串(例如 pt_year=2020/pt_mon=10/pt_day=01)。 is_lakehouse 否 无 Boolean 如果使用hive语法的 DLI Lakehouse表,则需要设置is_lakehouse为true。 Source Parallelism Inference 默认情况下,Flink 会根据文件数量和每个文件中的块数来推断其 Hive 读取器的最佳并行度。 Flink 支持灵活配置并行推理策略。您可以在 TableConfig 中配置以下参数(请注意,这些参数会影响作业的所有源): Key Default Type Description table.exec.hive.infer-source-parallelism true Boolean 如果为 true,则根据拆分数推断源并行度。如果为 false,则源的并行度由 config 设置。 table.exec.hive.infer-source-parallelism.max 1000 Integer 设置源运算符的最大推断并行度。 Load Partition Splits 多线程用于拆分 hive 的分区。您可以使用 table.exec.hive.load-partition-splits.thread-num 来配置线程号。默认值为 3,配置的值应大于 0。 Key Default Type Description table.exec.hive.load-partition-splits.thread-num 3 Integer 配置的值应大于0。 SQL 提示可用于将配置应用于 Hive 表,而无需更改其在 Hive 元存储中的定义。Hints | Apache Flink Vectorized Optimization upon Read 当满足以下条件时,Flink 会自动对 Hive 表进行矢量化读取: 格式:ORC 或 Parquet。 没有复杂数据类型的列,如配置单元类型:List、Map、Struct、Union。 默认情况下,此功能处于启用状态。可以使用以下配置禁用它。 table.exec.hive.fallback-mapred-reader=true
  • 功能描述 本节介绍利用Flink来读写Hive的表。Hive源表的定义,以及创建源表时使用的参数和示例代码。详情可参考:Apache Flink Hive Read & Write Flink支持在BATCH 和 STREAMING模式下从Hive读取数据。当作为BATCH应用程序运行时,Flink将在执行查询的时间点对表的状态执行查询。STREAMING读取将持续监控表,并在新数据可用时以增量方式获取新数据。默认情况下,Flink会读取有界的表。 STREAMING读取支持同时使用分区表和非分区表。对于分区表,Flink将监控新分区的生成,并在可用时增量读取它们。对于未分区的表,Flink 会监控文件夹中新文件的生成情况,并增量读取新文件。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 CREATE EXTERNAL TABLE [IF NOT EXISTS] table_name [(col_name data_type [column_constraint] [COMMENT col_comment], ... [table_constraint])] [COMMENT table_comment] [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] [ [ROW FORMAT row_format] [STORED AS file_format] ] [LOCATION obs_path] [TBLPROPERTIES (property_name=property_value, ...)] row_format: : DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] [COLLECTION ITEMS TERMINATED BY char] [MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char] [NULL DEFINED AS char] | SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, ...)] file_format: : SEQUENCEFILE | TEXTFILE | RCFILE | ORC | PARQUET | AVRO | INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname column_constraint: : NOT NULL [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]] table_constraint: : [CONSTRAINT constraint_name] PRIMARY KEY (col_name, ...) [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]]
  • 简介 Apache Hive 已经成为了 数据仓库 生态系统中的核心。 它不仅仅是一个用于大数据分析和ETL场景的SQL引擎,同样它也是一个数据管理平台,可用于发现,定义,和演化数据。 Flink与Hive的集成包含两个层面,一是利用了Hive的MetaStore作为持久化的Catalog,二是利用Flink来读写Hive的表。Overview | Apache Flink 从Flink 1.11.0开始,在使用 Hive方言时,Flink允许用户用Hive语法来编写SQL语句。通过提供与Hive语法的兼容性,改善与Hive的互操作性,并减少用户需要在Flink和Hive之间切换来执行不同语句的情况。详情可参考:Apache Flink Hive 方言 使用HiveCatalog,Apache Flink可以用于统一处理Apache Hive表的BATCH和STREAM。Flink可以作为Hive批处理引擎的更高效的替代方案,或者用于连续读写Hive表,以支持实时数据仓库应用程序。Apache Flink Hive Read & Write
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 数据类型的使用,请参考Format章节。 Hive 方言支持的 DDL 语句,Flink 1.15 当前仅支持使用Hive语法创建OBS表和使用hive语法的DLI Lakehouse表。 使用Hive语法创建OBS表 defalut方言: with 属性中需要设置hive.is-external为true。 使用hive 方言:建表语句需要使用EXTERNAL关键字。 使用hive语法的DLI Lakehouse表 使用hive 方言:表属性中需要添加'is_lakehouse'='true'。 开启checkpoint功能。 建议切换到Hive方言来创建Hive兼容表。如果您想用默认的方言创建Hive兼容表,确保在您的表属性中设置'connector'='hive',否则在HiveCatalog中一个表默认被认为是通用的。注意,如果使用Hive方言,就不需要connector属性。 监视策略是扫描当前位置路径中的所有目录/文件。许多分区可能会导致性能下降。 对未分区表进行流式读取时,要求将每个文件以原子方式写入目标目录。 分区表的流式读取要求在 hive 元存储的视图中以原子方式添加每个分区。否则,将使用添加到现有分区的新数据。 流式读取不支持 Flink DDL 中的水印语法。这些表不能用于窗口运算符。
  • 示例 该示例展示了一个经典的业务流水线,维度表来自 Hive,每天通过批处理流水线作业或 Flink 作业更新一次,kafka流来自实时在线业务数据或日志,需要与维度表联接以扩充流。 使用spark sql 创建 hive obs 外表,并插入数据。 CREATE TABLE if not exists dimension_hive_table ( product_id STRING, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP, update_user STRING ) STORED AS PARQUET LOCATION 'obs://demo/spark.db/dimension_hive_table' PARTITIONED BY ( create_time STRING ); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_11', 'product_name_11', 1.2345, 100, 50, 20, '2023-11-25 02:10:58', 'update_user_1'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_12', 'product_name_12', 2.3456, 200, 100, 40, '2023-11-25 02:10:58', 'update_user_2'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_13', 'product_name_13', 3.4567, 300, 150, 60, '2023-11-25 02:10:58', 'update_user_3'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_14', 'product_name_14', 4.5678, 400, 200, 80, '2023-11-25 02:10:58', 'update_user_4'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_15', 'product_name_15', 5.6789, 500, 250, 100, '2023-11-25 02:10:58', 'update_user_5'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_16', 'product_name_16', 6.7890, 600, 300, 120, '2023-11-25 02:10:58', 'update_user_6'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_17', 'product_name_17', 7.8901, 700, 350, 140, '2023-11-25 02:10:58', 'update_user_7'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_18', 'product_name_18', 8.9012, 800, 400, 160, '2023-11-25 02:10:58', 'update_user_8'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_19', 'product_name_19', 9.0123, 900, 450, 180, '2023-11-25 02:10:58', 'update_user_9'); INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_10', 'product_name_10', 10.1234, 1000, 500, 200, '2023-11-25 02:10:58', 'update_user_10'); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业模拟从kafka读取数据,并关联hive维表对数据进行打宽,并输出到print。 如下脚本中的加粗参数请根据实际环境修改。 CREATE CATALOG myhive WITH ( 'type' = 'hive' , 'default-database' = 'demo', 'hive-conf-dir' = '/opt/flink/conf' ); USE CATALOG myhive; CREATE TABLE if not exists ordersSource ( product_id STRING, user_name string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'TOPIC', 'properties.bootstrap.servers' = 'KafkaIP:PROT,KafkaIP:PROT,KafkaIP:PROT', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table if not exists print ( product_id STRING, user_name string, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP, update_user STRING, create_time STRING ) with ( 'connector' = 'print' ); insert into print select orders.product_id, orders.user_name, dim.product_name, dim.unit_price, dim.pv_count, dim.like_count, dim.comment_count, dim.update_time, dim.update_user, dim.create_time from ordersSource orders left join dimension_hive_table /*+ OPTIONS('lookup.join.cache.ttl'='60 m') */ for system_time as of orders.proctime as dim on orders.product_id = dim.product_id; 连接Kafka集群,向Kafka的source topic中插入如下测试数据: {"product_id": "product_id_11", "user_name": "name11"} {"product_id": "product_id_12", "user_name": "name12"} 查看print结果表数据。 +I[product_id_11, name11, product_name_11, 1.2345, 100, 50, 20, 2023-11-24T18:10:58, update_user_1, create_time_1] +I[product_id_12, name12, product_name_12, 2.3456, 200, 100, 40, 2023-11-24T18:10:58, update_user_2, create_time_1]
共100000条