网上关于此方面的内容较少,而且比较零散,正好项目中需要对ES7做预研,所以整理出了这篇文章。

ElasticSearch版本为7.13.2

分如下几个主题:

1.Datax的es导入插件elasticsearchwriter

1.1.从datax的github地址(https://github.com/alibaba/DataX)下载源码工程(Datax-master)

工程内容很多,如果在idea中构建,需要的时间比较长,需要耐心等待。如果不需要构建,那直接在文件夹中打开文件,进行第2步的操作。

1.2.修改父工程的pom.xml,配置modules模块,按需保留elasticsearchwriter模块

    <modules>
        <module>common</module>
        <module>core</module>
        <module>transformer</module>
        <module>elasticsearchwriter</module>
        <module>plugin-rdbms-util</module>
        <module>plugin-unstructured-storage-util</module>
        <module>hbase20xsqlreader</module>
        <module>hbase20xsqlwriter</module>
        <module>kuduwriter</module>
    </modules>

1.3.执行打包命令,idea或者命令行均可

mvn clean install -Dmaven.test.skip=true

1.4.将编译后的插件安装到 datax中

找到编译后的插件,目录为:

DataX-master/elasticsearchwriter/target/datax/plugin/writer/elasticsearchwriter

这个插件我会提供下载,你可以直接用我的,也可以自己打包。

下载地址:https://download.csdn.net/download/m0_37609579/20390001

 将这个文件夹拷贝到datax的plugin的writer目录下

2.Datax配置es导入job

任务类型选择Datax任务

3.Job的JSON文件编写

要配置导入导出数据源

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3,
        "byte": 1048576
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "oraclereader",
          "parameter": {
            "username": "oracle数据库账号",
            "password": "oracle数据库密码",
            "connection": [
              {
                "querySql": [
                  "select id,name from 数据库表名"
                ],
                "jdbcUrl": [
                  "jdbc:oracle:thin:@//127.0.0.1:1521/orcl"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "elasticsearchwriter",
          "parameter": {
            "endpoint": "http://127.0.0.1:9200",
            "accessId": "elastic",
            "accessKey": "XXXXXX",
            "index": "test_index",
            "type": "_doc",
            "cleanup": false,
            "settings": {
              "index": {
                "number_of_shards": 1,
                "number_of_replicas": 0
              }
            },
            "discovery": false,
            "batchSize": 10000,
            "splitter": ",",
            "column": [
              {
                "name": "id",
                "type": "long"
              },
              {
                "name": "name",
                "type": "text"
              }
            ]
          }
        }
      }
    ]
  }
}

4.需要注意的地方

4.1.querySql查询的字段要与column一一对应,不能多也不能少,顺序最好也一样

 

4.2. endpoint地址是es的http访问地址,端口为9200,不要配置成9300

4.3.accessId和accessKey必须要配,如果es没有密码,那随便填,但必须要有

4.4.其他注意事项

4.4.1.ES日期字段创建需指定格式 yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis

{
     "name": "CREATE_DATE",
     "type": "date",
     "format": "yyyy-MM-dd HH:mm:ss"
}

4.4.2.日期数据导入时,text写入为日期格式,long写入为时间戳

4.4.3.注意时区问题 写入时指定时区 或 对UTC时间戳进行转换

  1. 指定:“2019-03-12T12:12:12.123+0800”
  2. 转换:东八区时间戳 = 3600000*8 + UTC时间戳

4.5.json文件格式不对,数组越界错误

sql和column字段匹配不上就会数组越界

4.6.使用数据库id作为es中记录的_id

{"name": "pk", "type": "id"},

name指定为id,type也指定为id,这样就会把数据库的id作为es中的id了。

不需要再指定name为id的字段了,不然会报错 

 5.参数描述

endpoint 描述:ElasticSearch的连接地址 必选:是 默认值:无
accessId 描述:http auth中的user 必选:否 默认值:空
accessKey 描述:http auth中的password 必选:否 默认值:空
index 描述:elasticsearch中的index名 必选:是 默认值:无
type 描述:elasticsearch中index的type名 必选:否 默认值:index名
cleanup 描述:是否删除原表 必选:否 默认值:false
batchSize 描述:每次批量数据的条数 必选:否 默认值:1000
trySize 描述:失败后重试的次数 必选:否 默认值:30
timeout 描述:客户端超时时间 必选:否 默认值:600000
discovery 描述:启用节点发现将(轮询)并定期更新客户机中的服务器列表。 必选:否 默认值:false
compression 描述:http请求,开启压缩 必选:否 默认值:true
multiThread 描述:http请求,是否有多线程 必选:否 默认值:true
ignoreWriteError 描述:忽略写入错误,不重试,继续写入 必选:否 默认值:false
ignoreParseError 描述:忽略解析数据格式错误,继续写入 必选:否 默认值:true
alias 描述:数据导入完成后写入别名 必选:否 默认值:无
aliasMode 描述:数据导入完成后增加别名的模式,append(增加模式), exclusive(只留这一个) 必选:否 默认值:append
settings 描述:创建index时候的settings, 与elasticsearch官方相同 必选:否 默认值:无
splitter 描述:如果插入数据是array,就使用指定分隔符 必选:否 默认值:-,-
column 描述:elasticsearch所支持的字段类型,样例中包含了全部 必选:是 
dynamic 描述: 不使用datax的mappings,使用es自己的自动mappings 必选: 否 默认值: false

六、使用dbeaver 配置 jdbc 连接 es

报错 current license is non-compliant for [jdbc]

修改成30天试用版,https://www.elastic.co/guide/en/elasticsearch/reference/master/start-trial.html

POST "localhost:9200/_license/start_trial?acknowledge=true&pretty

查看服务器es的license信息

GET http://localhost:9200/_license

 可以愉快使用了

七、使用es的动态模板

datax从mysql同步数据到elasticsearch(使用es的动态模板)

        "writer": {
          "name": "elasticsearchwriter",
          "parameter": {
            "endpoint": "http://xxx:9200",
            "index": "myindex",
            "type": "data",
            "cleanup": true, #true表示插入前清空,即覆盖同步;false则追加同步
            "dynamic": true,  #这里一定要指定为true,否则使用的是datax的模板(就是下面定义的字段类型),而不会使用es的模板
            "settings": {"index" :{"number_of_shards": 2, "number_of_replicas": 1}},            
            "batchSize": 10000,
            "splitter": ",",
            "column": [
              {"name": "pk", "type": "id"},#指定第一个字段为rowkey 
              {"name": "province", "type": "text"},
              { "name": "city", "type": "text"},
              { "name": "area", "type": "text"},
              { "name": "longitude","type":"double" },
              { "name": "latitude","type": "double" },
              { "name": "location","type": "geo_point" }                   
            ]
          }
        }

参考文档:

  1. https://www.jianshu.com/p/7d08f91fc0be
  2. https://www.cnblogs.com/zzpblogs/p/13212617.html
  3. https://github.com/alibaba/DataX
  4. https://www.jianshu.com/p/7d08f91fc0be
  5. http://t.zoukankan.com/zzpblogs-p-13212617.html
  6. https://blog.csdn.net/ctypyb2002/article/details/106115691
  7. https://blog.csdn.net/ASN_forever/article/details/106340961
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐