Python 自行封装一个aiohttp模块并实现发生错误时重新调用

今天我们来学习下如何封装一个 aiohttp 网络请求模块,并实现请求错误时重调

入门

我们先了解下 aiohttp 的基本使用,下面方法我们获取下豆瓣排名

import asyncio

import aiohttp

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/535.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/535.36",
    "referer": "https://movie.douban.com/"
}
cookies = {
    "token": "1234567890",
}
proxy_status = False
proxy_url = "http://127.0.0.1:12123"


async def fetch_data(url):
    try:
        async with aiohttp.ClientSession(headers=headers, cookies=cookies,
                                         timeout=aiohttp.ClientTimeout(connect=30)) as session:
            async with session.get(url, proxy=proxy_url if proxy_status else None) as response:
                print('status:', response.status)
                print('headers:', response.headers)
                print('body:', await response.text())
                print('bytes:', await response.read())
                print('json:', await response.json())

                return await response.text()
    except Exception as e:
        print(e)


async def main():
    print(await fetch_data(url=f'https://movie.douban.com/top250?start={0}&filter='))


if __name__ == "__main__":
    asyncio.run(main())

请求重试

接下来我们看看假如出现了网络请求报错时该怎么处理。

async def fetch_data(url, retry_delay=5, retry_attempts=5):
    try:
        async with aiohttp.ClientSession(headers=headers, cookies=cookies,
                                         timeout=aiohttp.ClientTimeout(connect=30)) as session:
            async with session.get(url, proxy=proxy_url if proxy_status else None) as response:
                # print('status:', response.status)
                # print('headers:', response.headers)
                # print('body:', await response.text())
                # print('bytes:', await response.read())
                # print('json:', await response.json())

                # 手动抛出异常
                raise Exception('network error')

                return await response.text()
    except Exception as e:
        # 出现网络请求我们将会进入这里
        print(e)

        # 判断是否有重试次数
        if retry_attempts > 0:
            print(
                f"Failed to retrieve data. Retrying in {retry_delay} seconds, {retry_attempts - 1} attempts remaining. {url}")
            await asyncio.sleep(retry_delay)
            return await fetch_data(url=url, retry_delay=retry_delay, retry_attempts=retry_attempts - 1)
        else:
            print(f"Failed to retrieve data after {retry_attempts} attempts. {url}")
            return False

上面代码中,抛出异常后我们将会进入捕获异常,然后判断 retry_attempts > 0 重试次数,重试次数用光后我们将会返回False。

下载文件请求重试 (断点续传)

那么下载文件的时候该如何重试呢?

我们先新建一个http服务器来测试,我们先移动一些图片过去。

import http.server
import socketserver

# 定义服务器的端口
PORT = 8000

# 设置希望共享的目录
DIRECTORY = r"E:\Code\Python\test\img"

class CustomHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
    def __init__(self, *args, **kwargs):
        # 指定要共享的目录
        super().__init__(*args, directory=DIRECTORY, **kwargs)


# 启动服务器
with socketserver.TCPServer(("", PORT), CustomHTTPRequestHandler) as httpd:
    print(f"Serving files from {DIRECTORY} at http://localhost:{PORT}")
    httpd.serve_forever()

我使用 tqdm_file_path 变量保存下载文件的路径,假如为 None 就表示当前是获取数据,而不是下载,这样就无须额外增加代码了。

使用 tqdm 的代价就是,下载会变成流式下载,所以必须要使用 临时下载路径 保存文件,下载成功再改回去。

想实现断点续传请看下一节

async def fetch_data(url, tqdm_file_path=None, retry_delay=5, retry_attempts=5):
    try:
        async with aiohttp.ClientSession(headers=headers, cookies=cookies,
                                         timeout=aiohttp.ClientTimeout(connect=30)) as session:
            async with session.get(url, proxy=proxy_url if proxy_status else None) as response:
                # 如果tqdm_file_path不为空, 则是下载文件
                if tqdm_file_path is not None:
                    # 获取文件总大小
                    total_size = int(response.headers.get('Content-Length', 0))
                    # 描述名称, 不用管
                    desc_name = url.split('/')[-1] + "/" + os.path.basename(tqdm_file_path)
                    # 临时下载路径
                    temp_file_path = os.path.dirname(tqdm_file_path) + "/temp_" + os.path.basename(
                        tqdm_file_path)
                    with open(temp_file_path, 'wb') as f:
                        with tqdm_asyncio(total=total_size, unit='B', unit_scale=True, desc=desc_name) as pbar:
                            async for chunk in response.content.iter_chunked(1024):
                                f.write(chunk)
                                pbar.update(len(chunk))
                    # 下载完成后将文件从临时路径移动到指定路径
                    os.rename(temp_file_path, tqdm_file_path)
                    return True
                # 否则默认当做是获取数据
                return await response.read()
    except Exception as e:
        # 出现网络请求我们将会进入这里
        print(e)

        # 判断是否有重试次数
        if retry_attempts > 0:
            print(
                f"Failed to retrieve data. Retrying in {retry_delay} seconds, {retry_attempts - 1} attempts remaining. {url}")
            await asyncio.sleep(retry_delay)
            return await fetch_data(url=url, tqdm_file_path=tqdm_file_path, retry_delay=retry_delay, retry_attempts=retry_attempts - 1)
        else:
            print(f"Failed to retrieve data after {retry_attempts} attempts. {url}")
            return False

断点续传

我不推荐还在上面新增续传功能,因为当前模块已经够冗余了,这里我重新给你一份续传的代码。

async def fetch_data_stream(url, file_path, stream_range=0, retry_delay=10, retry_attempts=10):
    try:
        _headers = copy.deepcopy(headers)
        mode = 'wb'
        if stream_range != 0:
            headers.update({'Range': f'bytes={stream_range}-'})
            mode = 'ab'
        async with aiohttp.ClientSession(headers=headers, cookies=cookies) as session:
            async with session.get(url, proxy=proxy_url if proxy_status else None) as response:
                with tqdm_asyncio(total=int(response.headers.get("Content-Length", 0)) + stream_range,
                                  initial=stream_range,
                                  unit="B", unit_scale=True) as progress_bar:
                    folder_path = os.path.dirname(file_path)
                    os.makedirs(folder_path, exist_ok=True)
                    with open(file_path, mode) as f:
                        async for data in response.content.iter_chunked(1024):
                            f.write(data)
                            progress_bar.update(len(data))
            return True
    except Exception as e:
        # 出现网络请求我们将会进入这里
        print(e)

        # 判断是否有重试次数
        if retry_attempts > 0:
            print(
                f"Failed to retrieve data. Retrying in {retry_delay} seconds, {retry_attempts - 1} attempts remaining. {url}")
            await asyncio.sleep(retry_delay)
            return await fetch_data_stream(url=url, file_path=file_path, stream_range=os.path.getsize(file_path),
                                            retry_delay=retry_delay, retry_attempts=retry_attempts - 1)
        else:
            print(f"Failed to retrieve data after {retry_attempts} attempts. {url}")
            return False

post请求

如果你还希望在该方法上新增一个 post 请求,可以这样写,思路是一样的。

async def fetch_data(url, json=None, data=None, retry_delay=5, retry_attempts=5):
    try:
        async with aiohttp.ClientSession(headers=headers, cookies=cookies,
                                         timeout=aiohttp.ClientTimeout(connect=30)) as session:
            if data is not None:
                async with session.post(url, data=data,
                                        proxy=proxy_url if proxy_status else None) as response:
                    return await response.read()
            elif json is not None:
                async with session.post(url, json=json,
                                        proxy=proxy_url if proxy_status else None) as response:
                    return await response.json(content_type=None)
            else:
                async with session.get(url, proxy=proxy_url if proxy_status else None) as response:
                    return await response.read()
    except Exception as e:
        # 出现网络请求我们将会进入这里
        print(e)

        # 判断是否有重试次数
        if retry_attempts > 0:
            print(
                f"Failed to retrieve data. Retrying in {retry_delay} seconds, {retry_attempts - 1} attempts remaining. {url}")
            await asyncio.sleep(retry_delay)
            return await fetch_data(url=url, json=json, data=data, retry_delay=retry_delay,
                                    retry_attempts=retry_attempts - 1)
        else:
            print(f"Failed to retrieve data after {retry_attempts} attempts. {url}")
            return False

多线程

使用 aiohttp 就当初要用多线程了

下面是使用多线程获取豆瓣 250 电影

import asyncio

import aiohttp
from bs4 import BeautifulSoup
from tqdm.asyncio import tqdm_asyncio

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/535.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/535.36",
    "referer": "https://movie.douban.com/"
}
cookies = {
    "token": "1234567890",
}
proxy_status = False
proxy_url = "http://127.0.0.1:12123"


async def fetch_data(url):
    try:
        async with aiohttp.ClientSession(headers=headers, cookies=cookies,
                                         timeout=aiohttp.ClientTimeout(connect=30)) as session:
            async with session.get(url, proxy=proxy_url if proxy_status else None) as response:
                return await response.text()
    except Exception as e:
        print(e)


async def download(semaphore, url):
    async with semaphore:
        reload_count = 0
        while reload_count < 6:
            reload_count += 1
            try:
                result = await fetch_data(url)
                # 解析页面信息
                soup = BeautifulSoup(result, 'html.parser')
                data = []
                # 获取整个页面的信息
                for i in soup.select('ol.grid_view .item'):
                    _item = {'id': None, 'name': None}
                    # 序号
                    _item['id'] = i.select_one('.pic > em').get_text()
                    # 名称
                    _item['name'] = i.select_one('.info span.title').get_text()
                    data.append(_item)
                return data
            except Exception as e:
                print(e)
                continue
        return False


async def main():
    semaphore = asyncio.Semaphore(int(3))
    task_list = []
    for i in range(0, 250, 25):
        req = download(semaphore, url=f'https://movie.douban.com/top250?start={i}&filter=')
        task_list.append(req)
    results = await tqdm_asyncio.gather(*task_list, desc=F"Download")
    for result in results:
        if not result:
            print(f"Failed {result}")
            return False
    print(results)


if __name__ == "__main__":
    asyncio.run(main())

Licensed under CC BY-NC-SA 4.0
本博客已稳定运行
发表了53篇文章 · 总计28.17k字
使用 Hugo 构建
主题 StackJimmy 设计