【系列】scrapy爬虫开发(8)Redis分布式

本文介绍了如何利用Redis实现Scrapy的分布式爬虫。首先,讲述了Redis数据库的基本概念,强调其作为轻量级内存数据库在分布式任务中的优势。接着,详细介绍了scrapy_redis库,包括自动连接Redis、分布式Spider基础类、调度器和去重等功能。最后,讨论了RedisSpider与RedisCrawlSpider的区别以及启动流程,指出RedisSpider从Redis数据库获取URL而非使用start_urls。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

8.Redis分布式

原生scrapy不支持分布式,不适合大型开发。
安装redis数据库,然后pip install scrapy_redis安装对应的库,可基于redis实现分布式功能。

redis数据库简介

Redis是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。
Redis是非关系数据库。
Redis是轻量型内存数据库,有限的数据类型,简洁的读写API,运行后不经过磁盘IO,读写迅速,远程增删改查非常容易。既可用于对临时性、经常变动的数据的缓存,也可用于执行分布式任务。
在这里,需要在linux系统中安装redis数据库(master机器,yum install redis),并安装对应的python库(可选,pip install redis)。

scrapy_redis库

scrapy_redis主要内容:
1.不必额外连接redis数据库,只需在配置文件里进行配置,爬虫启动时自动连接redis数据库并自动读取url数据生成请求队列。
2.实现了基于redis数据库以及适用于分布式的Spider基础类。
3.实现了基于redis数据库以及适用于分布式的Scheduler(调度器)、RFPDupeFilter(去重)、PriorityQueue(优先级队列)等,即重写了scrapy框架的核心类。
4.提供了RedisPipeline,可自动将爬取及解析得到的item存到redis数据库里。

默认配置:
scrapy_redis/defaults.py:

# For standalone use.
DUPEFILTER_KEY = 'dupefilter:%(timestamp)s'

PIPELINE_KEY = '%(spider)s:items'

REDIS_CLS = redis.StrictRedis
REDIS_ENCODING = 'utf-8'
# Sane connection defaults.
REDIS_PARAMS = {
    'socket_timeout': 30,
    'socket_connect_timeout': 30,
    'retry_on_timeout': True,
    'encoding': REDIS_ENCODING,
}

SCHEDULER_QUEUE_KEY = '%(spider)s:requests'
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'
SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'

START_URLS_KEY = '%(name)s:start_urls'
START_URLS_AS_SET = False

分布式爬虫架构

需要定义一个Master主服务器,用于运行redis数据库,把所有的爬取任务集中统一管理。所有的爬虫实例均视作Slave。

相关配置

在爬虫工程的配置文件里修改或添加一些项:
yourproject/settings.py:

SCHEDULER = "scrapy_redis.scheduler.Scheduler"#基于redis的调度器
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"#基于redis的url去重类
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'#基于redis的优先级队列
SCHEDULER_PERSIST = True#持久化(关闭后可保存到磁盘,重启重新加载)

ITEM_PIPELINES = {
    #REDIS
    #REDIS任务队列管理相关
    'scrapy_redis.pipelines.RedisPipeline': 300,
}

REDIS_URL = redis://:yourpassword@youraddr:6379/0#redis数据库地址,可设置密码

RedisSpider和RedisCrawlSpider的异同

与原生Spider和CrawlSpider类似,前者为通用爬虫,后者可设置筛选规则。

RedisSpider启动流程

以RedisSpider为例,此类继承于原生Spider类和自己实现的RedisMixin类。
scrapy_redis/spiders.py#RedisMixin:

class RedisMixin(object):
    """Mixin class to implement reading urls from a redis queue."""
    redis_key = None
    redis_batch_size = None
    redis_encoding = None

    # Redis client placeholder.
    server = None

    def start_requests(self):
        """Returns a batch of start requests from redis."""
        return self.next_requests()

    def setup_redis(self, crawler=None):
        """Setup redis connection and idle signal.

        This should be called after the spider has set its crawler object.
        """
        if self.server is not None:
            return

        if crawler is None:
            # We allow optional crawler argument to keep backwards
            # compatibility.
            # XXX: Raise a deprecation warning.
            crawler = getattr(self, 'crawler', None)

        if crawler is None:
            raise ValueError("crawler is required")

        settings = crawler.settings

        if self.redis_key is None:
            self.redis_key = settings.get(
                'REDIS_START_URLS_KEY', defaults.START_URLS_KEY,
            )

        self.redis_key = self.redis_key % {'name': self.name}

        if not self.redis_key.strip():
            raise ValueError("redis_key must not be empty")

        if self.redis_batch_size is None:
            # TODO: Deprecate this setting (REDIS_START_URLS_BATCH_SIZE).
            self.redis_batch_size = settings.getint(
                'REDIS_START_URLS_BATCH_SIZE',
                settings.getint('CONCURRENT_REQUESTS'),
            )

        try:
            self.redis_batch_size = int(self.redis_batch_size)
        except (TypeError, ValueError):
            raise ValueError("redis_batch_size must be an integer")

        if self.redis_encoding is None:
            self.redis_encoding = settings.get('REDIS_ENCODING', defaults.REDIS_ENCODING)

        self.logger.info("Reading start URLs from redis key '%(redis_key)s' "
                         "(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s",
                         self.__dict__)

        self.server = connection.from_settings(crawler.settings)
        # The idle signal is called when the spider has no requests left,
        # that's when we will schedule new requests from redis queue
        crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)

    def next_requests(self):
        """Returns a request to be scheduled or none."""
        use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)
        fetch_one = self.server.spop if use_set else self.server.lpop
        # XXX: Do we need to use a timeout here?
        found = 0
        # TODO: Use redis pipeline execution.
        while found < self.redis_batch_size:
            data = fetch_one(self.redis_key)
            if not data:
                # Queue empty.
                break
            req = self.make_request_from_data(data)
            if req:
                yield req
                found += 1
            else:
                self.logger.debug("Request not made from data: %r", data)

        if found:
            self.logger.debug("Read %s requests from '%s'", found, self.redis_key)

    def make_request_from_data(self, data):
        """Returns a Request instance from data coming from Redis.

        By default, ``data`` is an encoded URL. You can override this method to
        provide your own message decoding.

        Parameters
        ----------
        data : bytes
            Message from redis.

        """
        url = bytes_to_str(data, self.redis_encoding)
        return self.make_requests_from_url(url)

    def schedule_next_requests(self):
        """Schedules a request if available"""
        # TODO: While there is capacity, schedule a batch of redis requests.
        for req in self.next_requests():
            self.crawler.engine.crawl(req, spider=self)

    def spider_idle(self):
        """Schedules a request if available, otherwise waits."""
        # XXX: Handle a sentinel to close the spider.
        self.schedule_next_requests()
        raise DontCloseSpider

执行命令scrapy crawl myspider启动爬虫后,先执行setup_redis()加载crawl和settings,连接redis数据库,定义对应的key(一般根据爬虫名称name自动生成)。
爬虫会通过start_requests事件调用start_requests()。
原生Spider类会在start_requests()方法里读取start_urls变量获取url。
而RedisSpider类会在start_requests()方法里继续调用另一个方法next_requests(),在next_requests()里读取redis数据库,而不再使用start_urls变量。
下一篇

### 解决PyCharm无法加载Conda虚拟环境的方法 #### 配置设置 为了使 PyCharm 能够成功识别并使用 Conda 创建的虚拟环境,需确保 Anaconda 的路径已正确添加至系统的环境变量中[^1]。这一步骤至关重要,因为只有当 Python 解释器及其关联工具被加入 PATH 后,IDE 才能顺利找到它们。 对于 Windows 用户而言,在安装 Anaconda 时,默认情况下会询问是否将它添加到系统路径里;如果当时选择了否,则现在应该手动完成此操作。具体做法是在“高级系统设置”的“环境变量”选项内编辑 `Path` 变量,追加 Anaconda 安装目录下的 Scripts 文件夹位置。 另外,建议每次新建项目前都通过命令行先激活目标 conda env: ```bash conda activate myenvname ``` 接着再启动 IDE 进入工作区,这样有助于减少兼容性方面的问题发生概率。 #### 常见错误及修复方法 ##### 错误一:未发现任何解释器 症状表现为打开 PyCharm 新建工程向导页面找不到由 Conda 构建出来的 interpreter 列表项。此时应前往 Preferences/Settings -> Project:...->Python Interpreter 下方点击齿轮图标选择 Add...按钮来指定自定义的位置。按照提示浏览定位到对应版本 python.exe 的绝对地址即可解决问题。 ##### 错误二:权限不足导致 DLL 加载失败 有时即使指定了正确的解释器路径,仍可能遇到由于缺乏适当的操作系统级许可而引发的功能缺失现象。特别是涉及到调用某些特定类型的动态链接库 (Dynamic Link Library, .dll) 时尤为明显。因此拥有管理员身份执行相关动作显得尤为重要——无论是从终端还是图形界面触发创建新 venv 流程均如此处理能够有效规避此类隐患。 ##### 错误三:网络连接异常引起依赖下载超时 部分开发者反馈过因网速慢或者其他因素造成 pip install 操作中途断开进而影响整个项目的初始化进度条卡住的情况。对此可尝试调整镜像源加速获取速度或是离线模式预先准备好所需资源包后再继续后续步骤。 ---
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值