在开发 Python 应用时,经常会使用到 Jupyter 来完成 Python 应用的开发及调试。简而言之,Jupyter Notebook 是以网页的形式打开,可以在网页页面中直接编写代码和运行代码,代码的运行结果也会直接在代码块下显示。如在编程过程中需要编写说明文档,可在同一个页面中直接编写,便于作及时的说明和解释。在今天的文章中,我将使用  Jupyter 来进行展示。

今天我就一个简单的例子来进行展示如何使用 Python 语言导入一个 CSV 文件 。这个 CSV 的文件很简单,但是我们通过这个文件来展示如何在 Python 中使用相应的 API 来导入数据。

addresses.csv

id,firstname,surname,address,city,state,postcode
1,John,Doe,120 jefferson st.,Riverside,NJ,08075
2,Jack,McGinnis,220 hobo Av.,Phila,PA,09119
3,John Da Man,Repici,120 Jefferson St.,Riverside,NJ,08075
4,Stephen,Tyler,7452 Terrace At the Plaz road,SomeTown,SD,91234
5,Joan the bone,Anne,Jet 9th at Terrace plc,Desert City,CO,00123

我们在自己的电脑上创建一个叫做 py-elasticsearch 的目录,并把 addresses.csv 文件拷贝到这个文件夹中。我们在这个目录中启动 jupyter:

$ pwd
/Users/liuxg/python/py-elasticsearch
$ ls
addresses.csv

安装

我们需要安装如下的部分:

Elastic Stack

你可以参照文章 “Elastic:菜鸟上手指南” 来根据自己的操作系统来安装自己的 Elasticsearch 及 Kibana。你需要启动 Elasticsearch 及 Kibana。

Jupyter

你需要安装 Jupyter 来创建 notebook。请根据自己的操作系统安装相应的软件。

Python

你可以安装最新的 Python 来进行实践。在我的电脑上,我安装的版本是 3.8.5。

$ jupyter notebook

这样就创建了我们的一个 jupyter notebook。我们创建一个叫做 py-elasticsearch 的 notebook:

我们可以在命令前面添加 !来运行 SHELL 指令。上面显示我们的 python 版本信息。

创建 Python 应用

我们接下来需要在自己的电脑上安装相应的模块:

pip3 install elasticsearch
pip3 panda

我们接下来输入如下的代码:

try:
    import os
    import sys
    
    import elasticsearch
    from elasticsearch import Elasticsearch 
    import pandas as pd
    
    print("All Modules Loaded ! ")
except Exception as e:
    print("Some Modules are Missing {}".format(e))

你可以使用 SHIFT + ENTER 来执行代码。上面显示所有的模块都已经被装载了。如果你没有看到上面的消息,你需要安装相应的模块。

我们接下来创建一个函数来连接 Elasticsearch:

def connect_elasticsearch():
    es = None
    es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
    if es.ping():
        print('Yupiee  Connected ')
    else:
        print('Awww it could not connect!')
    return es
es = connect_elasticsearch()
es.ping()

上面代码显示我们已经成功地连接到 Elasticsearch。接下来我们来创建一个叫做 liuxg-test 的 index:

es.indices.create(index="liuxg-test", ignore=400)

我们可以到 Kibana 中查看是否有一个叫做 liuxg-test 的 index 已经被创建:

GET _cat/indices/liuxg-test

我们接下来显示所有的索引:

res = es.indices.get_alias("*")
for name in res:
    print(name)

接下来,我们来删除上面我们已经创建的 liuxg-test 索引:

es.indices.delete(index="liuxg-test", ignore=[400,404])

上面显示已经成功。我们可以去 Kibana,并再次查询 liuxg-test 索引:

GET _cat/indices/liuxg-test

显然这次,我们没有看到 liuxg-test 这个索引。它表明我们的索引已经被删除了。

我们接下来导入两个文档到 Elasticsearch 中去:

e1 = {
    "first_name":"nitin",
    "last_name":"panwar",
    "age": 27,
    "about": "Love to play cricket",
    "interests": ['sports','music'],
}

e2 = {
    "first_name" :  "Jane",
    "last_name" :   "Smith",
    "age" :         32,
    "about" :       "I like to collect rock albums",
    "interests":  [ "music" ]
}
es.indices.create(index='people', ignore=400)
res1 = es.index(index='people', doc_type='_doc', body=e1, id=1)
res2 = es.index(index='people', doc_type='_doc', body=e2, id=2)

在上面,我们创建了一个叫做 people 的索引,并把两个文档 e1 及 e2 导入到 Elasticsearch 中:

我们可以到 Kibana 中查看所以 people 的文档:

GET people/_search

我们可以清楚地看到有两个文档被成功地导入到 Elasticsearch 中。

我们接下来删除一个文档:

res = es.delete(index='people', doc_type='_doc', id=1)

我们到 Kibana  中去查看 id 为1 的文档:

GET people/_doc/1

上面的命令显示该文档不存在。

接下来,我们来搜索所有的文档:


res = es.search(index = 'people', body = {'query': {"match_all": {}} } )

在很多的时候,我们需要定义一个索引的 settings 及 mapping。我们可以按照如下的调用来完成:

settings = {
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0
  },
   "mappings": {
    "dynamic": "true",
    "_source": {
      "enabled": "true"
    },
    "properties": {
      "title": {
        "type": "text"
      },
      "name": {
        "type": "text"
      }
    }
  }
}

indexName = 'liuxg-with-mapping'
es.indices.create(index=indexName, ignore=[400,404], body=settings)

我们可以在 Kibana 中进行查看:

GET liuxg-with-mapping

导入 CSV 文件

接下来,我们来进行我们的正题。我们重新创建一个叫做 csv-elasticsearch 的 Notebook:

我们打入如下的命令来装载所有的模块:

try:
    import elasticsearch
    from elasticsearch import Elasticsearch
    
    import pandas as pd
    import json
    from ast import literal_eval
    from tqdm import tqdm
    import datetime
    import os
    import sys
    import numpy as np
    
    from elasticsearch import helpers
    print("Loaded ............")
except Exception as E:
    print("Some Modules are Missing{}".format(e))

如果你看到 Loaded,则表明所有的模块都装载正确。否则,你需要安装相应的模块。接下来,我们确保我们之前的 addresses.csv 位于我们的 Jupyter 启动的目录里:

for name in os.listdir():
    print(name)

我们接下来尝试阅读这个 csv 文件。

df = pd.read_csv("addresses.csv")
df.head(2)

显然 Pandas 很方便地让我们读入我们的数据。上面显示我们有5个文档,有7列。在下面的代码中,我们将把文档中的 id 当做 Elasticsearch 文档中的 id。这个 id 是唯一的。我们来创建连接到 Elasticsearch 的实例:

ENDPOINT = "http://localhost:9200"
es = Elasticsearch(timeout=600, hosts = ENDPOINT)
es.ping()

在上面,我们对数据做了简单的清洗。对于确实任何一个项的文档,我们直接去掉。接下来,我们把 df 中的数据转换为 Elasticsearch 可以理解的格式:

df2 = df.to_dict('records')

我们需要创建一个 generator 来把数据写入到 Elasticsearch:

def generator(df2):
    for c, line in enumerate(df2):
        try:
            yield {
                '_index': "addresses",
                '_type': '_doc',
                '_id': line.get("id", None),
                '_source': {
                    "firstname":line.get("firstname", ""),
                    "surname":line.get("surname", ""),
                    "address":line.get("address", ""),
                    "city":line.get("city", ""),
                    "state":line.get("state", ""),
                    "postcode":line.get("postcode", "")
                }
            }
        except StopIteration:
            return

接下来,我们使用 helper 来导入数据:

try:
    res = helper.bulk(es, generator(df2))
    print("Working")
except Exception as e:
    pass

 我们到 Kibana 中查看 addresses 索引:

GET _cat/indices/addresses?v
health status index     uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   addresses eQCjFfWQTIyMR2D4eCyv-g   1   1          5            5     17.4kb         17.4kb

上面显示有四个文档。我们最早的 CSV 文档中有5个文档,这是因为我们在进行 helper.bulk 之前,已经调用过 next(my) 一次。

我们可以使用如下的命令来进行查询:

更多关于如何使用 Python 导入数据到 Elasticsearch 的介绍,可以参考文章 “Elasticsearch:Elasticsearch 开发入门 - Python”。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐