需要下载的东西

ElasticSearch

下载完ElasticSearch后解压,然后点击bin目录下的elasticsearch.bat启动(ES默认端口号为9200,若不能启动请检查端口号是否占用)
在这里插入图片描述

Logstash

将下载好的mysql驱动压缩包和Logstash压缩包解压,将mysql的的jar包放入logstash文件夹内在这里插入图片描述

同步mysql数据库中数据到ES中

在logstash/bin中新建logstash-mysql.conf文件(名字任意,可以不同,只要更改后面运行命令即可)

其中写入

input {
    stdin {
    }
    jdbc {
      # 数据库  数据库名称为ESDB,表名为user
      jdbc_connection_string => "jdbc:mysql://localhost:3306/ESDB"
      # 用户名密码
      jdbc_user => "root"
      jdbc_password => "root"
      # jar包的位置
      jdbc_driver_library => "D:/ElasticSearch/logstash-7.13.3/mysql-connector-java-8.0.25.jar"
      # mysql的Driver
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      # sql文件位置 没有可以不写
      #statement_filepath => "config-mysql/user.sql"
      statement => "select * from user"
      # 设置定时任务间隔  含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务
      schedule => "* * * * *"
	  #索引的类型
      type => "user"
    }
}

output {
    elasticsearch {
        hosts => "127.0.0.1:9200"
        # index名
		index => "user"
		# 需要关联的数据库中有有一个id字段,对应索引的id号
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }
}

然后在bin目录下执行logstash -f logstash-mysql.conf就能自动同步mysql数据了

可能出现的问题

  • 数据库中的时间字段与同步到ES中的对应的数据不符(相差8小时)
    修改logstash-mysql.conf配置文件
# 在jdbc字段中添加
plugin_timezone => "local"
# 添加filter字段
filter {
    # 因为时区问题需要修正时间
    ruby { 
        code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" 
    }
    ruby {
        code => "event.set('@timestamp',event.get('timestamp'))"
    }
    mutate {
        remove_field => ["timestamp"]
    }
    # 因为时区问题需要修正时间
    ruby {
        code => "event.set('create_date', event.get('create_date').time.localtime + 8*60*60)" 
    }    
    # 因为时区问题需要修正时间
    ruby {
        code => "event.set('update_date', event.get('update_date').time.localtime + 8*60*60)" 
    } 
}


# 完整配置信息
input {
    stdin {
    }
    jdbc {
      # 数据库  数据库名称为elk,表名为book_table
      jdbc_connection_string => "jdbc:mysql://localhost:3306/ESDB"
      # 用户名密码
      jdbc_user => "root"
      jdbc_password => "root"
      # jar包的位置
      jdbc_driver_library => "D:/ElasticSearch/logstash-7.13.3/mysql-connector-java-8.0.25.jar"
      # mysql的Driver
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      #statement_filepath => "config-mysql/user.sql"
      statement => "select * from user"
      schedule => "* * * * *"
	  #索引的类型
      type => "user"
	  plugin_timezone => "local"
    }
}
 
filter {
    # 因为时区问题需要修正时间
    ruby { 
        code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" 
    }
    ruby {
        code => "event.set('@timestamp',event.get('timestamp'))"
    }
    mutate {
        remove_field => ["timestamp"]
    }
    # 因为时区问题需要修正时间
    ruby {
        code => "event.set('create_date', event.get('create_date').time.localtime + 8*60*60)" 
    }    
    # 因为时区问题需要修正时间
    ruby {
        code => "event.set('update_date', event.get('update_date').time.localtime + 8*60*60)" 
    } 
}
 
output {
    elasticsearch {
        hosts => "127.0.0.1:9200"
        # index名
		index => "user"
		# 需要关联的数据库中有有一个id字段,对应索引的id号
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }
}

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐