admin管理员组文章数量:1130349
欢迎使用来到我的爬虫空间
新人一枚,有一点Python基础,属于业余混混,这是在AI辅助下,用关键字引导编写出的爬虫,已初步调试OK,暂无痛点
程序目的说明
该程序是一个抖音小说(douyinxs)的异步下载工具,主要功能是通过关键词搜索小说,获取指定小说的所有章节内容,并将其整合为 TXT 文件保存到本地,实现小说的批量下载与离线阅读。
核心逻辑说明
程序整体采用 “搜索 - 获取章节 - 下载内容” 的三段式逻辑,结合异步编程提升效率,具体流程如下:
-
搜索小说(同步操作)
- 通过
search_novels函数接收用户输入的关键词,向目标网站发送 POST 请求 - 解析返回的 HTML 页面,提取小说标题、作者、分类、状态等信息,生成可选择的小说列表
- 处理网络异常(超时、连接失败等),支持最多 5 次重试
- 通过
-
获取章节列表(异步操作)
- 用户选择目标小说后,通过
get_all_chapters_async异步爬取所有章节信息 - 采用分页递归获取模式,自动识别 “下一页” 链接,循环提取所有正文章节(跳过前 12 个非正文内容)
- 使用信号量控制并发请求数(默认 3 个并发),避免请求过频繁请求服务器
- 用户选择目标小说后,通过
-
下载章节内容(异步操作)
- 通过
download_chapters_async批量异步下载章节内容,支持并发控制(默认 200 个并发) - 单章节下载逻辑:
- 由
get_full_chapter_content_async处理章节分页内容拼接(支持章节内分页) - 自动提取正文内容并清理格式,保留文本段落结构
- 失败时自动重试(最多 3 次)
- 由
- 按章节顺序写入 TXT 文件,保存路径为
douyin_novels文件夹,文件名含特殊字符的标题进行安全处理
- 通过
技术特点
- 异步优化:使用
aiohttp和asyncio实现异步网络请求,相比同步方式大幅提升下载效率 - 反爬措施:
- 随机生成请求头(User-Agent、Accept 等)模拟不同浏览器
- 随机请求间隔避免触发频率限制
- 跳过 SSL 证书验证,处理 HTTPS 连接问题
- 用户体验:
- 彩色进度条实时显示下载进度
- 详细的错误提示与重试机制
- 自动创建保存目录,生成清晰的文件结构
- 鲁棒性设计:
- 多层级异常捕获(连接错误、超时、解析失败等)
- 失败重试机制(搜索 / 章节 / 内容各环节均支持重试)
- 兼容不同 Python 环境(对缺少的
RemoteDisconnected异常进行兼容定义)
运行流程
- 用户输入搜索关键词,程序返回匹配的小说列表
- 用户选择目标小说序号
- 程序异步获取该小说的所有章节列表
- 展示章节数量及部分章节预览
- 异步并发下载所有章节内容并按顺序写入 TXT 文件
- 输出下载结果(成功 / 失败数量)及文件保存路径
运行结果截图
程序实现完整代码:
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
import random
import sys
from colorama import Fore, Style
import urllib3
import os
from urllib.parse import urljoin
# 忽略SSL证书验证警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 初始化colorama
import colorama
colorama.init(autoreset=True)
# 定义保存文件夹
SAVE_FOLDER = "douyin_novels"
# 尝试导入RemoteDisconnected,如果失败则定义一个兼容类
try:
from urllib3.exceptions import RemoteDisconnected
except ImportError:
class RemoteDisconnected(Exception):
pass
# 模拟浏览器请求头池
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/114.0",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.68"
]
# 模拟不同的浏览器指纹
ACCEPT_HEADERS = [
"text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"
]
def get_headers():
"""动态生成更接近真实浏览器的请求头"""
return {
"User-Agent": random.choice(USER_AGENTS),
"Referer": "https://www.douyinxs/",
"Accept": random.choice(ACCEPT_HEADERS),
"Accept-Language": "zh-CN,zh;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Cache-Control": f"max-age={random.randint(0, 300)}"
}
def print_progress(completed, total):
"""打印自定义进度条"""
percent = completed / total * 100
bar_length = 50
filled_length = int(bar_length * completed // total)
# 使用颜色美化进度条
bar = f"{Fore.GREEN}{'█' * filled_length}{Style.RESET_ALL}{'-' * (bar_length - filled_length)}"
sys.stdout.write(f'\r进度: [{bar}] {percent:.1f}% ({completed}/{total})')
sys.stdout.flush()
# 完成时换行
if completed == total:
print()
async def create_async_session():
"""创建异步会话"""
timeout = aiohttp.ClientTimeout(total=30)
connector = aiohttp.TCPConnector(ssl=False) # 跳过SSL验证
session = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers=get_headers()
)
return session
def search_novels(keyword):
"""搜索小说并返回结果列表(同步操作)"""
import requests
search_url = "https://www.douyinxs/search/"
data = {"searchkey": keyword}
max_retries = 5
for retry in range(max_retries):
try:
time.sleep(random.uniform(1, 2)) # 缩短搜索延迟
response = requests.post(
search_url,
data=data,
headers=get_headers(),
timeout=15,
allow_redirects=True,
verify=False
)
if response.status_code != 200:
print(f"搜索请求失败,状态码:{response.status_code},正在重试...")
time.sleep(3 * (retry + 1))
continue
response.encoding = "utf-8"
soup = BeautifulSoup(response.text, "html.parser")
# 定位搜索结果列表
result_list = soup.select_one("#main > div.novelslist2 > ul")
if not result_list:
print("未找到相关小说")
return []
# 提取小说信息
novels = []
for idx, item in enumerate(result_list.find_all("li")[1:], 1):
category = item.select_one(".s1").text.strip()
title_tag = item.select_one(".s2 a")
title = title_tag.text.strip() if title_tag else f"未知标题{idx}"
href = title_tag["href"] if title_tag else ""
author = item.select_one(".s4").text.strip()
word_count = item.select_one(".s5").text.strip()
status = item.select_one(".s7").text.strip()
# 补全链接为绝对路径
if href and not href.startswith("http"):
href = f"https://www.douyinxs{href}"
novels.append({
"index": idx,
"title": title,
"author": author,
"category": category,
"word_count": word_count,
"status": status,
"url": href
})
print(f"{idx}. {title} | 作者:{author} | 分类:{category} | 字数:{word_count} | 状态:{status}")
return novels
except (requests.exceptions.ConnectionError,
RemoteDisconnected,
requests.exceptions.Timeout):
print(f"{Fore.RED}搜索连接失败,正在重试第{retry + 1}/{max_retries}次...{Style.RESET_ALL}")
time.sleep(5 * (retry + 1))
except Exception as e:
print(f"{Fore.RED}搜索发生错误:{str(e)},正在重试...{Style.RESET_ALL}")
time.sleep(3 * (retry + 1))
print(f"{Fore.RED}达到最大重试次数,搜索失败{Style.RESET_ALL}")
return []
async def fetch_chapter_page(session, url, semaphore, retry=3):
"""异步获取单页章节列表"""
async with semaphore:
try:
# 缩短章节列表请求延迟
await asyncio.sleep(random.uniform(0.5, 1.5))
async with session.get(url) as response:
if response.status != 200:
if retry > 0:
await asyncio.sleep(1)
return await fetch_chapter_page(session, url, semaphore, retry - 1)
print(f"{Fore.YELLOW}章节页 {url} 请求失败,状态码:{response.status}{Style.RESET_ALL}")
return None, None
content = await response.text()
soup = BeautifulSoup(content, "html.parser")
# 查找下一页链接
next_link = soup.select_one("body > div.index-container > a:nth-child(3)")
next_url = None
if next_link and "下一页" in next_link.text:
next_url = urljoin("https://www.douyinxs", next_link["href"])
# 提取章节(从第13项开始为正文)
chapters = []
chapter_dl = soup.select_one("#list > dl")
if chapter_dl:
dd_tags = chapter_dl.find_all("dd")
for dd in dd_tags[12:]: # 跳过前12个非正文章节
a_tag = dd.find("a")
if a_tag:
chapter_title = a_tag.text.strip()
chapter_url = urljoin("https://www.douyinxs", a_tag["href"])
chapters.append({
"title": chapter_title,
"url": chapter_url
})
return chapters, next_url
except (aiohttp.ClientError, RemoteDisconnected) as e:
if retry > 0:
await asyncio.sleep(1)
return await fetch_chapter_page(session, url, semaphore, retry - 1)
print(f"{Fore.RED}获取章节页 {url} 失败:{str(e)}{Style.RESET_ALL}")
return None, None
async def get_all_chapters_async(novel_url):
"""异步爬取小说所有正文章节,大幅提升速度"""
chapters = []
current_url = novel_url
page_count = 1
# 创建信号量控制章节列表并发请求数
semaphore = asyncio.Semaphore(3)
async with await create_async_session() as session:
print("开始异步获取章节列表...")
# 循环获取所有章节页
while current_url:
print(f"正在获取第 {page_count} 页章节...", end="\r")
# 异步获取当前页章节
page_chapters, next_url = await fetch_chapter_page(session, current_url, semaphore)
if page_chapters:
chapters.extend(page_chapters)
page_count += 1
# 准备获取下一页
current_url = next_url
print(f"\r已获取所有章节页,共 {page_count - 1} 页")
return chapters
async def fetch_url(session, url, retry=3):
"""异步获取URL内容"""
try:
await asyncio.sleep(random.uniform(0.3, 1.2))
async with session.get(url) as response:
if response.status != 200:
if retry > 0:
await asyncio.sleep(2)
return await fetch_url(session, url, retry - 1)
return None, response.status
content = await response.text()
return content, response.status
except (aiohttp.ClientError, RemoteDisconnected) as e:
if retry > 0:
await asyncio.sleep(2)
return await fetch_url(session, url, retry - 1)
print(f"{Fore.RED}获取URL失败:{url},错误:{str(e)}{Style.RESET_ALL}")
return None, 500
except Exception as e:
print(f"{Fore.RED}处理URL时出错:{url},错误:{str(e)}{Style.RESET_ALL}")
return None, 500
async def get_full_chapter_content_async(session, chapter_url, semaphore):
"""异步获取完整的章节内容(带信号量控制并发)"""
async with semaphore:
full_content = []
current_url = chapter_url
page_count = 1
max_page_retries = 3
while True:
content, status = await fetch_url(session, current_url, max_page_retries)
if not content or status != 200:
print(f"{Fore.YELLOW}章节分页 {page_count} 请求失败,状态码:{status}{Style.RESET_ALL}")
return "\n\n".join(full_content)
soup = BeautifulSoup(content, "html.parser")
# 提取当前页内容
content_tag = soup.select_one("#content")
if not content_tag:
print(f"{Fore.YELLOW}警告:章节分页 {page_count} 内容提取失败{Style.RESET_ALL}")
return "\n\n".join(full_content)
# 清理当前页内容
for p in content_tag.find_all("p"):
text = p.text.strip()
if text:
full_content.append(text)
# 查找下一页链接
next_link = soup.select_one("#next")
if next_link and "下一页" in next_link.text:
next_url = urljoin("https://www.douyinxs", next_link["href"])
current_url = next_url
page_count += 1
else:
return "\n\n".join(full_content)
async def get_chapter_title_async(session, chapter_url):
"""异步获取章节标题"""
try:
content, status = await fetch_url(session, chapter_url, 2)
if not content or status != 200:
return None
soup = BeautifulSoup(content, "html.parser")
chapter_title_tag = soup.select_one("div.bookname h1")
return chapter_title_tag.text.strip() if chapter_title_tag else None
except Exception as e:
print(f"{Fore.YELLOW}获取章节标题失败:{str(e)}{Style.RESET_ALL}")
return None
async def download_chapters_async(novel_title, chapters, max_concurrent=5):
"""异步下载所有章节内容,保持章节顺序"""
# 创建保存文件夹(如果不存在)
if not os.path.exists(SAVE_FOLDER):
os.makedirs(SAVE_FOLDER)
print(f"{Fore.CYAN}已创建保存文件夹:{os.path.abspath(SAVE_FOLDER)}{Style.RESET_ALL}")
# 创建安全的文件名
safe_title = "".join([c for c in novel_title if c not in '/\\:*?"<>|'])
filename = os.path.join(SAVE_FOLDER, f"{safe_title}.txt")
success_count = 0
fail_count = 0
chapter_results = []
completed_count = 0 # 用于跟踪进度
total_chapters = len(chapters) # 总章节数
# 定义进度更新函数
def update_progress():
nonlocal completed_count
completed_count += 1
print_progress(completed_count, total_chapters)
semaphore = asyncio.Semaphore(max_concurrent)
async with await create_async_session() as session:
# 初始化进度条
print_progress(0, total_chapters)
tasks = []
for index, chap in enumerate(chapters):
# 传递进度更新函数
task = asyncio.ensure_future(
download_single_chapter(
session, chap, index, semaphore, update_progress
)
)
tasks.append(task)
chapter_results = await asyncio.gather(*tasks)
# 按索引排序确保顺序
chapter_results.sort(key=lambda x: x[0])
# 写入文件
try:
with open(filename, "w", encoding="utf-8") as f:
for result in chapter_results:
index, chapter_title, content = result
if content:
f.write(f"【{chapter_title}】\n")
f.write(content)
f.write("\n\n")
success_count += 1
else:
fail_count += 1
print(f"{Fore.YELLOW}警告:章节《{chapter_title}》内容为空,已跳过{Style.RESET_ALL}")
print(
f"\n{Fore.GREEN}下载完成!共{len(chapters)}章,成功下载{success_count}章,失败{fail_count}章{Style.RESET_ALL}")
print(f"{Fore.GREEN}文件已保存至:{os.path.abspath(filename)}{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}文件写入失败:{str(e)}{Style.RESET_ALL}")
async def download_single_chapter(session, chap, index, semaphore, progress_callback):
"""下载单个章节并返回带索引的结果"""
try:
content = await get_full_chapter_content_async(session, chap["url"], semaphore)
chapter_title = await get_chapter_title_async(session, chap["url"])
if not chapter_title:
chapter_title = chap["title"]
# 调用回调函数更新进度
progress_callback()
return index, chapter_title, content
except Exception as e:
print(f"\n{Fore.RED}章节《{chap['title']}》下载失败:{str(e)}{Style.RESET_ALL}")
# 即使失败也更新进度
progress_callback()
return index, chap["title"], None
# 新增:将获取章节列表和下载章节合并为一个异步函数
async def main_async():
try:
keyword = input("\n请输入搜索关键字:")
novels = search_novels(keyword)
if not novels:
return
choice = int(input("\n请输入要下载的小说序号:"))
selected_novel = next((n for n in novels if n["index"] == choice), None)
if not selected_novel:
print("无效的序号")
return
print(f"\n正在获取《{selected_novel['title']}》的章节列表...")
# 异步获取章节列表
chapters = await get_all_chapters_async(selected_novel["url"])
if not chapters:
print("未获取到任何章节")
return
print(f"\n《{selected_novel['title']}》共找到 {len(chapters)} 个正文章节,开始异步下载...")
print(f"并发数:200(可通过修改max_concurrent参数调整)")
for i, chap in enumerate(chapters[:5], 1):
print(f"{i}. {chap['title']}")
if len(chapters) > 5:
print(f"... 省略 {len(chapters) - 5} 章")
# 异步下载章节
await download_chapters_async(
selected_novel["title"],
chapters,
max_concurrent=200 # 可根据需要调整并发数
)
except ValueError:
print("请输入有效的数字")
except Exception as e:
print(f"操作失败:{str(e)}")
def main():
# 使用统一的asyncio.run()避免事件循环警告
asyncio.run(main_async())
if __name__ == "__main__":
main()
欢迎使用来到我的爬虫空间
新人一枚,有一点Python基础,属于业余混混,这是在AI辅助下,用关键字引导编写出的爬虫,已初步调试OK,暂无痛点
程序目的说明
该程序是一个抖音小说(douyinxs)的异步下载工具,主要功能是通过关键词搜索小说,获取指定小说的所有章节内容,并将其整合为 TXT 文件保存到本地,实现小说的批量下载与离线阅读。
核心逻辑说明
程序整体采用 “搜索 - 获取章节 - 下载内容” 的三段式逻辑,结合异步编程提升效率,具体流程如下:
-
搜索小说(同步操作)
- 通过
search_novels函数接收用户输入的关键词,向目标网站发送 POST 请求 - 解析返回的 HTML 页面,提取小说标题、作者、分类、状态等信息,生成可选择的小说列表
- 处理网络异常(超时、连接失败等),支持最多 5 次重试
- 通过
-
获取章节列表(异步操作)
- 用户选择目标小说后,通过
get_all_chapters_async异步爬取所有章节信息 - 采用分页递归获取模式,自动识别 “下一页” 链接,循环提取所有正文章节(跳过前 12 个非正文内容)
- 使用信号量控制并发请求数(默认 3 个并发),避免请求过频繁请求服务器
- 用户选择目标小说后,通过
-
下载章节内容(异步操作)
- 通过
download_chapters_async批量异步下载章节内容,支持并发控制(默认 200 个并发) - 单章节下载逻辑:
- 由
get_full_chapter_content_async处理章节分页内容拼接(支持章节内分页) - 自动提取正文内容并清理格式,保留文本段落结构
- 失败时自动重试(最多 3 次)
- 由
- 按章节顺序写入 TXT 文件,保存路径为
douyin_novels文件夹,文件名含特殊字符的标题进行安全处理
- 通过
技术特点
- 异步优化:使用
aiohttp和asyncio实现异步网络请求,相比同步方式大幅提升下载效率 - 反爬措施:
- 随机生成请求头(User-Agent、Accept 等)模拟不同浏览器
- 随机请求间隔避免触发频率限制
- 跳过 SSL 证书验证,处理 HTTPS 连接问题
- 用户体验:
- 彩色进度条实时显示下载进度
- 详细的错误提示与重试机制
- 自动创建保存目录,生成清晰的文件结构
- 鲁棒性设计:
- 多层级异常捕获(连接错误、超时、解析失败等)
- 失败重试机制(搜索 / 章节 / 内容各环节均支持重试)
- 兼容不同 Python 环境(对缺少的
RemoteDisconnected异常进行兼容定义)
运行流程
- 用户输入搜索关键词,程序返回匹配的小说列表
- 用户选择目标小说序号
- 程序异步获取该小说的所有章节列表
- 展示章节数量及部分章节预览
- 异步并发下载所有章节内容并按顺序写入 TXT 文件
- 输出下载结果(成功 / 失败数量)及文件保存路径
运行结果截图
程序实现完整代码:
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
import random
import sys
from colorama import Fore, Style
import urllib3
import os
from urllib.parse import urljoin
# 忽略SSL证书验证警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 初始化colorama
import colorama
colorama.init(autoreset=True)
# 定义保存文件夹
SAVE_FOLDER = "douyin_novels"
# 尝试导入RemoteDisconnected,如果失败则定义一个兼容类
try:
from urllib3.exceptions import RemoteDisconnected
except ImportError:
class RemoteDisconnected(Exception):
pass
# 模拟浏览器请求头池
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/114.0",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.68"
]
# 模拟不同的浏览器指纹
ACCEPT_HEADERS = [
"text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"
]
def get_headers():
"""动态生成更接近真实浏览器的请求头"""
return {
"User-Agent": random.choice(USER_AGENTS),
"Referer": "https://www.douyinxs/",
"Accept": random.choice(ACCEPT_HEADERS),
"Accept-Language": "zh-CN,zh;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Cache-Control": f"max-age={random.randint(0, 300)}"
}
def print_progress(completed, total):
"""打印自定义进度条"""
percent = completed / total * 100
bar_length = 50
filled_length = int(bar_length * completed // total)
# 使用颜色美化进度条
bar = f"{Fore.GREEN}{'█' * filled_length}{Style.RESET_ALL}{'-' * (bar_length - filled_length)}"
sys.stdout.write(f'\r进度: [{bar}] {percent:.1f}% ({completed}/{total})')
sys.stdout.flush()
# 完成时换行
if completed == total:
print()
async def create_async_session():
"""创建异步会话"""
timeout = aiohttp.ClientTimeout(total=30)
connector = aiohttp.TCPConnector(ssl=False) # 跳过SSL验证
session = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers=get_headers()
)
return session
def search_novels(keyword):
"""搜索小说并返回结果列表(同步操作)"""
import requests
search_url = "https://www.douyinxs/search/"
data = {"searchkey": keyword}
max_retries = 5
for retry in range(max_retries):
try:
time.sleep(random.uniform(1, 2)) # 缩短搜索延迟
response = requests.post(
search_url,
data=data,
headers=get_headers(),
timeout=15,
allow_redirects=True,
verify=False
)
if response.status_code != 200:
print(f"搜索请求失败,状态码:{response.status_code},正在重试...")
time.sleep(3 * (retry + 1))
continue
response.encoding = "utf-8"
soup = BeautifulSoup(response.text, "html.parser")
# 定位搜索结果列表
result_list = soup.select_one("#main > div.novelslist2 > ul")
if not result_list:
print("未找到相关小说")
return []
# 提取小说信息
novels = []
for idx, item in enumerate(result_list.find_all("li")[1:], 1):
category = item.select_one(".s1").text.strip()
title_tag = item.select_one(".s2 a")
title = title_tag.text.strip() if title_tag else f"未知标题{idx}"
href = title_tag["href"] if title_tag else ""
author = item.select_one(".s4").text.strip()
word_count = item.select_one(".s5").text.strip()
status = item.select_one(".s7").text.strip()
# 补全链接为绝对路径
if href and not href.startswith("http"):
href = f"https://www.douyinxs{href}"
novels.append({
"index": idx,
"title": title,
"author": author,
"category": category,
"word_count": word_count,
"status": status,
"url": href
})
print(f"{idx}. {title} | 作者:{author} | 分类:{category} | 字数:{word_count} | 状态:{status}")
return novels
except (requests.exceptions.ConnectionError,
RemoteDisconnected,
requests.exceptions.Timeout):
print(f"{Fore.RED}搜索连接失败,正在重试第{retry + 1}/{max_retries}次...{Style.RESET_ALL}")
time.sleep(5 * (retry + 1))
except Exception as e:
print(f"{Fore.RED}搜索发生错误:{str(e)},正在重试...{Style.RESET_ALL}")
time.sleep(3 * (retry + 1))
print(f"{Fore.RED}达到最大重试次数,搜索失败{Style.RESET_ALL}")
return []
async def fetch_chapter_page(session, url, semaphore, retry=3):
"""异步获取单页章节列表"""
async with semaphore:
try:
# 缩短章节列表请求延迟
await asyncio.sleep(random.uniform(0.5, 1.5))
async with session.get(url) as response:
if response.status != 200:
if retry > 0:
await asyncio.sleep(1)
return await fetch_chapter_page(session, url, semaphore, retry - 1)
print(f"{Fore.YELLOW}章节页 {url} 请求失败,状态码:{response.status}{Style.RESET_ALL}")
return None, None
content = await response.text()
soup = BeautifulSoup(content, "html.parser")
# 查找下一页链接
next_link = soup.select_one("body > div.index-container > a:nth-child(3)")
next_url = None
if next_link and "下一页" in next_link.text:
next_url = urljoin("https://www.douyinxs", next_link["href"])
# 提取章节(从第13项开始为正文)
chapters = []
chapter_dl = soup.select_one("#list > dl")
if chapter_dl:
dd_tags = chapter_dl.find_all("dd")
for dd in dd_tags[12:]: # 跳过前12个非正文章节
a_tag = dd.find("a")
if a_tag:
chapter_title = a_tag.text.strip()
chapter_url = urljoin("https://www.douyinxs", a_tag["href"])
chapters.append({
"title": chapter_title,
"url": chapter_url
})
return chapters, next_url
except (aiohttp.ClientError, RemoteDisconnected) as e:
if retry > 0:
await asyncio.sleep(1)
return await fetch_chapter_page(session, url, semaphore, retry - 1)
print(f"{Fore.RED}获取章节页 {url} 失败:{str(e)}{Style.RESET_ALL}")
return None, None
async def get_all_chapters_async(novel_url):
"""异步爬取小说所有正文章节,大幅提升速度"""
chapters = []
current_url = novel_url
page_count = 1
# 创建信号量控制章节列表并发请求数
semaphore = asyncio.Semaphore(3)
async with await create_async_session() as session:
print("开始异步获取章节列表...")
# 循环获取所有章节页
while current_url:
print(f"正在获取第 {page_count} 页章节...", end="\r")
# 异步获取当前页章节
page_chapters, next_url = await fetch_chapter_page(session, current_url, semaphore)
if page_chapters:
chapters.extend(page_chapters)
page_count += 1
# 准备获取下一页
current_url = next_url
print(f"\r已获取所有章节页,共 {page_count - 1} 页")
return chapters
async def fetch_url(session, url, retry=3):
"""异步获取URL内容"""
try:
await asyncio.sleep(random.uniform(0.3, 1.2))
async with session.get(url) as response:
if response.status != 200:
if retry > 0:
await asyncio.sleep(2)
return await fetch_url(session, url, retry - 1)
return None, response.status
content = await response.text()
return content, response.status
except (aiohttp.ClientError, RemoteDisconnected) as e:
if retry > 0:
await asyncio.sleep(2)
return await fetch_url(session, url, retry - 1)
print(f"{Fore.RED}获取URL失败:{url},错误:{str(e)}{Style.RESET_ALL}")
return None, 500
except Exception as e:
print(f"{Fore.RED}处理URL时出错:{url},错误:{str(e)}{Style.RESET_ALL}")
return None, 500
async def get_full_chapter_content_async(session, chapter_url, semaphore):
"""异步获取完整的章节内容(带信号量控制并发)"""
async with semaphore:
full_content = []
current_url = chapter_url
page_count = 1
max_page_retries = 3
while True:
content, status = await fetch_url(session, current_url, max_page_retries)
if not content or status != 200:
print(f"{Fore.YELLOW}章节分页 {page_count} 请求失败,状态码:{status}{Style.RESET_ALL}")
return "\n\n".join(full_content)
soup = BeautifulSoup(content, "html.parser")
# 提取当前页内容
content_tag = soup.select_one("#content")
if not content_tag:
print(f"{Fore.YELLOW}警告:章节分页 {page_count} 内容提取失败{Style.RESET_ALL}")
return "\n\n".join(full_content)
# 清理当前页内容
for p in content_tag.find_all("p"):
text = p.text.strip()
if text:
full_content.append(text)
# 查找下一页链接
next_link = soup.select_one("#next")
if next_link and "下一页" in next_link.text:
next_url = urljoin("https://www.douyinxs", next_link["href"])
current_url = next_url
page_count += 1
else:
return "\n\n".join(full_content)
async def get_chapter_title_async(session, chapter_url):
"""异步获取章节标题"""
try:
content, status = await fetch_url(session, chapter_url, 2)
if not content or status != 200:
return None
soup = BeautifulSoup(content, "html.parser")
chapter_title_tag = soup.select_one("div.bookname h1")
return chapter_title_tag.text.strip() if chapter_title_tag else None
except Exception as e:
print(f"{Fore.YELLOW}获取章节标题失败:{str(e)}{Style.RESET_ALL}")
return None
async def download_chapters_async(novel_title, chapters, max_concurrent=5):
"""异步下载所有章节内容,保持章节顺序"""
# 创建保存文件夹(如果不存在)
if not os.path.exists(SAVE_FOLDER):
os.makedirs(SAVE_FOLDER)
print(f"{Fore.CYAN}已创建保存文件夹:{os.path.abspath(SAVE_FOLDER)}{Style.RESET_ALL}")
# 创建安全的文件名
safe_title = "".join([c for c in novel_title if c not in '/\\:*?"<>|'])
filename = os.path.join(SAVE_FOLDER, f"{safe_title}.txt")
success_count = 0
fail_count = 0
chapter_results = []
completed_count = 0 # 用于跟踪进度
total_chapters = len(chapters) # 总章节数
# 定义进度更新函数
def update_progress():
nonlocal completed_count
completed_count += 1
print_progress(completed_count, total_chapters)
semaphore = asyncio.Semaphore(max_concurrent)
async with await create_async_session() as session:
# 初始化进度条
print_progress(0, total_chapters)
tasks = []
for index, chap in enumerate(chapters):
# 传递进度更新函数
task = asyncio.ensure_future(
download_single_chapter(
session, chap, index, semaphore, update_progress
)
)
tasks.append(task)
chapter_results = await asyncio.gather(*tasks)
# 按索引排序确保顺序
chapter_results.sort(key=lambda x: x[0])
# 写入文件
try:
with open(filename, "w", encoding="utf-8") as f:
for result in chapter_results:
index, chapter_title, content = result
if content:
f.write(f"【{chapter_title}】\n")
f.write(content)
f.write("\n\n")
success_count += 1
else:
fail_count += 1
print(f"{Fore.YELLOW}警告:章节《{chapter_title}》内容为空,已跳过{Style.RESET_ALL}")
print(
f"\n{Fore.GREEN}下载完成!共{len(chapters)}章,成功下载{success_count}章,失败{fail_count}章{Style.RESET_ALL}")
print(f"{Fore.GREEN}文件已保存至:{os.path.abspath(filename)}{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}文件写入失败:{str(e)}{Style.RESET_ALL}")
async def download_single_chapter(session, chap, index, semaphore, progress_callback):
"""下载单个章节并返回带索引的结果"""
try:
content = await get_full_chapter_content_async(session, chap["url"], semaphore)
chapter_title = await get_chapter_title_async(session, chap["url"])
if not chapter_title:
chapter_title = chap["title"]
# 调用回调函数更新进度
progress_callback()
return index, chapter_title, content
except Exception as e:
print(f"\n{Fore.RED}章节《{chap['title']}》下载失败:{str(e)}{Style.RESET_ALL}")
# 即使失败也更新进度
progress_callback()
return index, chap["title"], None
# 新增:将获取章节列表和下载章节合并为一个异步函数
async def main_async():
try:
keyword = input("\n请输入搜索关键字:")
novels = search_novels(keyword)
if not novels:
return
choice = int(input("\n请输入要下载的小说序号:"))
selected_novel = next((n for n in novels if n["index"] == choice), None)
if not selected_novel:
print("无效的序号")
return
print(f"\n正在获取《{selected_novel['title']}》的章节列表...")
# 异步获取章节列表
chapters = await get_all_chapters_async(selected_novel["url"])
if not chapters:
print("未获取到任何章节")
return
print(f"\n《{selected_novel['title']}》共找到 {len(chapters)} 个正文章节,开始异步下载...")
print(f"并发数:200(可通过修改max_concurrent参数调整)")
for i, chap in enumerate(chapters[:5], 1):
print(f"{i}. {chap['title']}")
if len(chapters) > 5:
print(f"... 省略 {len(chapters) - 5} 章")
# 异步下载章节
await download_chapters_async(
selected_novel["title"],
chapters,
max_concurrent=200 # 可根据需要调整并发数
)
except ValueError:
print("请输入有效的数字")
except Exception as e:
print(f"操作失败:{str(e)}")
def main():
# 使用统一的asyncio.run()避免事件循环警告
asyncio.run(main_async())
if __name__ == "__main__":
main()
版权声明:本文标题:Python语言实现无措小说爬虫-案例一 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://it.en369.cn/jiaocheng/1759991511a2836959.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。


发表评论