admin管理员组

文章数量:1130349

欢迎使用来到我的爬虫空间

        新人一枚,有一点Python基础,属于业余混混,这是在AI辅助下,用关键字引导编写出的爬虫,已初步调试OK,暂无痛点

程序目的说明

该程序是一个抖音小说(douyinxs)的异步下载工具,主要功能是通过关键词搜索小说,获取指定小说的所有章节内容,并将其整合为 TXT 文件保存到本地,实现小说的批量下载与离线阅读。

核心逻辑说明

程序整体采用 “搜索 - 获取章节 - 下载内容” 的三段式逻辑,结合异步编程提升效率,具体流程如下:

  1. 搜索小说(同步操作)

    • 通过search_novels函数接收用户输入的关键词,向目标网站发送 POST 请求
    • 解析返回的 HTML 页面,提取小说标题、作者、分类、状态等信息,生成可选择的小说列表
    • 处理网络异常(超时、连接失败等),支持最多 5 次重试
  2. 获取章节列表(异步操作)

    • 用户选择目标小说后,通过get_all_chapters_async异步爬取所有章节信息
    • 采用分页递归获取模式,自动识别 “下一页” 链接,循环提取所有正文章节(跳过前 12 个非正文内容)
    • 使用信号量控制并发请求数(默认 3 个并发),避免请求过频繁请求服务器
  3. 下载章节内容(异步操作)

    • 通过download_chapters_async批量异步下载章节内容,支持并发控制(默认 200 个并发)
    • 单章节下载逻辑:
      • get_full_chapter_content_async处理章节分页内容拼接(支持章节内分页)
      • 自动提取正文内容并清理格式,保留文本段落结构
      • 失败时自动重试(最多 3 次)
    • 按章节顺序写入 TXT 文件,保存路径为douyin_novels文件夹,文件名含特殊字符的标题进行安全处理

技术特点

  1. 异步优化:使用aiohttpasyncio实现异步网络请求,相比同步方式大幅提升下载效率
  2. 反爬措施
    • 随机生成请求头(User-Agent、Accept 等)模拟不同浏览器
    • 随机请求间隔避免触发频率限制
    • 跳过 SSL 证书验证,处理 HTTPS 连接问题
  3. 用户体验
    • 彩色进度条实时显示下载进度
    • 详细的错误提示与重试机制
    • 自动创建保存目录,生成清晰的文件结构
  4. 鲁棒性设计
    • 多层级异常捕获(连接错误、超时、解析失败等)
    • 失败重试机制(搜索 / 章节 / 内容各环节均支持重试)
    • 兼容不同 Python 环境(对缺少的RemoteDisconnected异常进行兼容定义)

运行流程

  1. 用户输入搜索关键词,程序返回匹配的小说列表
  2. 用户选择目标小说序号
  3. 程序异步获取该小说的所有章节列表
  4. 展示章节数量及部分章节预览
  5. 异步并发下载所有章节内容并按顺序写入 TXT 文件
  6. 输出下载结果(成功 / 失败数量)及文件保存路径

运行结果截图

程序实现完整代码:

import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
import random
import sys
from colorama import Fore, Style
import urllib3
import os
from urllib.parse import urljoin

# 忽略SSL证书验证警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 初始化colorama
import colorama

colorama.init(autoreset=True)

# 定义保存文件夹
SAVE_FOLDER = "douyin_novels"

# 尝试导入RemoteDisconnected,如果失败则定义一个兼容类
try:
    from urllib3.exceptions import RemoteDisconnected
except ImportError:
    class RemoteDisconnected(Exception):
        pass

# 模拟浏览器请求头池
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Safari/605.1.15",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/114.0",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.68"
]

# 模拟不同的浏览器指纹
ACCEPT_HEADERS = [
    "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
    "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"
]


def get_headers():
    """动态生成更接近真实浏览器的请求头"""
    return {
        "User-Agent": random.choice(USER_AGENTS),
        "Referer": "https://www.douyinxs/",
        "Accept": random.choice(ACCEPT_HEADERS),
        "Accept-Language": "zh-CN,zh;q=0.9",
        "Accept-Encoding": "gzip, deflate, br",
        "Connection": "keep-alive",
        "Upgrade-Insecure-Requests": "1",
        "Cache-Control": f"max-age={random.randint(0, 300)}"
    }


def print_progress(completed, total):
    """打印自定义进度条"""
    percent = completed / total * 100
    bar_length = 50
    filled_length = int(bar_length * completed // total)
    # 使用颜色美化进度条
    bar = f"{Fore.GREEN}{'█' * filled_length}{Style.RESET_ALL}{'-' * (bar_length - filled_length)}"
    sys.stdout.write(f'\r进度: [{bar}] {percent:.1f}% ({completed}/{total})')
    sys.stdout.flush()
    # 完成时换行
    if completed == total:
        print()


async def create_async_session():
    """创建异步会话"""
    timeout = aiohttp.ClientTimeout(total=30)
    connector = aiohttp.TCPConnector(ssl=False)  # 跳过SSL验证
    session = aiohttp.ClientSession(
        connector=connector,
        timeout=timeout,
        headers=get_headers()
    )
    return session


def search_novels(keyword):
    """搜索小说并返回结果列表(同步操作)"""
    import requests

    search_url = "https://www.douyinxs/search/"
    data = {"searchkey": keyword}
    max_retries = 5

    for retry in range(max_retries):
        try:
            time.sleep(random.uniform(1, 2))  # 缩短搜索延迟

            response = requests.post(
                search_url,
                data=data,
                headers=get_headers(),
                timeout=15,
                allow_redirects=True,
                verify=False
            )

            if response.status_code != 200:
                print(f"搜索请求失败,状态码:{response.status_code},正在重试...")
                time.sleep(3 * (retry + 1))
                continue

            response.encoding = "utf-8"
            soup = BeautifulSoup(response.text, "html.parser")

            # 定位搜索结果列表
            result_list = soup.select_one("#main > div.novelslist2 > ul")
            if not result_list:
                print("未找到相关小说")
                return []

            # 提取小说信息
            novels = []
            for idx, item in enumerate(result_list.find_all("li")[1:], 1):
                category = item.select_one(".s1").text.strip()
                title_tag = item.select_one(".s2 a")
                title = title_tag.text.strip() if title_tag else f"未知标题{idx}"
                href = title_tag["href"] if title_tag else ""
                author = item.select_one(".s4").text.strip()
                word_count = item.select_one(".s5").text.strip()
                status = item.select_one(".s7").text.strip()

                # 补全链接为绝对路径
                if href and not href.startswith("http"):
                    href = f"https://www.douyinxs{href}"

                novels.append({
                    "index": idx,
                    "title": title,
                    "author": author,
                    "category": category,
                    "word_count": word_count,
                    "status": status,
                    "url": href
                })

                print(f"{idx}. {title} | 作者:{author} | 分类:{category} | 字数:{word_count} | 状态:{status}")

            return novels

        except (requests.exceptions.ConnectionError,
                RemoteDisconnected,
                requests.exceptions.Timeout):
            print(f"{Fore.RED}搜索连接失败,正在重试第{retry + 1}/{max_retries}次...{Style.RESET_ALL}")
            time.sleep(5 * (retry + 1))
        except Exception as e:
            print(f"{Fore.RED}搜索发生错误:{str(e)},正在重试...{Style.RESET_ALL}")
            time.sleep(3 * (retry + 1))

    print(f"{Fore.RED}达到最大重试次数,搜索失败{Style.RESET_ALL}")
    return []


async def fetch_chapter_page(session, url, semaphore, retry=3):
    """异步获取单页章节列表"""
    async with semaphore:
        try:
            # 缩短章节列表请求延迟
            await asyncio.sleep(random.uniform(0.5, 1.5))

            async with session.get(url) as response:
                if response.status != 200:
                    if retry > 0:
                        await asyncio.sleep(1)
                        return await fetch_chapter_page(session, url, semaphore, retry - 1)
                    print(f"{Fore.YELLOW}章节页 {url} 请求失败,状态码:{response.status}{Style.RESET_ALL}")
                    return None, None

                content = await response.text()
                soup = BeautifulSoup(content, "html.parser")

                # 查找下一页链接
                next_link = soup.select_one("body > div.index-container > a:nth-child(3)")
                next_url = None
                if next_link and "下一页" in next_link.text:
                    next_url = urljoin("https://www.douyinxs", next_link["href"])

                # 提取章节(从第13项开始为正文)
                chapters = []
                chapter_dl = soup.select_one("#list > dl")
                if chapter_dl:
                    dd_tags = chapter_dl.find_all("dd")
                    for dd in dd_tags[12:]:  # 跳过前12个非正文章节
                        a_tag = dd.find("a")
                        if a_tag:
                            chapter_title = a_tag.text.strip()
                            chapter_url = urljoin("https://www.douyinxs", a_tag["href"])
                            chapters.append({
                                "title": chapter_title,
                                "url": chapter_url
                            })

                return chapters, next_url

        except (aiohttp.ClientError, RemoteDisconnected) as e:
            if retry > 0:
                await asyncio.sleep(1)
                return await fetch_chapter_page(session, url, semaphore, retry - 1)
            print(f"{Fore.RED}获取章节页 {url} 失败:{str(e)}{Style.RESET_ALL}")
            return None, None


async def get_all_chapters_async(novel_url):
    """异步爬取小说所有正文章节,大幅提升速度"""
    chapters = []
    current_url = novel_url
    page_count = 1

    # 创建信号量控制章节列表并发请求数
    semaphore = asyncio.Semaphore(3)

    async with await create_async_session() as session:
        print("开始异步获取章节列表...")

        # 循环获取所有章节页
        while current_url:
            print(f"正在获取第 {page_count} 页章节...", end="\r")

            # 异步获取当前页章节
            page_chapters, next_url = await fetch_chapter_page(session, current_url, semaphore)

            if page_chapters:
                chapters.extend(page_chapters)
                page_count += 1

            # 准备获取下一页
            current_url = next_url

    print(f"\r已获取所有章节页,共 {page_count - 1} 页")
    return chapters


async def fetch_url(session, url, retry=3):
    """异步获取URL内容"""
    try:
        await asyncio.sleep(random.uniform(0.3, 1.2))

        async with session.get(url) as response:
            if response.status != 200:
                if retry > 0:
                    await asyncio.sleep(2)
                    return await fetch_url(session, url, retry - 1)
                return None, response.status

            content = await response.text()
            return content, response.status
    except (aiohttp.ClientError, RemoteDisconnected) as e:
        if retry > 0:
            await asyncio.sleep(2)
            return await fetch_url(session, url, retry - 1)
        print(f"{Fore.RED}获取URL失败:{url},错误:{str(e)}{Style.RESET_ALL}")
        return None, 500
    except Exception as e:
        print(f"{Fore.RED}处理URL时出错:{url},错误:{str(e)}{Style.RESET_ALL}")
        return None, 500


async def get_full_chapter_content_async(session, chapter_url, semaphore):
    """异步获取完整的章节内容(带信号量控制并发)"""
    async with semaphore:
        full_content = []
        current_url = chapter_url
        page_count = 1
        max_page_retries = 3

        while True:
            content, status = await fetch_url(session, current_url, max_page_retries)

            if not content or status != 200:
                print(f"{Fore.YELLOW}章节分页 {page_count} 请求失败,状态码:{status}{Style.RESET_ALL}")
                return "\n\n".join(full_content)

            soup = BeautifulSoup(content, "html.parser")

            # 提取当前页内容
            content_tag = soup.select_one("#content")
            if not content_tag:
                print(f"{Fore.YELLOW}警告:章节分页 {page_count} 内容提取失败{Style.RESET_ALL}")
                return "\n\n".join(full_content)

            # 清理当前页内容
            for p in content_tag.find_all("p"):
                text = p.text.strip()
                if text:
                    full_content.append(text)

            # 查找下一页链接
            next_link = soup.select_one("#next")
            if next_link and "下一页" in next_link.text:
                next_url = urljoin("https://www.douyinxs", next_link["href"])
                current_url = next_url
                page_count += 1
            else:
                return "\n\n".join(full_content)


async def get_chapter_title_async(session, chapter_url):
    """异步获取章节标题"""
    try:
        content, status = await fetch_url(session, chapter_url, 2)
        if not content or status != 200:
            return None

        soup = BeautifulSoup(content, "html.parser")
        chapter_title_tag = soup.select_one("div.bookname h1")
        return chapter_title_tag.text.strip() if chapter_title_tag else None
    except Exception as e:
        print(f"{Fore.YELLOW}获取章节标题失败:{str(e)}{Style.RESET_ALL}")
        return None


async def download_chapters_async(novel_title, chapters, max_concurrent=5):
    """异步下载所有章节内容,保持章节顺序"""
    # 创建保存文件夹(如果不存在)
    if not os.path.exists(SAVE_FOLDER):
        os.makedirs(SAVE_FOLDER)
        print(f"{Fore.CYAN}已创建保存文件夹:{os.path.abspath(SAVE_FOLDER)}{Style.RESET_ALL}")

    # 创建安全的文件名
    safe_title = "".join([c for c in novel_title if c not in '/\\:*?"<>|'])
    filename = os.path.join(SAVE_FOLDER, f"{safe_title}.txt")

    success_count = 0
    fail_count = 0
    chapter_results = []
    completed_count = 0  # 用于跟踪进度
    total_chapters = len(chapters)  # 总章节数

    # 定义进度更新函数
    def update_progress():
        nonlocal completed_count
        completed_count += 1
        print_progress(completed_count, total_chapters)

    semaphore = asyncio.Semaphore(max_concurrent)

    async with await create_async_session() as session:
        # 初始化进度条
        print_progress(0, total_chapters)

        tasks = []
        for index, chap in enumerate(chapters):
            # 传递进度更新函数
            task = asyncio.ensure_future(
                download_single_chapter(
                    session, chap, index, semaphore, update_progress
                )
            )
            tasks.append(task)

        chapter_results = await asyncio.gather(*tasks)

    # 按索引排序确保顺序
    chapter_results.sort(key=lambda x: x[0])

    # 写入文件
    try:
        with open(filename, "w", encoding="utf-8") as f:
            for result in chapter_results:
                index, chapter_title, content = result
                if content:
                    f.write(f"【{chapter_title}】\n")
                    f.write(content)
                    f.write("\n\n")
                    success_count += 1
                else:
                    fail_count += 1
                    print(f"{Fore.YELLOW}警告:章节《{chapter_title}》内容为空,已跳过{Style.RESET_ALL}")

        print(
            f"\n{Fore.GREEN}下载完成!共{len(chapters)}章,成功下载{success_count}章,失败{fail_count}章{Style.RESET_ALL}")
        print(f"{Fore.GREEN}文件已保存至:{os.path.abspath(filename)}{Style.RESET_ALL}")

    except Exception as e:
        print(f"{Fore.RED}文件写入失败:{str(e)}{Style.RESET_ALL}")


async def download_single_chapter(session, chap, index, semaphore, progress_callback):
    """下载单个章节并返回带索引的结果"""
    try:
        content = await get_full_chapter_content_async(session, chap["url"], semaphore)
        chapter_title = await get_chapter_title_async(session, chap["url"])
        if not chapter_title:
            chapter_title = chap["title"]

        # 调用回调函数更新进度
        progress_callback()
        return index, chapter_title, content

    except Exception as e:
        print(f"\n{Fore.RED}章节《{chap['title']}》下载失败:{str(e)}{Style.RESET_ALL}")
        # 即使失败也更新进度
        progress_callback()
        return index, chap["title"], None


# 新增:将获取章节列表和下载章节合并为一个异步函数
async def main_async():
    try:
        keyword = input("\n请输入搜索关键字:")
        novels = search_novels(keyword)
        if not novels:
            return

        choice = int(input("\n请输入要下载的小说序号:"))
        selected_novel = next((n for n in novels if n["index"] == choice), None)

        if not selected_novel:
            print("无效的序号")
            return

        print(f"\n正在获取《{selected_novel['title']}》的章节列表...")
        # 异步获取章节列表
        chapters = await get_all_chapters_async(selected_novel["url"])

        if not chapters:
            print("未获取到任何章节")
            return

        print(f"\n《{selected_novel['title']}》共找到 {len(chapters)} 个正文章节,开始异步下载...")
        print(f"并发数:200(可通过修改max_concurrent参数调整)")
        for i, chap in enumerate(chapters[:5], 1):
            print(f"{i}. {chap['title']}")
        if len(chapters) > 5:
            print(f"... 省略 {len(chapters) - 5} 章")

        # 异步下载章节
        await download_chapters_async(
            selected_novel["title"],
            chapters,
            max_concurrent=200  # 可根据需要调整并发数
        )

    except ValueError:
        print("请输入有效的数字")
    except Exception as e:
        print(f"操作失败:{str(e)}")


def main():
    # 使用统一的asyncio.run()避免事件循环警告
    asyncio.run(main_async())


if __name__ == "__main__":
    main()

欢迎使用来到我的爬虫空间

        新人一枚,有一点Python基础,属于业余混混,这是在AI辅助下,用关键字引导编写出的爬虫,已初步调试OK,暂无痛点

程序目的说明

该程序是一个抖音小说(douyinxs)的异步下载工具,主要功能是通过关键词搜索小说,获取指定小说的所有章节内容,并将其整合为 TXT 文件保存到本地,实现小说的批量下载与离线阅读。

核心逻辑说明

程序整体采用 “搜索 - 获取章节 - 下载内容” 的三段式逻辑,结合异步编程提升效率,具体流程如下:

  1. 搜索小说(同步操作)

    • 通过search_novels函数接收用户输入的关键词,向目标网站发送 POST 请求
    • 解析返回的 HTML 页面,提取小说标题、作者、分类、状态等信息,生成可选择的小说列表
    • 处理网络异常(超时、连接失败等),支持最多 5 次重试
  2. 获取章节列表(异步操作)

    • 用户选择目标小说后,通过get_all_chapters_async异步爬取所有章节信息
    • 采用分页递归获取模式,自动识别 “下一页” 链接,循环提取所有正文章节(跳过前 12 个非正文内容)
    • 使用信号量控制并发请求数(默认 3 个并发),避免请求过频繁请求服务器
  3. 下载章节内容(异步操作)

    • 通过download_chapters_async批量异步下载章节内容,支持并发控制(默认 200 个并发)
    • 单章节下载逻辑:
      • get_full_chapter_content_async处理章节分页内容拼接(支持章节内分页)
      • 自动提取正文内容并清理格式,保留文本段落结构
      • 失败时自动重试(最多 3 次)
    • 按章节顺序写入 TXT 文件,保存路径为douyin_novels文件夹,文件名含特殊字符的标题进行安全处理

技术特点

  1. 异步优化:使用aiohttpasyncio实现异步网络请求,相比同步方式大幅提升下载效率
  2. 反爬措施
    • 随机生成请求头(User-Agent、Accept 等)模拟不同浏览器
    • 随机请求间隔避免触发频率限制
    • 跳过 SSL 证书验证,处理 HTTPS 连接问题
  3. 用户体验
    • 彩色进度条实时显示下载进度
    • 详细的错误提示与重试机制
    • 自动创建保存目录,生成清晰的文件结构
  4. 鲁棒性设计
    • 多层级异常捕获(连接错误、超时、解析失败等)
    • 失败重试机制(搜索 / 章节 / 内容各环节均支持重试)
    • 兼容不同 Python 环境(对缺少的RemoteDisconnected异常进行兼容定义)

运行流程

  1. 用户输入搜索关键词,程序返回匹配的小说列表
  2. 用户选择目标小说序号
  3. 程序异步获取该小说的所有章节列表
  4. 展示章节数量及部分章节预览
  5. 异步并发下载所有章节内容并按顺序写入 TXT 文件
  6. 输出下载结果(成功 / 失败数量)及文件保存路径

运行结果截图

程序实现完整代码:

import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
import random
import sys
from colorama import Fore, Style
import urllib3
import os
from urllib.parse import urljoin

# 忽略SSL证书验证警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 初始化colorama
import colorama

colorama.init(autoreset=True)

# 定义保存文件夹
SAVE_FOLDER = "douyin_novels"

# 尝试导入RemoteDisconnected,如果失败则定义一个兼容类
try:
    from urllib3.exceptions import RemoteDisconnected
except ImportError:
    class RemoteDisconnected(Exception):
        pass

# 模拟浏览器请求头池
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Safari/605.1.15",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/114.0",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.68"
]

# 模拟不同的浏览器指纹
ACCEPT_HEADERS = [
    "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
    "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"
]


def get_headers():
    """动态生成更接近真实浏览器的请求头"""
    return {
        "User-Agent": random.choice(USER_AGENTS),
        "Referer": "https://www.douyinxs/",
        "Accept": random.choice(ACCEPT_HEADERS),
        "Accept-Language": "zh-CN,zh;q=0.9",
        "Accept-Encoding": "gzip, deflate, br",
        "Connection": "keep-alive",
        "Upgrade-Insecure-Requests": "1",
        "Cache-Control": f"max-age={random.randint(0, 300)}"
    }


def print_progress(completed, total):
    """打印自定义进度条"""
    percent = completed / total * 100
    bar_length = 50
    filled_length = int(bar_length * completed // total)
    # 使用颜色美化进度条
    bar = f"{Fore.GREEN}{'█' * filled_length}{Style.RESET_ALL}{'-' * (bar_length - filled_length)}"
    sys.stdout.write(f'\r进度: [{bar}] {percent:.1f}% ({completed}/{total})')
    sys.stdout.flush()
    # 完成时换行
    if completed == total:
        print()


async def create_async_session():
    """创建异步会话"""
    timeout = aiohttp.ClientTimeout(total=30)
    connector = aiohttp.TCPConnector(ssl=False)  # 跳过SSL验证
    session = aiohttp.ClientSession(
        connector=connector,
        timeout=timeout,
        headers=get_headers()
    )
    return session


def search_novels(keyword):
    """搜索小说并返回结果列表(同步操作)"""
    import requests

    search_url = "https://www.douyinxs/search/"
    data = {"searchkey": keyword}
    max_retries = 5

    for retry in range(max_retries):
        try:
            time.sleep(random.uniform(1, 2))  # 缩短搜索延迟

            response = requests.post(
                search_url,
                data=data,
                headers=get_headers(),
                timeout=15,
                allow_redirects=True,
                verify=False
            )

            if response.status_code != 200:
                print(f"搜索请求失败,状态码:{response.status_code},正在重试...")
                time.sleep(3 * (retry + 1))
                continue

            response.encoding = "utf-8"
            soup = BeautifulSoup(response.text, "html.parser")

            # 定位搜索结果列表
            result_list = soup.select_one("#main > div.novelslist2 > ul")
            if not result_list:
                print("未找到相关小说")
                return []

            # 提取小说信息
            novels = []
            for idx, item in enumerate(result_list.find_all("li")[1:], 1):
                category = item.select_one(".s1").text.strip()
                title_tag = item.select_one(".s2 a")
                title = title_tag.text.strip() if title_tag else f"未知标题{idx}"
                href = title_tag["href"] if title_tag else ""
                author = item.select_one(".s4").text.strip()
                word_count = item.select_one(".s5").text.strip()
                status = item.select_one(".s7").text.strip()

                # 补全链接为绝对路径
                if href and not href.startswith("http"):
                    href = f"https://www.douyinxs{href}"

                novels.append({
                    "index": idx,
                    "title": title,
                    "author": author,
                    "category": category,
                    "word_count": word_count,
                    "status": status,
                    "url": href
                })

                print(f"{idx}. {title} | 作者:{author} | 分类:{category} | 字数:{word_count} | 状态:{status}")

            return novels

        except (requests.exceptions.ConnectionError,
                RemoteDisconnected,
                requests.exceptions.Timeout):
            print(f"{Fore.RED}搜索连接失败,正在重试第{retry + 1}/{max_retries}次...{Style.RESET_ALL}")
            time.sleep(5 * (retry + 1))
        except Exception as e:
            print(f"{Fore.RED}搜索发生错误:{str(e)},正在重试...{Style.RESET_ALL}")
            time.sleep(3 * (retry + 1))

    print(f"{Fore.RED}达到最大重试次数,搜索失败{Style.RESET_ALL}")
    return []


async def fetch_chapter_page(session, url, semaphore, retry=3):
    """异步获取单页章节列表"""
    async with semaphore:
        try:
            # 缩短章节列表请求延迟
            await asyncio.sleep(random.uniform(0.5, 1.5))

            async with session.get(url) as response:
                if response.status != 200:
                    if retry > 0:
                        await asyncio.sleep(1)
                        return await fetch_chapter_page(session, url, semaphore, retry - 1)
                    print(f"{Fore.YELLOW}章节页 {url} 请求失败,状态码:{response.status}{Style.RESET_ALL}")
                    return None, None

                content = await response.text()
                soup = BeautifulSoup(content, "html.parser")

                # 查找下一页链接
                next_link = soup.select_one("body > div.index-container > a:nth-child(3)")
                next_url = None
                if next_link and "下一页" in next_link.text:
                    next_url = urljoin("https://www.douyinxs", next_link["href"])

                # 提取章节(从第13项开始为正文)
                chapters = []
                chapter_dl = soup.select_one("#list > dl")
                if chapter_dl:
                    dd_tags = chapter_dl.find_all("dd")
                    for dd in dd_tags[12:]:  # 跳过前12个非正文章节
                        a_tag = dd.find("a")
                        if a_tag:
                            chapter_title = a_tag.text.strip()
                            chapter_url = urljoin("https://www.douyinxs", a_tag["href"])
                            chapters.append({
                                "title": chapter_title,
                                "url": chapter_url
                            })

                return chapters, next_url

        except (aiohttp.ClientError, RemoteDisconnected) as e:
            if retry > 0:
                await asyncio.sleep(1)
                return await fetch_chapter_page(session, url, semaphore, retry - 1)
            print(f"{Fore.RED}获取章节页 {url} 失败:{str(e)}{Style.RESET_ALL}")
            return None, None


async def get_all_chapters_async(novel_url):
    """异步爬取小说所有正文章节,大幅提升速度"""
    chapters = []
    current_url = novel_url
    page_count = 1

    # 创建信号量控制章节列表并发请求数
    semaphore = asyncio.Semaphore(3)

    async with await create_async_session() as session:
        print("开始异步获取章节列表...")

        # 循环获取所有章节页
        while current_url:
            print(f"正在获取第 {page_count} 页章节...", end="\r")

            # 异步获取当前页章节
            page_chapters, next_url = await fetch_chapter_page(session, current_url, semaphore)

            if page_chapters:
                chapters.extend(page_chapters)
                page_count += 1

            # 准备获取下一页
            current_url = next_url

    print(f"\r已获取所有章节页,共 {page_count - 1} 页")
    return chapters


async def fetch_url(session, url, retry=3):
    """异步获取URL内容"""
    try:
        await asyncio.sleep(random.uniform(0.3, 1.2))

        async with session.get(url) as response:
            if response.status != 200:
                if retry > 0:
                    await asyncio.sleep(2)
                    return await fetch_url(session, url, retry - 1)
                return None, response.status

            content = await response.text()
            return content, response.status
    except (aiohttp.ClientError, RemoteDisconnected) as e:
        if retry > 0:
            await asyncio.sleep(2)
            return await fetch_url(session, url, retry - 1)
        print(f"{Fore.RED}获取URL失败:{url},错误:{str(e)}{Style.RESET_ALL}")
        return None, 500
    except Exception as e:
        print(f"{Fore.RED}处理URL时出错:{url},错误:{str(e)}{Style.RESET_ALL}")
        return None, 500


async def get_full_chapter_content_async(session, chapter_url, semaphore):
    """异步获取完整的章节内容(带信号量控制并发)"""
    async with semaphore:
        full_content = []
        current_url = chapter_url
        page_count = 1
        max_page_retries = 3

        while True:
            content, status = await fetch_url(session, current_url, max_page_retries)

            if not content or status != 200:
                print(f"{Fore.YELLOW}章节分页 {page_count} 请求失败,状态码:{status}{Style.RESET_ALL}")
                return "\n\n".join(full_content)

            soup = BeautifulSoup(content, "html.parser")

            # 提取当前页内容
            content_tag = soup.select_one("#content")
            if not content_tag:
                print(f"{Fore.YELLOW}警告:章节分页 {page_count} 内容提取失败{Style.RESET_ALL}")
                return "\n\n".join(full_content)

            # 清理当前页内容
            for p in content_tag.find_all("p"):
                text = p.text.strip()
                if text:
                    full_content.append(text)

            # 查找下一页链接
            next_link = soup.select_one("#next")
            if next_link and "下一页" in next_link.text:
                next_url = urljoin("https://www.douyinxs", next_link["href"])
                current_url = next_url
                page_count += 1
            else:
                return "\n\n".join(full_content)


async def get_chapter_title_async(session, chapter_url):
    """异步获取章节标题"""
    try:
        content, status = await fetch_url(session, chapter_url, 2)
        if not content or status != 200:
            return None

        soup = BeautifulSoup(content, "html.parser")
        chapter_title_tag = soup.select_one("div.bookname h1")
        return chapter_title_tag.text.strip() if chapter_title_tag else None
    except Exception as e:
        print(f"{Fore.YELLOW}获取章节标题失败:{str(e)}{Style.RESET_ALL}")
        return None


async def download_chapters_async(novel_title, chapters, max_concurrent=5):
    """异步下载所有章节内容,保持章节顺序"""
    # 创建保存文件夹(如果不存在)
    if not os.path.exists(SAVE_FOLDER):
        os.makedirs(SAVE_FOLDER)
        print(f"{Fore.CYAN}已创建保存文件夹:{os.path.abspath(SAVE_FOLDER)}{Style.RESET_ALL}")

    # 创建安全的文件名
    safe_title = "".join([c for c in novel_title if c not in '/\\:*?"<>|'])
    filename = os.path.join(SAVE_FOLDER, f"{safe_title}.txt")

    success_count = 0
    fail_count = 0
    chapter_results = []
    completed_count = 0  # 用于跟踪进度
    total_chapters = len(chapters)  # 总章节数

    # 定义进度更新函数
    def update_progress():
        nonlocal completed_count
        completed_count += 1
        print_progress(completed_count, total_chapters)

    semaphore = asyncio.Semaphore(max_concurrent)

    async with await create_async_session() as session:
        # 初始化进度条
        print_progress(0, total_chapters)

        tasks = []
        for index, chap in enumerate(chapters):
            # 传递进度更新函数
            task = asyncio.ensure_future(
                download_single_chapter(
                    session, chap, index, semaphore, update_progress
                )
            )
            tasks.append(task)

        chapter_results = await asyncio.gather(*tasks)

    # 按索引排序确保顺序
    chapter_results.sort(key=lambda x: x[0])

    # 写入文件
    try:
        with open(filename, "w", encoding="utf-8") as f:
            for result in chapter_results:
                index, chapter_title, content = result
                if content:
                    f.write(f"【{chapter_title}】\n")
                    f.write(content)
                    f.write("\n\n")
                    success_count += 1
                else:
                    fail_count += 1
                    print(f"{Fore.YELLOW}警告:章节《{chapter_title}》内容为空,已跳过{Style.RESET_ALL}")

        print(
            f"\n{Fore.GREEN}下载完成!共{len(chapters)}章,成功下载{success_count}章,失败{fail_count}章{Style.RESET_ALL}")
        print(f"{Fore.GREEN}文件已保存至:{os.path.abspath(filename)}{Style.RESET_ALL}")

    except Exception as e:
        print(f"{Fore.RED}文件写入失败:{str(e)}{Style.RESET_ALL}")


async def download_single_chapter(session, chap, index, semaphore, progress_callback):
    """下载单个章节并返回带索引的结果"""
    try:
        content = await get_full_chapter_content_async(session, chap["url"], semaphore)
        chapter_title = await get_chapter_title_async(session, chap["url"])
        if not chapter_title:
            chapter_title = chap["title"]

        # 调用回调函数更新进度
        progress_callback()
        return index, chapter_title, content

    except Exception as e:
        print(f"\n{Fore.RED}章节《{chap['title']}》下载失败:{str(e)}{Style.RESET_ALL}")
        # 即使失败也更新进度
        progress_callback()
        return index, chap["title"], None


# 新增:将获取章节列表和下载章节合并为一个异步函数
async def main_async():
    try:
        keyword = input("\n请输入搜索关键字:")
        novels = search_novels(keyword)
        if not novels:
            return

        choice = int(input("\n请输入要下载的小说序号:"))
        selected_novel = next((n for n in novels if n["index"] == choice), None)

        if not selected_novel:
            print("无效的序号")
            return

        print(f"\n正在获取《{selected_novel['title']}》的章节列表...")
        # 异步获取章节列表
        chapters = await get_all_chapters_async(selected_novel["url"])

        if not chapters:
            print("未获取到任何章节")
            return

        print(f"\n《{selected_novel['title']}》共找到 {len(chapters)} 个正文章节,开始异步下载...")
        print(f"并发数:200(可通过修改max_concurrent参数调整)")
        for i, chap in enumerate(chapters[:5], 1):
            print(f"{i}. {chap['title']}")
        if len(chapters) > 5:
            print(f"... 省略 {len(chapters) - 5} 章")

        # 异步下载章节
        await download_chapters_async(
            selected_novel["title"],
            chapters,
            max_concurrent=200  # 可根据需要调整并发数
        )

    except ValueError:
        print("请输入有效的数字")
    except Exception as e:
        print(f"操作失败:{str(e)}")


def main():
    # 使用统一的asyncio.run()避免事件循环警告
    asyncio.run(main_async())


if __name__ == "__main__":
    main()

本文标签: 爬虫无措案例语言小说