diff --git a/handle.cpp b/handle.cpp new file mode 100644 index 0000000..d1a6f93 --- /dev/null +++ b/handle.cpp @@ -0,0 +1,59 @@ +#include +#include +#include +using namespace std; + +int main(){ + system("chcp 65001"); + + // 使用Windows API遍历目录 + WIN32_FIND_DATAW findData; + HANDLE hFind = FindFirstFileW(L"*.srt", &findData); + + if(hFind == INVALID_HANDLE_VALUE){ + cout << "没有找到需要处理的srt文件" << endl; + return 0; + } + + wstring_convert> converter; + + do { + wstring wfilename = findData.cFileName; + string filename = converter.to_bytes(wfilename); + + // 跳过已处理的文件 + if(filename.find("handled_") == 0){ + continue; + } + + FILE* infile = _wfopen(wfilename.c_str(), L"r"); + if(!infile){ + cout << "无法打开文件: " << filename << endl; + continue; + } + + wstring woutname = L"handled_" + wfilename; + FILE* outfile = _wfopen(woutname.c_str(), L"w"); + if(!outfile){ + cout << "无法创建输出文件: handled_" << filename << endl; + fclose(infile); + continue; + } + + char line[1024]; + int line_count=0; + while(fgets(line, sizeof(line), infile)){ + line_count++; + if(line_count%4==3){ + fputs(line, outfile); + } + } + fclose(infile); + fclose(outfile); + cout << "已处理: " << filename << endl; + + } while(FindNextFileW(hFind, &findData)); + + FindClose(hFind); + return 0; +} diff --git a/handle.exe b/handle.exe new file mode 100644 index 0000000..7f8ec70 Binary files /dev/null and b/handle.exe differ diff --git a/srt_downloader.exe b/srt_downloader.exe new file mode 100644 index 0000000..5fe748e Binary files /dev/null and b/srt_downloader.exe differ diff --git a/srt_downloader.py b/srt_downloader.py new file mode 100644 index 0000000..6e3c8e0 --- /dev/null +++ b/srt_downloader.py @@ -0,0 +1,282 @@ +from __future__ import annotations + +import json +import os +import re +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, Iterable, List, Optional + +import requests +from bs4 import BeautifulSoup +from dotenv import load_dotenv + +# Constants for the Chaoxing endpoints +CARDS_URL = "https://mooc1.chaoxing.com/mooc-ans/knowledge/cards" +# Primary endpoints visible in cards payload / player scripts +RICH_SUBTITLE_URL = "https://mooc1.chaoxing.com/mooc-ans/richvideo/subtitle" +RICH_ALLSUBTITLE_URL = "https://mooc1.chaoxing.com/mooc-ans/richvideo/allsubtitle" +# Fallback (may return 500 depending on auth): +EDITOR_SUBTITLE_URL = "https://mooc1.chaoxing.com/ananas/video-editor/sub" + +# Regex helpers +_ONCLICK_RE = re.compile(r"getTeacherAjax\('(?P\d+)',\s*'(?P\d+)',\s*'(?P\d+)'\)") +_JSON_RE = re.compile(r"mArg\s*=\s*(\{.*?\});", re.DOTALL) +_INVALID_FILENAME_RE = re.compile(r"[\\/:*?\"<>|]") + + +@dataclass(frozen=True) +class KnowledgeEntry: + knowledge_id: str + title: str + + +def load_cookie_from_env(env_var: str = "CHAOXING_COOKIE") -> Dict[str, str]: + """Parse a raw cookie string from environment variables into a dict.""" + # Load .env values into environment first + try: + load_dotenv() + except Exception: + pass + raw_cookie = os.getenv(env_var, "").strip() + cookies: Dict[str, str] = {} + if not raw_cookie: + return cookies + for fragment in raw_cookie.split(";"): + if not fragment.strip(): + continue + if "=" not in fragment: + continue + name, value = fragment.split("=", 1) + cookies[name.strip()] = value.strip() + return cookies + + +def sanitize_filename(name: str) -> str: + """Turn a lesson title into a safe filename for Windows.""" + cleaned = _INVALID_FILENAME_RE.sub("_", name) + return cleaned.strip() or "untitled" + + +def parse_knowledge_entries(html_path: Path) -> List[KnowledgeEntry]: + """Scan the saved HTML page and extract all knowledge entries.""" + soup = BeautifulSoup(html_path.read_text(encoding="utf-8"), "html.parser") + entries: Dict[str, KnowledgeEntry] = {} + for span in soup.select("span.posCatalog_name"): + onclick = span.get("onclick", "") + match = _ONCLICK_RE.search(onclick) + if not match: + continue + knowledge_id = match["knowledge"] + title_attr = span.get("title") or span.get_text(strip=True) + title = " ".join(title_attr.split()) + if knowledge_id not in entries: + entries[knowledge_id] = KnowledgeEntry(knowledge_id=knowledge_id, title=title) + return list(entries.values()) + + +def extract_marg_json(cards_html: str) -> Optional[dict]: + """Pull the mArg JSON payload from a cards response.""" + match = _JSON_RE.search(cards_html) + if not match: + return None + json_block = match.group(1) + # Fix JavaScript style booleans/nulls if necessary + json_block = json_block.replace("\n", " ") + try: + return json.loads(json_block) + except json.JSONDecodeError: + # Try to sanitize single quotes inside keys by using eval-safe replacement + sanitized = json_block + sanitized = sanitized.replace("'", '\"') + try: + return json.loads(sanitized) + except json.JSONDecodeError: + return None + +def load_local_cards_marg(cards_dir: Path, knowledge_id: str) -> Optional[dict]: + """Try to parse saved cards.html to get mArg when network call fails.""" + # Saved file path observed: 学生学习页面_files/cards.html + candidate = cards_dir / "cards.html" + if not candidate.exists(): + return None + try: + text = candidate.read_text(encoding="utf-8", errors="ignore") + marg = extract_marg_json(text) + # Optionally ensure the knowledgeid matches + if marg and str(marg.get("defaults", {}).get("knowledgeid")) == str(knowledge_id): + return marg + return marg + except Exception: + return None + + +def iter_video_attachments(marg: dict) -> Iterable[dict]: + """Yield the attachment dicts that correspond to videos.""" + attachments = marg.get("attachments") or [] + for attachment in attachments: + if attachment.get("type") == "video": + yield attachment + +def is_html_error(text: str) -> bool: + t = text.strip().lower() + return t.startswith("") and ("500" in t or "用户登录" in t) + + +def fetch_cards(session: requests.Session, knowledge_id: str, base_params: Dict[str, str]) -> Optional[dict]: + params = dict(base_params) + params["knowledgeid"] = knowledge_id + response = session.get(CARDS_URL, params=params, timeout=10) + if response.status_code != 200: + print(f"[WARN] Failed to load cards for knowledge {knowledge_id}: HTTP {response.status_code}") + return None + marg = extract_marg_json(response.text) + if marg is None: + # Fallback: try local saved cards + cards_dir = Path("学生学习页面_files") + marg = load_local_cards_marg(cards_dir, knowledge_id) + if marg is None: + print(f"[WARN] Could not locate mArg JSON for knowledge {knowledge_id}") + return marg + + +def download_subtitle(session: requests.Session, marg: dict, object_id: str, mid: Optional[str], course_id: str, output_path: Path) -> bool: + # Try richvideo/allsubtitle first (more complete), then richvideo/subtitle, then editor fallback + tried = [] + def try_get(url: str, params: dict) -> Optional[str]: + tried.append((url, params)) + r = session.get(url, params=params, timeout=10) + if r.status_code != 200: + return None + if not r.text.strip() or is_html_error(r.text): + return None + return r.text + + # Prefer objectId and mid + subtitle_url_base = marg.get("defaults", {}).get("subtitleUrl") or RICH_SUBTITLE_URL + # 1) allsubtitle + params_all = {"mid": mid or marg.get("attachments", [{}])[0].get("mid"), "objectid": object_id, "courseId": course_id} + text = try_get(RICH_ALLSUBTITLE_URL, params_all) + if not text: + # 2) subtitle + params_sub = {"mid": params_all["mid"], "objectid": object_id, "courseId": course_id} + text = try_get(RICH_SUBTITLE_URL, params_sub) + if not text: + # 3) fallback editor endpoint + text = try_get(EDITOR_SUBTITLE_URL, {"objectid": object_id}) + if text: + output_path.write_text(text, encoding="utf-8") + print(f"[INFO] Saved subtitle to {output_path}") + return True + print(f"[WARN] Subtitle download failed for object {object_id}. Tried: {tried}") + return False + + +def main() -> None: + load_dotenv() + html_path = Path("学生学习页面.html") + course_url = os.getenv("COURSE_URL", "").strip() + if not html_path.exists(): + if course_url: + print(f"[INFO] 本地HTML不存在,自动下载课程页面: {course_url}") + cookies = load_cookie_from_env() + session = requests.Session() + session.headers.update({ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0 Safari/537.36", + "Referer": course_url, + }) + if cookies: + session.cookies.update(cookies) + resp = session.get(course_url, timeout=15) + if resp.status_code == 200 and resp.text: + html_path.write_text(resp.text, encoding="utf-8") + print(f"[INFO] 已保存课程页面到 {html_path}") + else: + raise SystemExit(f"下载课程页面失败: HTTP {resp.status_code}") + else: + raise SystemExit("学生学习页面.html not found, 且未配置 COURSE_URL") + + entries = parse_knowledge_entries(html_path) + if not entries: + raise SystemExit("No knowledge entries found in saved HTML") + print(f"[INFO] Found {len(entries)} lessons") + + session = requests.Session() + session.headers.update({ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0 Safari/537.36", + "Referer": "https://mooc1.chaoxing.com/", + }) + cookies = load_cookie_from_env() + if cookies: + session.cookies.update(cookies) + print(f"[INFO] Loaded {len(cookies)} cookies from environment") + else: + print("[WARN] No cookies provided. Requests may fail if authentication is required.") + + base_params = { + "clazzid": "129437493", + "courseid": "256005147", + "num": "0", + "ut": "s", + "cpi": "441843723", + "mooc2": "1", + "isMicroCourse": "", + "editorPreview": "", + } + + + # 步骤1:下载字幕 JSON + output_dir = Path("subtitles") + output_dir.mkdir(exist_ok=True) + for entry in entries: + print(f"[INFO] Processing {entry.title} (knowledge {entry.knowledge_id})") + marg = fetch_cards(session, entry.knowledge_id, base_params) + if not marg: + continue + video_found = False + for attachment in iter_video_attachments(marg): + video_found = True + object_id = attachment.get("objectId") or attachment.get("property", {}).get("objectid") + mid = attachment.get("mid") or attachment.get("property", {}).get("mid") + if not object_id: + print(f"[WARN] No objectId for lesson {entry.title}") + continue + filename = sanitize_filename(entry.title) + ".txt" + output_path = output_dir / filename + if download_subtitle(session, marg, object_id, mid, base_params["courseid"], output_path): + time.sleep(0.5) + if not video_found: + print(f"[WARN] No video attachments found for {entry.title}") + + # 步骤2:批量下载 SRT 文件 + srt_dir = Path("srt") + srt_dir.mkdir(exist_ok=True) + def fetch_srt(url: str, session: requests.Session) -> str | None: + r = session.get(url, timeout=10) + if r.status_code == 200 and r.text.strip(): + return r.text + return None + + for txt_file in output_dir.glob("*.txt"): + try: + content = txt_file.read_text(encoding="utf-8").strip() + data = json.loads(content) + # data is list of tracks + for track in data: + url = track.get("url") + if not url: + continue + srt_text = fetch_srt(url, session) + if not srt_text: + print(f"[WARN] Failed to fetch SRT from {url}") + continue + name = txt_file.stem + ".srt" + out_path = srt_dir / name + out_path.write_text(srt_text, encoding="utf-8") + print(f"[INFO] Saved {out_path}") + except Exception as e: + print(f"[WARN] {txt_file.name}: {e}") + +if __name__ == "__main__": + main()