admin管理员组

文章数量:1031308

注:仅做学术研究,请勿用于他用。

需要安装BeautifulSoup、selenium 库

# -*- coding: UTF-8 -*-
# 导入相关库
import csv
from bs4 import BeautifulSoup
import bs4
from selenium import webdriver
from selenium.webdrivermon.action_chains import ActionChains  # 鼠标操作
from selenium.webdrivermon.by import By
from selenium.webdriver.firefox.service import Service as FirefoxService
from selenium.webdriver.chrome.service import Service as ChromeService
from webdriver_manager.firefox import GeckoDriverManager
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import os
import traceback
import random

# 获取信息函数
def get_info(soup, _type, element, param=None):
    if _type == "find":
        if param is not None:
            params = dict([param.split('=')])
            res = soup.find(element, **params)
        else:
            res = soup.find(element)
        if res is not None and res.string:
            res = res.string.replace(" ", "").replace("\n", "")
        else:
            res = "None"
    if _type == "find_all":
        if param is not None:
            params = dict([param.split('=')])
            res = soup.find_all(element, **params)
        else:
            res = soup.find_all(element)
    return res

# 填充大学列表函数
def fillUnivList(html, ulist):
    soup = BeautifulSoup(html, 'html.parser')
    tbody = soup.find('tbody')
    if not tbody:
        print("找不到表格体(tbody),请检查网页结构是否变化")
        return
    
    for tr in tbody.children:
        if isinstance(tr, bs4.element.Tag):  # 如果为Tag类型
            try:
                td_list = tr.find_all('td')
                if len(td_list) < 6:  # 确保有足够的列
                    print(f"警告: 行中只有 {len(td_list)} 列,需要至少6列")
                    continue
                
                # 排名
                try:
                    top = td_list[0].text.strip()
                except Exception as e:
                    print(f"提取排名时出错: {e}")
                    top = "N/A"
                
                # 学校信息
                try:
                    # logo
                    img = td_list[1].find('img')
                    logo = img["src"] if img and "src" in img.attrs else "无图片"
                    
                    # 查找大学名称 - 更灵活的查找方式
                    # 尝试多种可能的类名或结构
                    name_cn_elem = td_list[1].find(class_="name-cn")
                    if not name_cn_elem:
                        # 尝试其他可能的选择器
                        name_cn_elem = td_list[1].select_one('.univ-name strong, .university-name strong, strong, h3')
                    
                    ch_name = name_cn_elem.text.strip() if name_cn_elem else "未知"
                    
                    # 同样更灵活地查找英文名
                    name_en_elem = td_list[1].find(class_="name-en")
                    if not name_en_elem:
                        # 尝试其他可能的选择器
                        name_en_elem = td_list[1].select_one('.univ-name-en, .university-name-en, em, p:not([class])')
                    
                    en_name = name_en_elem.text.strip() if name_en_elem else "未知"
                    
                    # 打印调试信息
                    print(f"提取到学校: {ch_name} ({en_name})")
                        
                except Exception as e:
                    print(f"提取学校名称时出错: {e}")
                    ch_name = "未知"
                    en_name = "未知"
                
                # 学校标签
                try:
                    tag_elem = td_list[1].find('p') or td_list[1].find('div', {'class': 'tags'})
                    if tag_elem:
                        tags = tag_elem.text.strip()
                    else:
                        tags = "无标签"
                except Exception as e:
                    print(f"提取学校标签时出错: {e}")
                    tags = "无标签"
                
                # 学校地址
                try:
                    area = td_list[2].text.strip()
                except Exception as e:
                    print(f"提取学校地址时出错: {e}")
                    area = "未知"
                
                # 学校行业
                try:
                    main = td_list[3].text.strip()
                except Exception as e:
                    print(f"提取学校行业时出错: {e}")
                    main = "未知"
                
                # 综合分数
                try:
                    score = td_list[4].text.strip()
                except Exception as e:
                    print(f"提取综合分数时出错: {e}")
                    score = "未知"
                
                # 办学层次
                try:
                    layer = td_list[5].text.strip()
                except Exception as e:
                    print(f"提取办学层次时出错: {e}")
                    layer = "未知"
                
                # 添加到列表
                ulist.append([top, ch_name, en_name, tags, area, main, score, layer, logo])
                
            except Exception as e:
                print(f"处理表格行时出错: {e}")


# 保存到CSV函数
def saveToCSV(ulist, filename):
    headers = ['排名', '中文名', '英文名', '标签', '地址', '行业', '分数', '层次', 'Logo']
    with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
        writer = csv.writer(f)
        writer.writerow(headers)
        writer.writerows(ulist)


# 鼠标操作运行函数
def action_run(driver, actions, info, by=By.ID, time_num=1):
    try:
        # 使用WebDriverWait等待元素出现
        element = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((by, info))
        )
        
        if element.is_displayed():
            actions.move_to_element(element).click().perform()
            time.sleep(time_num)
            return True
        else:
            print("%s 元素不可见,等待中..." % (info))
            time.sleep(1)
            return False
    except Exception as e:
        print(f"找不到元素 {info}: {e}")
        return False


# 主函数
if __name__ == "__main__":
    url = "https://www.shanghairanking/rankings/bcur/2025"
    start = time.strftime("%H:%M:%S", time.localtime())
    
    # 添加随机User-Agent
    user_agents = [
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
        "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36",
    ]
    
    # 尝试使用不同的浏览器
    try:
        # 尝试使用Firefox
        firefox_options = webdriver.FirefoxOptions()
        firefox_options.add_argument(f"user-agent={random.choice(user_agents)}")
        driver = webdriver.Firefox(service=FirefoxService(GeckoDriverManager().install()), options=firefox_options)
        print("使用Firefox浏览器")
    except Exception as e:
        print(f"Firefox启动失败: {e}")
        try:
            # 尝试使用Chrome
            chrome_options = webdriver.ChromeOptions()
            chrome_options.add_argument(f"user-agent={random.choice(user_agents)}")
            driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=chrome_options)
            print("使用Chrome浏览器")
        except Exception as e:
            print(f"Chrome启动失败: {e}")
            print("请确保已安装Chrome或Firefox浏览器,并且网络环境可以下载WebDriver")
            exit(1)
            
    driver.maximize_window()
    driver.get(url)
    print(f"正在访问: {url}")
    # 等待页面加载
    time.sleep(5)  # 增加等待时间,确保页面完全加载

    "模拟鼠标操作"
    actions = ActionChains(driver)
    ulist = []
    
    print("开始爬取数据...")
    page_count = 0
    max_pages = 20  # 设置最大页数
    
    try:
        for i in range(max_pages):
            page_count += 1
            print(f"正在爬取第 {page_count} 页...")
            
            # 等待表格加载完成
            try:
                WebDriverWait(driver, 10).until(
                    EC.presence_of_element_located((By.TAG_NAME, "tbody"))
                )
            except Exception as e:
                print(f"等待表格加载时出错: {e}")
                break
                
            html = driver.page_source
            
            # 保存第一页HTML用于调试
            if page_count == 1:
                with open("first_page.html", "w", encoding="utf-8") as f:
                    f.write(html)
                print("已保存第一页HTML用于调试")
                
                # 检查网页结构
                soup = BeautifulSoup(html, 'html.parser')
                first_row = soup.find('tbody').find('tr')
                if first_row:
                    second_cell = first_row.find_all('td')[1] if len(first_row.find_all('td')) > 1 else None
                    if second_cell:
                        print("\n第一所大学的第二列HTML结构:")
                        print(second_cell.prettify())
                        # 检查name-cn和name-en类是否存在
                        if second_cell.find(class_="name-cn"):
                            print("找到class='name-cn'元素")
                        else:
                            print("警告: 找不到class='name-cn'元素")
                        
                        if second_cell.find(class_="name-en"):
                            print("找到class='name-en'元素")
                        else:
                            print("警告: 找不到class='name-en'元素")
                        print("\n")
                
            # 确认表格存在
            soup = BeautifulSoup(html, 'html.parser')
            if not soup.find('tbody'):
                print("找不到数据表格,网页结构可能已变化")
                # 保存当前页面以便调试
                with open(f"page_{page_count}.html", "w", encoding="utf-8") as f:
                    f.write(html)
                break
            
            # 填充数据
            before_count = len(ulist)
            fillUnivList(html, ulist)
            after_count = len(ulist)
            
            # 检查是否有新数据添加
            if after_count > before_count:
                print(f"成功获取 {after_count - before_count} 所大学的数据,当前总数: {after_count}")
            else:
                print("警告: 本页未能获取任何新数据")
                # 保存当前页面以便调试
                with open(f"page_{page_count}_no_data.html", "w", encoding="utf-8") as f:
                    f.write(html)
            
            # 滚动至底部
            driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(2)  # 增加滚动后的等待时间
            
            # 点击下一页 - 修改选择器和尝试多种可能的下一页按钮
            next_button_found = False
            # 尝试多种可能的下一页按钮选择器
            selectors = [
                "li[title='下一页']", 
                ".ant-pagination-next", 
                "a.next", 
                "button.next-page",
                ".pagination .next",
                ".pagination-next",
                "//li[contains(text(), '下一页')]",  # XPath
                "//button[contains(text(), '下一页')]",  # XPath
                "//a[contains(text(), '下一页')]"  # XPath
            ]
            
            for selector in selectors:
                try:
                    # 尝试CSS选择器
                    if selector.startswith("//"):
                        next_button_found = action_run(driver, actions, info=selector, by=By.XPATH, time_num=2)
                    else:
                        next_button_found = action_run(driver, actions, info=selector, by=By.CSS_SELECTOR, time_num=2)
                    
                    if next_button_found:
                        print(f"使用选择器 '{selector}' 找到下一页按钮")
                        break
                except Exception as e:
                    continue
            
            if not next_button_found:
                print("尝试了所有可能的下一页按钮选择器,但均未成功")
                print("可能已到达最后一页,结束爬取")
                break
            
            # 随机暂停,避免被检测为爬虫
            sleep_time = random.uniform(1.5, 3.5)
            print(f"随机等待 {sleep_time:.2f} 秒...")
            time.sleep(sleep_time)
                
    except Exception as e:
        print(f"爬取过程中出现错误: {e}")
        traceback.print_exc()  # 打印详细的错误堆栈
    finally:
        end = time.strftime("%H:%M:%S", time.localtime())
        print("用时 %s - %s" % (start, end))  # 时长统计
        
        # 保存为CSV文件
        filename = 'D:\\ranking\\university_rankings_2025.csv'
        if ulist:
            print(f"准备保存 {len(ulist)} 条记录到CSV文件")
            try:
                saveToCSV(ulist, filename)
                print(f"数据已保存到 {filename}")
                # 验证文件是否创建
                if os.path.exists(filename):
                    print(f"已确认文件 {filename} 成功创建,大小: {os.path.getsize(filename)} 字节")
                else:
                    print(f"警告: 文件 {filename} 未找到,可能保存失败")
            except Exception as e:
                print(f"保存CSV时出错: {e}")
                traceback.print_exc()
        else:
            print("没有获取到数据,不保存文件")
        
        # 关闭浏览器
        driver.quit()
        print("浏览器已关闭")

注:仅做学术研究,请勿用于他用。

需要安装BeautifulSoup、selenium 库

# -*- coding: UTF-8 -*-
# 导入相关库
import csv
from bs4 import BeautifulSoup
import bs4
from selenium import webdriver
from selenium.webdrivermon.action_chains import ActionChains  # 鼠标操作
from selenium.webdrivermon.by import By
from selenium.webdriver.firefox.service import Service as FirefoxService
from selenium.webdriver.chrome.service import Service as ChromeService
from webdriver_manager.firefox import GeckoDriverManager
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import os
import traceback
import random

# 获取信息函数
def get_info(soup, _type, element, param=None):
    if _type == "find":
        if param is not None:
            params = dict([param.split('=')])
            res = soup.find(element, **params)
        else:
            res = soup.find(element)
        if res is not None and res.string:
            res = res.string.replace(" ", "").replace("\n", "")
        else:
            res = "None"
    if _type == "find_all":
        if param is not None:
            params = dict([param.split('=')])
            res = soup.find_all(element, **params)
        else:
            res = soup.find_all(element)
    return res

# 填充大学列表函数
def fillUnivList(html, ulist):
    soup = BeautifulSoup(html, 'html.parser')
    tbody = soup.find('tbody')
    if not tbody:
        print("找不到表格体(tbody),请检查网页结构是否变化")
        return
    
    for tr in tbody.children:
        if isinstance(tr, bs4.element.Tag):  # 如果为Tag类型
            try:
                td_list = tr.find_all('td')
                if len(td_list) < 6:  # 确保有足够的列
                    print(f"警告: 行中只有 {len(td_list)} 列,需要至少6列")
                    continue
                
                # 排名
                try:
                    top = td_list[0].text.strip()
                except Exception as e:
                    print(f"提取排名时出错: {e}")
                    top = "N/A"
                
                # 学校信息
                try:
                    # logo
                    img = td_list[1].find('img')
                    logo = img["src"] if img and "src" in img.attrs else "无图片"
                    
                    # 查找大学名称 - 更灵活的查找方式
                    # 尝试多种可能的类名或结构
                    name_cn_elem = td_list[1].find(class_="name-cn")
                    if not name_cn_elem:
                        # 尝试其他可能的选择器
                        name_cn_elem = td_list[1].select_one('.univ-name strong, .university-name strong, strong, h3')
                    
                    ch_name = name_cn_elem.text.strip() if name_cn_elem else "未知"
                    
                    # 同样更灵活地查找英文名
                    name_en_elem = td_list[1].find(class_="name-en")
                    if not name_en_elem:
                        # 尝试其他可能的选择器
                        name_en_elem = td_list[1].select_one('.univ-name-en, .university-name-en, em, p:not([class])')
                    
                    en_name = name_en_elem.text.strip() if name_en_elem else "未知"
                    
                    # 打印调试信息
                    print(f"提取到学校: {ch_name} ({en_name})")
                        
                except Exception as e:
                    print(f"提取学校名称时出错: {e}")
                    ch_name = "未知"
                    en_name = "未知"
                
                # 学校标签
                try:
                    tag_elem = td_list[1].find('p') or td_list[1].find('div', {'class': 'tags'})
                    if tag_elem:
                        tags = tag_elem.text.strip()
                    else:
                        tags = "无标签"
                except Exception as e:
                    print(f"提取学校标签时出错: {e}")
                    tags = "无标签"
                
                # 学校地址
                try:
                    area = td_list[2].text.strip()
                except Exception as e:
                    print(f"提取学校地址时出错: {e}")
                    area = "未知"
                
                # 学校行业
                try:
                    main = td_list[3].text.strip()
                except Exception as e:
                    print(f"提取学校行业时出错: {e}")
                    main = "未知"
                
                # 综合分数
                try:
                    score = td_list[4].text.strip()
                except Exception as e:
                    print(f"提取综合分数时出错: {e}")
                    score = "未知"
                
                # 办学层次
                try:
                    layer = td_list[5].text.strip()
                except Exception as e:
                    print(f"提取办学层次时出错: {e}")
                    layer = "未知"
                
                # 添加到列表
                ulist.append([top, ch_name, en_name, tags, area, main, score, layer, logo])
                
            except Exception as e:
                print(f"处理表格行时出错: {e}")


# 保存到CSV函数
def saveToCSV(ulist, filename):
    headers = ['排名', '中文名', '英文名', '标签', '地址', '行业', '分数', '层次', 'Logo']
    with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
        writer = csv.writer(f)
        writer.writerow(headers)
        writer.writerows(ulist)


# 鼠标操作运行函数
def action_run(driver, actions, info, by=By.ID, time_num=1):
    try:
        # 使用WebDriverWait等待元素出现
        element = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((by, info))
        )
        
        if element.is_displayed():
            actions.move_to_element(element).click().perform()
            time.sleep(time_num)
            return True
        else:
            print("%s 元素不可见,等待中..." % (info))
            time.sleep(1)
            return False
    except Exception as e:
        print(f"找不到元素 {info}: {e}")
        return False


# 主函数
if __name__ == "__main__":
    url = "https://www.shanghairanking/rankings/bcur/2025"
    start = time.strftime("%H:%M:%S", time.localtime())
    
    # 添加随机User-Agent
    user_agents = [
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
        "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36",
    ]
    
    # 尝试使用不同的浏览器
    try:
        # 尝试使用Firefox
        firefox_options = webdriver.FirefoxOptions()
        firefox_options.add_argument(f"user-agent={random.choice(user_agents)}")
        driver = webdriver.Firefox(service=FirefoxService(GeckoDriverManager().install()), options=firefox_options)
        print("使用Firefox浏览器")
    except Exception as e:
        print(f"Firefox启动失败: {e}")
        try:
            # 尝试使用Chrome
            chrome_options = webdriver.ChromeOptions()
            chrome_options.add_argument(f"user-agent={random.choice(user_agents)}")
            driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=chrome_options)
            print("使用Chrome浏览器")
        except Exception as e:
            print(f"Chrome启动失败: {e}")
            print("请确保已安装Chrome或Firefox浏览器,并且网络环境可以下载WebDriver")
            exit(1)
            
    driver.maximize_window()
    driver.get(url)
    print(f"正在访问: {url}")
    # 等待页面加载
    time.sleep(5)  # 增加等待时间,确保页面完全加载

    "模拟鼠标操作"
    actions = ActionChains(driver)
    ulist = []
    
    print("开始爬取数据...")
    page_count = 0
    max_pages = 20  # 设置最大页数
    
    try:
        for i in range(max_pages):
            page_count += 1
            print(f"正在爬取第 {page_count} 页...")
            
            # 等待表格加载完成
            try:
                WebDriverWait(driver, 10).until(
                    EC.presence_of_element_located((By.TAG_NAME, "tbody"))
                )
            except Exception as e:
                print(f"等待表格加载时出错: {e}")
                break
                
            html = driver.page_source
            
            # 保存第一页HTML用于调试
            if page_count == 1:
                with open("first_page.html", "w", encoding="utf-8") as f:
                    f.write(html)
                print("已保存第一页HTML用于调试")
                
                # 检查网页结构
                soup = BeautifulSoup(html, 'html.parser')
                first_row = soup.find('tbody').find('tr')
                if first_row:
                    second_cell = first_row.find_all('td')[1] if len(first_row.find_all('td')) > 1 else None
                    if second_cell:
                        print("\n第一所大学的第二列HTML结构:")
                        print(second_cell.prettify())
                        # 检查name-cn和name-en类是否存在
                        if second_cell.find(class_="name-cn"):
                            print("找到class='name-cn'元素")
                        else:
                            print("警告: 找不到class='name-cn'元素")
                        
                        if second_cell.find(class_="name-en"):
                            print("找到class='name-en'元素")
                        else:
                            print("警告: 找不到class='name-en'元素")
                        print("\n")
                
            # 确认表格存在
            soup = BeautifulSoup(html, 'html.parser')
            if not soup.find('tbody'):
                print("找不到数据表格,网页结构可能已变化")
                # 保存当前页面以便调试
                with open(f"page_{page_count}.html", "w", encoding="utf-8") as f:
                    f.write(html)
                break
            
            # 填充数据
            before_count = len(ulist)
            fillUnivList(html, ulist)
            after_count = len(ulist)
            
            # 检查是否有新数据添加
            if after_count > before_count:
                print(f"成功获取 {after_count - before_count} 所大学的数据,当前总数: {after_count}")
            else:
                print("警告: 本页未能获取任何新数据")
                # 保存当前页面以便调试
                with open(f"page_{page_count}_no_data.html", "w", encoding="utf-8") as f:
                    f.write(html)
            
            # 滚动至底部
            driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(2)  # 增加滚动后的等待时间
            
            # 点击下一页 - 修改选择器和尝试多种可能的下一页按钮
            next_button_found = False
            # 尝试多种可能的下一页按钮选择器
            selectors = [
                "li[title='下一页']", 
                ".ant-pagination-next", 
                "a.next", 
                "button.next-page",
                ".pagination .next",
                ".pagination-next",
                "//li[contains(text(), '下一页')]",  # XPath
                "//button[contains(text(), '下一页')]",  # XPath
                "//a[contains(text(), '下一页')]"  # XPath
            ]
            
            for selector in selectors:
                try:
                    # 尝试CSS选择器
                    if selector.startswith("//"):
                        next_button_found = action_run(driver, actions, info=selector, by=By.XPATH, time_num=2)
                    else:
                        next_button_found = action_run(driver, actions, info=selector, by=By.CSS_SELECTOR, time_num=2)
                    
                    if next_button_found:
                        print(f"使用选择器 '{selector}' 找到下一页按钮")
                        break
                except Exception as e:
                    continue
            
            if not next_button_found:
                print("尝试了所有可能的下一页按钮选择器,但均未成功")
                print("可能已到达最后一页,结束爬取")
                break
            
            # 随机暂停,避免被检测为爬虫
            sleep_time = random.uniform(1.5, 3.5)
            print(f"随机等待 {sleep_time:.2f} 秒...")
            time.sleep(sleep_time)
                
    except Exception as e:
        print(f"爬取过程中出现错误: {e}")
        traceback.print_exc()  # 打印详细的错误堆栈
    finally:
        end = time.strftime("%H:%M:%S", time.localtime())
        print("用时 %s - %s" % (start, end))  # 时长统计
        
        # 保存为CSV文件
        filename = 'D:\\ranking\\university_rankings_2025.csv'
        if ulist:
            print(f"准备保存 {len(ulist)} 条记录到CSV文件")
            try:
                saveToCSV(ulist, filename)
                print(f"数据已保存到 {filename}")
                # 验证文件是否创建
                if os.path.exists(filename):
                    print(f"已确认文件 {filename} 成功创建,大小: {os.path.getsize(filename)} 字节")
                else:
                    print(f"警告: 文件 {filename} 未找到,可能保存失败")
            except Exception as e:
                print(f"保存CSV时出错: {e}")
                traceback.print_exc()
        else:
            print("没有获取到数据,不保存文件")
        
        # 关闭浏览器
        driver.quit()
        print("浏览器已关闭")

本文标签: 大学排名软科