今天我们来学习下如何封装一个 aiohttp 网络请求模块,并实现请求错误时重调
入门
我们先了解下 aiohttp 的基本使用,下面方法我们获取下豆瓣排名
import asyncio
import aiohttp
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/535.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/535.36",
"referer": "https://movie.douban.com/"
}
cookies = {
"token": "1234567890",
}
proxy_status = False
proxy_url = "http://127.0.0.1:12123"
async def fetch_data(url):
try:
async with aiohttp.ClientSession(headers=headers, cookies=cookies,
timeout=aiohttp.ClientTimeout(connect=30)) as session:
async with session.get(url, proxy=proxy_url if proxy_status else None) as response:
print('status:', response.status)
print('headers:', response.headers)
print('body:', await response.text())
print('bytes:', await response.read())
print('json:', await response.json())
return await response.text()
except Exception as e:
print(e)
async def main():
print(await fetch_data(url=f'https://movie.douban.com/top250?start={0}&filter='))
if __name__ == "__main__":
asyncio.run(main())
请求重试
接下来我们看看假如出现了网络请求报错时该怎么处理。
async def fetch_data(url, retry_delay=5, retry_attempts=5):
try:
async with aiohttp.ClientSession(headers=headers, cookies=cookies,
timeout=aiohttp.ClientTimeout(connect=30)) as session:
async with session.get(url, proxy=proxy_url if proxy_status else None) as response:
# print('status:', response.status)
# print('headers:', response.headers)
# print('body:', await response.text())
# print('bytes:', await response.read())
# print('json:', await response.json())
# 手动抛出异常
raise Exception('network error')
return await response.text()
except Exception as e:
# 出现网络请求我们将会进入这里
print(e)
# 判断是否有重试次数
if retry_attempts > 0:
print(
f"Failed to retrieve data. Retrying in {retry_delay} seconds, {retry_attempts - 1} attempts remaining. {url}")
await asyncio.sleep(retry_delay)
return await fetch_data(url=url, retry_delay=retry_delay, retry_attempts=retry_attempts - 1)
else:
print(f"Failed to retrieve data after {retry_attempts} attempts. {url}")
return False
上面代码中,抛出异常后我们将会进入捕获异常,然后判断 retry_attempts > 0
重试次数,重试次数用光后我们将会返回False。
下载文件请求重试 (断点续传)
那么下载文件的时候该如何重试呢?
我们先新建一个http服务器来测试,我们先移动一些图片过去。
import http.server
import socketserver
# 定义服务器的端口
PORT = 8000
# 设置希望共享的目录
DIRECTORY = r"E:\Code\Python\test\img"
class CustomHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
# 指定要共享的目录
super().__init__(*args, directory=DIRECTORY, **kwargs)
# 启动服务器
with socketserver.TCPServer(("", PORT), CustomHTTPRequestHandler) as httpd:
print(f"Serving files from {DIRECTORY} at http://localhost:{PORT}")
httpd.serve_forever()
我使用 tqdm_file_path
变量保存下载文件的路径,假如为 None
就表示当前是获取数据,而不是下载,这样就无须额外增加代码了。
使用 tqdm
的代价就是,下载会变成流式下载,所以必须要使用 临时下载路径 保存文件,下载成功再改回去。
想实现断点续传请看下一节
async def fetch_data(url, tqdm_file_path=None, retry_delay=5, retry_attempts=5):
try:
async with aiohttp.ClientSession(headers=headers, cookies=cookies,
timeout=aiohttp.ClientTimeout(connect=30)) as session:
async with session.get(url, proxy=proxy_url if proxy_status else None) as response:
# 如果tqdm_file_path不为空, 则是下载文件
if tqdm_file_path is not None:
# 获取文件总大小
total_size = int(response.headers.get('Content-Length', 0))
# 描述名称, 不用管
desc_name = url.split('/')[-1] + "/" + os.path.basename(tqdm_file_path)
# 临时下载路径
temp_file_path = os.path.dirname(tqdm_file_path) + "/temp_" + os.path.basename(
tqdm_file_path)
with open(temp_file_path, 'wb') as f:
with tqdm_asyncio(total=total_size, unit='B', unit_scale=True, desc=desc_name) as pbar:
async for chunk in response.content.iter_chunked(1024):
f.write(chunk)
pbar.update(len(chunk))
# 下载完成后将文件从临时路径移动到指定路径
os.rename(temp_file_path, tqdm_file_path)
return True
# 否则默认当做是获取数据
return await response.read()
except Exception as e:
# 出现网络请求我们将会进入这里
print(e)
# 判断是否有重试次数
if retry_attempts > 0:
print(
f"Failed to retrieve data. Retrying in {retry_delay} seconds, {retry_attempts - 1} attempts remaining. {url}")
await asyncio.sleep(retry_delay)
return await fetch_data(url=url, tqdm_file_path=tqdm_file_path, retry_delay=retry_delay, retry_attempts=retry_attempts - 1)
else:
print(f"Failed to retrieve data after {retry_attempts} attempts. {url}")
return False
断点续传
我不推荐还在上面新增续传功能,因为当前模块已经够冗余了,这里我重新给你一份续传的代码。
async def fetch_data_stream(url, file_path, stream_range=0, retry_delay=10, retry_attempts=10):
try:
_headers = copy.deepcopy(headers)
mode = 'wb'
if stream_range != 0:
headers.update({'Range': f'bytes={stream_range}-'})
mode = 'ab'
async with aiohttp.ClientSession(headers=headers, cookies=cookies) as session:
async with session.get(url, proxy=proxy_url if proxy_status else None) as response:
with tqdm_asyncio(total=int(response.headers.get("Content-Length", 0)) + stream_range,
initial=stream_range,
unit="B", unit_scale=True) as progress_bar:
folder_path = os.path.dirname(file_path)
os.makedirs(folder_path, exist_ok=True)
with open(file_path, mode) as f:
async for data in response.content.iter_chunked(1024):
f.write(data)
progress_bar.update(len(data))
return True
except Exception as e:
# 出现网络请求我们将会进入这里
print(e)
# 判断是否有重试次数
if retry_attempts > 0:
print(
f"Failed to retrieve data. Retrying in {retry_delay} seconds, {retry_attempts - 1} attempts remaining. {url}")
await asyncio.sleep(retry_delay)
return await fetch_data_stream(url=url, file_path=file_path, stream_range=os.path.getsize(file_path),
retry_delay=retry_delay, retry_attempts=retry_attempts - 1)
else:
print(f"Failed to retrieve data after {retry_attempts} attempts. {url}")
return False
post请求
如果你还希望在该方法上新增一个 post 请求,可以这样写,思路是一样的。
async def fetch_data(url, json=None, data=None, retry_delay=5, retry_attempts=5):
try:
async with aiohttp.ClientSession(headers=headers, cookies=cookies,
timeout=aiohttp.ClientTimeout(connect=30)) as session:
if data is not None:
async with session.post(url, data=data,
proxy=proxy_url if proxy_status else None) as response:
return await response.read()
elif json is not None:
async with session.post(url, json=json,
proxy=proxy_url if proxy_status else None) as response:
return await response.json(content_type=None)
else:
async with session.get(url, proxy=proxy_url if proxy_status else None) as response:
return await response.read()
except Exception as e:
# 出现网络请求我们将会进入这里
print(e)
# 判断是否有重试次数
if retry_attempts > 0:
print(
f"Failed to retrieve data. Retrying in {retry_delay} seconds, {retry_attempts - 1} attempts remaining. {url}")
await asyncio.sleep(retry_delay)
return await fetch_data(url=url, json=json, data=data, retry_delay=retry_delay,
retry_attempts=retry_attempts - 1)
else:
print(f"Failed to retrieve data after {retry_attempts} attempts. {url}")
return False
多线程
使用 aiohttp 就当初要用多线程了
下面是使用多线程获取豆瓣 250 电影
import asyncio
import aiohttp
from bs4 import BeautifulSoup
from tqdm.asyncio import tqdm_asyncio
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/535.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/535.36",
"referer": "https://movie.douban.com/"
}
cookies = {
"token": "1234567890",
}
proxy_status = False
proxy_url = "http://127.0.0.1:12123"
async def fetch_data(url):
try:
async with aiohttp.ClientSession(headers=headers, cookies=cookies,
timeout=aiohttp.ClientTimeout(connect=30)) as session:
async with session.get(url, proxy=proxy_url if proxy_status else None) as response:
return await response.text()
except Exception as e:
print(e)
async def download(semaphore, url):
async with semaphore:
reload_count = 0
while reload_count < 6:
reload_count += 1
try:
result = await fetch_data(url)
# 解析页面信息
soup = BeautifulSoup(result, 'html.parser')
data = []
# 获取整个页面的信息
for i in soup.select('ol.grid_view .item'):
_item = {'id': None, 'name': None}
# 序号
_item['id'] = i.select_one('.pic > em').get_text()
# 名称
_item['name'] = i.select_one('.info span.title').get_text()
data.append(_item)
return data
except Exception as e:
print(e)
continue
return False
async def main():
semaphore = asyncio.Semaphore(int(3))
task_list = []
for i in range(0, 250, 25):
req = download(semaphore, url=f'https://movie.douban.com/top250?start={i}&filter=')
task_list.append(req)
results = await tqdm_asyncio.gather(*task_list, desc=F"Download")
for result in results:
if not result:
print(f"Failed {result}")
return False
print(results)
if __name__ == "__main__":
asyncio.run(main())