Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zookeeper中没有cursor或很久才更新一次,必须配置canal.destinations参数才会按照canal.zookeeper.flush.period频率更新cursor值 #2616

Closed
Fanduzi opened this issue Mar 9, 2020 · 19 comments
Labels
Milestone

Comments

@Fanduzi
Copy link

Fanduzi commented Mar 9, 2020

Question

canal 1.1.4
mysql 5.7.26
zookeeper 3.4.14
kafka 2.12-2.4.0

单点. 发现zookeeper没有cursor, 没存位点信息啊

[zk: localhost:2181(CONNECTED) 10] ls /otter/canal 
[cluster, destinations]
[zk: localhost:2181(CONNECTED) 11] ls /otter/canal/cluster
[172.16.23.8:11111]
[zk: localhost:2181(CONNECTED) 12] ls /otter/canal/destinations
[fanboshi]
[zk: localhost:2181(CONNECTED) 13] ls /otter/canal/destinations/fanboshi
[running, cluster, 1001]
[zk: localhost:2181(CONNECTED) 14] ls /otter/canal/destinations/fanboshi/running
[]
[zk: localhost:2181(CONNECTED) 15] get /otter/canal/destinations/fanboshi/running
{"active":true,"address":"172.16.2xx.8:11111"}
cZxid = 0x10000053b
ctime = Mon Mar 09 18:30:00 CST 2020
mZxid = 0x10000053b
mtime = Mon Mar 09 18:30:00 CST 2020
pZxid = 0x10000053b
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x100719607a90008
dataLength = 45
numChildren = 0
[zk: localhost:2181(CONNECTED) 16] ls /otter/canal/destinations/fanboshi/cluster 
[172.16.23.8:11111]
[zk: localhost:2181(CONNECTED) 17] ls /otter/canal/destinations/fanboshi/1001   
[]

canal.properties

#################################################
#########               common argument         #############
#################################################
# tcp bind ip
canal.ip = 
# register ip to zookeeper
canal.register.ip = 
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

canal.zkServers = ip1:2181,ip2:2181,ip3:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = false
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
#########               destinations            #############
#################################################
canal.destinations = 
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
#canal.instance.global.spring.xml = classpath:spring/file-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
#########                    MQ                      #############
##################################################
canal.mq.servers = ip1:9092,ip2:9092,ip3:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =

##################################################
#########     Kafka Kerberos Info    #############
##################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

instance.properties

$cat fanboshi/instance.properties 
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=true

# position info
canal.instance.master.address=ip:port
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=c30c6a02-4e32-11ea-84ec-fa163edcd14e:1-2051921

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=false
canal.instance.tsdb.url=jdbc:mysql://ip:port/canal_tsdb_fanboshi
canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal_r
canal.instance.dbPassword=superpassword
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=fanboshi\\..*,sysbench\\..*
# table black regex
canal.instance.filter.black.regex=.*\\.\\_.*\\_ghc,.*\\.\\_.*\\_gho,.*\\.\\_.*\\_del
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=default_topic
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.dynamicTopic=.*\\..*
#canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
@Fanduzi
Copy link
Author

Fanduzi commented Mar 9, 2020

额. 等了好久, 有cursor了........
持久化到zookeeper的频率是什么控制的?
canal.zookeeper.flush.period? 感觉不像是就这一个参数控制
因为我观察了, mtime间隔相差很大且没规律

[zk: localhost:2181(CONNECTED) 107] get /otter/canal/destinations/fanboshi/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"centos-1","port":3306}},"postion":{"gtid":"c30c6a02-4e32-11ea-84ec-fa163edcd14e:1-2099946","included":false,"journalName":"mysql-bin.000006","position":120492651,"serverId":23133311,"timestamp":1583755304000}}
cZxid = 0x100000584
ctime = Mon Mar 09 19:10:01 CST 2020
mZxid = 0x1000005b8
mtime = Mon Mar 09 20:01:44 CST 2020
pZxid = 0x100000584
cversion = 0
dataVersion = 3
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 340
numChildren = 0
[zk: localhost:2181(CONNECTED) 108] get /otter/canal/destinations/fanboshi/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"centos-1","port":3306}},"postion":{"gtid":"c30c6a02-4e32-11ea-84ec-fa163edcd14e:1-2135328","included":false,"journalName":"mysql-bin.000006","position":178693436,"serverId":23133311,"timestamp":1583756690000}}
cZxid = 0x100000584
ctime = Mon Mar 09 19:10:01 CST 2020
mZxid = 0x1000005b9
mtime = Mon Mar 09 20:24:50 CST 2020
pZxid = 0x100000584
cversion = 0
dataVersion = 4
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 340
numChildren = 0
[zk: localhost:2181(CONNECTED) 109] get /otter/canal/destinations/fanboshi/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"centos-1","port":3306}},"postion":{"gtid":"c30c6a02-4e32-11ea-84ec-fa163edcd14e:1-2135871","included":false,"journalName":"mysql-bin.000006","position":179557480,"serverId":23133311,"timestamp":1583756732000}}
cZxid = 0x100000584
ctime = Mon Mar 09 19:10:01 CST 2020
mZxid = 0x1000005ba
mtime = Mon Mar 09 20:25:32 CST 2020
pZxid = 0x100000584
cversion = 0
dataVersion = 5
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 340
numChildren = 0


...




[zk: localhost:2181(CONNECTED) 3] get /otter/canal/destinations/fanboshi/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"bj1-mysql-dba-prod-01","port":3311}},"postion":{"gtid":"c30c6a02-4e32-11ea-84ec-fa163edcd14e:1-2135871","included":false,"journalName":"mysql-bin.000006","position":179557480,"serverId":23133311,"timestamp":1583756732000}}
cZxid = 0x100000584
ctime = Mon Mar 09 19:10:01 CST 2020
mZxid = 0x1000005ba
mtime = Mon Mar 09 20:25:32 CST 2020
pZxid = 0x100000584
cversion = 0
dataVersion = 5
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 340
numChildren = 0
[zk: localhost:2181(CONNECTED) 4] quit
Quitting...
2020-03-09 20:34:34,819 [myid:] - INFO  [main:ZooKeeper@693] - Session: 0x100719607a9000c closed
2020-03-09 20:34:34,820 [myid:] - INFO  [main-EventThread:ClientCnxn$EventThread@522] - EventThread shut down for session: 0x100719607a9000c

[devops@bj1-mysql-dba-prod-01 20:34:34 /usr/local/kafka]
$date
Mon Mar  9 20:34:38 CST 2020

@Fanduzi
Copy link
Author

Fanduzi commented Mar 9, 2020

那个cursor的问题. 我找到原因了, 但是还不知道为啥
canal.destinations = 就是这个参数, 我以前都不写的
因为按照wiki的说明是可以不写的啊
image
但是我今天发现这个参数要是不指定, zookeeper里的cursor就很久才会更新一次, 而且周期不固定.
只要是配置到这个参数里的instance, 那个cursor就是按照 canal.zookeeper.flush.period = 1000 , 1秒更新一次

@agapple 大佬就能帮忙瞅瞅么, 这样的话我以后要是加instance还得重启canal啊. 要不然加不到canal.destinations里啊.

@Fanduzi Fanduzi changed the title zookeeper中没有cursor zookeeper中没有cursor,或者很久才更新一次而非按照canal.zookeeper.flush.period = 1000更新 Mar 9, 2020
@Fanduzi Fanduzi changed the title zookeeper中没有cursor,或者很久才更新一次而非按照canal.zookeeper.flush.period = 1000更新 zookeeper中没有cursor或很久才更新一次,并非按照canal.zookeeper.flush.period = 1000为周期去更新 Mar 9, 2020
@Fanduzi Fanduzi changed the title zookeeper中没有cursor或很久才更新一次,并非按照canal.zookeeper.flush.period = 1000为周期去更新 zookeeper中没有cursor或很久才更新一次,必须配置canal.destinations参数才会按照canal.zookeeper.flush.period频率更新cursor值 Mar 9, 2020
@Hueason
Copy link

Hueason commented Apr 19, 2020

看代码逻辑是定时任务每秒都会同步一次,同步的是当前内存中的cursor,内存中的cursor更新是由cilent端Ack之后才会更新,所以只有client端Ack之后才会更新 并以period频率同步zk

@Fanduzi
Copy link
Author

Fanduzi commented Apr 19, 2020

看代码逻辑是定时任务每秒都会同步一次,同步的是当前内存中的cursor,内存中的cursor更新是由cilent端Ack之后才会更新,所以只有client端Ack之后才会更新 并以period频率同步zk

我是canal-> kafka的模式, 是不是不存在client ack?

@Hueason
Copy link

Hueason commented Apr 19, 2020

mq模式 send成功就会ack了。

@Fanduzi
Copy link
Author

Fanduzi commented Apr 19, 2020

mq模式 send成功就会ack了。

那这就有问题了, 我生产已经跑了一年了, 应用消费数据都没问题, 偶然发现一个canal instance没有cursor文件, 其他的instance cursor文件更新频率并非canal.zookeeper.flush.period. 我觉得可能还是哪里有bug

@Hueason
Copy link

Hueason commented Apr 19, 2020

这个情况的话,还得再检查下吧,,没有cursor文件的是否是default-instance模式呢?
其他的更新频率有波动,是否是数据库数据有波动呢,没有数据进来也就没有更新

@Fanduzi
Copy link
Author

Fanduzi commented Apr 19, 2020

这个情况的话,还得再检查下吧,,没有cursor文件的是否是default-instance模式呢?
其他的更新频率有波动,是否是数据库数据有波动呢,没有数据进来也就没有更新

先感谢你哈
我的canal配置都已经在上面贴出来了..
数据库的话, 我们有一个延迟监控程序每秒都会写一条数据到一个monito表, 所以每秒都有binlog产生

我现在是发现必须配置 canal.destinations 更新频率才正常, 其实也不能完全这么说,
canal.destinations = A,B,C . B ,C 频率也可能不是canal.zookeeper.flush.period

@sky-cc
Copy link

sky-cc commented Apr 27, 2020

最近测试canal刚好也遇到这个问题,测试之后发下问题如下:
1.当canal.instance.gtidon=false,此时server直接写kafka是可以正常更新offset的
2.当canal.instance.gtidon=true时,server写kafka无法正常更新offset。原因如下:
a.CanalKafkaProducer类消费完,会commit对应的消息。CanalServerWithEmbedded的ack函数,会判断positionRanges的ack是否为空,空则过滤。问题就是出在这个,positionRanges的ack一直是空的。
b.根据PositionRange.setAck(),可定位到MemoryEventStoreWithBuffer,MemoryEventStoreWithBuffer判断是gtid模式,就必须有CanalEntry.EntryType.TRANSACTIONEND事件,才会setAck。我这边测试时,一直显示无法拿到CanalEntry.EntryType.TRANSACTIONEND事件。
c.通过b,我怀疑是部分空事务被过滤,导致MemoryEventStoreWithBuffer无法正确拿到CanalEntry.EntryType.TRANSACTIONEND事件。所以往前的parse\sink模块排查。最终定位到EntryEventSink类的sinkData函数,有一些策略将空事务过滤了。
过滤逻辑如下:
事务>=8192时或距上一个发送到下游的空事务头/尾的超过5s,就会发送这个空事务。
问题如下:
1.一般第一个发送的是空事务是EntryType.TRANSACTIONBEGIN,根据事务>=8192这条规则,下次发送的还是EntryType.TRANSACTIONBEGIN。也就是说这条规则无法发送EntryType.TRANSACTIONEND。
2.根据5s发送一次的规则,若每次都是小事务,事务耗时不超过5s,此时这种情况下,也是拿不到EntryType.TRANSACTIONEND。

此时就会出现,一直无法获取到EntryType.TRANSACTIONEND的情况。结合b看,就无法满足更新offset的条件。

EntryEventSink关键源代码
if ((filterTransactionEntry && (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND))) { long currentTimestamp = entry.getHeader().getExecuteTime(); String gtid = entry.getHeader().getGtid(); // 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常 if (lastTransactionCount.incrementAndGet() <= emptyTransctionThresold && Math.abs(currentTimestamp - lastTransactionTimestamp) <= emptyTransactionInterval) { continue; } else { lastTransactionCount.set(0L); lastTransactionTimestamp = currentTimestamp; } }

下面代码,在遵循原来代码逻辑的情况下,可以保证,同一个事务的头尾可以同时被发送。如此,就可以正常更新offset了。
if ((filterTransactionEntry && (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND))) { long currentTimestamp = entry.getHeader().getExecuteTime(); String gtid = entry.getHeader().getGtid(); // 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常 if (lastTransactionCount.incrementAndGet() <= emptyTransctionThresold && Math.abs(currentTimestamp - lastTransactionTimestamp) <= emptyTransactionInterval && !commitGtid.equals(gtid)) { continue; } else { lastTransactionCount.set(0L); lastTransactionTimestamp = currentTimestamp; commitGtid = gtid; } }

@Fanduzi
Copy link
Author

Fanduzi commented Apr 27, 2020

最近测试canal刚好也遇到这个问题,测试之后发下问题如下:
1.当canal.instance.gtidon=false,此时server直接写kafka是可以正常更新offset的
2.当canal.instance.gtidon=true时,server写kafka无法正常更新offset。原因如下:
a.CanalKafkaProducer类消费完,会commit对应的消息。CanalServerWithEmbedded的ack函数,会判断positionRanges的ack是否为空,空则过滤。问题就是出在这个,positionRanges的ack一直是空的。
b.根据PositionRange.setAck(),可定位到MemoryEventStoreWithBuffer,MemoryEventStoreWithBuffer判断是gtid模式,就必须有CanalEntry.EntryType.TRANSACTIONEND事件,才会setAck。我这边测试时,一直显示无法拿到CanalEntry.EntryType.TRANSACTIONEND事件。
c.通过b,我怀疑是部分空事务被过滤,导致MemoryEventStoreWithBuffer无法正确拿到CanalEntry.EntryType.TRANSACTIONEND事件。所以往前的parse\sink模块排查。最终定位到EntryEventSink类的sinkData函数,有一些策略将空事务过滤了。
过滤逻辑如下:
事务>=8192时或距上一个发送到下游的空事务头/尾的超过5s,就会发送这个空事务。
问题如下:
1.一般第一个发送的是空事务是EntryType.TRANSACTIONBEGIN,根据事务>=8192这条规则,下次发送的还是EntryType.TRANSACTIONBEGIN。也就是说这条规则无法发送EntryType.TRANSACTIONEND。
2.根据5s发送一次的规则,若每次都是小事务,事务耗时不超过5s,此时这种情况下,也是拿不到EntryType.TRANSACTIONEND。

此时就会出现,一直无法获取到EntryType.TRANSACTIONEND的情况。结合b看,就无法满足更新offset的条件。

EntryEventSink关键源代码
if ((filterTransactionEntry && (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND))) { long currentTimestamp = entry.getHeader().getExecuteTime(); String gtid = entry.getHeader().getGtid(); // 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常 if (lastTransactionCount.incrementAndGet() <= emptyTransctionThresold && Math.abs(currentTimestamp - lastTransactionTimestamp) <= emptyTransactionInterval) { continue; } else { lastTransactionCount.set(0L); lastTransactionTimestamp = currentTimestamp; } }

下面代码,在遵循原来代码逻辑的情况下,可以保证,同一个事务的头尾可以同时被发送。如此,就可以正常更新offset了。
if ((filterTransactionEntry && (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND))) { long currentTimestamp = entry.getHeader().getExecuteTime(); String gtid = entry.getHeader().getGtid(); // 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常 if (lastTransactionCount.incrementAndGet() <= emptyTransctionThresold && Math.abs(currentTimestamp - lastTransactionTimestamp) <= emptyTransactionInterval && !commitGtid.equals(gtid)) { continue; } else { lastTransactionCount.set(0L); lastTransactionTimestamp = currentTimestamp; commitGtid = gtid; } }

👍

@Fanduzi
Copy link
Author

Fanduzi commented Apr 28, 2020

最近测试canal刚好也遇到这个问题,测试之后发下问题如下:
1.当canal.instance.gtidon=false,此时server直接写kafka是可以正常更新offset的
2.当canal.instance.gtidon=true时,server写kafka无法正常更新offset。原因如下:
a.CanalKafkaProducer类消费完,会commit对应的消息。CanalServerWithEmbedded的ack函数,会判断positionRanges的ack是否为空,空则过滤。问题就是出在这个,positionRanges的ack一直是空的。
b.根据PositionRange.setAck(),可定位到MemoryEventStoreWithBuffer,MemoryEventStoreWithBuffer判断是gtid模式,就必须有CanalEntry.EntryType.TRANSACTIONEND事件,才会setAck。我这边测试时,一直显示无法拿到CanalEntry.EntryType.TRANSACTIONEND事件。
c.通过b,我怀疑是部分空事务被过滤,导致MemoryEventStoreWithBuffer无法正确拿到CanalEntry.EntryType.TRANSACTIONEND事件。所以往前的parse\sink模块排查。最终定位到EntryEventSink类的sinkData函数,有一些策略将空事务过滤了。
过滤逻辑如下:
事务>=8192时或距上一个发送到下游的空事务头/尾的超过5s,就会发送这个空事务。
问题如下:
1.一般第一个发送的是空事务是EntryType.TRANSACTIONBEGIN,根据事务>=8192这条规则,下次发送的还是EntryType.TRANSACTIONBEGIN。也就是说这条规则无法发送EntryType.TRANSACTIONEND。
2.根据5s发送一次的规则,若每次都是小事务,事务耗时不超过5s,此时这种情况下,也是拿不到EntryType.TRANSACTIONEND。

此时就会出现,一直无法获取到EntryType.TRANSACTIONEND的情况。结合b看,就无法满足更新offset的条件。

EntryEventSink关键源代码
if ((filterTransactionEntry && (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND))) { long currentTimestamp = entry.getHeader().getExecuteTime(); String gtid = entry.getHeader().getGtid(); // 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常 if (lastTransactionCount.incrementAndGet() <= emptyTransctionThresold && Math.abs(currentTimestamp - lastTransactionTimestamp) <= emptyTransactionInterval) { continue; } else { lastTransactionCount.set(0L); lastTransactionTimestamp = currentTimestamp; } }

下面代码,在遵循原来代码逻辑的情况下,可以保证,同一个事务的头尾可以同时被发送。如此,就可以正常更新offset了。
if ((filterTransactionEntry && (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND))) { long currentTimestamp = entry.getHeader().getExecuteTime(); String gtid = entry.getHeader().getGtid(); // 基于一定的策略控制,放过空的事务头和尾,便于及时更新数据库位点,表明工作正常 if (lastTransactionCount.incrementAndGet() <= emptyTransctionThresold && Math.abs(currentTimestamp - lastTransactionTimestamp) <= emptyTransactionInterval && !commitGtid.equals(gtid)) { continue; } else { lastTransactionCount.set(0L); lastTransactionTimestamp = currentTimestamp; commitGtid = gtid; } }

老哥提个mr啊, 等着作者修复就不知道啥时候了

@saigu
Copy link

saigu commented May 19, 2020

遇到同样的问题,使用gtid模式,无法在zk中更新cursor,不使用gtid模式就是正常的,但是数据库主备切换后会报错,太难了。。。

@sky-cc
Copy link

sky-cc commented May 19, 2020

遇到同样的问题,使用gtid模式,无法在zk中更新cursor,不使用gtid模式就是正常的,但是数据库主备切换后会报错,太难了。。。

我后面也有遇到,你可以参考下哦 https://blog.csdn.net/ZSZSZSxh/article/details/105826492

@hbpeng
Copy link

hbpeng commented May 21, 2020

遇到同样的问题,使用gtid模式,无法在zk中更新cursor,不使用gtid模式就是正常的,但是数据库主备切换后会报错,太难了。。。

我后面也有遇到,你可以参考下哦 https://blog.csdn.net/ZSZSZSxh/article/details/105826492

按照你的方式给代码贴上去,显示commitGtid 未定义,你贴的代码是完整的吗?

@agapple agapple closed this as completed Aug 21, 2020
@agapple agapple added the bug label Aug 21, 2020
@agapple agapple added this to the v1.1.5 milestone Aug 21, 2020
@agapple
Copy link
Member

agapple commented Aug 21, 2020

@Fanduzi 原因分析是对的,主要是因为空事务发送时只发送了begin,没有配套带上begin/commit事件,导致在gtid模式下位点更新时一直看不到commit事件

@simake2017
Copy link

这个没有位点 怎么解决呢 ,admin 集群模式 zk没有位点啊

@simake2017
Copy link

@agapple 大佬

@simake2017
Copy link

@saigu 哥哥 解决了吗

@saigu
Copy link

saigu commented May 24, 2022

@simake2017

看起来1.1.5的release已经修复了这个bug,可以升级一波试试

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

7 participants