canal 与 Kafka 本地运行记录

本文主要记录 canal 与 kafka 本地运行的过程

环境安装

  • canal
  • kafka
  • zookeeper

文件配置

  • kafka 配置

/config/server.properties

1
2
3
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.1.119:9092 # 本机 ip + 端口
zookeeper.connect=192.168.1.119:2181 # 本机 ip + 端口

  • canal.properties 配置
1
2
canal.serverMode = kafka
canal.destinations= example # 主题
  • mq.yml 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
servers: 192.168.1.119:9092 #for rocketmq: means the nameserver
retries: 0
batchSize: 16384
lingerMs: 1
bufferMemory: 33554432

# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canalBatchSize: 50
# Canal get数据的超时时间, 单位: 毫秒, 0为不限超时
canalGetTimeout: 100
flatMessage: true

canalDestinations:
- canalDestination: example
topic: example
partition: # 不填分区,否则会报错,Invalid partition given with record
# #对应topic分区数量
# partitionsNum: 3
# partitionHash:
# #库名.表名: 唯一主键
# mytest.person: id
  • instance.properties
1
2
3
4
5
canal.instance.master.address=127.0.0.1:3306  # 数据库地址端口
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName =test

启动

  1. 启动 Zookeeper
1
2
3
4
zkServer.cmd or ./zkServer.sh start

out:
2018-11-17 13:11:07,366 [myid:] - INFO [main:NIOServerCnxnFactory@89] - binding to port 0.0.0.0/0.0.0.0:2181
  1. 启动 canal,查看 canal.log 和 example.log 没报错即可
1
2
3
4
startup.bat

out:
2018-11-17 13:13:30.933 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
  1. 启动 kafka
1
kafka-server-start.bat ../../config/server.properties
  1. 新增 kafka 消费者,订阅主题为 example
1
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic example
  1. 变更数据库数据,查看输出。
1
{"data":[{"id":"123"}],"database":"test","es":1542431895000,"id":3,"isDdl":false,"mysqlType":{"id":"int(11)"},"old":null,"sql":"","sqlType":{"id":4},"table":"q","ts":1542431896029,"type":"INSERT"}

异常

1
2
3
4
5
6
canal 控制台输出:ERROR com.alibaba.otter.canal.server.CanalMQStarter - ack error , clientId:1001 batchId:3209 is not exist , please check

canal.log 输出:2018-11-17 12:39:59.317 [pool-4-thread-1] ERROR com.alibaba.otter.canal.server.CanalMQStarter - ack error , clientId:1001 batchId:2882 is not exist , please check
com.alibaba.otter.canal.server.exception.CanalServerException: ack error , clientId:1001 batchId:2882 is not exist , please check
2018-11-17 12:39:59.417 [pool-4-thread-1] ERROR com.alibaba.otter.canal.kafka.CanalKafkaProducer - Invalid partition given with record: 1 is not in the range [0...1).
org.apache.kafka.common.KafkaException: Invalid partition given with record: 1 is not in the range [0...1).

解决:mq.yml partition 不填