Skip to content

使用jar包配置启动

使用jar形式直接启动同步软件同步至Kafka时,需配置cfg.properties、consumer.properties和producer.properties文件。

配置cfg.properties

配置参数说明
#type in (db,kafka,file)-
source.type=db源端类型
aim.type=db目标端类型
#binlog source config源端数据库配置信息
source.log.db=db待同步库名
source.db.usr=usr_sod源端用户名
source.db.pwd=password源端用户名密码(通过Encryption.jar加密后的密码)
source.db.port=12345源端端口号
source.log.suber.name=sync2同步使用的订阅名
source.log.filter.T=null同步过滤表达式,过滤指定表。格式:模式名:表名,多个表之间以&符号分割。例:s1:t1&s2:t2。(均使用英文字符)
source.log.filter.V=null同步过滤表达式,过滤指定视图
source.log.filter.P=null同步过滤表达式,过滤指定存储过程
source.log.filter.F=null同步过滤表达式,过滤指定存储函数
source.log.forward=true与source.log.filter配合使用,若为true,则同步source.log.filter中所配置的表。若为false,则不同步source.log.filter中所配置的表。默认为true
source.log.partion=8数据库日志分区
source.log.fetch.size=20同步每次拉取日志大小
source.log.ips=127.0.0.1单事务最大行数
source.commit_rows=1000同步过滤表达式,过滤指定存储函数
ddl.support=false是否进行 DDL 同步
redo.offset.server=true是否启用数据库端订阅者偏移量
reconnect.num=-1数据库重连次数,为-1则为无限重连
disable.binlog=falsetrue目标端binlog不记录同步的数据,false为记录
isMax.binlog=falsefalse从binlog原始位置开始读取,true从最新的文件号的0偏移量开始读取
# kafka source config源端kafka配置
source.kafka.parath=8每个topic的分区数
source.topic=test.d2.tclob\\test.u2.t1需要同步的topic名,以”\\”分隔
case.sensitive=true用于同步至kafka时创建topic的大小写控制。若为true,则创建的topic名大小写按照源端库实际大小写创建。若为false,则创建的topic名全为小写
#to kafka写入 Kafka 消息中间件配置
zk.connect=192.168.2.225:2181zookeeper ip
topic.partition=8Kafka 每个 topic 的分区数
repeat.pos.kafka=true是否依赖 Kafka 数据进行断点续传过滤重复
case.sensitive=true用于同步至kafka时创建topic的大小写控制。若为true,则创建的topic名大小写按照源端库实际大小写创建。若为false,则创建的topic名全为小写
#to db写入数据库
writer.thd.num=8同步时写入线程数(建议和source.log.partion 相同)
writer.db=xugu目标库类型(xugu,oracle,mysql)
writer.schema=SYSDBA指定目标端模式名(严格区分大小写)
writer.parathd.num=1入库并发数
# to HttpEI信息
url.ei=针对天境EI,可不填
task.id=针对天境EI,可不填
task.name=针对天境EI,可不填

说明:

同步至Kafka需要修改cfg.properties中的源端数据库信息和Kafka中间件信息。配置aim.type参数为Kafka。源端数据库信息 (# binlog source config) 配置和源端配置一致,只需按照实际情况修改库名、订阅者、过滤表达式和日志记载节点IP即可。然后,Kafka根据生产环境的配置,将zk.connect修改为zookeeper的IP端口,数据库连接使用订阅者s_2kafka绑定的用户u_2kafka,指定任意一个节点的IP即可。

配置producer.properties

配置参数说明
bootstrapServers=ip1:port1,ip2:port2,ip3:port3Kafka 连接信息按需配置
compressionType=none建议保持默认
lingerMs=10建议保持默认
acks=all建议保持默认
retries=8建议保持默认
batchSize=163840建议保持默认
bufferMemory=33554432建议保持默认
enableIdempotence=true建议保持默认
key.serializer=org.apache.kafka.common.serialization.StringSerializer必须保持默认
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer必须保持默认
partitioner.class=com.sync.util.BinlogPartitioner必须保持默认

说明:

按照实际部署情况,将bootstrap.servers修改为Kafka集群的IP和端口,其余参数保持默认。

配置consumer.properties

配置参数说明
bootstrapServers=ip1:port1,ip2:port2,ip3:port3Kafka 连接信息按需配置
group.id=getendpos建议保持默认
auto.offset.reset=earliest建议保持默认
enable.auto.commit=false建议保持默认
auto.commit.interval.ms=1000建议保持默认
session.timeout.ms=30000建议保持默认
max.partition.fetch.bytes=10485760建议保持默认
max.poll.records=10建议保持默认
max.poll.interval.ms=300000建议保持默认
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer必须保持默认
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer必须保持默认

启动程序

待所有配置修改完成,检查无误后启动即可。

  • 启动命令:

    ./xugusyn.sh start
  • 查看是否启动同步软件命令:

    ./xugusyn.sh status
  • 停止同步软件命令:

    ./xugusyn.sh stop