使用jar包配置启动
使用jar形式直接启动同步软件同步至Kafka时,需配置cfg.properties、consumer.properties和producer.properties文件。
配置cfg.properties
配置参数 | 说明 |
---|---|
#type in (db,kafka,file) | - |
source.type=db | 源端类型 |
aim.type=db | 目标端类型 |
#binlog source config | 源端数据库配置信息 |
source.log.db=db | 待同步库名 |
source.db.usr=usr_sod | 源端用户名 |
source.db.pwd=password | 源端用户名密码(通过Encryption.jar加密后的密码) |
source.db.port=12345 | 源端端口号 |
source.log.suber.name=sync2 | 同步使用的订阅名 |
source.log.filter.T=null | 同步过滤表达式,过滤指定表。格式:模式名:表名,多个表之间以&符号分割。例:s1:t1&s2:t2。(均使用英文字符) |
source.log.filter.V=null | 同步过滤表达式,过滤指定视图 |
source.log.filter.P=null | 同步过滤表达式,过滤指定存储过程 |
source.log.filter.F=null | 同步过滤表达式,过滤指定存储函数 |
source.log.forward=true | 与source.log.filter配合使用,若为true,则同步source.log.filter中所配置的表。若为false,则不同步source.log.filter中所配置的表。默认为true |
source.log.partion=8 | 数据库日志分区 |
source.log.fetch.size=20 | 同步每次拉取日志大小 |
source.log.ips=127.0.0.1 | 单事务最大行数 |
source.commit_rows=1000 | 同步过滤表达式,过滤指定存储函数 |
ddl.support=false | 是否进行 DDL 同步 |
redo.offset.server=true | 是否启用数据库端订阅者偏移量 |
reconnect.num=-1 | 数据库重连次数,为-1则为无限重连 |
disable.binlog=false | true目标端binlog不记录同步的数据,false为记录 |
isMax.binlog=false | false从binlog原始位置开始读取,true从最新的文件号的0偏移量开始读取 |
# kafka source config | 源端kafka配置 |
source.kafka.parath=8 | 每个topic的分区数 |
source.topic=test.d2.tclob\\test.u2.t1 | 需要同步的topic名,以”\\”分隔 |
case.sensitive=true | 用于同步至kafka时创建topic的大小写控制。若为true,则创建的topic名大小写按照源端库实际大小写创建。若为false,则创建的topic名全为小写 |
#to kafka | 写入 Kafka 消息中间件配置 |
zk.connect=192.168.2.225:2181 | zookeeper ip |
topic.partition=8 | Kafka 每个 topic 的分区数 |
repeat.pos.kafka=true | 是否依赖 Kafka 数据进行断点续传过滤重复 |
case.sensitive=true | 用于同步至kafka时创建topic的大小写控制。若为true,则创建的topic名大小写按照源端库实际大小写创建。若为false,则创建的topic名全为小写 |
#to db | 写入数据库 |
writer.thd.num=8 | 同步时写入线程数(建议和source.log.partion 相同) |
writer.db=xugu | 目标库类型(xugu,oracle,mysql) |
writer.schema=SYSDBA | 指定目标端模式名(严格区分大小写) |
writer.parathd.num=1 | 入库并发数 |
# to Http | EI信息 |
url.ei= | 针对天境EI,可不填 |
task.id= | 针对天境EI,可不填 |
task.name= | 针对天境EI,可不填 |
说明:
同步至Kafka需要修改cfg.properties中的源端数据库信息和Kafka中间件信息。配置aim.type参数为Kafka。源端数据库信息 (# binlog source config) 配置和源端配置一致,只需按照实际情况修改库名、订阅者、过滤表达式和日志记载节点IP即可。然后,Kafka根据生产环境的配置,将zk.connect修改为zookeeper的IP端口,数据库连接使用订阅者s_2kafka绑定的用户u_2kafka,指定任意一个节点的IP即可。
配置producer.properties
配置参数 | 说明 |
---|---|
bootstrapServers=ip1:port1,ip2:port2,ip3:port3 | Kafka 连接信息按需配置 |
compressionType=none | 建议保持默认 |
lingerMs=10 | 建议保持默认 |
acks=all | 建议保持默认 |
retries=8 | 建议保持默认 |
batchSize=163840 | 建议保持默认 |
bufferMemory=33554432 | 建议保持默认 |
enableIdempotence=true | 建议保持默认 |
key.serializer=org.apache.kafka.common.serialization.StringSerializer | 必须保持默认 |
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer | 必须保持默认 |
partitioner.class=com.sync.util.BinlogPartitioner | 必须保持默认 |
说明:
按照实际部署情况,将bootstrap.servers修改为Kafka集群的IP和端口,其余参数保持默认。
配置consumer.properties
配置参数 | 说明 |
---|---|
bootstrapServers=ip1:port1,ip2:port2,ip3:port3 | Kafka 连接信息按需配置 |
group.id=getendpos | 建议保持默认 |
auto.offset.reset=earliest | 建议保持默认 |
enable.auto.commit=false | 建议保持默认 |
auto.commit.interval.ms=1000 | 建议保持默认 |
session.timeout.ms=30000 | 建议保持默认 |
max.partition.fetch.bytes=10485760 | 建议保持默认 |
max.poll.records=10 | 建议保持默认 |
max.poll.interval.ms=300000 | 建议保持默认 |
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer | 必须保持默认 |
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer | 必须保持默认 |
启动程序
待所有配置修改完成,检查无误后启动即可。
启动命令:
./xugusyn.sh start
查看是否启动同步软件命令:
./xugusyn.sh status
停止同步软件命令:
./xugusyn.sh stop