from __future__ import annotations import json import os import re import time from dataclasses import dataclass from pathlib import Path from typing import Dict, Iterable, List, Optional from urllib.parse import parse_qs, urlparse import requests from bs4 import BeautifulSoup from dotenv import load_dotenv # Constants for the Chaoxing endpoints CARDS_URL = "https://mooc1.chaoxing.com/mooc-ans/knowledge/cards" # Primary endpoints visible in cards payload / player scripts RICH_SUBTITLE_URL = "https://mooc1.chaoxing.com/mooc-ans/richvideo/subtitle" RICH_ALLSUBTITLE_URL = "https://mooc1.chaoxing.com/mooc-ans/richvideo/allsubtitle" # Fallback (may return 500 depending on auth): EDITOR_SUBTITLE_URL = "https://mooc1.chaoxing.com/ananas/video-editor/sub" # Regex helpers _ONCLICK_RE = re.compile(r"getTeacherAjax\('(?P\d+)',\s*'(?P\d+)',\s*'(?P\d+)'\)") _JSON_RE = re.compile(r"mArg\s*=\s*(\{.*?\});", re.DOTALL) _INVALID_FILENAME_RE = re.compile(r"[\\/:*?\"<>|]") @dataclass(frozen=True) class KnowledgeEntry: knowledge_id: str title: str def load_cookie_from_env(env_var: str = "CHAOXING_COOKIE") -> Dict[str, str]: """Parse a raw cookie string from environment variables into a dict.""" # Load .env values into environment first try: load_dotenv() except Exception: pass raw_cookie = os.getenv(env_var, "").strip() cookies: Dict[str, str] = {} if not raw_cookie: return cookies for fragment in raw_cookie.split(";"): if not fragment.strip(): continue if "=" not in fragment: continue name, value = fragment.split("=", 1) cookies[name.strip()] = value.strip() return cookies def sanitize_filename(name: str) -> str: """Turn a lesson title into a safe filename for Windows.""" cleaned = _INVALID_FILENAME_RE.sub("_", name) return cleaned.strip() or "untitled" def parse_knowledge_entries(html_path: Path) -> List[KnowledgeEntry]: """Scan the saved HTML page and extract all knowledge entries.""" soup = BeautifulSoup(html_path.read_text(encoding="utf-8"), "html.parser") entries: Dict[str, KnowledgeEntry] = {} for span in soup.select("span.posCatalog_name"): onclick = span.get("onclick", "") match = _ONCLICK_RE.search(onclick) if not match: continue knowledge_id = match["knowledge"] title_attr = span.get("title") or span.get_text(strip=True) title = " ".join(title_attr.split()) if knowledge_id not in entries: entries[knowledge_id] = KnowledgeEntry(knowledge_id=knowledge_id, title=title) return list(entries.values()) def extract_marg_json(cards_html: str) -> Optional[dict]: """Pull the mArg JSON payload from a cards response.""" match = _JSON_RE.search(cards_html) if not match: return None json_block = match.group(1) # Fix JavaScript style booleans/nulls if necessary json_block = json_block.replace("\n", " ") try: return json.loads(json_block) except json.JSONDecodeError: # Try to sanitize single quotes inside keys by using eval-safe replacement sanitized = json_block sanitized = sanitized.replace("'", '\"') try: return json.loads(sanitized) except json.JSONDecodeError: return None def load_local_cards_marg(cards_dir: Path, knowledge_id: str) -> Optional[dict]: """Try to parse saved cards.html to get mArg when network call fails.""" # Saved file path observed: 学生学习页面_files/cards.html candidate = cards_dir / "cards.html" if not candidate.exists(): return None try: text = candidate.read_text(encoding="utf-8", errors="ignore") marg = extract_marg_json(text) # Optionally ensure the knowledgeid matches if marg and str(marg.get("defaults", {}).get("knowledgeid")) == str(knowledge_id): return marg return marg except Exception: return None def iter_video_attachments(marg: dict) -> Iterable[dict]: """Yield the attachment dicts that correspond to videos.""" attachments = marg.get("attachments") or [] for attachment in attachments: if attachment.get("type") == "video": yield attachment def is_html_error(text: str) -> bool: t = text.strip().lower() return t.startswith("") and ("500" in t or "用户登录" in t) def fetch_cards(session: requests.Session, knowledge_id: str, base_params: Dict[str, str]) -> Optional[dict]: params = dict(base_params) params["knowledgeid"] = knowledge_id response = session.get(CARDS_URL, params=params, timeout=10) if response.status_code != 200: print(f"[WARN] Failed to load cards for knowledge {knowledge_id}: HTTP {response.status_code}") return None marg = extract_marg_json(response.text) if marg is None: # Fallback: try local saved cards cards_dir = Path("学生学习页面_files") marg = load_local_cards_marg(cards_dir, knowledge_id) if marg is None: print(f"[WARN] Could not locate mArg JSON for knowledge {knowledge_id}") return marg def download_subtitle(session: requests.Session, marg: dict, object_id: str, mid: Optional[str], course_id: str, output_path: Path) -> bool: # Try richvideo/allsubtitle first (more complete), then richvideo/subtitle, then editor fallback tried = [] def try_get(url: str, params: dict) -> Optional[str]: tried.append((url, params)) r = session.get(url, params=params, timeout=10) if r.status_code != 200: return None if not r.text.strip() or is_html_error(r.text): return None return r.text # Prefer objectId and mid subtitle_url_base = marg.get("defaults", {}).get("subtitleUrl") or RICH_SUBTITLE_URL # 1) allsubtitle params_all = {"mid": mid or marg.get("attachments", [{}])[0].get("mid"), "objectid": object_id, "courseId": course_id} text = try_get(RICH_ALLSUBTITLE_URL, params_all) if not text: # 2) subtitle params_sub = {"mid": params_all["mid"], "objectid": object_id, "courseId": course_id} text = try_get(RICH_SUBTITLE_URL, params_sub) if not text: # 3) fallback editor endpoint text = try_get(EDITOR_SUBTITLE_URL, {"objectid": object_id}) if text: output_path.write_text(text, encoding="utf-8") print(f"[INFO] Saved subtitle to {output_path}") return True print(f"[WARN] Subtitle download failed for object {object_id}. Tried: {tried}") return False def main() -> None: load_dotenv() html_path = Path("学生学习页面.html") course_url = os.getenv("COURSE_URL", "").strip() # Parse URL parameters from .env clazzid, courseid, cpi = "", "", "" if course_url: parsed = urlparse(course_url) qs = parse_qs(parsed.query) clazzid = qs.get("clazzid", [""])[0] courseid = qs.get("courseId", [""])[0] or qs.get("courseid", [""])[0] cpi = qs.get("cpi", [""])[0] print(f"[INFO] Extracted params: courseid={courseid}, clazzid={clazzid}, cpi={cpi}") if not html_path.exists(): if course_url: print(f"[INFO] 本地HTML不存在,自动下载课程页面: {course_url}") cookies = load_cookie_from_env() session = requests.Session() session.headers.update({ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0 Safari/537.36", "Referer": course_url, }) if cookies: session.cookies.update(cookies) resp = session.get(course_url, timeout=15) if resp.status_code == 200 and resp.text: html_path.write_text(resp.text, encoding="utf-8") print(f"[INFO] 已保存课程页面到 {html_path}") else: raise SystemExit(f"下载课程页面失败: HTTP {resp.status_code}") else: raise SystemExit("学生学习页面.html not found, 且未配置 COURSE_URL") entries = parse_knowledge_entries(html_path) if not entries: raise SystemExit("No knowledge entries found in saved HTML") print(f"[INFO] Found {len(entries)} lessons") session = requests.Session() session.headers.update({ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0 Safari/537.36", "Referer": "https://mooc1.chaoxing.com/", }) cookies = load_cookie_from_env() if cookies: session.cookies.update(cookies) print(f"[INFO] Loaded {len(cookies)} cookies from environment") else: print("[WARN] No cookies provided. Requests may fail if authentication is required.") base_params = { "clazzid": clazzid, "courseid": courseid, "num": "0", "ut": "s", "cpi": cpi, "mooc2": "1", "isMicroCourse": "", "editorPreview": "", } # 步骤1:下载字幕 JSON output_dir = Path("subtitles") output_dir.mkdir(exist_ok=True) for entry in entries: print(f"[INFO] Processing {entry.title} (knowledge {entry.knowledge_id})") marg = fetch_cards(session, entry.knowledge_id, base_params) if not marg: continue video_found = False for attachment in iter_video_attachments(marg): video_found = True object_id = attachment.get("objectId") or attachment.get("property", {}).get("objectid") mid = attachment.get("mid") or attachment.get("property", {}).get("mid") if not object_id: print(f"[WARN] No objectId for lesson {entry.title}") continue filename = sanitize_filename(entry.title) + ".txt" output_path = output_dir / filename if download_subtitle(session, marg, object_id, mid, base_params["courseid"], output_path): time.sleep(0.5) if not video_found: print(f"[WARN] No video attachments found for {entry.title}") # 步骤2:批量下载 SRT 文件 srt_dir = Path("srt") srt_dir.mkdir(exist_ok=True) def fetch_srt(url: str, session: requests.Session) -> str | None: r = session.get(url, timeout=10) if r.status_code == 200 and r.text.strip(): return r.text return None for txt_file in output_dir.glob("*.txt"): try: content = txt_file.read_text(encoding="utf-8").strip() data = json.loads(content) # data is list of tracks for track in data: url = track.get("url") if not url: continue srt_text = fetch_srt(url, session) if not srt_text: print(f"[WARN] Failed to fetch SRT from {url}") continue name = txt_file.stem + ".srt" out_path = srt_dir / name out_path.write_text(srt_text, encoding="utf-8") print(f"[INFO] Saved {out_path}") except Exception as e: print(f"[WARN] {txt_file.name}: {e}") if __name__ == "__main__": main()