云服务器内容精选

  • 验证Maxwell 登录Maxwell所在的服务器。 查看日志。如果日志里面没有ERROR日志,且有打印如下日志,表示与MySQL连接正常。 BinlogConnectorLifecycleListener - Binlog connected. 登录MySQL数据库,对测试数据进行更新/创建/删除等操作。操作语句可以参考如下示例。 -- 创建库 create database test; -- 创建表 create table test.e ( id int(10) not null primary key auto_increment, m double, c timestamp(6), comment varchar(255) charset 'latin1' ); -- 增加记录 insert into test.e set m = 4.2341, c = now(3), comment = 'I am a creature of light.'; -- 更新记录 update test.e set m = 5.444, c = now(3) where id = 1; -- 删除记录 delete from test.e where id = 1; -- 修改表 alter table test.e add column torvalds bigint unsigned after m; -- 删除表 drop table test.e; -- 删除库 drop database test; 观察Maxwell的日志输出,如果没有WARN/ERROR打印,则表示Maxwell安装配置正常。 如果要确定数据是否成功上传,可设置config.properties中的log_level为debug,则数据上传成功时会立刻打印如下JSON格式数据,具体字段含义请参考Maxwell生成的数据格式及常见字段含义。 {"database":"test","table":"e","type":"insert","ts":1541150929,"xid":60556,"commit":true,"data":{"id":1,"m":4.2341,"c":"2018-11-02 09:28:49.297000","comment":"I am a creature of light."}} …… 当整个流程调试通过之后,可以把config.properties文件中的配置项log_level修改为info,减少日志打印量,并重启Maxwell。 # log level [debug | info | warn | error] log_level=info
  • Maxwell生成的数据格式及常见字段含义 Maxwell生成的数据格式为JSON,常见字段含义如下: type:操作类型,包含database-create,database-drop,table-create,table-drop,table-alter,insert,update,delete database:操作的数据库名称 ts:操作时间,13位时间戳 table:操作的表名 data:数据增加/删除/修改之后的内容 old:数据修改前的内容或者表修改前的结构定义 sql:DDL操作的SQL语句 def:表创建与表修改的结构定义 xid:事务唯一ID commit:数据增加/删除/修改操作是否已提交
  • 启动Maxwell 登录Maxwell所在的服务器。 执行如下命令进入Maxwell安装目录。 cd /opt/maxwell-1.21.0/ 如果是初次使用Maxwell,建议将conf/config.properties中的log_level改为debug(调试级别),以便观察启动之后是否能正常从MySQL获取数据并发送到kafka,当整个流程调试通过之后,再把log_level修改为info,然后先停止再启动Maxwell生效。 # log level [debug | info | warn | error] log_level=debug 执行如下命令启动Maxwell。 source /opt/client/bigdata_env bin/Maxwell bin/maxwell --user='maxwell' --password='XXXXXX' --host='127.0.0.1' \ --producer=kafka --kafka.bootstrap.servers=kafkahost:9092 --kafka_topic=Maxwell 其中,user,password和host分别表示MySQL的用户名,密码和IP地址,这三个参数可以通过修改配置项配置也可以通过上述命令配置,kafkahost为流式集群的Core节点的IP地址。 显示类似如下信息,表示Maxwell启动成功。 Success to start Maxwell [78092].
  • 安装Maxwell 下载安装包,下载路径为https://github.com/zendesk/maxwell/releases,选择名为maxwell-XXX.tar.gz的二进制文件下载,其中XXX为版本号。 将tar.gz包上传到任意目录下(本示例路径为Master节点的/opt)。 登录部署Maxwell的服务器,并执行如下命令进入tar.gz包所在目录。 cd /opt 执行如下命令解压“maxwell-XXX.tar.gz”压缩包,并进入“maxwell-XXX”文件夹。 tar -zxvf maxwell-XXX.tar.gz cd maxwell-XXX
  • 配置Maxwell 在maxwell-XXX文件夹下如果有conf目录则配置config.properties文件,配置项说明请参见表1。如果没有conf目录,则是在maxwell-XXX文件夹下将config.properties.example修改成config.properties。 表1 Maxwell配置项说明 配置项 是否必填 说明 默认值 user 是 连接MySQL的用户名,即2中新创建的用户 - password 是 连接MySQL的密码。配置文件中包含认证密码信息可能存在安全风险,建议当前场景执行完毕后删除相关配置文件或加强安全管理。 - host 否 MySQL地址 localhost port 否 MySQL端口 3306 log_level 否 日志打印级别,可选值为 debug info warn error info output_ddl 否 是否发送DDL(数据库与数据表的定义修改)事件 true:发送DDL事件 false:不发送DDL事件 false producer 是 生产者类型,配置为kafka stdout:将生成的事件打印在日志中 kafka:将生成的事件发送到kafka stdout producer_partition_by 否 分区策略,用来确保相同一类的数据写入到kafka同一分区 database:使用数据库名称做分区,保证同一个数据库的事件写入到kafka同一个分区中 table:使用表名称做分区,保证同一个表的事件写入到kafka同一个分区中 database ignore_producer_error 否 是否忽略生产者发送数据失败的错误 true:在日志中打印错误信息并跳过错误的数据,程序继续运行 false:在日志中打印错误信息并终止程序 true metrics_slf4j_interval 否 在日志中输出上传kafka成功与失败数据的数量统计的时间间隔,单位为秒 60 kafka.bootstrap.servers 是 kafka代理节点地址,配置形式为HOST:PORT[,HOST:PORT] - kafka_topic 否 写入kafka的topic名称 maxwell dead_letter_topic 否 当发送某条记录出错时,记录该条出错记录主键的kafka topic - kafka_version 否 Maxwell使用的kafka producer版本号,不能在config.properties中配置,需要在启动命令时用-- kafka_version xxx参数传入 - kafka_partition_hash 否 划分kafka topic partition的算法,支持default或murmur3 default kafka_key_format 否 Kafka record的key生成方式,支持array或Hash Hash ddl_kafka_topic 否 当output_ddl配置为true时,DDL操作写入的topic {kafka_topic} filter 否 过滤数据库或表。 如果只想采集mydatabase的库,可以配置为 exclude: *.*,include: mydatabase.* 如果只想采集mydatabase.mytable的表,可以配置为 exclude: *.*,include: mydatabase.mytable 如果只想采集mydatabase库下的mytable,mydate_123, mydate_456表,可以配置为 exclude: *.*,include: mydatabase.mytable, include: mydatabase./mydate_\\d*/ -
  • Topic和Partition的划分关系说明 假设集群中部署了K个Kafka节点,每个节点上配置的磁盘个数为N,每块磁盘大小为M,集群共有n个Topic(T1,T2…Tn),并且其中第m个Topic的每秒输入数据总流量为X(Tm) MB/s,配置的副本数为R(Tm),配置数据保存时间为Y(Tm)小时,那么整体必须满足: 假设单个磁盘大小为M,该磁盘上有n个Partition(P0,P1……Pn),并且其中第m个Partition的每秒写入数据流量为Q(Pm) MB/s(计算方法:所属Topic的数据流量除以Partition数) 、数据保存时间为T(Pm)小时,那么单个磁盘必须满足: 根据吞吐量粗略计算,假设生产者可以达到的吞吐量为P,消费者可以达到的吞吐量为C,预期Kafka吞吐量为T,那么建议该Topic的Partition数目设置为Max(T/P , T/C)。 在Kafka集群中,分区越多吞吐量越高,但是分区过多也存在潜在影响,例如文件句柄增加、不可用性增加(如:某个节点故障后,部分Partition重选Leader后时间窗口会比较大)及端到端时延增加等。 建议:单个Partition的磁盘占用最大不超过100GB;单节点上Partition数目不超过3000;整个集群的分区总数不超过10000。