285 lines
12 KiB
Python
285 lines
12 KiB
Python
import os
|
||
import time
|
||
import requests
|
||
import random
|
||
from urllib.parse import urlparse, unquote, quote
|
||
from dotenv import load_dotenv # <--- 新增导入
|
||
|
||
from selenium import webdriver
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.chrome.service import Service
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
from webdriver_manager.chrome import ChromeDriverManager
|
||
from selenium.common.exceptions import TimeoutException, NoSuchElementException
|
||
|
||
# --- 从 .env 文件加载环境变量 ---
|
||
load_dotenv()
|
||
|
||
# --- 从环境变量中读取配置 ---
|
||
COURSE_LIST_URL = os.getenv("COURSE_LIST_URL")
|
||
COURSE_BUTTON_LOCATOR_XPATH = os.getenv("COURSE_BUTTON_LOCATOR_XPATH")
|
||
|
||
# 检查环境变量是否成功加载
|
||
if not COURSE_LIST_URL or not COURSE_BUTTON_LOCATOR_XPATH:
|
||
print("错误:请确保 .env 文件存在于脚本相同目录下,")
|
||
print("并且其中包含 COURSE_LIST_URL 和 COURSE_BUTTON_LOCATOR_XPATH 变量。")
|
||
exit()
|
||
|
||
# --- 主程序 ---
|
||
|
||
def setup_driver():
|
||
"""初始化并返回一个Chrome WebDriver实例"""
|
||
print("正在设置浏览器驱动...")
|
||
service = Service(ChromeDriverManager().install())
|
||
driver = webdriver.Chrome(service=service)
|
||
driver.implicitly_wait(5)
|
||
return driver
|
||
|
||
def try_fix_url(url, expected_filename):
|
||
"""尝试根据期望的文件名修复URL中的文件名部分,并正确处理URL编码
|
||
|
||
Args:
|
||
url: 原始URL(可能已被错误解码)
|
||
expected_filename: 期望的文件名
|
||
|
||
Returns:
|
||
修复后的URL(正确编码)
|
||
"""
|
||
if not expected_filename:
|
||
return url
|
||
|
||
# 从URL中提取基础部分和文件名部分
|
||
# 例如: https://app.teachermate.com.cn/9cdLn-1693370741604-Chapter One From C to C .ppt
|
||
# 分解为: https://app.teachermate.com.cn/ + 9cdLn-1693370741604- + Chapter One From C to C .ppt
|
||
|
||
try:
|
||
# 找到最后一个 '-' 之前的部分
|
||
last_dash_idx = url.rfind('-')
|
||
if last_dash_idx > 0:
|
||
# 获取URL的前缀部分(包含ID)
|
||
prefix = url[:last_dash_idx + 1]
|
||
|
||
# 从期望文件名中提取实际文件名(去掉前面的K编号)
|
||
# 例如: K0002-1-1 Chapter One From C to C++.ppt -> Chapter One From C to C++.ppt
|
||
filename_parts = expected_filename.split()
|
||
actual_filename = None
|
||
for i, part in enumerate(filename_parts):
|
||
if not part.startswith('K') or i > 0:
|
||
actual_filename = ' '.join(filename_parts[i:])
|
||
break
|
||
|
||
if actual_filename:
|
||
# 正确编码文件名:
|
||
# 1. 先对整个文件名进行URL编码(+会变成%2B)
|
||
# 2. 然后将%20替换为+(这是URL中空格的标准表示)
|
||
encoded_filename = quote(actual_filename, safe='.')
|
||
# 将空格的编码%20替换为+
|
||
encoded_filename = encoded_filename.replace('%20', '+')
|
||
|
||
# 构建新的URL
|
||
fixed_url = prefix + encoded_filename
|
||
return fixed_url
|
||
except Exception as e:
|
||
print(f" URL修复失败: {e}")
|
||
|
||
return url
|
||
|
||
def download_file(session, url, headers, directory=".", expected_filename=None):
|
||
"""使用requests库下载文件
|
||
|
||
Args:
|
||
session: requests会话对象
|
||
url: 下载链接
|
||
headers: 请求头
|
||
directory: 保存目录
|
||
expected_filename: 期望的文件名(从课件名称中提取),如果提供则优先使用
|
||
"""
|
||
try:
|
||
# 如果提供了期望的文件名,使用它;否则从URL中提取
|
||
if expected_filename:
|
||
meaningful_name = expected_filename
|
||
print(f"准备下载文件: {meaningful_name} (使用课件原始名称)")
|
||
else:
|
||
path = urlparse(url).path
|
||
filename = unquote(os.path.basename(path))
|
||
clean_filename = filename.split('?')[0]
|
||
|
||
try:
|
||
meaningful_name = clean_filename.split('-')[-1]
|
||
except:
|
||
meaningful_name = clean_filename
|
||
|
||
print(f"准备下载文件: {meaningful_name}")
|
||
|
||
response = session.get(url, stream=True, headers=headers)
|
||
response.raise_for_status()
|
||
|
||
filepath = os.path.join(directory, meaningful_name)
|
||
with open(filepath, 'wb') as f:
|
||
for chunk in response.iter_content(chunk_size=8192):
|
||
f.write(chunk)
|
||
print(f"成功下载并保存文件到: {filepath}\n")
|
||
return True
|
||
except requests.exceptions.RequestException as e:
|
||
# 特别处理403和404错误,重新抛出以便上层捕获
|
||
if e.response is not None and e.response.status_code in [403, 404]:
|
||
raise e # 重新抛出异常
|
||
print(f"下载文件时出错: {e}\n")
|
||
return False
|
||
except Exception as e:
|
||
print(f"处理文件时发生未知错误: {e}\n")
|
||
return False
|
||
|
||
def main():
|
||
driver = setup_driver()
|
||
|
||
try:
|
||
print(f"正在访问页面: {COURSE_LIST_URL}")
|
||
driver.get(COURSE_LIST_URL)
|
||
|
||
# --- 使用从 .env 文件读取的定位符 ---
|
||
course_button_locator = (By.XPATH, COURSE_BUTTON_LOCATOR_XPATH)
|
||
|
||
WebDriverWait(driver, 20).until(
|
||
EC.presence_of_element_located(course_button_locator)
|
||
)
|
||
|
||
user_agent = driver.execute_script("return navigator.userAgent;")
|
||
|
||
course_buttons = driver.find_elements(*course_button_locator)
|
||
num_courses = len(course_buttons)
|
||
print(f"检测到 {num_courses} 个课件,准备开始处理...")
|
||
|
||
session = requests.Session()
|
||
for cookie in driver.get_cookies():
|
||
session.cookies.set(cookie['name'], cookie['value'])
|
||
|
||
for i in range(num_courses):
|
||
course_buttons = WebDriverWait(driver, 10).until(
|
||
EC.presence_of_all_elements_located(course_button_locator)
|
||
)
|
||
button = course_buttons[i]
|
||
|
||
course_name = button.text.strip().replace('\n', ' ') if button.text.strip() else f"课件 {i+1}"
|
||
print(f"--- 正在处理第 {i+1}/{num_courses} 个课件: {course_name} ---")
|
||
|
||
# 从课件名称中提取实际文件名(去掉前面的ID和状态标记)
|
||
# 例如:"K0002-1-1 Chapter One From C to C++.ppt 已读 Chapter One From C to C++.ppt"
|
||
# 提取最后一个部分作为文件名
|
||
parts = course_name.split()
|
||
# 找到文件扩展名的位置
|
||
expected_filename = None
|
||
for idx, part in enumerate(parts):
|
||
if part.lower().endswith(('.ppt', '.pptx', '.pdf', '.doc', '.docx', '.xls', '.xlsx')):
|
||
# 从这个位置开始往前找,直到遇到"已读"或"未读"
|
||
start_idx = 0
|
||
for j in range(idx, -1, -1):
|
||
if parts[j] in ['已读', '未读']:
|
||
start_idx = j + 1
|
||
break
|
||
expected_filename = ' '.join(parts[start_idx:idx+1])
|
||
break
|
||
|
||
print(f"提取的文件名: {expected_filename}")
|
||
|
||
try:
|
||
driver.execute_script("arguments[0].click();", button)
|
||
print("已点击课件按钮,等待操作菜单弹出...")
|
||
|
||
preview_button_locator = (By.XPATH, "//span[text()='预览课件']")
|
||
preview_button = WebDriverWait(driver, 10).until(
|
||
EC.element_to_be_clickable(preview_button_locator)
|
||
)
|
||
|
||
driver.execute_script("arguments[0].click();", preview_button)
|
||
print("已点击“预览课件”选项。")
|
||
|
||
print("正在当前页面寻找预览iframe...")
|
||
iframe_locator = (By.XPATH, "//iframe[contains(@src, 'ow365.cn')]")
|
||
iframe_element = WebDriverWait(driver, 20).until(
|
||
EC.presence_of_element_located(iframe_locator)
|
||
)
|
||
|
||
ow365_url = iframe_element.get_attribute('src')
|
||
print("成功找到预览URL。")
|
||
|
||
modified_url = ow365_url.replace("ssl=1", "ssl=0")
|
||
print("修改URL并正在访问...")
|
||
driver.get(modified_url)
|
||
|
||
print("正在从页面文本中提取最终下载链接...")
|
||
url_div_locator = (By.CSS_SELECTOR, "div[style='color:red']")
|
||
url_element = WebDriverWait(driver, 20).until(
|
||
EC.presence_of_element_located(url_div_locator)
|
||
)
|
||
|
||
full_download_link = url_element.text.strip()
|
||
|
||
headers = { 'User-Agent': user_agent, 'Referer': modified_url }
|
||
|
||
# ==================================================================
|
||
# vv 最终决定性测试 vv
|
||
# ==================================================================
|
||
|
||
# 1. 优先尝试直链(删除参数)
|
||
clean_url = full_download_link.split('?')[0]
|
||
print("\n>>> 正在尝试直链(删除参数)下载...")
|
||
print(f" 将要访问的URL: {clean_url}")
|
||
|
||
try:
|
||
# 尝试使用无参数的URL下载,并传入期望的文件名
|
||
download_succeeded = download_file(session, clean_url, headers, expected_filename=expected_filename)
|
||
if not download_succeeded:
|
||
# 如果下载失败但不是403,也打印一下信息
|
||
print(" 您指定的方法下载失败,但错误不是403。")
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
# 如果是因为403错误而失败,证明此路不通
|
||
if e.response is not None and e.response.status_code == 403:
|
||
print(">>> 正在自动切换到备用方案(保留参数)进行重试...")
|
||
|
||
# 2. 备用方案:使用完整的带有签名参数的URL重试
|
||
try:
|
||
download_file(session, full_download_link, headers, expected_filename=expected_filename)
|
||
except Exception as retry_error:
|
||
print(f" 备用方案也失败了: {retry_error}")
|
||
elif e.response is not None and e.response.status_code == 404:
|
||
# 404错误,尝试修复URL
|
||
print(">>> 检测到404错误,尝试修复URL...")
|
||
fixed_url = try_fix_url(clean_url, expected_filename)
|
||
if fixed_url != clean_url:
|
||
print(f" 修复后的URL: {fixed_url}")
|
||
try:
|
||
download_file(session, fixed_url, headers, expected_filename=expected_filename)
|
||
except Exception as fix_error:
|
||
print(f" 修复后的URL仍然无法下载: {fix_error}")
|
||
print(f" 跳过此文件。")
|
||
else:
|
||
print(" 无法修复URL,跳过此文件。")
|
||
else:
|
||
# 如果是其他网络错误
|
||
print(f" 您指定的方法下载时遇到未知网络错误: {e}")
|
||
|
||
|
||
except (TimeoutException, NoSuchElementException) as e:
|
||
print(f"处理课件 '{course_name}' 时出错: {e}")
|
||
print("跳过此课件,继续下一个...")
|
||
|
||
finally:
|
||
print("正在返回课程列表页面...")
|
||
driver.get(COURSE_LIST_URL)
|
||
|
||
sleep_time = random.uniform(1, 2)
|
||
print(f"暂停 {sleep_time:.2f} 秒...")
|
||
time.sleep(sleep_time)
|
||
|
||
finally:
|
||
print("所有任务已完成,即将关闭浏览器。")
|
||
time.sleep(5)
|
||
driver.quit()
|
||
|
||
if __name__ == "__main__":
|
||
main()
|
||
|