一、ahttp库介绍
近期在学习异步爬虫,在论坛发现这个帖子:
https://blog.csdn.net/getcomputerstyle/article/details/103014896
看了之后发现很适合新手使用,于是按照尝试,发现ahttp库近期没有更新,有一些问题存在(也有可能是我不会用,本人菜鸟!!)
二、自用ahttp库的修改
自己对ahttp库做了一些修改,主要是基于自己的使用习惯和需求,代码在下面,主要做了一下修改:
1、增加了爬虫返回结果html
@property
def html(self):
# @html.setter #def html用于设置
# @重写,原库GB18030编码的网页可能导致乱码,这里使用content,而不是text,避免二次转码
2、完善了回调函数callback,原来的没有发挥作用
3、完善了max_try,这里原来也没有发挥作用。增加了按照max_try次数循环,并可以打印相关信息,
4、取消返回结果是否排序选项,各种情况下结果都不排序,但在返回结果中增加了index,按照任务task顺序编号,处理完后可以对结果进行排序。
5、调用fale_useragent库,随机选择useragent。
6、增加log标识,用来选择是否打印爬虫遇到的错误。
7、再次封装了两个函数,一个ahttpGet,一个ahttpGetAll,进一步简化调用
修改后的库代码如下:
# !/usr/bin/env python
# -*- coding: utf-8 -*-
import asyncio
import ctypes
import json
from functools import partial
import aiohttp
from cchardet import detect
from fake_useragent import UserAgent
from requests_html import HTML, HTMLSession
__all__ = ('map', 'Session', 'get', 'options', 'head', 'post', 'put', 'patch', 'delete')
class Session:
def __init__(self, *args, **kwargs):
self.session = self
self.headers = HTMLSession().headers
self.cookies = {}
self.request_pool = []
def __getattr__(self, name):
if name in ['get', 'options', 'head', 'post', 'put', 'patch', 'delete']:
new_req = AsyncRequestTask(headers=self.headers, session=self.session)
new_req.__getattr__(name)
self.request_pool.append(new_req)
return new_req.get_params
def __repr__(self):
return f"<Ahttp Session [id:{id(self.session)} client]>"
class AsyncRequestTask:
def __init__(self, *args, session=None, headers=None, **kwargs):
self.session = session
self.headers = headers
self.cookies = None
self.kw = kwargs
self.method = None
def __getattr__(self, name):
if name in ['get', 'options', 'head', 'post', 'put', 'patch', 'delete']:
self.method = name
return self.get_params
def __repr__(self):
return f"<AsyncRequestTask session:[{id(self.session)}] req:[{self.method.upper()}:{self.url}]>"
def get_params(self, *args, **kw):
self.url = args[0]
self.args = args[1:]
if "callback" in kw:
self.callback = kw['callback']
kw.pop("callback")
else:
self.callback = None
if "headers" in kw:
self.headers = kw['headers']
kw.pop("headers")
self.kw = kw
return self
def run(self):
future = asyncio.ensure_future(single_task(self))
loop = asyncio.get_event_loop()
loop.run_until_complete(future)
new_res = AhttpResponse(id(self.session), self.result, self.content, self)
return [new_res, self.callback and self.callback(new_res)][0]
def wrap_headers(headers):
ua = UserAgent()
new_headers = {}
for k, v in headers.items():
new_headers[k] = str(v)
new_headers['User-Agent'] = ua.random
return new_headers
async def single_task(self):
# #单个任务,从task.run()调用
# 创建会话对象
async with aiohttp.ClientSession(cookies=self.cookies) as session:
# 发送请求
async with session.request(
self.method,
self.url,
*self.args,
verify_ssl=False,
timeout=9,
headers=wrap_headers(self.headers or self.session.headers),
**self.kw) as resp:
# 读取响应
content = await resp.read()
# 将相应结果保存
self.result, self.content = resp, content
class AhttpResponse:
# 结构化返回结果
def __init__(self, index, sessReq, content, task, *args, **kwargs):
self.index = index
self.content = content
self.task = task
self.raw = self.clientResponse = sessReq
@property
def text(self):
code_type = detect(self.content)
return self.content.decode(code_type['encoding'])
@property
def url(self):
return self.clientResponse.url
@property
def cookies(self):
return self.clientResponse.cookies
@property
def headers(self):
return self.clientResponse.headers
def json(self):
return json.loads(self.text)
@property
def status(self):
return self.clientResponse.status
@property
def method(self):
return self.clientResponse.method
@property
def html(self):
# @html.setter #def html用于设置
# @重写,原库GB18030编码的网页可能导致乱码,这里使用content,而不是text,避免二次转码
html = HTML(html=self.content, url=self.raw.url)
return html
@property
def dom(self):
"""
返回一个requests_html对象,
支持所有requests_html的html对象的操作。例如find, xpath, render(先安装chromium浏览器)
"""
html = HTML(html=self.text)
html.url = self.raw.url
return html
def __repr__(self):
return f"<AhttpResponse status[{self.status}] url=[{self.url}]>"
def run(tasks, pool=20, max_try=5, callback=None, log=True):
if not isinstance(tasks, list):
raise "the tasks of run must be a list object"
conn = aiohttp.TCPConnector(use_dns_cache=True, loop=asyncio.get_event_loop(), ssl=False)
# 并发量限制
sem = asyncio.Semaphore(pool)
result = [] # #存放返回结果集合
loop = asyncio.get_event_loop()
# 执行任务
loop.run_until_complete(multi_req(tasks, conn, sem, callback, log, max_try, result))
# #不排序直接返回结果
return result
@asyncio.coroutine
async def multi_req(tasks, conn, sem, callback, log, max_try, result):
new_tasks = []
# 创建会话对象,,使用单一session对象
sessions_list = {}
new_tasks = []
for index in range(len(tasks)):
task = tasks[index]
if id(task.session) not in sessions_list:
sessions_list[id(task.session)] = aiohttp.ClientSession(
connector_owner=False,
connector=conn,
cookies=task.session.cookies
)
new_tasks.append(
asyncio.ensure_future(
control_sem(
sem, index, task, callback, log, max_try, result, sessions_list[id(task.session)])
)
)
await asyncio.wait(new_tasks)
await asyncio.wait([asyncio.ensure_future(v.close()) for k, v in sessions_list.items()])
await conn.close() # 关闭tcp连接器
@asyncio.coroutine
async def control_sem(sem, index, task, callback, log, max_try, result, session):
# 限制信号量
async with sem:
await fetch(index, task, callback, log, max_try, result, session)
@asyncio.coroutine
async def fetch(index, task, callback, log, max_try, result, session):
headers = wrap_headers(task.headers or ctypes.cast(task.session, ctypes.py_object).value.headers)
Err = '' # 错误标示
while max_try > 0:
try:
async with session.request(task.method, task.url, *task.args, headers=headers, timeout=9, **task.kw) as sessReq:
if sessReq.status != 200:
max_try = max_try - 1
Err = 'status!=200'
if log: print(task.url, 'Error:', Err)
await asyncio.sleep(0.1)
continue
content = await sessReq.read()
new_res = AhttpResponse(index, sessReq, content, task)
result.append(new_res)
if log and Err: print(task.url, 'result get OK')
if callback: callback(new_res) # 有回调则调用
break # @status=200,完全退出循环
except Exception as err:
max_try = max_try - 1
Err = repr(err)
if log: print(task.url, 'Error:', Err)
await asyncio.sleep(0.1)
continue # 跳过此轮,继续下一轮循环
def create_session(method, *args, **kw):
sess = Session()
return {"get": sess.get,
"post": sess.post,
"options": sess.options,
"head": sess.head,
"put": sess.put,
"patch": sess.patch,
"delete": sess.delete}[method](*args, **kw)
# #使用偏函数 Partial,快速构建多个函数
get = partial(create_session, "get")
post = partial(create_session, "post")
options = partial(create_session, "options")
head = partial(create_session, "head")
put = partial(create_session, "put")
patch = partial(create_session, "patch")
delete = partial(create_session, "delete")
def ahttpGet(url, callback=None, params=None, **kwargs):
task = get(url, params=params, **kwargs)
res = task.run()
return res
def ahttpGetAll(urls, pool=20, callback=None, max_try=5, log=True, params=None, **kwargs):
tasks = [get(url, params=params, **kwargs) for url in urls]
resps = run(tasks, pool=pool, callback=callback, log=log)
return resps
三、ahttp异步爬虫的例子
以爬取https://www.biqukan.com/的书籍做一个示范,
1、采用lxml解析html,与文章中介绍的ahttp内置了使用requests_html来处理文本相比,个人测试速度快了很多很多
2、大部分内容来自于互联网,个人做了调整和优化。
代码如下:
# !/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import re
import time
from ahttp import ahttpGet, ahttpGetAll
from lxml import etree
texts = []
def Ex_Re_Sub(oldtext, *args, **kwds):
'''
用法 newtext=Ex_Re_Sub(oldtext,{'\n\n':'\n'})
不需要用转义符号
'''
adict = dict(*args, **kwds)
rx = re.compile('|'.join(map(re.escape, adict)))
def one_xlat(match):
return adict[match.group(0)]
return rx.sub(one_xlat, oldtext)
def get_stime():
import datetime
time_now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
return time_now
def get_FileSize(filePath):
fsize = os.path.getsize(filePath)
fsize = fsize / float(1024 * 1024)
return round(fsize, 2)
def savefile(_filename, _list_texts, br=''):
# 函数说明:将爬取的文章内容写入文件,迭代多层
# br为标题换行标志,可以用'\t'
# #多层次的list 或 tuple写入文件
print('[{}]开始保存......@{}。'.format(_filename, get_stime()))
with open(_filename, 'w', encoding='utf-8') as file:
file.write(_filename + '\n')
def each(data):
for index, value in enumerate(data):
if isinstance(value, list) or isinstance(value, tuple):
each(value)
else:
file.write(str(value) + br)
if index == len(data) - 1:
file.write('\n')
each(_list_texts)
size = "文件大小:%.2f MB" % (get_FileSize(_filename))
print('[{}]保存完成\t文件{}\ttime:{}。'.format(_filename, size, get_stime()))
def get_download_url(target):
urls = [] # 存放章节链接
resp = ahttpGet(target)
# response = resp.html
# 指定解析器
response = etree.HTML(resp.text)
_bookname = response.xpath('//h2/text()', first=True)[0]
全部章节节点 = response.xpath('//div[@class="listmain"]/dl/dt[2]/following-sibling::dd/a/@href')
for item in 全部章节节点:
_ZJHERF = 'https://www.biqukan.com' + item
urls.append(_ZJHERF)
return _bookname, urls
def 结果处理(resps):
for resp in resps:
index = resp.index
response = etree.HTML(resp.text)
_name = response.xpath('//h1/text()', first=True)[0]
_showtext = "".join(response.xpath('//*[@id="content"]/text()', first=True))
# '''
name = Ex_Re_Sub(_name, {'\'': '', ' ': ' ', '\xa0': ' ', })
text = Ex_Re_Sub(
_showtext,
{
'\'': '',
' ': ' ',
'\xa0': ' ',
# '\x0a': '\n', #!错误所在,可能导致\n\n查找不到
'\b;': '\n',
' ': ' ',
'app2();': '',
'笔趣看;': '',
'\u3000': '',
'chaptererror();': '',
'readtype!=2&&(\'vipchapter\n(\';\n\n}': '',
'm.biqukan.com': '',
'wap.biqukan.com': '',
'www.biqukan.com': '',
'www.biqukan.com。': '',
'百度搜索“笔趣看小说网”手机阅读:': '',
'请记住本书首发域名:': '',
'请记住本书首发域名:': '',
'笔趣阁手机版阅读网址:': '',
'笔趣阁手机版阅读网址:': '',
'[]': '',
'\r': '\n',
'\n\n': '\n',
}
)
texts.append([index, name, ' ' + text])
def callback(future):
resp = future
if resp is None: return
index = resp.index
response = etree.HTML(resp.text)
_name = response.xpath('//h1/text()', first=True)[0]
_showtext = "".join(response.xpath('//*[@id="content"]/text()'))
name = Ex_Re_Sub(_name, {'\'': '', ' ': ' ', '\xa0': ' ', })
text = Ex_Re_Sub(
_showtext,
{
'\'': '',
' ': ' ',
'\xa0': ' ',
# '\x0a': '\n', #!错误所在,可能导致\n\n查找不到
'\b;': '\n',
' ': ' ',
'app2();': '',
'笔趣看;': '',
'\u3000': '',
'chaptererror();': '',
'readtype!=2&&(\'vipchapter\n(\';\n\n}': '',
'm.biqukan.com': '',
'wap.biqukan.com': '',
'www.biqukan.com': '',
'www.biqukan.com。': '',
'百度搜索“笔趣看小说网”手机阅读:': '',
'请记住本书首发域名:': '',
'请记住本书首发域名:': '',
'笔趣阁手机版阅读网址:': '',
'笔趣阁手机版阅读网址:': '',
'[]': '',
'\r': '\n',
'\n\n': '\n',
}
)
texts.append([index, name, ' ' + text])
def main(url):
print('开始下载:《{}》\t{}\t获取下载链接......'.format(url, get_stime()), flush=True)
bookname, urls = get_download_url(url)
print('AHTTP,开始下载:《' + bookname + '》', flush=True)
# 方法2:不回调,获取最终结果,自动排序
resps = ahttpGetAll(urls, pool=200)
print('小说爬取完成,开始整理数据\t time:{} 。'.format(get_stime()))
结果处理(resps)
print('AHTTP,书籍《' + bookname + '》数据整理完成,time:{}'.format(get_stime()), flush=True)
texts.sort(key=lambda x: x[0]) # #排序
# @重新梳理数据,剔除序号
aftertexts = [[row[i] for i in range(1, 3)] for row in texts]
savefile(bookname + '.txt', aftertexts, br='\n')
print('{} 结束,\t用时:{} 秒。'.format(get_stime(), round(time.time() - _stime, 2)), flush=True)
def mainbycall(url):
print('开始下载:《{}》\t{}\t获取下载链接......'.format(url, get_stime()), flush=True)
bookname, urls = get_download_url(url)
print('AHTTP,开始下载:《' + bookname + '》', flush=True)
# 方法1:使用回调,不排序
ahttpGetAll(urls, pool=100, callback=callback)
print('AHTTP,书籍《' + bookname + '》完成下载')
texts.sort(key=lambda x: x[0]) # #排序
# @重新梳理数据,剔除序号
aftertexts = [[row[i] for i in range(1, 3)] for row in texts]
savefile(bookname + '.txt', aftertexts, br='\n')
print('{} 结束,\t用时:{} 秒。'.format(get_stime(), round(time.time() - _stime, 2)))
if __name__ == '__main__':
url = 'https://www.biqukan.com/38_38836/'
_stime = time.time()
# mainbycall(url) # 使用callback
#texts = []
main(url) # 爬虫取得全部结果后再处理
# '76_76519' #章节少,#@ 4秒
# '38_38836' #2676KB,#@ 9秒
# '0_790' #8977KB,#@ 13秒
# "10_10736" #34712KB,#@ 24秒
# "2_2714" #武炼巅峰,#@ 36秒