8.Redis分布式
原生scrapy不支持分布式,不适合大型开发。
安装redis数据库,然后pip install scrapy_redis安装对应的库,可基于redis实现分布式功能。
redis数据库简介
Redis是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。
Redis是非关系数据库。
Redis是轻量型内存数据库,有限的数据类型,简洁的读写API,运行后不经过磁盘IO,读写迅速,远程增删改查非常容易。既可用于对临时性、经常变动的数据的缓存,也可用于执行分布式任务。
在这里,需要在linux系统中安装redis数据库(master机器,yum install redis),并安装对应的python库(可选,pip install redis)。
scrapy_redis库
scrapy_redis主要内容:
1.不必额外连接redis数据库,只需在配置文件里进行配置,爬虫启动时自动连接redis数据库并自动读取url数据生成请求队列。
2.实现了基于redis数据库以及适用于分布式的Spider基础类。
3.实现了基于redis数据库以及适用于分布式的Scheduler(调度器)、RFPDupeFilter(去重)、PriorityQueue(优先级队列)等,即重写了scrapy框架的核心类。
4.提供了RedisPipeline,可自动将爬取及解析得到的item存到redis数据库里。
默认配置:
scrapy_redis/defaults.py:
# For standalone use.
DUPEFILTER_KEY = 'dupefilter:%(timestamp)s'
PIPELINE_KEY = '%(spider)s:items'
REDIS_CLS = redis.StrictRedis
REDIS_ENCODING = 'utf-8'
# Sane connection defaults.
REDIS_PARAMS = {
'socket_timeout': 30,
'socket_connect_timeout': 30,
'retry_on_timeout': True,
'encoding': REDIS_ENCODING,
}
SCHEDULER_QUEUE_KEY = '%(spider)s:requests'
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'
SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
START_URLS_KEY = '%(name)s:start_urls'
START_URLS_AS_SET = False
分布式爬虫架构
需要定义一个Master主服务器,用于运行redis数据库,把所有的爬取任务集中统一管理。所有的爬虫实例均视作Slave。
相关配置
在爬虫工程的配置文件里修改或添加一些项:
yourproject/settings.py:
SCHEDULER = "scrapy_redis.scheduler.Scheduler"#基于redis的调度器
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"#基于redis的url去重类
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'#基于redis的优先级队列
SCHEDULER_PERSIST = True#持久化(关闭后可保存到磁盘,重启重新加载)
ITEM_PIPELINES = {
#REDIS
#REDIS任务队列管理相关
'scrapy_redis.pipelines.RedisPipeline': 300,
}
REDIS_URL = redis://:yourpassword@youraddr:6379/0#redis数据库地址,可设置密码
RedisSpider和RedisCrawlSpider的异同
与原生Spider和CrawlSpider类似,前者为通用爬虫,后者可设置筛选规则。
RedisSpider启动流程
以RedisSpider为例,此类继承于原生Spider类和自己实现的RedisMixin类。
scrapy_redis/spiders.py#RedisMixin:
class RedisMixin(object):
"""Mixin class to implement reading urls from a redis queue."""
redis_key = None
redis_batch_size = None
redis_encoding = None
# Redis client placeholder.
server = None
def start_requests(self):
"""Returns a batch of start requests from redis."""
return self.next_requests()
def setup_redis(self, crawler=None):
"""Setup redis connection and idle signal.
This should be called after the spider has set its crawler object.
"""
if self.server is not None:
return
if crawler is None:
# We allow optional crawler argument to keep backwards
# compatibility.
# XXX: Raise a deprecation warning.
crawler = getattr(self, 'crawler', None)
if crawler is None:
raise ValueError("crawler is required")
settings = crawler.settings
if self.redis_key is None:
self.redis_key = settings.get(
'REDIS_START_URLS_KEY', defaults.START_URLS_KEY,
)
self.redis_key = self.redis_key % {'name': self.name}
if not self.redis_key.strip():
raise ValueError("redis_key must not be empty")
if self.redis_batch_size is None:
# TODO: Deprecate this setting (REDIS_START_URLS_BATCH_SIZE).
self.redis_batch_size = settings.getint(
'REDIS_START_URLS_BATCH_SIZE',
settings.getint('CONCURRENT_REQUESTS'),
)
try:
self.redis_batch_size = int(self.redis_batch_size)
except (TypeError, ValueError):
raise ValueError("redis_batch_size must be an integer")
if self.redis_encoding is None:
self.redis_encoding = settings.get('REDIS_ENCODING', defaults.REDIS_ENCODING)
self.logger.info("Reading start URLs from redis key '%(redis_key)s' "
"(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s",
self.__dict__)
self.server = connection.from_settings(crawler.settings)
# The idle signal is called when the spider has no requests left,
# that's when we will schedule new requests from redis queue
crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
def next_requests(self):
"""Returns a request to be scheduled or none."""
use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)
fetch_one = self.server.spop if use_set else self.server.lpop
# XXX: Do we need to use a timeout here?
found = 0
# TODO: Use redis pipeline execution.
while found < self.redis_batch_size:
data = fetch_one(self.redis_key)
if not data:
# Queue empty.
break
req = self.make_request_from_data(data)
if req:
yield req
found += 1
else:
self.logger.debug("Request not made from data: %r", data)
if found:
self.logger.debug("Read %s requests from '%s'", found, self.redis_key)
def make_request_from_data(self, data):
"""Returns a Request instance from data coming from Redis.
By default, ``data`` is an encoded URL. You can override this method to
provide your own message decoding.
Parameters
----------
data : bytes
Message from redis.
"""
url = bytes_to_str(data, self.redis_encoding)
return self.make_requests_from_url(url)
def schedule_next_requests(self):
"""Schedules a request if available"""
# TODO: While there is capacity, schedule a batch of redis requests.
for req in self.next_requests():
self.crawler.engine.crawl(req, spider=self)
def spider_idle(self):
"""Schedules a request if available, otherwise waits."""
# XXX: Handle a sentinel to close the spider.
self.schedule_next_requests()
raise DontCloseSpider
执行命令scrapy crawl myspider启动爬虫后,先执行setup_redis()加载crawl和settings,连接redis数据库,定义对应的key(一般根据爬虫名称name自动生成)。
爬虫会通过start_requests事件调用start_requests()。
原生Spider类会在start_requests()方法里读取start_urls变量获取url。
而RedisSpider类会在start_requests()方法里继续调用另一个方法next_requests(),在next_requests()里读取redis数据库,而不再使用start_urls变量。
下一篇