from __future__ import annotations import json import os import re import time from dataclasses import dataclass from pathlib import Path from typing import Dict, Iterable, List, Optional import requests from bs4 import BeautifulSoup from dotenv import load_dotenv # Constants for the Chaoxing endpoints CARDS_URL = "https://mooc1.chaoxing.com/mooc-ans/knowledge/cards" # Primary endpoints visible in cards payload / player scripts RICH_SUBTITLE_URL = "https://mooc1.chaoxing.com/mooc-ans/richvideo/subtitle" RICH_ALLSUBTITLE_URL = "https://mooc1.chaoxing.com/mooc-ans/richvideo/allsubtitle" # Fallback (may return 500 depending on auth): EDITOR_SUBTITLE_URL = "https://mooc1.chaoxing.com/ananas/video-editor/sub" # Regex helpers _ONCLICK_RE = re.compile(r"getTeacherAjax\('(?P\d+)',\s*'(?P\d+)',\s*'(?P\d+)'\)") _JSON_RE = re.compile(r"mArg\s*=\s*(\{.*?\});", re.DOTALL) _INVALID_FILENAME_RE = re.compile(r"[\\/:*?\"<>|]") @dataclass(frozen=True) class KnowledgeEntry: knowledge_id: str title: str def load_cookie_from_env(env_var: str = "CHAOXING_COOKIE") -> Dict[str, str]: """Parse a raw cookie string from environment variables into a dict.""" # Load .env values into environment first try: load_dotenv() except Exception: pass raw_cookie = os.getenv(env_var, "").strip() cookies: Dict[str, str] = {} if not raw_cookie: return cookies for fragment in raw_cookie.split(";"): if not fragment.strip(): continue if "=" not in fragment: continue name, value = fragment.split("=", 1) cookies[name.strip()] = value.strip() return cookies def sanitize_filename(name: str) -> str: """Turn a lesson title into a safe filename for Windows.""" cleaned = _INVALID_FILENAME_RE.sub("_", name) return cleaned.strip() or "untitled" def parse_knowledge_entries(html_path: Path) -> List[KnowledgeEntry]: """Scan the saved HTML page and extract all knowledge entries.""" soup = BeautifulSoup(html_path.read_text(encoding="utf-8"), "html.parser") entries: Dict[str, KnowledgeEntry] = {} for span in soup.select("span.posCatalog_name"): onclick = span.get("onclick", "") match = _ONCLICK_RE.search(onclick) if not match: continue knowledge_id = match["knowledge"] title_attr = span.get("title") or span.get_text(strip=True) title = " ".join(title_attr.split()) if knowledge_id not in entries: entries[knowledge_id] = KnowledgeEntry(knowledge_id=knowledge_id, title=title) return list(entries.values()) def extract_marg_json(cards_html: str) -> Optional[dict]: """Pull the mArg JSON payload from a cards response.""" match = _JSON_RE.search(cards_html) if not match: return None json_block = match.group(1) # Fix JavaScript style booleans/nulls if necessary json_block = json_block.replace("\n", " ") try: return json.loads(json_block) except json.JSONDecodeError: # Try to sanitize single quotes inside keys by using eval-safe replacement sanitized = json_block sanitized = sanitized.replace("'", '\"') try: return json.loads(sanitized) except json.JSONDecodeError: return None def load_local_cards_marg(cards_dir: Path, knowledge_id: str) -> Optional[dict]: """Try to parse saved cards.html to get mArg when network call fails.""" # Saved file path observed: 学生学习页面_files/cards.html candidate = cards_dir / "cards.html" if not candidate.exists(): return None try: text = candidate.read_text(encoding="utf-8", errors="ignore") marg = extract_marg_json(text) # Optionally ensure the knowledgeid matches if marg and str(marg.get("defaults", {}).get("knowledgeid")) == str(knowledge_id): return marg return marg except Exception: return None def iter_video_attachments(marg: dict) -> Iterable[dict]: """Yield the attachment dicts that correspond to videos.""" attachments = marg.get("attachments") or [] for attachment in attachments: if attachment.get("type") == "video": yield attachment def is_html_error(text: str) -> bool: t = text.strip().lower() return t.startswith("") and ("500" in t or "用户登录" in t) def fetch_cards(session: requests.Session, knowledge_id: str, base_params: Dict[str, str]) -> Optional[dict]: params = dict(base_params) params["knowledgeid"] = knowledge_id response = session.get(CARDS_URL, params=params, timeout=10) if response.status_code != 200: print(f"[WARN] Failed to load cards for knowledge {knowledge_id}: HTTP {response.status_code}") return None marg = extract_marg_json(response.text) if marg is None: # Fallback: try local saved cards cards_dir = Path("学生学习页面_files") marg = load_local_cards_marg(cards_dir, knowledge_id) if marg is None: print(f"[WARN] Could not locate mArg JSON for knowledge {knowledge_id}") return marg def download_subtitle(session: requests.Session, marg: dict, object_id: str, mid: Optional[str], course_id: str, output_path: Path) -> bool: # Try richvideo/allsubtitle first (more complete), then richvideo/subtitle, then editor fallback tried = [] def try_get(url: str, params: dict) -> Optional[str]: tried.append((url, params)) r = session.get(url, params=params, timeout=10) if r.status_code != 200: return None if not r.text.strip() or is_html_error(r.text): return None return r.text # Prefer objectId and mid subtitle_url_base = marg.get("defaults", {}).get("subtitleUrl") or RICH_SUBTITLE_URL # 1) allsubtitle params_all = {"mid": mid or marg.get("attachments", [{}])[0].get("mid"), "objectid": object_id, "courseId": course_id} text = try_get(RICH_ALLSUBTITLE_URL, params_all) if not text: # 2) subtitle params_sub = {"mid": params_all["mid"], "objectid": object_id, "courseId": course_id} text = try_get(RICH_SUBTITLE_URL, params_sub) if not text: # 3) fallback editor endpoint text = try_get(EDITOR_SUBTITLE_URL, {"objectid": object_id}) if text: output_path.write_text(text, encoding="utf-8") print(f"[INFO] Saved subtitle to {output_path}") return True print(f"[WARN] Subtitle download failed for object {object_id}. Tried: {tried}") return False def main() -> None: load_dotenv() html_path = Path("学生学习页面.html") course_url = os.getenv("COURSE_URL", "").strip() if not html_path.exists(): if course_url: print(f"[INFO] 本地HTML不存在,自动下载课程页面: {course_url}") cookies = load_cookie_from_env() session = requests.Session() session.headers.update({ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0 Safari/537.36", "Referer": course_url, }) if cookies: session.cookies.update(cookies) resp = session.get(course_url, timeout=15) if resp.status_code == 200 and resp.text: html_path.write_text(resp.text, encoding="utf-8") print(f"[INFO] 已保存课程页面到 {html_path}") else: raise SystemExit(f"下载课程页面失败: HTTP {resp.status_code}") else: raise SystemExit("学生学习页面.html not found, 且未配置 COURSE_URL") entries = parse_knowledge_entries(html_path) if not entries: raise SystemExit("No knowledge entries found in saved HTML") print(f"[INFO] Found {len(entries)} lessons") session = requests.Session() session.headers.update({ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0 Safari/537.36", "Referer": "https://mooc1.chaoxing.com/", }) cookies = load_cookie_from_env() if cookies: session.cookies.update(cookies) print(f"[INFO] Loaded {len(cookies)} cookies from environment") else: print("[WARN] No cookies provided. Requests may fail if authentication is required.") base_params = { "clazzid": "129437493", "courseid": "256005147", "num": "0", "ut": "s", "cpi": "441843723", "mooc2": "1", "isMicroCourse": "", "editorPreview": "", } # 步骤1:下载字幕 JSON output_dir = Path("subtitles") output_dir.mkdir(exist_ok=True) for entry in entries: print(f"[INFO] Processing {entry.title} (knowledge {entry.knowledge_id})") marg = fetch_cards(session, entry.knowledge_id, base_params) if not marg: continue video_found = False for attachment in iter_video_attachments(marg): video_found = True object_id = attachment.get("objectId") or attachment.get("property", {}).get("objectid") mid = attachment.get("mid") or attachment.get("property", {}).get("mid") if not object_id: print(f"[WARN] No objectId for lesson {entry.title}") continue filename = sanitize_filename(entry.title) + ".txt" output_path = output_dir / filename if download_subtitle(session, marg, object_id, mid, base_params["courseid"], output_path): time.sleep(0.5) if not video_found: print(f"[WARN] No video attachments found for {entry.title}") # 步骤2:批量下载 SRT 文件 srt_dir = Path("srt") srt_dir.mkdir(exist_ok=True) def fetch_srt(url: str, session: requests.Session) -> str | None: r = session.get(url, timeout=10) if r.status_code == 200 and r.text.strip(): return r.text return None for txt_file in output_dir.glob("*.txt"): try: content = txt_file.read_text(encoding="utf-8").strip() data = json.loads(content) # data is list of tracks for track in data: url = track.get("url") if not url: continue srt_text = fetch_srt(url, session) if not srt_text: print(f"[WARN] Failed to fetch SRT from {url}") continue name = txt_file.stem + ".srt" out_path = srt_dir / name out_path.write_text(srt_text, encoding="utf-8") print(f"[INFO] Saved {out_path}") except Exception as e: print(f"[WARN] {txt_file.name}: {e}") if __name__ == "__main__": main()