博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Canal dynamicTopic问题
阅读量:2195 次
发布时间:2019-05-02

本文共 14164 字,大约阅读时间需要 47 分钟。

未来同事跑了几个月的canal突然报下面的错, 使用了dynamicTopic. 其实我没有用过dynamicTopic, 只能搜一搜issue

关于dynamicTopic和partitionHash的说明

canal.mq.dynamicTopic 表达式说明

canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔例子1:test\\.test 指定匹配的单表,发送到以test_test为名字的topic上例子2:.*\\..* 匹配所有表,则每个表都会发送到各自表名的topic上例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上例子4:test\\.* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上例子5:test,test1\\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\\.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值为满足更大的灵活性,允许对匹配条件的规则指定发送的topic名字,配置格式:topicName:schema 或 topicName:schema.table例子1: test:test\\.test 指定匹配的单表,发送到以test为名字的topic上例子2: test:.*\\..* 匹配所有表,因为有指定topic,则每个表都会发送到test的topic下例子3: test:test 指定匹配对应的库,一个库的所有表都会发送到test的topic下例子4:testA:test\\.* 指定匹配的表达式,针对匹配的表会发送到testA的topic下例子5:test0:test,test1:test1\\.test1,指定多个表达式,会将test库的表都发送到test0的topic下,test1\\.test1的表发送到对应的test1的topic下,其余的表发送到默认的canal.mq.topic值大家可以结合自己的业务需求,设置匹配规则,建议MQ开启自动创建topic的能力

canal.mq.partitionHash 表达式说明

canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔例子1:test\\.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2例子2:.*\\..*:id 正则匹配,指定所有正则匹配的表对应的hash字段为id例子3:.*\\..*:$pk$ 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)例子4: 匹配规则啥都不写,则默认发到0这个partition上例子5:.*\\..* ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)例子6: test\\.test:id,.\\..* , 针对test的表按照id散列,其余的表按照table散列

注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)

其他详细参数可参考

canal.log报INVALID_TOPIC_EXCEPTION

2020-01-10 15:58:31.552 [canal-instance-scan-0] INFO  com.alibaba.otter.canal.deployer.CanalController - auto notify start example successful.2020-01-10 16:37:26.946 [canal-instance-scan-0] INFO  com.alibaba.otter.canal.deployer.CanalController - auto notify start mgr_fanboshi successful.2020-01-10 16:38:03.992 [kafka-producer-network-thread | producer-1] WARN  org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 10 : {=INVALID_TOPIC_EXCEPTION}2020-01-10 16:38:04.096 [kafka-producer-network-thread | producer-1] WARN  org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 11 : {=INVALID_TOPIC_EXCEPTION}2020-01-10 16:38:04.200 [kafka-producer-network-thread | producer-1] WARN  org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 12 : {=INVALID_TOPIC_EXCEPTION}2020-01-10 16:38:04.304 [kafka-producer-network-thread | producer-1] WARN  org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 13 : {=INVALID_TOPIC_EXCEPTION}2020-01-10 16:38:04.409 [kafka-producer-network-thread | producer-1] WARN  org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 14 : {=INVALID_TOPIC_EXCEPTION}

instance的log中会报错

2020-01-10 16:37:27.159 [destination = mgr_fanboshi , address = /172.18.8.200:3399 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=0080323399-mysql-bin.000004,position=1065960584,serverId=
,gtid=69ce1dcb-1b67-5e4b-9945-7cc64c64a14f:1-566281:1000009-2590159:3005066-3017688,ce947b65-6b70-539b-8e6a-59e9bcde28a0:206083443-206083524:207812793-207825891:207921053-207921085,d58aa5c2-b773-5944-bfe9-0a1142ef87f4:1706527:2447252-2447264,f5d4a39c-7800-52ec-b181-8c751ba2d078:868915-869106:1968590-1968591,timestamp=
] cost : 2ms , the next step is binlog dump2020-01-10 16:39:03.987 [pool-3-thread-2] ERROR com.alibaba.otter.canal.kafka.CanalKafkaProducer - java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. at com.alibaba.otter.canal.kafka.CanalKafkaProducer.produce(CanalKafkaProducer.java:215) ~[canal.server-1.1.4.jar:na] at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:179) ~[canal.server-1.1.4.jar:na] at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:117) ~[canal.server-1.1.4.jar:na] at com.alibaba.otter.canal.server.CanalMQStarter.worker(CanalMQStarter.java:183) [canal.server-1.1.4.jar:na] at com.alibaba.otter.canal.server.CanalMQStarter.access$500(CanalMQStarter.java:23) [canal.server-1.1.4.jar:na] at com.alibaba.otter.canal.server.CanalMQStarter$CanalMQRunnable.run(CanalMQStarter.java:225) [canal.server-1.1.4.jar:na] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.
(KafkaProducer.java:1150) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:846) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:671) ~[kafka-clients-1.1.1.jar:na] at com.alibaba.otter.canal.kafka.CanalKafkaProducer.produce(CanalKafkaProducer.java:199) ~[canal.server-1.1.4.jar:na] ... 8 common frames omittedCaused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

搜到一个issue给了我一些启发https://github.com/alibaba/canal/issues/1716

在instance.properties里面配置

同步库配置
canal.instance.filter.regex=A\\..*,B\\..*,C\\..*

消息队列配置

使用正则路由schema和table
格式topic:schema.table,topic:schema.table,topic:schema.table
canal.mq.dynamicTopic=topic_A:A,topic_B:B,topic_C:C
最好配置一个canal.mq.topic作为默认的topic,在实践中发现,有一些mysql schema的binlog也会读进来(建表语句,grant语句等),如果没有这个默认的topic,会报找不到分区的错误,从而导致canal停止写入。
canal.mq.topic=a_default_topic

分区设置

  1. 如果每个topic只有一个分区设置如下
    canal.mq.partition=0
  2. 如果每个topic有多个分区,且希望按照表名做hash进行如下配置
    # hash partition config
    canal.mq.partitionsNum=3
    canal.mq.partitionHash=.*\\..*

是不是canal.mq.topic这个参数随便写一个存在的topic,然后主要设置在这里对吧?canal.mq.dynamicTopic=topic_A:A,topic_B:B,topic_C:C

是的。canal.mq.topic算是一个默认的,没有匹配到的schema和table的日志都会输入到那个默认的里面去。消费的时候只关心你设置的动态分区。

我的理解就是一些不是我们匹配规则所需要的消息因为没有配置默认topic而找不到topic可写, 于是才会报错INVALID_TOPIC_EXCEPTION

虽然未来同事配置了canal.instance.filter.regex=broker\\..* 但是有可能grant之类的语句还是会有问题吧

最后附上自己1.1.4的配置吧

canal.properties

##########################################################               common argument         ############################################################### tcp bind ipcanal.ip = 172.18.8.32# register ip to zookeepercanal.register.ip = 172.18.8.32canal.port = 11111canal.metrics.pull.port = 11112# canal instance user/passwd# canal.user = canal# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458# canal admin config#canal.admin.manager = 127.0.0.1:8089canal.admin.port = 11110canal.admin.user = admincanal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441canal.zkServers = 172.18.12.212:2181,172.18.12.213:2181# flush data to zkcanal.zookeeper.flush.period = 1000canal.withoutNetty = true# tcp, kafka, RocketMQcanal.serverMode = kafka# flush meta cursor/parse position to filecanal.file.data.dir = ${canal.conf.dir}canal.file.flush.period = 1000## memory store RingBuffer size, should be Math.pow(2,n)canal.instance.memory.buffer.size = 16384## memory store RingBuffer used memory unit size , default 1kbcanal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZEcanal.instance.memory.batch.mode = MEMSIZEcanal.instance.memory.rawEntry = true## detecing configcanal.instance.detecting.enable = false#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()canal.instance.detecting.sql = select 1canal.instance.detecting.interval.time = 3canal.instance.detecting.retry.threshold = 3canal.instance.detecting.heartbeatHaEnable = false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions deliverycanal.instance.transaction.size =  1024# mysql fallback connected to new master should fallback timescanal.instance.fallbackIntervalInSeconds = 60# network configcanal.instance.network.receiveBufferSize = 16384canal.instance.network.sendBufferSize = 16384canal.instance.network.soTimeout = 30# binlog filter configcanal.instance.filter.druid.ddl = truecanal.instance.filter.query.dcl = falsecanal.instance.filter.query.dml = falsecanal.instance.filter.query.ddl = falsecanal.instance.filter.table.error = falsecanal.instance.filter.rows = falsecanal.instance.filter.transaction.entry = false# binlog format/image checkcanal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolationcanal.instance.get.ddl.isolation = false# parallel parser configcanal.instance.parser.parallel = true## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()#canal.instance.parser.parallelThreadSize = 16## disruptor ringbuffer size, must be power of 2canal.instance.parser.parallelBufferSize = 256# table meta tsdb infocanal.instance.tsdb.enable = falsecanal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;canal.instance.tsdb.dbUsername = canalcanal.instance.tsdb.dbPassword = canal# dump snapshot interval, default 24 hourcanal.instance.tsdb.snapshot.interval = 24# purge snapshot expire , default 360 hour(15 days)canal.instance.tsdb.snapshot.expire = 360# aliyun ak/sk , support rds/mqcanal.aliyun.accessKey =canal.aliyun.secretKey =##########################################################               destinations            ##############################################################canal.destinations =# conf root dircanal.conf.dir = ../conf# auto scan instance dir add/remove and start/stop instancecanal.auto.scan = truecanal.auto.scan.interval = 5canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode = springcanal.instance.global.lazy = falsecanal.instance.global.manager.address = ${canal.admin.manager}#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml#canal.instance.global.spring.xml = classpath:spring/file-instance.xmlcanal.instance.global.spring.xml = classpath:spring/default-instance.xml###########################################################                    MQ                      ###############################################################canal.mq.servers = 172.18.12.212:9092canal.mq.retries = 0canal.mq.batchSize = 16384canal.mq.maxRequestSize = 33554432canal.mq.lingerMs = 100canal.mq.bufferMemory = 33554432canal.mq.canalBatchSize = 50canal.mq.canalGetTimeout = 100canal.mq.flatMessage = truecanal.mq.compressionType = nonecanal.mq.acks = all#canal.mq.properties. =canal.mq.transaction = falsecanal.mq.extraParams.enable.idempotence=truecanal.mq.extraParams.max.in.flight.requests.per.connection=1#canal.mq.producerGroup = test# Set this value to "cloud", if you want open message trace feature in aliyun.#canal.mq.accessChannel = local# aliyun mq namespace#canal.mq.namespace =###########################################################     Kafka Kerberos Info    ################################################################canal.mq.kafka.kerberos.enable = false#canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"#canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

instance.properties

################################################### mysql serverId , v1.0.26+ will autoGen# canal.instance.mysql.slaveId=0# enable gtid use true/falsecanal.instance.gtidon=true# position infocanal.instance.master.address=172.18.8.200:3372canal.instance.master.journal.name=canal.instance.master.position=canal.instance.master.timestamp=canal.instance.master.gtid=# rds oss binlogcanal.instance.rds.accesskey=canal.instance.rds.secretkey=canal.instance.rds.instanceId=# table meta tsdb infocanal.instance.tsdb.enable=false#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb#canal.instance.tsdb.dbUsername=canal#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =#canal.instance.standby.journal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#canal.instance.standby.gtid=# username/passwordcanal.instance.dbUsername=canal_rcanal.instance.dbPassword=canal1234!@#$canal.instance.connectionCharset = UTF-8# enable druid Decrypt database passwordcanal.instance.enableDruid=false#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regexcanal.instance.filter.regex=fanboshi\\..*# table black regexcanal.instance.filter.black.regex=# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq configcanal.mq.topic=#canal.mq.topic=cherry_default# dynamic topic route by schema or table regex#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*canal.mq.dynamicTopic= cherry:fanboshi#canal.mq.partition=0# hash partition config#canal.mq.partitionsNum=3#canal.mq.partitionHash=test.table:id^name,.*\\..*#################################################

转载地址:http://skvub.baihongyu.com/

你可能感兴趣的文章
详解 LSTM
查看>>
按时间轴简述九大卷积神经网络
查看>>
详解循环神经网络(Recurrent Neural Network)
查看>>
为什么要用交叉验证
查看>>
用学习曲线 learning curve 来判别过拟合问题
查看>>
用验证曲线 validation curve 选择超参数
查看>>
用 Grid Search 对 SVM 进行调参
查看>>
用 Pipeline 将训练集参数重复应用到测试集
查看>>
PCA 的数学原理和可视化效果
查看>>
机器学习中常用评估指标汇总
查看>>
什么是 ROC AUC
查看>>
Bagging 简述
查看>>
详解 Stacking 的 python 实现
查看>>
简述极大似然估计
查看>>
用线性判别分析 LDA 降维
查看>>
用 Doc2Vec 得到文档/段落/句子的向量表达
查看>>
使聊天机器人具有个性
查看>>
使聊天机器人的对话更有营养
查看>>
一个 tflearn 情感分析小例子
查看>>
attention 机制入门
查看>>