宗旨:

我们今天,使用logstash同步一下数据库,mysql-->ES

废话不多说,直接上

一,下载logstash

 下载地址:Logstash 7.17.0 | Elastic

PS:

 es,kibana,logstash 三个版本一定要一致(包括ik分词器)

前面我用的都是 7.17.0的版本。

下载完,解压

二,安装ruby环境

下载地址: Downloads

无脑下一步

三,修改pipelines.yml

位置:

 把注释去掉(保存为 UTF-8格式!!!这里是个坑

四,创建配置文件

 先创建一个文件夹mysqlconfig

大家自己改一下前面的路径。

D:\elasticsearch\logstash-7.17.0\mysqlconfig

然后创建文件   mysql.conf

内容如下:

注意几点:

1 jdbc的userSSL=false   关闭 SSL

2 数据表中,最好有个自动生成的更新时间,ES可以根据这个字段取增量更新(后面我会放上sql建表语句)

3 jdbc_driver_library 是你的mysql驱动,我的是通过maven下载的,你们自己改改

4 last_run_metadata_path 这个路径改一下,文件名不用改

5 statement => "select * from myTest where sysUpdateTime > date_add(:sql_last_value,INTERVAL 8 HOUR)"

这一句大家自己改一下

input {
  stdin {
  }
  jdbc {
  jdbc_connection_string => "jdbc:mysql://localhost:3306/ty_test?userSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC"
  # the user we wish to excute our statement as
  jdbc_user => "root"
  jdbc_password => "123456"
  # the path to our downloaded jdbc driver  
  jdbc_driver_library => "C:\Users\pc\.m2\repository\mysql\mysql-connector-java\8.0.27\mysql-connector-java-8.0.27.jar"
  # the name of the driver class for mysql
  jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "50000"
  #要执行的sql文件
  #statement_filepath => "/conf/course.sql"
  statement => "select * from myTest where sysUpdateTime > date_add(:sql_last_value,INTERVAL 8 HOUR)"
  schedule => "* * * * *"
  record_last_run => true
  last_run_metadata_path => "D:/elasticsearch/logstash-7.17.0/config/logstash_metadata"
  }
}
 
 
output {
  elasticsearch {
  hosts => "localhost:9200"
  #hosts => ["localhost:9200","localhost:9202","localhost:9203"]
  #ES索引库名称
  index => "mytest"
  document_id => "%{id}"
  document_type => "doc"
  }
  stdout {
  #日志输出
  codec => json_lines
  }
}

五,创建数据库表

CREATE TABLE `myTest` (
  `id` int(11) NOT NULL,
  `name` varchar(50) DEFAULT NULL,
  `hobby` varchar(50) DEFAULT NULL,
  `amount` decimal(18,2) DEFAULT NULL,
  `sysUpdateTime` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


-- 录入数据
insert into myTest(id,`name`,hobby,amount) 
select 1,'1','1',80 union 
select 2,'2','2',40 union 
select 3,'3','3',40 union 
select 4,'4','4',100 union 
select 5,'5','5',100 union 
select 6,'6','6',50  

六,启动测试

1 启动 ES      bin下的bat

2 启动 kibana    bin下的bat

3 启动 logstash

上面两个不讲了,我们说一下logstash同步命令

打开cmd,进入logstash的bin目录。

执行  logstash -f ../mysqlconfig/mysql.conf

第一次会全量更新数据。

接着每1分钟更新一次。

当你在mysql中录入数据,就会看到如下:(数据没有变更则只是单纯的一个select语句)

最后

可以去kibana查看一下

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐