达梦到kafka单向同步环境部署

一、安装ZooKeeper

1、 下载安装包地址:

https://www.apache.org/dyn/closer.cgi/zookeeper/

选择此链接:
在这里插入图片描述
点击stable:
在这里插入图片描述
注意一定要选择带bin的那个版本:
在这里插入图片描述
如果选择下边那个会报:找不到或无法加载主类:
原因分析:
即下载的是未编译的 jar 包。
注:zookeeper 好像从 3.5 版本以后,命名就发生了改变,如果是 apache-zookeeper-3.5.5.tar.gz 这般命名的,都是未编译的,而 apache-zookeeper-3.5.5-bin.tar.gz 这般命名的,才是已编译的包。

2、解压并进入ZooKeeper目录,进入D:\kafka\apache-zookeeper-3.6.3-bin\conf
3、将“zoo_sample.cfg”重命名为“zoo.cfg”
4、打开“zoo.cfg”找到并编辑dataDir=D:\Kafka\ apache-zookeeper-3.6.3-bin \tmp

5、 添加系统变量:ZOOKEEPER_HOME=D:\Kafka\ apache-zookeeper-3.6.3-bin
计算机-属性-高级系统设置中配置环境变量,配置完成环境变量,打开cmd,用命令使环境变量生效set path=环境变量,echo %PATH%查看环境变量是否正确
在这里插入图片描述
6、 编辑path系统变量,添加路径:%ZOOKEEPER_HOME%\bin
在这里插入图片描述
7、 在zoo.cfg文件中修改默认的Zookeeper端口(默认端口2181)
8、 打开新的cmd,输入“zkServer“,运行Zookeeper
在这里插入图片描述
9、命令行提示如下:说明本地Zookeeper启动成功
在这里插入图片描述注意:不要关了这个窗口

问题解决:
1、启动时报错:此时不应有 \Java\jdk1.7.0_13\bin。
解决办法:将JAVA_HOME删除

2、报错
解决办法:zkServer.cmd中的call %JAVA%改为call JAVA
在这里插入图片描述

二、安装Kafka

1.下载安装包

http://kafka.apache.org/downloads

选择,注意要下载二进制版本
在这里插入图片描述

选择链接下载:
2.解压并进入Kafka目录,放到:D:\kafka\kafka_2.12-2.8.0
3.进入config目录找到文件server.properties并打开
4.找到并编辑log.dirs=D:\Kafka\ kafka_2.12-2.8.0\kafka-logs
在这里插入图片描述
5.找到并编辑zookeeper.connect=localhost:2181
6. Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181
在这里插入图片描述
7.进入Kafka安装目录D:\Kafka\ kafka_2.12-2.8.0,按下Shift+右键,选择“打开命令窗口”选项,打开命令行,输入:

.\bin\windows\kafka-server-start.bat .\config\server.properties
在这里插入图片描述
注意:不要关了这个窗口,启用Kafka前请确保ZooKeeper实例已经准备好并开始运行

三、测试

1、 创建主题,进入Kafka安装目录D:\Kafka\ kafka_2.12-2.8.0,按下Shift+右键,选择“打开命令窗口”选项,打开命令行,输入:

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
在这里插入图片描述
这个界面可以关闭。

查看主题输入:

.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
在这里插入图片描述

2、 创建生产者,进入Kafka安装目录D:\Kafka\ kafka_2.12-2.8.0,按下Shift+右键,选择“打开命令窗口”选项,打开命令行,输入:hello自己输的

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
在这里插入图片描述
注意:这个可以关闭,相当于是输入端,在这里输入内容,回车后可以在消费者端输出。

3、 创建消费者,进入Kafka安装目录D:\Kafka\ kafka_2.12-2.8.0,按下Shift+右键,选择“打开命令窗口”选项,打开命令行,输入:

.\bin\windows\kafka-console-consumer.bat --bootstrap-server
localhost:9092 --topic test --from-beginning
在这里插入图片描述

生产者输入hello,消费者就会打出hello,123同理。
注意:在生产者test窗口进行的输入,查看时也必须是在对应的test消费者界面进行查看;

四、HS同步到kafka

范例环境:linux上的dm8单向同步到windows上的kafka在这里插入图片描述

4.1 配置归档

1)修改达梦端的dm.ini参数,开启归档,重启数据库使参数生效
1、ARCH_INI参数值设置为1
2、RLOG_APPEND_LOGIC参数的值设置为1
3、归档文件例子

[root@dmdsc01 bin]# more /SJ/data/PROD/dmarch.ini
#DaMeng Database Archive Configuration file
#this is comments
[ARCHIVE_LOCAL1]
ARCH_TYPE = LOCAL
ARCH_DEST = /SJ/data/PROD/arch
ARCH_FILE_SIZE = 256
ARCH_SPACE_LIMIT = 10240

4.2安装软件,配置hs文件

1、安装hs软件
2、管理工具中的执行/opt/dmhs/scripts/ddl_sql_dm7.sql(DDL同步必要操作

3、目的端(kafka)dmhs.hs配置文件E:\dmhs\bin\dmhs.hs

<?xml version="1.0" encoding="GB2312" standalone="no"?>
<base>
    <lang>en</lang>
    <version>2.0</version>
    <mgr_port>5445</mgr_port>
    <chk_interval>3</chk_interval>
    <ckpt_interval>60</ckpt_interval>
    <siteid>2</siteid>
</base>
<exec>
    <recv>
        <data_port>5446</data_port>
    </recv>
    <name></name>
    <enable>1</enable> 		
    <enable_ddl>1</enable_ddl>
    <char_code>PG_GB18030</char_code>
    <level>0</level>
    <exec_thr>16</exec_thr>
    <exec_sql>1024</exec_sql>
    <exec_trx>5000</exec_trx>
    <exec_rows>250</exec_rows>
    <recv_caches>8</recv_caches>
    <trxid_tables>1</trxid_tables>
    <exec_policy>2</exec_policy>
    <is_kafka>1</is_kafka>
    <max_packet_size>16</max_packet_size>
</exec> </dmhs>

<is_kafka>说明:
含义:EXEC 模块是否需要支持 KAFKA,可选项 0/1。
说明:在 KAFKA 模式下,dmhs_server 会作为动态库供 JAVA 程序调用,加载 EXEC
模块以后,EXEC 模块收集到的事务通过专门的 JAVA 服务发送给 KAFKA 服务,由第三方应用进一步的处理。

<enable_ddl>说明:
含义:在 KAFKA 模式下,是否记录源端数据库 DDL 信息,可选项 0/1,默认值 0。
说明:该参数仅在 KAFKA 模式下有效。在 KAFKA 模式下,EXEC 模块会将源端操作解析为 JSON 字符串,将 JSON 字符串发送到 KAFKA 服务。该参数控制是否将源端 DDL
操作解析为 JSON 发送到 KAFKA 服务。DDL 操作 JSON 字符串如下:
{“DatasetTable”:“TEST1”,
“DatasetName”:“DMHS”,
“oType”:“DD”,
“batchTime”:“2019-12-13 16:46:01”,
“scn”:“35634014”,
“content”:{“sql”:"DROP TABLE “DMHS”.“TEST1"”}}

字段含义:
tableName:DDL 操作表名
DatasetName:DDL 操作表所在用户
oType:DDL 类型。
DD:DROP TABLE
DC:CREATE TABLE
DA:ALTER TABLE
DT:TRUNCATE TABLE
DSQL:以源端以 SQL 方式发送过来的 DDL
其中,DA 仅支持 add column,drop column,modify column,rename column,其他操作不支
持。
batchTime:源端 DDL 操作时间
content:DDL 操作 SQL,当 oType 为 DSQL 时,记录源端 DDL SQL,其他为 DMHS
解析日志生成的 DDL SQL。

4、源端(dm8)dmhs.hs配置文件**/opt/dmhs/bin/dmhs.hs**

[root@dmdsc01 bin]# more dmhs.hs
<?xml version="1.0" encoding="GB2312" standalone="no"?>
<dmhs>
<base>
<lang>en</lang>
<mgr_port>5345</mgr_port>
<chk_interval>3</chk_interval>
<ckpt_interval>60</ckpt_interval>
<siteid>1</siteid>
<version>2.0</version>
</base>
    <cpt>
    <db_type>DM8</db_type>
    <db_server>192.168.3.66</db_server>
    <db_user>SYSDBA</db_user>
    <db_pwd>Dameng123</db_pwd>
    <db_port>5236</db_port>
	<char_code>PG_UTF8</char_code>
	<ddl_mask>OP:TABLE:REC</ddl_mask>
    <arch>
    <clear_interval>600</clear_interval>
    <clear_flag>1</clear_flag>
    </arch>
    <send>
		<ip>192.168.3.2</ip>
		<mgr_port>5445</mgr_port>
		<data_port>5446</data_port>
		<net_pack_size>256</net_pack_size>
		<net_turns>0</net_turns>
		<crc_check>0</crc_check>
		<trigger>0</trigger>
		<identity>0</identity>
    <filter>
    <enable>
    <item>SYSDBA.*</item>
    <item>SYSDBA.DMHS_TRXID_TABLE</item>
    </enable>
    <disable/>
    </filter>
	  <map>
	  </map>
    </send>
    </cpt>
    </dmhs>

4.3启动kafka服务

进入kafka程序bin目录,分别启动zookeeper和kafka server。
Zookepeer:
D:\kafka\apache-zookeeper-3.6.3-bin\bin>zkServer.cmd

Kafka:
D:\kafka\kafka_2.12-2.8.0>.\bin\windows\kafka-server-start.bat .\config\server.p
roperties

4.4启动脚本(新版本使用)

1、首先拿到kafka的版本后要检查版本里是否包含如下3个文件:

dmga-dmhs-kafka-service.jar,fastjson-1.2.21.jar和dmhs_kafka.properties;

2、这三个文件都放在dmhs\bin下即可;
3、修改文件dmhs_kafka.properties里的内容,其中
dmhs.conf.pat修改为实际的hs配置文件路径
bootstrap.servers修改为实际的kafka节点信息
kafka.topic.name修改为实际的主题
其余参数保持不变
一个模板如下:

# DMHS config file path dmhs.conf.path=/opt/sty/dmhs_kafka_0917/bin/dmhs.hs
# kafka broker list,such as ip1:port1,ip2:port2,…
bootstrap.servers=223.254.16.75:9092
# kafka topic name kafka.topic.name=TEST

win版dmhs_kafka.properties

# DMHS config file path dmhs.conf.path=E:\dmhs\bin\dmhs.hs
# kafka broker list,such as ip1:port1,ip2:port2,… bootstrap.servers=127.0.0.1:9092
# kafka topic name kafka.topic.name=test
# whether to enable JSON format check json.format.check=1
# How many messages print cost time print.message.num=1000
# How many messages batch to get dmhs.min.batch.size=20
# kafka serializer class key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# kafka partitioner config partitioner.class=com.dameng.dmhs.dmga.service.impl.OnePartitioner
# kafka request acks config acks=-1 max.request.size=5024000
#batch.size=1048576
#linger.ms=3
#buffer.memory=134217728 retries=3
#enable.idempotence=true compression.type=none max.in.flight.requests.per.connection=1 send.buffer.bytes=1048576
metadata.max.age.ms=300000

配置hs启动脚本
创建文件start_dmhs_kafka.sh,内容如下:

#!/bin/sh
export LANG=zh_CN.GBK
/usr/java/jdk1.8.0_112/bin/java -Djava.ext.dirs="/opt/kafka_2.11-0.11.0.1/libs:." com.dameng.dmhs.dmga.service.impl.ExecDMHSKafkaService
/opt/sty/dmhs_kafka_0917/bin/dmhs_kafka.properties

其中,要注意:java路径,kafka目录和dmhs_kafka.properties根据实际路径配置

Win版运行命令如下:

E:\dmhs\bin>java
-Djava.ext.dirs=“E:\kafka_2.12-2.8.0\libs;E:\dmhs\bin;.” com.dameng.dmhs.dmga.service.impl.ExecDMHSKafkaService
E:\dmhs\bin\dmhs_kafka.propertie

java -Djava.ext.dirs=dmga-dmhs-kafka-service.jar路径;kafka_2.11-0.9.0.1_libs路径, -cp kafka_dmhs_dll路径 com.dameng.dmhs.dmga.service.impl.ExecDMHSKafkaService
dmhs_kafka.propertie

说明:
① 设置java系统属性java.ext.dirs为kafka的libs目录。
②  DMHS实现kafka同步服务的类名为: com.dameng.dmhs.dmga.service.impl.ExecDMHSKafkaService

总结启动顺序:
数据库服务:

Zookepeer:

E:\apache-zookeeper-3.6.3-bin\bin>zkServer.cmd
在这里插入图片描述

Kafka:

E:\kafka_2.12-2.8.0>.\bin\windows\kafka-server-start.bat
.\config\server.p roperties
在这里插入图片描述

目的端(kafka)Hs服务:

E:\dmhs\bin>java
-Djava.ext.dirs=“E:\kafka_2.12-2.8.0\libs;E:\dmhs\bin;.” com.dameng.dmhs.dmga.service.impl.ExecDMHSKafkaService
E:\dmhs\bin\dmhs_kafka.propertie
在这里插入图片描述
此处的报错:库文件 libexec_ins_ora.so未找到,该报错不影响dmhs正常运行,可以忽略。

源端(dm8):

[root@dmdsc01 bin]# ./dmhs_server
在这里插入图片描述
此处报错是因为windows上的防火墙导致网络不通;

[root@dmdsc01 ~]# cd /opt/dmhs/bin
[root@dmdsc01 bin]# ./dmhs_console
DMHS> connect 192.168.3.66:5345
DMHS> copy 0 “sch.name=‘SYSDBA’” DICT
DMHS> start cpt
在这里插入图片描述在这里插入图片描述

源端启动cpt后目的端显示如下:
在这里插入图片描述

测试:

1、源端建表:
在这里插入图片描述

2、开启一个消费者界面,观察DMHSTEST的推送情况,打印的消息如下所示

E:\kafka_2.12-2.8.0>.\bin\windows\kafka-console-consumer.bat
–bootstrap-server localhost:-9092 -topic test
在这里插入图片描述

4.5 hs服务启动报错与解决
报错1

在这里插入图片描述解决:把命令中的dmhs.hs路径删除,此处花了有两个小时的时间,一开始怀疑是java版本的问题,尝试了几个版本(1.8即可),还是报这个错,原因是看了几篇前辈的笔记:老版本是需要加上dmhs.hs的,但是新版本在dmhs_kafka.properties文件写的有dmhs.hs的路径。

报错2
无法同步:需在目的端dmhs.hs里面加参数enable_ddl,源端ddl_mask改成OP:TABLE:REC。
<enable_ddl>1</enable_ddl>
<ddl_mask>OP:TABLE:REC</ddl_mask>

报错3
由于本地ip是自由分配,公司是一个ip,家里是一个ip,在kafka的server.properties文件中如果增加了本机的地址listeners = PLAINTEXT://192.168.159.143:9092,到家后,ip发生变换就会发生kafka服务起不来报如下错,只需把ip变成家里的ip即可,还有dmhs.hs的配置文件里的ip和dmhs_kafka.properties里的ip也要改。
org.apache.kafka.common.KafkaException: Socket server failed to bind to 10.12.20
.183:9092: Cannot assign requested address: bind.

报错4:

Kafka启动报:另一个程序正在使用此文件,进程无法访问,手动删除kafka_2.12-1.1.0log文件夹下的所有文件,再重新启动就可以了
在这里插入图片描述

dmhs使用过程中常见问题汇总

问题1:

现象描述:oracle执行端exec后报错:create socket connection failure ,unable to
connect database。 解决方法:ldd libdmhs_exec.so 查看版本是否匹配
Oracle版本对应的是libodbc.so,而dm7版本对应的是libdodbc.so。

问题2

现象描述:dm端执行copy命令后一直报错:cls[error]: initial loader
failure,前台服务显示:LD[error]: OCI initial session failed
问题原因:配置错误导致,dmhs.hs配置文件中dm库的端口实为5236,误写为5336。
解决方法:将dmhs.hs配置文件修改为<db_port>5236</db_port>。

问题3

现象描述: oracle端执行copy时报错CPT[error]: DICT file directory(./DICT)not existed 然后服务退出。
问题原因:使用不当导致,没有权限创建dict目录。 解决办法:查看并更改dmhs目录权限

问题4

现象描述:ddl同步时dm端新建表后oracle端的dmhs_server日志中报错: EXE[ERROR]:unable to find NAME col’s data type CHAR in map
问题原因:使用不当导致,oracle端表dmhs_dtype_map中有没有数据
解决办法:删掉oracle里dmhs_*等五张表,执行exec重新生成这五张表

问题5

现象描述:Unable to extend lob segment TEST.SYS_lob0000121 by 8192 in tablespace TEST_TBS。
问题原因:oracle数据库表空间数据文件配置过少导致,DataFile设定了大小,且设置为自动增长,已经到了32G的文件最大值上限。
解决办法:ALTER TABLESPACE XXX ADD DATAFILE ‘数据文件路径’ SIZE 100M AUTOEXTEND ON NEXT 10M MAXSIZE UNLIMITED;

问题6

现象描述:CPT[ERROR]: DB:创建SOCKET连接失败 (code=3113) CPT[ERROR]: OCI initial session failed
问题原因:dm端数据库未启动
解决办法:启动数据库服务

问题7

现象描述:isql: symbol lookup error:
/usr/lib/oracle/11.2/client/lib/libsqora.so.11.1: undefined symbol:
SQLGetPrivateProfileStringW
问题原因:unixODBC的版本问题,oracle11g必须unixODBC2.3.0
解决办法:安装unixODBC 2.3.0

问题8

现象描述:执行完start exec报错 replicat: symbol lookup error:
/u01/oracle/lib/libsqora.so.11.1: undefined symbol:
SQLGetPrivateProfileStringW
问题原因:环境变量问题
解决办法:配置环境变量export
LD_LIBRARY_PATH=/usr/local/lib:$ORACLE_HOME/lib:/lib64

问题9

现象描述: [oracle@bjcuug bin]$ ./dmhs_server MGR[INFO]: DMHS start up,current version: V3.1.3-Build(2018.11.29-81244trunc)_64 (The beta)
MGR[INFO]: load config file successful,site no:2, manager port :5645,poll interval:3 MGR[INFO]: init unique sign fail
问题原因:dmhs目录权限不正确
解决办法:[root@bjcuug opt]# chown -R oracle:oinstall dmhs

问题10

现象描述:源端执行完copy后报错:
在这里插入图片描述
此时目的端服务中显示:
在这里插入图片描述
REV[ERROR]: site (2): The message of the site version are not compatible
问题原因:源端版本为V3.1.3,目的端版本为V3.1.2,执行端和捕获端dmhs版本不一致
解决办法:更换版本使两端版本保持一致 注:单向同步时源端可以为低版本。

问题11

现象描述: EXE[INFO]: CONNECT: SERVER=dmhs;DRIVER={Oracle in OraDb11g_home1};UID=dmhs;PWD=******; EXE[ERROR]: SITEID:0 SEQID:0
TRXID:0 STATE:HY000 CODE: 12541
ERR:[unixODBC][Oracle][ODBC][Ora]ORA-12541: TNS: Ꮼ
EXE[ERROR]: unable to connect database!
问题原因:oracle端的监听未打开
解决办法:lsnrctl start

问题12

现象描述: MGR[ERROR]: cpt log delay 0 day, server stop, logtime:0 , …PUB[UNKNOW]: log delay
解决办法:在base里面加cpt_check为0

问题13

现象描述:EXE[ERROR]: SITEID:2 SYSDBA.TEST1(DELETE): affect the number of rows (0) and the expected number of rows (1)! SEQID:1340532
TRXID:2533377869612019 OP_ID:2
EXE[ERROR]: SITEID:2 SYSDBA.TEST1(DELETE) sync failed, fill rows 1, SEQID:1340532 TRXID:2533377869612019 OP_ID:2
解决办法:在exec模块中添加affect_row配置为0

问题14

现象描述:MGR[ERROR]: lib dmhs_exec.dll can not foud,error code 193
MGR[ERROR]: log exec start fail 问题原因:oracle为32位而同步工具是64位 解决办法:更换版本

问题15

现象描述:执行copy装载后map映射关系未生效,例如:源端A模式下的表同步到目的端B模式下,用copy装载来实现目的端create表,但是执行完copy后目的端的B模式下没有新建表,而在目的端的A模式下创建了表
问题原因:map映射规则中小写 解决办法:把map里面都改成大写

问题16

现象描述:执行copy后报错,
控制台中显示:CLS[error]: initial loader failuer
服务中显示:在这里插入图片描述
问题原因:数据库登录失败次数超过限制,用户被锁定
解决办法:解锁用户

问题17

现象描述:oracle端执行完start exec报错 EXE[INFO]: CONNECT:SERVER=dmhs;DRIVER=Oracle in OraDb11g_home1;UID=dmhs;PWD=******;
EXE[ERROR]: SITEID:0 SEQID:0 TRXID:0 STATE:IM002 CODE: 0
ERR:[unixODBC][Driver Manager]Data source name not found, and no
default driver specified EXE[ERROR]: unable to connect database!
问题原因:环境变量问题
解决办法:配置环境变量export
LD_LIBRARY_PATH=/usr/local/lib:$ORACLE_HOME/lib:/lib64

问题18

现象描述:源端执行完copy报错
DMHS> copy 0 “sch.name=‘SYSDBA’” DICT
copy mask is : |DICT|REP
CSL[ERROR]: load log analysis module failure
在这里插入图片描述
在这里插入图片描述

问题原因:数据库是dm7,而dmhs是dm8的版本
解决办法:更换版本

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐