#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 成绩监控系统 功能:自动登录并监控成绩页面变化 作者:GitHub Copilot 日期:2026-01-17 """ import os import sys import time import logging import hashlib import smtplib import ssl import requests import argparse import random from pathlib import Path from datetime import datetime from configparser import ConfigParser from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from typing import Optional, Dict import signal # 配置日志 logging.basicConfig( level=logging.INFO, format='[%(levelname)s] [%(asctime)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S', handlers=[ logging.FileHandler('monitor.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class GradeMonitor: """成绩监控类""" def __init__(self, config_file: str = 'config.ini', test_mode: bool = False): """初始化监控器""" self.config_file = config_file self.config = self._load_config() self.test_mode = test_mode self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' }) # 文件路径 self.script_dir = Path(__file__).parent.absolute() self.last_grade_file = self.script_dir / '.last_grade_hash.txt' self.last_grade_html_file = self.script_dir / '.last_grade_page.html' self.last_grade_content_file = self.script_dir / '.last_grade_content.txt' # 运行标志 self.running = True # 设置信号处理 signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) def _signal_handler(self, signum, frame): """处理终止信号""" logger.info("收到停止信号,正在退出...") self.running = False sys.exit(0) def _load_config(self) -> Dict: """加载配置文件""" if not os.path.exists(self.config_file): logger.error(f"配置文件不存在: {self.config_file}") sys.exit(1) parser = ConfigParser() parser.read(self.config_file, encoding='utf-8') try: config = { # 登录配置 'username': parser.get('login', 'USERNAME', fallback=''), 'password': parser.get('login', 'PASSWORD', fallback=''), 'login_url': parser.get('login', 'LOGIN_URL', fallback=''), 'grade_url': parser.get('login', 'GRADE_URL', fallback=''), # 邮件配置 'sender_email': parser.get('email', 'SENDER_EMAIL', fallback=''), 'sender_password': parser.get('email', 'SENDER_PASSWORD', fallback=''), 'receiver_email': parser.get('email', 'RECEIVER_EMAIL', fallback=''), 'smtp_server': parser.get('email', 'SMTP_SERVER', fallback='smtp.163.com'), 'smtp_port': parser.getint('email', 'SMTP_PORT', fallback=465), # 监控配置 'check_interval': parser.getint('monitor', 'CHECK_INTERVAL', fallback=60), 'request_delay': parser.getint('monitor', 'REQUEST_DELAY', fallback=5), 'max_retries': parser.getint('monitor', 'MAX_RETRIES', fallback=3), 'retry_delay': parser.getint('monitor', 'RETRY_DELAY', fallback=10), } # 验证必要配置 if not config['username'] or not config['password']: logger.error("请在配置文件中设置 USERNAME 和 PASSWORD") sys.exit(1) if not config['sender_email'] or not config['sender_password'] or not config['receiver_email']: logger.error("请在配置文件中设置邮件相关配置") sys.exit(1) logger.info("配置文件加载成功") return config except Exception as e: logger.error(f"配置文件格式错误: {e}") sys.exit(1) def login(self) -> bool: """登录统一身份认证""" logger.info("开始登录统一身份认证...") for attempt in range(self.config['max_retries']): try: logger.info(f"登录尝试 {attempt + 1}/{self.config['max_retries']}") # 第一步:访问成绩页面,会重定向到登录页 logger.info("步骤1:访问成绩页面,获取登录页面...") response = self.session.get( self.config['grade_url'], timeout=30, allow_redirects=True ) # 随机等待5-10秒,模拟真实用户 wait_time = random.uniform(5, 10) logger.info(f"第一次访问完成,等待 {wait_time:.1f} 秒(模拟用户行为)...") time.sleep(wait_time) # 检查是否需要登录 if '统一身份认证' in response.text or 'tpass/login' in response.url: logger.info("需要登录,提取登录表单信息...") # 提取lt值 import re lt_match = re.search(r'name="lt"\s+value="([^"]+)"', response.text) execution_match = re.search(r'name="execution"\s+value="([^"]+)"', response.text) lt = lt_match.group(1) if lt_match else '' execution = execution_match.group(1) if execution_match else 'e1s1' if not lt: logger.warning("无法提取lt值,使用默认值") lt = '00000000' logger.info(f"提取到 lt={lt}, execution={execution}") # 获取登录提交URL login_submit_url = response.url.split('?')[0] # 准备登录数据(按照login_neu.js的逻辑) username = self.config['username'] password = self.config['password'] login_data = { 'rsa': username + password + lt, 'ul': str(len(username)), 'pl': str(len(password)), 'sl': '0', 'lt': lt, 'execution': execution, '_eventId': 'submit' } logger.info(f"步骤2:提交登录表单到 {login_submit_url}") # 随机等待5-10秒后提交,模拟用户填写表单 wait_time = random.uniform(5, 10) logger.info(f"等待 {wait_time:.1f} 秒后提交登录(模拟填写表单)...") time.sleep(wait_time) response = self.session.post( login_submit_url, data=login_data, timeout=30, allow_redirects=True ) # 登录后等待5-10秒,让服务器处理 wait_time = random.uniform(5, 10) logger.info(f"登录表单已提交,等待 {wait_time:.1f} 秒处理...") time.sleep(wait_time) # 第三步:再次访问成绩页面验证登录 logger.info("步骤3:验证登录状态,访问成绩页面...") # 随机等待5-10秒,模拟用户操作 wait_time = random.uniform(5, 10) logger.info(f"等待 {wait_time:.1f} 秒后访问成绩页面(模拟用户操作)...") time.sleep(wait_time) response = self.session.get( self.config['grade_url'], timeout=30, allow_redirects=True ) # 检查是否成功(不再是登录页面,且不是"请不要过快点击") if '统一身份认证' not in response.text and 'tpass/login' not in response.url and '请不要过快点击' not in response.text: # 进一步验证:检查是否包含成绩相关内容 if '学年学期' in response.text or '课程名称' in response.text or '成绩' in response.text: logger.info("登录成功!成功访问成绩页面") return True else: logger.warning(f"页面内容异常,可能不是成绩页面") logger.info(f"当前URL: {response.url}") else: if '请不要过快点击' in response.text: wait_time = 30 + (attempt * 15) # 大幅递增等待时间:30秒、45秒、60秒... logger.warning(f"⚠️ 请求过快被拦截!等待 {wait_time} 秒后重试...") logger.info("建议:1) 检查 config.ini 中的 CHECK_INTERVAL 是否 >= 120 秒") logger.info(" 2) 避免频繁手动测试,建议间隔至少5分钟") time.sleep(wait_time) continue # 直接进入下一次循环,不再额外sleep else: logger.warning(f"登录失败,尝试重试 {attempt + 1}/{self.config['max_retries']}") logger.info(f"当前URL: {response.url}") except requests.RequestException as e: logger.warning(f"登录请求异常: {e},尝试重试 {attempt + 1}/{self.config['max_retries']}") if '请不要过快点击' not in response.text: logger.info(f"等待 {self.config['retry_delay']} 秒后重试...") time.sleep(self.config['retry_delay']) logger.error("登录失败,已达到最大重试次数") logger.info("建议:1) 检查用户名密码是否正确") logger.info(" 2) 手动在浏览器中登录一次,确认账号正常") logger.info(" 3) 查看 .debug_response.html 了解实际响应内容") return False def fetch_grade_page(self) -> Optional[str]: """获取成绩页面""" logger.info("获取成绩页面...") for attempt in range(self.config['max_retries']): try: # 在每次请求前增加延迟,避免触发"请不要过快点击" if attempt > 0: wait_time = random.uniform(5, 8) logger.info(f"等待 {wait_time:.1f} 秒后发送请求...") time.sleep(wait_time) response = self.session.get( self.config['grade_url'], timeout=30 ) # 检查是否触发"请不要过快点击" if '请不要过快点击' in response.text: wait_time = 30 + (attempt * 15) logger.warning(f"⚠️ 请求过快被拦截!等待 {wait_time} 秒后重试...") time.sleep(wait_time) continue if response.status_code == 200: return response.text elif response.status_code in [302, 401]: logger.warning("会话已过期,重新登录...") if self.login(): continue else: return None else: logger.warning(f"获取成绩页面失败 (HTTP {response.status_code}),尝试重试 {attempt + 1}/{self.config['max_retries']}") except requests.RequestException as e: logger.warning(f"获取成绩页面异常: {e},尝试重试 {attempt + 1}/{self.config['max_retries']}") time.sleep(self.config['retry_delay']) logger.error("获取成绩页面失败,已达到最大重试次数") return None def extract_grade_info(self, html: str) -> str: """提取成绩信息的关键内容""" try: from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'html.parser') result = [] # 提取总平均绩点 gpa_div = soup.find('div', string=lambda x: x and '总平均绩点' in x) if gpa_div: gpa_text = gpa_div.get_text(strip=True) result.append("=" * 60) result.append(gpa_text) result.append("=" * 60) result.append("") # 提取成绩表格 table = soup.find('table', {'class': 'gridtable'}) if table: # 提取表头 thead = table.find('thead') if thead: headers = [th.get_text(strip=True) for th in thead.find_all('th')] result.append(' | '.join(headers)) result.append("-" * 120) # 提取每一行成绩 tbody = table.find('tbody') if tbody: for row in tbody.find_all('tr'): cells = row.find_all('td') if cells: row_data = [] for cell in cells: # 提取文本,包括sup标签的内容 text = cell.get_text(strip=True, separator=' ') row_data.append(text) result.append(' | '.join(row_data)) if not result: logger.warning("未能提取到成绩信息,返回原始文本") return self._fallback_extract(html) return '\n'.join(result) except ImportError: logger.warning("未安装 beautifulsoup4,使用简单文本提取") return self._fallback_extract(html) except Exception as e: logger.error(f"提取成绩信息时出错: {e}") return self._fallback_extract(html) def _fallback_extract(self, html: str) -> str: """备用的简单文本提取方法""" import re # 移除script和style标签 html = re.sub(r']*>.*?', '', html, flags=re.DOTALL) html = re.sub(r']*>.*?', '', html, flags=re.DOTALL) return html def parse_courses(self, grade_text: str) -> list: """解析成绩文本,提取课程列表""" courses = [] lines = grade_text.split('\n') for line in lines: # 跳过标题行、分隔行和空行 stripped = line.strip() # 跳过空行、纯分隔符行、标题行 if not stripped or stripped.startswith('=') or stripped.startswith('-') or '学年学期' in line or '总平均绩点' in line: continue # 解析课程行 parts = [p.strip() for p in line.split('|')] if len(parts) >= 4: # 至少有学期、代码、序号、名称 # 提取关键信息:学期 + 课程代码 + 课程名称 semester = parts[0] if len(parts) > 0 else '' course_code = parts[1] if len(parts) > 1 else '' course_name = parts[3] if len(parts) > 3 else '' if course_code and course_name: course_key = f"{semester}|{course_code}|{course_name}" courses.append(course_key) return courses def calculate_hash(self, content: str) -> str: """计算内容的哈希值""" return hashlib.md5(content.encode('utf-8')).hexdigest() def check_grade_changes(self, current_content: str, current_html: str = None) -> bool: """检查成绩是否有变化""" # 保存当前内容(供用户查看) self.last_grade_content_file.write_text(current_content, encoding='utf-8') if current_html: self.last_grade_html_file.write_text(current_html, encoding='utf-8') # 解析当前课程列表 current_courses = self.parse_courses(current_content) if not self.last_grade_file.exists(): logger.info("=" * 60) logger.info("首次运行,保存当前成绩状态") logger.info(f"当前共有 {len(current_courses)} 门课程") logger.info(f"原始HTML已保存到: {self.last_grade_html_file}") logger.info(f"提取的成绩内容已保存到: {self.last_grade_content_file}") logger.info("请检查这些文件确认内容是否正确!") logger.info("=" * 60) # 保存课程列表 courses_file = self.script_dir / '.last_courses.txt' courses_file.write_text('\n'.join(current_courses), encoding='utf-8') # 计算哈希 current_hash = self.calculate_hash(current_content) self.last_grade_file.write_text(current_hash, encoding='utf-8') return False # 读取上次的课程列表 courses_file = self.script_dir / '.last_courses.txt' if courses_file.exists(): last_courses = courses_file.read_text(encoding='utf-8').strip().split('\n') else: last_courses = [] # 比较课程数量和内容,检测新增课程 new_courses = [c for c in current_courses if c not in last_courses] # 检测课程成绩变化(课程存在但内容不同) current_hash = self.calculate_hash(current_content) last_hash = self.last_grade_file.read_text(encoding='utf-8').strip() if self.last_grade_file.exists() else '' content_changed = current_hash != last_hash if new_courses: logger.info("=" * 60) logger.info(f"✓ 检测到新增课程成绩!共 {len(new_courses)} 门") logger.info("-" * 60) for idx, course in enumerate(new_courses, 1): parts = course.split('|') if len(parts) >= 3: semester = parts[0] if parts[0] else '未知学期' course_name = parts[2] if len(parts) > 2 else '未知课程' course_code = parts[1] if len(parts) > 1 else '' logger.info(f" {idx}. [{semester}] {course_name} ({course_code})") logger.info("=" * 60) logger.info(f"详细内容已保存到: {self.last_grade_content_file}") # 更新保存的课程列表和哈希 courses_file.write_text('\n'.join(current_courses), encoding='utf-8') self.last_grade_file.write_text(current_hash, encoding='utf-8') return True elif content_changed and current_courses: # 课程数量没变,但内容变了(可能是成绩更新) logger.info("=" * 60) logger.info("✓ 检测到成绩内容发生变化(可能是成绩更新)") logger.info(f"当前共有 {len(current_courses)} 门课程") logger.info("=" * 60) logger.info(f"详细内容已保存到: {self.last_grade_content_file}") # 更新哈希值 self.last_grade_file.write_text(current_hash, encoding='utf-8') return True else: logger.info(f"成绩无变化(共 {len(current_courses)} 门课程)") return False def send_email_notification(self, new_courses: list = None) -> bool: """发送邮件通知""" logger.info("准备发送邮件通知...") if new_courses: subject = f"【成绩更新通知】新增 {len(new_courses)} 门课程成绩" courses_text = "\n".join([f" - {course.split('|')[0]} {course.split('|')[2]}" for course in new_courses if len(course.split('|')) >= 3]) body = f"""尊敬的用户: 您好!系统检测到成绩页面已更新。 新增课程(共{len(new_courses)}门): {courses_text} 请及时查看详细成绩。 查看地址:{self.config['grade_url']} 检测时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} 此邮件由系统自动发送,请勿回复。""" else: subject = "【成绩更新通知】成绩页面已更新" body = f"""尊敬的用户: 您好!系统检测到成绩页面已更新。 请及时查看详细成绩。 查看地址:{self.config['grade_url']} 检测时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} 此邮件由系统自动发送,请勿回复。""" try: message = MIMEMultipart("alternative") message["Subject"] = subject message["From"] = self.config['sender_email'] message["To"] = self.config['receiver_email'] part = MIMEText(body, "plain", "utf-8") message.attach(part) context = ssl.create_default_context() with smtplib.SMTP_SSL( self.config['smtp_server'], self.config['smtp_port'], context=context ) as server: server.login( self.config['sender_email'], self.config['sender_password'] ) server.sendmail( self.config['sender_email'], self.config['receiver_email'], message.as_string() ) logger.info("邮件发送成功") return True except Exception as e: logger.error(f"邮件发送失败: {e}") return False def send_error_notification(self, error_msg: str) -> bool: """发送错误通知邮件""" logger.info("准备发送错误通知邮件...") subject = "【成绩监控系统】程序运行出错" body = f"""尊敬的用户: 成绩监控系统在运行过程中遇到错误: 错误信息: {error_msg} 发生时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} 请检查系统状态或查看日志文件 monitor.log 了解详情。 此邮件由系统自动发送,请勿回复。""" try: message = MIMEMultipart("alternative") message["Subject"] = subject message["From"] = self.config['sender_email'] message["To"] = self.config['receiver_email'] part = MIMEText(body, "plain", "utf-8") message.attach(part) context = ssl.create_default_context() with smtplib.SMTP_SSL( self.config['smtp_server'], self.config['smtp_port'], context=context ) as server: server.login( self.config['sender_email'], self.config['sender_password'] ) server.sendmail( self.config['sender_email'], self.config['receiver_email'], message.as_string() ) logger.info("错误通知邮件发送成功") return True except Exception as e: logger.error(f"错误通知邮件发送失败: {e}") return False def test_fetch(self): """测试模式:获取一次成绩并显示内容""" logger.info("=" * 60) logger.info("测试模式:获取成绩页面") logger.info("=" * 60) # 登录 if not self.login(): logger.error("登录失败,无法获取成绩页面") return # 获取成绩页面 logger.info("正在获取成绩页面...") grade_html = self.fetch_grade_page() if not grade_html: logger.error("获取成绩页面失败") return # 保存原始HTML self.last_grade_html_file.write_text(grade_html, encoding='utf-8') logger.info(f"✓ 原始HTML已保存到: {self.last_grade_html_file}") # 提取成绩信息 grade_info = self.extract_grade_info(grade_html) self.last_grade_content_file.write_text(grade_info, encoding='utf-8') logger.info(f"✓ 提取的成绩内容已保存到: {self.last_grade_content_file}") # 显示前500个字符 logger.info("=" * 60) logger.info("提取的成绩内容预览(前500字符):") logger.info("-" * 60) print(grade_info[:500]) if len(grade_info) > 500: logger.info(f"\n... (还有 {len(grade_info) - 500} 个字符)") logger.info("-" * 60) logger.info(f"完整内容请查看文件: {self.last_grade_content_file}") logger.info("=" * 60) def monitor_loop(self): """主监控循环""" logger.info("=" * 50) logger.info("成绩监控程序启动") logger.info(f"检查间隔: {self.config['check_interval']} 秒") logger.info("按 Ctrl+C 停止监控") logger.info("=" * 50) # 首次登录 if not self.login(): logger.error("初始登录失败,程序退出") self.send_error_notification("初始登录失败,无法访问成绩页面") return # 登录成功后等待一段时间再进行首次检查,避免触发"请不要过快点击" wait_time = random.uniform(5, 10) logger.info(f"登录成功,等待 {wait_time:.1f} 秒后开始首次检查(避免请求过快)...") time.sleep(wait_time) consecutive_errors = 0 max_consecutive_errors = 5 while self.running: try: logger.info("-" * 50) logger.info("开始新一轮检查") # 获取成绩页面 grade_html = self.fetch_grade_page() if not grade_html: consecutive_errors += 1 logger.error(f"获取成绩页面失败,等待下次检查 (连续失败: {consecutive_errors}/{max_consecutive_errors})") if consecutive_errors >= max_consecutive_errors: error_msg = f"连续 {consecutive_errors} 次获取成绩页面失败" logger.error(error_msg) self.send_error_notification(error_msg) consecutive_errors = 0 # 重置计数,避免重复发送 time.sleep(self.config['check_interval']) continue # 提取成绩信息 grade_info = self.extract_grade_info(grade_html) # 检查变化 if self.check_grade_changes(grade_info, grade_html): # 获取新增课程列表 courses_file = self.script_dir / '.last_courses.txt' if courses_file.exists(): current_courses = self.parse_courses(grade_info) # 简单通知有新课程 self.send_email_notification(current_courses[:3]) # 只显示前3门 else: self.send_email_notification() # 成功执行,重置错误计数 consecutive_errors = 0 logger.info(f"等待 {self.config['check_interval']} 秒后进行下次检查...") time.sleep(self.config['check_interval']) except Exception as e: consecutive_errors += 1 error_msg = f"监控循环发生异常: {e}" logger.error(error_msg) if consecutive_errors >= max_consecutive_errors: logger.error(f"连续 {consecutive_errors} 次出现异常") self.send_error_notification(f"{error_msg}\n\n连续失败次数: {consecutive_errors}") consecutive_errors = 0 time.sleep(self.config['retry_delay']) def run(self): """运行监控""" try: if self.test_mode: self.test_fetch() else: self.monitor_loop() except KeyboardInterrupt: logger.info("收到键盘中断信号,程序退出") except Exception as e: logger.error(f"程序异常退出: {e}") sys.exit(1) def main(): """主函数""" parser = argparse.ArgumentParser( description='成绩监控系统 - 自动监控成绩变化并发送邮件通知', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 使用示例: # 正常监控模式 python3 monitor.py # 测试模式(获取一次成绩并显示内容) python3 monitor.py --test # 查看保存的成绩内容 cat .last_grade_content.txt # 查看原始HTML cat .last_grade_page.html """ ) parser.add_argument( '--test', action='store_true', help='测试模式:仅获取一次成绩页面并显示内容,不进行监控' ) parser.add_argument( '--config', default='config.ini', help='配置文件路径(默认:config.ini)' ) args = parser.parse_args() monitor = GradeMonitor(config_file=args.config, test_mode=args.test) monitor.run() if __name__ == '__main__': main()