Elasticsearch:使用 Python 实现 Web Scraper

本文介绍如何使用Python爬虫抓取网络数据,并利用Elasticsearch进行数据存储、搜索与分析,涵盖Elasticsearch的Python客户端安装、索引创建、数据存储及检索过程。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

在今天的文章里,我们来介绍如何使用 Python 来访问 Elasticsearch。如果大家对 Elasicsearch 的安装及使用还不是很熟的话,建议看我之前的博客文章:如何在Linux,MacOS及Windows上进行安装Elasticsearch,并熟悉Elasticsearch的最基本的使用:开始使用Elasticsearch (1)/(2)/(3)

在今天的文章中,我们来介绍如何使用 Python 来把我们需要的数据存入到一个 Elasticsearch 的索引中,并使用它进行搜索数据及分析数据。

安装 Python 及 Elasticsearch python 包

首先我们需要安装 Python 及 Elasticsearch 相关的 Python 包。我们可以通过如下的方法来安装:

$ pip install elasticsearch

针对 Python3,我们可能需要如下的方法:

$ pip3 install elasticsearch

使用 Python 创建索引及访问索引

使用 Python 创建一个索引及访问其索引非常直接:

from datetime import datetime
from elasticsearch import Elasticsearch

es = Elasticsearch()

doc = {
    'author': 'kimchy',
    'text': 'Elasticsearch: cool. bonsai cool.',
    'timestamp': datetime.now(),
}

res = es.index(index="test-index", doc_type='_doc', id=1, body=doc)
print(res['result'])

res = es.get(index="test-index", doc_type='_doc', id=1)
print(res['_source'])

es.indices.refresh(index="test-index")

res = es.search(index="test-index", body={"query": {"match_all": {}}})
print("Got %d Hits:" % res['hits']['total']['value'])
for hit in res['hits']['hits']:
    print("%(timestamp)s %(author)s: %(text)s" % hit["_source"])

在这里,首先建立一个连接到 Elasticsearch 的实例 es。然后通过 es 来创建索引,并访问这个新建立的索引。我们运行的结果是:

updated
{'author': 'kimchy', 'text': 'Elasticsearch: cool. bonsai cool.', 'timestamp': '2019-08-27T05:18:12.375857'}
Got 1 Hits:
2019-08-27T05:18:12.375857 kimchy: Elasticsearch: cool. bonsai cool.

这里显示是 “updated”,这是因为我之前已经创建一个 id 为 1 的文档。再次创建时返回 updated,并且它的 version 会自动加 1。

在默认的情况下,它使用默认的地址 localhost:9200。如果我们想为 Elasticsearch 链接定义一个新的地址,我们可以使用如下的办法:

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])

在上面,我们可以把我们的 host 及 port信息输入到 Elasticsearch 中,这样我们可以连接到任何我们想要的 Elasticsearch 安装的实例中。

SSL 和身份验证

如果我们的 Elasticsearch 有安全的认证,您可以将客户端配置为使用 SSL 连接到 Elasticsearch 集群,包括证书验证和 HTTP 身份验证:

那么我需要使用如下的方法:

from elasticsearch import Elasticsearch

# you can use RFC-1738 to specify the url
es = Elasticsearch(['https://user:secret@localhost:443'])

# ... or specify common parameters as kwargs

es = Elasticsearch(
    ['localhost', 'otherhost'],
    http_auth=('user', 'secret'),
    scheme="https",
    port=443,
)

# SSL client authentication using client_cert and client_key

from ssl import create_default_context

context = create_default_context(cafile="path/to/cert.pem")
es = Elasticsearch(
    ['localhost', 'otherhost'],
    http_auth=('user', 'secret'),
    scheme="https",
    port=443,
    ssl_context=context,
)

Web scraper 及 Elasticsearch

下面介绍一个简单的使用 Elasticsearch 来实现从网路抓取数据的 Web Scraper。我们的主要目的是从一个在线的 recipe(食谱)抓取数据并存放于 Elasticsearch 中提供搜索并进行分析。这个网站的内容在 https://www.allrecipes.com/recipes/96/salad/。从网站上我们可以看到有很多的菜谱在那里。我们的分析应用从这个网站抓取数据。
 

Scrape数据

首先,我们创建一个叫做 get_recipes.py 的文件。它的内容是:

import json
from time import sleep
import requests
from bs4 import BeautifulSoup
def parse(u):
    title = '-'
    submit_by = '-'
    description = '-'
    calories = 0
    ingredients = []
    rec = {}

    try:
        r = requests.get(u, headers=headers)
        if r.status_code == 200:
            html = r.text
            soup = BeautifulSoup(html, 'lxml')
            # title
            title_section = soup.select('.recipe-summary__h1')
            # submitter
            submitter_section = soup.select('.submitter__name')
            # description
            description_section = soup.select('.submitter__description')
            # ingredients
            ingredients_section = soup.select('.recipe-ingred_txt')
            # calories
            calories_section = soup.select('.calorie-count')

            if calories_section:
                calories = calories_section[0].text.replace('cals', '').strip()

            if ingredients_section:
                for ingredient in ingredients_section:
                    ingredient_text = ingredient.text.strip()
                    if 'Add all ingredients to list' not in ingredient_text and ingredient_text != '':
                        ingredients.append({'step': ingredient.text.strip()})

            if description_section:
                description = description_section[0].text.strip().replace('"', '')

            if submitter_section:
                submit_by = submitter_section[0].text.strip()

            if title_section:
                title = title_section[0].text

            rec = {'title': title, 'submitter': submit_by, 'description': description, 'calories': calories,
                   'ingredients': ingredients}
    except Exception as ex:
        print('Exception while parsing')
        print(str(ex))
    finally:
        return json.dumps(rec)

if __name__ == '__main__':
    headers = {
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36',
        'Pragma': 'no-cache'
    }
    url = 'https://www.allrecipes.com/recipes/96/salad/'
    r = requests.get(url, headers=headers)
    if r.status_code == 200:
        html = r.text
        soup = BeautifulSoup(html, 'lxml')
        links = soup.select('.fixed-recipe-card__h3 a')
        for link in links:
            sleep(2)
            result = parse(link['href'])
            print(result)
            print('=================================')

这是一个一个最基本的 python 应用框架。在主程序里,我们对网址 https://www.allrecipes.com/recipes/96/salad/ 进行访问。如果访问成功,我们 BeautifulSoup 对返回的 html 内容进行分析。我们可以得到所有以 '.fixed-recipe-card__h3 a' 标识的内容。这个非常类似于 jQuery 对 html 进行的查询。这样我们可以的到像如下内容的一个 links:

<a class="fixed-recipe-card__title-link" data-content-provider-id="" data-internal-referrer-link="hub recipe" href="https://www.allrecipes.com/recipe/14469/jamies-cranberry-spinach-salad/">
<span class="fixed-recipe-card__title-link">Jamie's Cranberry Spinach Salad</span>
</a>, <a class="fixed-recipe-card__title-link" data-content-provider-id="" data-internal-referrer-link="hub recipe" href="https://www.allrecipes.com/recipe/142027/sweet-restaurant-slaw/">
<span class="fixed-recipe-card__title-link">Sweet Restaurant Slaw</span>
</a>, <a class="fixed-recipe-card__title-link" data-content-provider-id="" data-internal-referrer-link="hub recipe" href="https://www.allrecipes.com/recipe/14276/strawberry-spinach-salad-i/">
<span class="fixed-recipe-card__title-link">Strawberry Spinach Salad I</span>
...

上面的内容是一个数组,它里面含有一个叫做href的项。它是一个链接指向另外一个页面描述这个菜的的食谱,比如 https://www.allrecipes.com/recipe/14469/jamies-cranberry-spinach-salad/

parse 是一个用来解析一个食谱链接的数据。通过 BeautifulSoup 的使用,如法炮制,解析其中的数据项,比如 title_section, submitter_section 等,并最终得到我们所需要的 title, submitter 等数据。最终这个数据以json的形式返回。返回的结果就像如下的数据:

{
  "calories": "253",
  "description": "This is a great salad for a buffet, with interesting textures and southwest flavors combined in one delicious salad.  Leftovers store well refrigerated for several days.",
  "ingredients": [
    {
      "step": "1 cup uncooked couscous"
    },
    {
      "step": "1 1/4 cups chicken broth"
    },
    {
      "step": "3 tablespoons extra virgin olive oil"
    },
    {
      "step": "2 tablespoons fresh lime juice"
    },
    {
      "step": "1 teaspoon red wine vinegar"
    },
    {
      "step": "1/2 teaspoon ground cumin"
    },
    {
      "step": "8 green onions, chopped"
    },
    {
      "step": "1 red bell pepper, seeded and chopped"
    },
    {
      "step": "1/4 cup chopped fresh cilantro"
    },
    {
      "step": "1 cup frozen corn kernels, thawed"
    },
    {
      "step": "2 (15 ounce) cans black beans, drained"
    },
    {
      "step": "salt and pepper to taste"
    }
  ],
  "submitter": "Paula",
  "title": "Black Bean and Couscous Salad"
}

创建索引

我们从上面 parse 的数据最终我们想存储于一个 Elasticsearch 的索引里,并供以后的搜索及分析。为了达到这个目的,我们必须创建一个索引。我们命名这个索引的名字为recipes。我们把 type 的名字叫做 salad。另外我们也必须创建一个 mapping。

为了能够创建一个索引,我们必须连接 Elasticsearch 服务器。

def connect_elasticsearch():
    """

    :rtype: object
    """
    _es = None
    _es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
    if _es.ping():
        print('Yay Connected')
    else:
        print('Awww it could not connect!')
    return _es

为了能够是的上面的代码工作,我们必须加入使用 Elasticsearch 库:

from elasticsearch import Elasticsearch

我们可以修改上面的 localhost 来连接到我们自己的 Elasticsearch 服务器。如果连接成功,它将返回 "Yay Connected",并最终返回一个可以被使用的 Elasticsearch 实例。这里的_es.ping() 可以用来 ping 一下服务器。如果连接成功将返回 True。

下面,我们用上面返回的 Elasticsearch 实例来创建一个索引:

def create_index(es_object, index_name):
    created = False
    # index settings
    settings = {
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 0
        },
        "mappings": {
            "salads": {
                "dynamic": "strict",
                "properties": {
                    "title": {
                        "type": "text"
                    },
                    "submitter": {
                        "type": "text"
                    },
                    "description": {
                        "type": "text"
                    },
                    "calories": {
                        "type": "integer"
                    },
                    "ingredients": {
                        "type": "nested",
                        "properties": {
                            "step": {"type": "text"}
                        }
                    },
                }
            }
        }
    }

    try:
        if not es_object.indices.exists(index_name):
            # Ignore 400 means to ignore "Index Already Exist" error.
            es_object.indices.create(index=index_name, ignore=400, body=settings)
            print('Created Index')
        created = True
    except Exception as ex:
        print(str(ex))
    finally:
        return created

这里,我们通过一个 settings 变量把 Elasticsearch 所需要的 settings 及 mappings 一并放入这个字典中,并通过上面通过连接到 Elasticsearch 服务器返回的 es_object 来创建这个索引。如果成功将返回 True,否则返回 False。我们可以看看我们这里定义的数据类型,和我上面显示的返回结果。这里我们定义了 nested 数据类型,这是因为 ingredients 是一个 1 对多的关系。如果大家对这个还不是很熟的话,可以参阅我之前写的文章 “Elasticsearch: nested对象”。

接下来,我确保索引根本不存在然后创建它。 检查后不再需要参数 ignore = 400,但如果不检查是否存在,则可以抑制错误并覆盖现有索引。 但这有风险。 这就像覆盖数据库一样。

我们可以在浏览器中地址栏输入地址:http://localhost:9200/recipes/_mappings?pretty。如果我们看到如下的结果,表名,我们的 mapping 已经创建成功:

{
  "recipes" : {
    "mappings" : {
      "properties" : {
        "calories" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "description" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "ingredients" : {
          "properties" : {
            "step" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            }
          }
        },
        "submitter" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "title" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    }
  }
}

通过设置 dynamic:strict,我们强制 Elasticsearch 对我们任何新的文档进行严格的检查。注意这里 salads 是我们的文档的 type。在新的 Elasticsearch 中,我们针对一个索引有且只有一个 type。我们也可以通过 _doc 来访问。

存储数据

下一步我们来存储文档

def store_record(elastic_object, index_name, record):
    is_stored = True
    try:
        outcome = elastic_object.index(index=index_name, doc_type='salads', body=record)
        print(outcome)
    except Exception as ex:
        print('Error in indexing data')
        print(str(ex))
        is_stored = False
    finally:
        return is_stored

我们通过传入是的 record 来把我们需要的数据进行存储。为了能够我们能够存储数据,我们可以必须修改我们之前的 __main__ 部分代码:

if __name__ == '__main__':
    headers = {
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36',
        'Pragma': 'no-cache'
    }
    logging.basicConfig(level=logging.ERROR)

    print("starting ...")

    url = 'https://www.allrecipes.com/recipes/96/salad/'
    r = requests.get(url, headers=headers)
    if r.status_code == 200:
        html = r.text
        soup = BeautifulSoup(html, 'lxml')
        # print(soup)
        links = soup.select('.fixed-recipe-card__h3 a')
        # print(links)

        if len(links) > 0:
            es = connect_elasticsearch()

        for link in links:
            # print(link)

            sleep(2)
            result = parse(link['href'])
            # print(result)
            if es is not None:
                if create_index(es, 'recipes'):
                    out = store_record(es, 'recipes', result)
                    print('Data indexed successfully')

搜索数据


现在数据都已经被建立为索引,并存于一个叫做 recipies 的索引里。我们可以 Elasticsearch 来进行搜索,并分析数据。

def search(es_object, index_name, search):
    res = es_object.search(index=index_name, body=search)
    return res

我们可以通过如下的 __main__ 来调用:

if __name__ == '__main__':
    es = connect_elasticsearch()
    if es is not None:
        # search_object = {'query': {'match': {'calories': '102'}}}
        # search_object = {'_source': ['title'], 'query': {'match': {'calories': '102'}}}
        search_object = {'query': {'range': {'calories': {'gte': 20}}}}
        result = search(es, 'recipes', json.dumps(search_object))
        print(result)

你可能看到如下的结果:

{'took': 0, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 37, 'relation': 'eq'}, 'max_score': 1.0, 'hits': [{'_index': 'recipes', '_type': 'salads', '_id' ... }}

为了完成这个应用的运行,我们必须安装如下的 python 包:

beautifulsoup4==4.8.0
bs4==0.0.1
certifi==2019.6.16
chardet==3.0.4
elasticsearch==7.0.4
idna==2.8
lxml==4.4.1
requests==2.22.0
soupsieve==1.9.3
urllib3==1.25.3

至此,我们已经完成了整个应用的构造。你可以找到最终的代码:https://github.com/liu-xiao-guo/recipies

参考:

[1]:  https://elasticsearch-py.readthedocs.io/en/master/

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值