Files
XueXiTongSrtDownloads/srt_downloader.py
ChuXun 1aacbb1d18 1
2026-01-18 18:53:17 +08:00

308 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import json
import os
import re
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Optional
from urllib.parse import parse_qs, urlparse
import requests
from bs4 import BeautifulSoup
from dotenv import load_dotenv
# Constants for the Chaoxing endpoints
CARDS_URL = "https://mooc1.chaoxing.com/mooc-ans/knowledge/cards"
# Primary endpoints visible in cards payload / player scripts
RICH_SUBTITLE_URL = "https://mooc1.chaoxing.com/mooc-ans/richvideo/subtitle"
RICH_ALLSUBTITLE_URL = "https://mooc1.chaoxing.com/mooc-ans/richvideo/allsubtitle"
# Fallback (may return 500 depending on auth):
EDITOR_SUBTITLE_URL = "https://mooc1.chaoxing.com/ananas/video-editor/sub"
# Regex helpers
_ONCLICK_RE = re.compile(r"getTeacherAjax\('(?P<course>\d+)',\s*'(?P<class>\d+)',\s*'(?P<knowledge>\d+)'\)")
_JSON_RE = re.compile(r"mArg\s*=\s*(\{.*?\});", re.DOTALL)
_INVALID_FILENAME_RE = re.compile(r"[\\/:*?\"<>|]")
@dataclass(frozen=True)
class KnowledgeEntry:
knowledge_id: str
title: str
def load_cookie_from_env(env_var: str = "CHAOXING_COOKIE") -> Dict[str, str]:
"""Parse a raw cookie string from environment variables into a dict."""
# Load .env values into environment first
try:
load_dotenv()
except Exception:
pass
raw_cookie = os.getenv(env_var, "").strip()
cookies: Dict[str, str] = {}
if not raw_cookie:
return cookies
for fragment in raw_cookie.split(";"):
if not fragment.strip():
continue
if "=" not in fragment:
continue
name, value = fragment.split("=", 1)
cookies[name.strip()] = value.strip()
return cookies
def sanitize_filename(name: str) -> str:
"""Turn a lesson title into a safe filename for Windows."""
cleaned = _INVALID_FILENAME_RE.sub("_", name)
return cleaned.strip() or "untitled"
def parse_knowledge_entries(html_path: Path) -> List[KnowledgeEntry]:
"""Scan the saved HTML page and extract all knowledge entries."""
soup = BeautifulSoup(html_path.read_text(encoding="utf-8"), "html.parser")
entries: Dict[str, KnowledgeEntry] = {}
for div in soup.select("div.posCatalog_select"):
div_id = div.get("id", "")
if not div_id.startswith("cur"):
continue
knowledge_id = div_id[3:]
span = div.select_one("span.posCatalog_name")
if not span:
continue
title_attr = span.get("title")
em = span.select_one("em.posCatalog_sbar")
if title_attr:
if em:
title = f"{em.get_text(strip=True)} {title_attr}"
else:
title = title_attr
else:
title = span.get_text(strip=True)
title = " ".join(title.split())
if knowledge_id not in entries:
entries[knowledge_id] = KnowledgeEntry(knowledge_id=knowledge_id, title=title)
return list(entries.values())
def extract_marg_json(cards_html: str) -> Optional[dict]:
"""Pull the mArg JSON payload from a cards response."""
match = _JSON_RE.search(cards_html)
if not match:
return None
json_block = match.group(1)
# Fix JavaScript style booleans/nulls if necessary
json_block = json_block.replace("\n", " ")
try:
return json.loads(json_block)
except json.JSONDecodeError:
# Try to sanitize single quotes inside keys by using eval-safe replacement
sanitized = json_block
sanitized = sanitized.replace("'", '\"')
try:
return json.loads(sanitized)
except json.JSONDecodeError:
return None
def load_local_cards_marg(cards_dir: Path, knowledge_id: str) -> Optional[dict]:
"""Try to parse saved cards.html to get mArg when network call fails."""
# Saved file path observed: 学生学习页面_files/cards.html
candidate = cards_dir / "cards.html"
if not candidate.exists():
return None
try:
text = candidate.read_text(encoding="utf-8", errors="ignore")
marg = extract_marg_json(text)
# Optionally ensure the knowledgeid matches
if marg and str(marg.get("defaults", {}).get("knowledgeid")) == str(knowledge_id):
return marg
return marg
except Exception:
return None
def iter_video_attachments(marg: dict) -> Iterable[dict]:
"""Yield the attachment dicts that correspond to videos."""
attachments = marg.get("attachments") or []
for attachment in attachments:
if attachment.get("type") == "video":
yield attachment
def is_html_error(text: str) -> bool:
t = text.strip().lower()
return t.startswith("<!doctype html>") and ("500" in t or "用户登录" in t)
def fetch_cards(session: requests.Session, knowledge_id: str, base_params: Dict[str, str]) -> Optional[dict]:
params = dict(base_params)
params["knowledgeid"] = knowledge_id
response = session.get(CARDS_URL, params=params, timeout=10)
if response.status_code != 200:
print(f"[WARN] Failed to load cards for knowledge {knowledge_id}: HTTP {response.status_code}")
return None
marg = extract_marg_json(response.text)
if marg is None:
# Fallback: try local saved cards
cards_dir = Path("学生学习页面_files")
marg = load_local_cards_marg(cards_dir, knowledge_id)
if marg is None:
print(f"[WARN] Could not locate mArg JSON for knowledge {knowledge_id}")
return marg
def download_subtitle(session: requests.Session, marg: dict, object_id: str, mid: Optional[str], course_id: str, output_path: Path) -> bool:
# Try richvideo/allsubtitle first (more complete), then richvideo/subtitle, then editor fallback
tried = []
def try_get(url: str, params: dict) -> Optional[str]:
tried.append((url, params))
r = session.get(url, params=params, timeout=10)
if r.status_code != 200:
return None
if not r.text.strip() or is_html_error(r.text):
return None
return r.text
# Prefer objectId and mid
subtitle_url_base = marg.get("defaults", {}).get("subtitleUrl") or RICH_SUBTITLE_URL
# 1) allsubtitle
params_all = {"mid": mid or marg.get("attachments", [{}])[0].get("mid"), "objectid": object_id, "courseId": course_id}
text = try_get(RICH_ALLSUBTITLE_URL, params_all)
if not text:
# 2) subtitle
params_sub = {"mid": params_all["mid"], "objectid": object_id, "courseId": course_id}
text = try_get(RICH_SUBTITLE_URL, params_sub)
if not text:
# 3) fallback editor endpoint
text = try_get(EDITOR_SUBTITLE_URL, {"objectid": object_id})
if text:
output_path.write_text(text, encoding="utf-8")
print(f"[INFO] Saved subtitle to {output_path}")
return True
print(f"[WARN] Subtitle download failed for object {object_id}. Tried: {tried}")
return False
def main() -> None:
load_dotenv()
html_path = Path("学生学习页面.html")
course_url = os.getenv("COURSE_URL", "").strip()
# Parse URL parameters from .env
clazzid, courseid, cpi = "", "", ""
if course_url:
parsed = urlparse(course_url)
qs = parse_qs(parsed.query)
clazzid = qs.get("clazzid", [""])[0]
courseid = qs.get("courseId", [""])[0] or qs.get("courseid", [""])[0]
cpi = qs.get("cpi", [""])[0]
print(f"[INFO] Extracted params: courseid={courseid}, clazzid={clazzid}, cpi={cpi}")
if not html_path.exists():
if course_url:
print(f"[INFO] 本地HTML不存在自动下载课程页面: {course_url}")
cookies = load_cookie_from_env()
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0 Safari/537.36",
"Referer": course_url,
})
if cookies:
session.cookies.update(cookies)
resp = session.get(course_url, timeout=15)
if resp.status_code == 200 and resp.text:
html_path.write_text(resp.text, encoding="utf-8")
print(f"[INFO] 已保存课程页面到 {html_path}")
else:
raise SystemExit(f"下载课程页面失败: HTTP {resp.status_code}")
else:
raise SystemExit("学生学习页面.html not found, 且未配置 COURSE_URL")
entries = parse_knowledge_entries(html_path)
if not entries:
raise SystemExit("No knowledge entries found in saved HTML")
print(f"[INFO] Found {len(entries)} lessons")
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0 Safari/537.36",
"Referer": "https://mooc1.chaoxing.com/",
})
cookies = load_cookie_from_env()
if cookies:
session.cookies.update(cookies)
print(f"[INFO] Loaded {len(cookies)} cookies from environment")
else:
print("[WARN] No cookies provided. Requests may fail if authentication is required.")
base_params = {
"clazzid": clazzid,
"courseid": courseid,
"num": "0",
"ut": "s",
"cpi": cpi,
"mooc2": "1",
"isMicroCourse": "",
"editorPreview": "",
}
# 步骤1下载字幕 JSON
output_dir = Path("subtitles")
output_dir.mkdir(exist_ok=True)
for entry in entries:
print(f"[INFO] Processing {entry.title} (knowledge {entry.knowledge_id})")
marg = fetch_cards(session, entry.knowledge_id, base_params)
if not marg:
continue
video_found = False
for attachment in iter_video_attachments(marg):
video_found = True
object_id = attachment.get("objectId") or attachment.get("property", {}).get("objectid")
mid = attachment.get("mid") or attachment.get("property", {}).get("mid")
if not object_id:
print(f"[WARN] No objectId for lesson {entry.title}")
continue
filename = sanitize_filename(entry.title) + ".txt"
output_path = output_dir / filename
if download_subtitle(session, marg, object_id, mid, base_params["courseid"], output_path):
time.sleep(0.5)
if not video_found:
print(f"[WARN] No video attachments found for {entry.title}")
# 步骤2批量下载 SRT 文件
srt_dir = Path("srt")
srt_dir.mkdir(exist_ok=True)
def fetch_srt(url: str, session: requests.Session) -> str | None:
r = session.get(url, timeout=10)
if r.status_code == 200 and r.text.strip():
return r.text
return None
for txt_file in output_dir.glob("*.txt"):
try:
content = txt_file.read_text(encoding="utf-8").strip()
data = json.loads(content)
# data is list of tracks
for track in data:
url = track.get("url")
if not url:
continue
srt_text = fetch_srt(url, session)
if not srt_text:
print(f"[WARN] Failed to fetch SRT from {url}")
continue
name = txt_file.stem + ".srt"
out_path = srt_dir / name
out_path.write_text(srt_text, encoding="utf-8")
print(f"[INFO] Saved {out_path}")
except Exception as e:
print(f"[WARN] {txt_file.name}: {e}")
if __name__ == "__main__":
main()