Files
GPA_Monitoring/monitor.py
2026-01-29 05:57:54 +08:00

935 lines
40 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
成绩监控系统
功能:自动登录并监控成绩页面变化
作者GitHub Copilot
日期2026-01-17
"""
import os
import sys
import time
import logging
import hashlib
import smtplib
import ssl
import requests
import argparse
import random
from pathlib import Path
from datetime import datetime
from configparser import ConfigParser
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Optional, Dict
import signal
# 数据目录(用于日志和运行时文件)
_DEFAULT_DATA_DIR = Path(__file__).parent.resolve()
DATA_DIR = Path(os.environ.get('GPA_DATA_DIR', str(_DEFAULT_DATA_DIR))).expanduser()
DATA_DIR.mkdir(parents=True, exist_ok=True)
# 配置日志
logger = logging.getLogger(__name__)
# 避免重复添加处理器
if not logger.handlers:
logger.setLevel(logging.INFO)
# 设置日志格式
formatter = logging.Formatter(
'[%(levelname)s] [%(asctime)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 文件处理器
file_handler = logging.FileHandler(DATA_DIR / 'monitor.log', encoding='utf-8')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# 防止日志传播到根日志记录器
logger.propagate = False
class GradeMonitor:
"""成绩监控类"""
def __init__(self, config_file: str = 'config.ini', test_mode: bool = False):
"""初始化监控器"""
self.config_file = config_file
self.config = self._load_config()
self.test_mode = test_mode
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
})
# 文件路径
self.data_dir = DATA_DIR
self.last_grade_file = self.data_dir / '.last_grade_hash.txt'
self.last_grade_html_file = self.data_dir / '.last_grade_page.html'
self.last_grade_content_file = self.data_dir / '.last_grade_content.txt'
# 运行标志
self.running = True
# 设置信号处理
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""处理终止信号"""
logger.info("收到停止信号,正在退出...")
self.running = False
sys.exit(0)
def _load_config(self) -> Dict:
"""加载配置文件"""
if not os.path.exists(self.config_file):
logger.error(f"配置文件不存在: {self.config_file}")
sys.exit(1)
parser = ConfigParser()
parser.read(self.config_file, encoding='utf-8')
try:
config = {
# 登录配置
'username': parser.get('login', 'USERNAME', fallback=''),
'password': parser.get('login', 'PASSWORD', fallback=''),
'login_url': parser.get('login', 'LOGIN_URL', fallback=''),
'grade_url': parser.get('login', 'GRADE_URL', fallback=''),
# 邮件配置
'sender_email': parser.get('email', 'SENDER_EMAIL', fallback=''),
'sender_password': parser.get('email', 'SENDER_PASSWORD', fallback=''),
'receiver_email': parser.get('email', 'RECEIVER_EMAIL', fallback=''),
'smtp_server': parser.get('email', 'SMTP_SERVER', fallback='smtp.163.com'),
'smtp_port': parser.getint('email', 'SMTP_PORT', fallback=465),
# 监控配置
'check_interval': parser.getint('monitor', 'CHECK_INTERVAL', fallback=60),
'request_delay': parser.getint('monitor', 'REQUEST_DELAY', fallback=5),
'max_retries': parser.getint('monitor', 'MAX_RETRIES', fallback=3),
'retry_delay': parser.getint('monitor', 'RETRY_DELAY', fallback=10),
}
# 验证必要配置
if not config['username'] or not config['password']:
logger.error("请在配置文件中设置 USERNAME 和 PASSWORD")
sys.exit(1)
if not config['sender_email'] or not config['sender_password'] or not config['receiver_email']:
logger.error("请在配置文件中设置邮件相关配置")
sys.exit(1)
logger.info("配置文件加载成功")
return config
except Exception as e:
logger.error(f"配置文件格式错误: {e}")
sys.exit(1)
def login(self) -> bool:
"""登录统一身份认证"""
logger.info("开始登录统一身份认证...")
for attempt in range(self.config['max_retries']):
try:
logger.info(f"登录尝试 {attempt + 1}/{self.config['max_retries']}")
# 第一步:访问成绩页面,会重定向到登录页
logger.info("步骤1访问成绩页面获取登录页面...")
response = self.session.get(
self.config['grade_url'],
timeout=30,
allow_redirects=True
)
# 随机等待5-10秒模拟真实用户
wait_time = random.uniform(5, 10)
logger.info(f"第一次访问完成,等待 {wait_time:.1f} 秒(模拟用户行为)...")
time.sleep(wait_time)
# 检查是否需要登录
if '统一身份认证' in response.text or 'tpass/login' in response.url:
logger.info("需要登录,提取登录表单信息...")
# 提取lt值
import re
lt_match = re.search(r'name="lt"\s+value="([^"]+)"', response.text)
execution_match = re.search(r'name="execution"\s+value="([^"]+)"', response.text)
lt = lt_match.group(1) if lt_match else ''
execution = execution_match.group(1) if execution_match else 'e1s1'
if not lt:
logger.warning("无法提取lt值使用默认值")
lt = '00000000'
logger.info(f"提取到 lt={lt}, execution={execution}")
# 获取登录提交URL
login_submit_url = response.url.split('?')[0]
# 准备登录数据按照login_neu.js的逻辑
username = self.config['username']
password = self.config['password']
login_data = {
'rsa': username + password + lt,
'ul': str(len(username)),
'pl': str(len(password)),
'sl': '0',
'lt': lt,
'execution': execution,
'_eventId': 'submit'
}
logger.info(f"步骤2提交登录表单到 {login_submit_url}")
# 随机等待5-10秒后提交模拟用户填写表单
wait_time = random.uniform(5, 10)
logger.info(f"等待 {wait_time:.1f} 秒后提交登录(模拟填写表单)...")
time.sleep(wait_time)
response = self.session.post(
login_submit_url,
data=login_data,
timeout=30,
allow_redirects=True
)
# 登录后等待5-10秒让服务器处理
wait_time = random.uniform(5, 10)
logger.info(f"登录表单已提交,等待 {wait_time:.1f} 秒处理...")
time.sleep(wait_time)
# 第三步:再次访问成绩页面验证登录
logger.info("步骤3验证登录状态访问成绩页面...")
# 随机等待5-10秒模拟用户操作
wait_time = random.uniform(5, 10)
logger.info(f"等待 {wait_time:.1f} 秒后访问成绩页面(模拟用户操作)...")
time.sleep(wait_time)
response = self.session.get(
self.config['grade_url'],
timeout=30,
allow_redirects=True
)
# 检查是否成功(不再是登录页面,且不是"请不要过快点击"
if '统一身份认证' not in response.text and 'tpass/login' not in response.url and '请不要过快点击' not in response.text:
# 进一步验证:检查是否包含成绩相关内容
if '学年学期' in response.text or '课程名称' in response.text or '成绩' in response.text:
logger.info("登录成功!成功访问成绩页面")
return True
else:
logger.warning(f"页面内容异常,可能不是成绩页面")
logger.info(f"当前URL: {response.url}")
else:
if '请不要过快点击' in response.text:
wait_time = 30 + (attempt * 15) # 大幅递增等待时间30秒、45秒、60秒...
logger.warning(f"⚠️ 请求过快被拦截!等待 {wait_time} 秒后重试...")
logger.info("建议1) 检查 config.ini 中的 CHECK_INTERVAL 是否 >= 120 秒")
logger.info(" 2) 避免频繁手动测试建议间隔至少5分钟")
time.sleep(wait_time)
continue # 直接进入下一次循环不再额外sleep
else:
logger.warning(f"登录失败,尝试重试 {attempt + 1}/{self.config['max_retries']}")
logger.info(f"当前URL: {response.url}")
except requests.RequestException as e:
logger.warning(f"登录请求异常: {e},尝试重试 {attempt + 1}/{self.config['max_retries']}")
if '请不要过快点击' not in response.text:
logger.info(f"等待 {self.config['retry_delay']} 秒后重试...")
time.sleep(self.config['retry_delay'])
logger.error("登录失败,已达到最大重试次数")
logger.info("建议1) 检查用户名密码是否正确")
logger.info(" 2) 手动在浏览器中登录一次,确认账号正常")
logger.info(" 3) 查看 .debug_response.html 了解实际响应内容")
return False
def fetch_grade_page(self) -> Optional[str]:
"""获取成绩页面"""
logger.info("获取成绩页面...")
for attempt in range(self.config['max_retries']):
try:
# 在每次请求前增加延迟,避免触发"请不要过快点击"
if attempt > 0:
wait_time = random.uniform(5, 8)
logger.info(f"等待 {wait_time:.1f} 秒后发送请求...")
time.sleep(wait_time)
response = self.session.get(
self.config['grade_url'],
timeout=30
)
# 检查是否触发"请不要过快点击"
if '请不要过快点击' in response.text:
wait_time = 30 + (attempt * 15)
logger.warning(f"⚠️ 请求过快被拦截!等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
continue
if response.status_code == 200:
return response.text
elif response.status_code in [302, 401]:
logger.warning("会话已过期,重新登录...")
if self.login():
continue
else:
return None
else:
logger.warning(f"获取成绩页面失败 (HTTP {response.status_code}),尝试重试 {attempt + 1}/{self.config['max_retries']}")
except requests.RequestException as e:
logger.warning(f"获取成绩页面异常: {e},尝试重试 {attempt + 1}/{self.config['max_retries']}")
time.sleep(self.config['retry_delay'])
logger.error("获取成绩页面失败,已达到最大重试次数")
return None
def extract_grade_info(self, html: str) -> str:
"""提取成绩信息的关键内容"""
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
result = []
# 检查是否是登录页面
if soup.find('input', {'name': 'username'}) or soup.find('input', {'name': 'password'}):
logger.error("检测到登录页面,可能需要重新登录")
return "LOGIN_REQUIRED"
# 提取总平均绩点
gpa_div = soup.find('div', string=lambda x: x and '总平均绩点' in x)
if gpa_div:
gpa_text = gpa_div.get_text(strip=True)
result.append("=" * 60)
result.append(gpa_text)
result.append("=" * 60)
result.append("")
logger.debug(f"找到总平均绩点: {gpa_text}")
# 尝试多种方式查找成绩表格
table = soup.find('table', {'class': 'gridtable'})
if not table:
table = soup.find('table', {'id': 'dataList'})
if not table:
# 尝试查找任何包含成绩相关标题的表格
for tbl in soup.find_all('table'):
headers = tbl.find_all('th')
if headers and any('课程名称' in th.get_text() or '成绩' in th.get_text() for th in headers):
table = tbl
logger.info("通过表头关键词找到成绩表格")
break
if table:
logger.debug("找到成绩表格")
# 提取表头
thead = table.find('thead')
if thead:
headers = [th.get_text(strip=True) for th in thead.find_all('th')]
result.append(' | '.join(headers))
result.append("-" * 120)
logger.debug(f"表头: {headers}")
# 提取每一行成绩
tbody = table.find('tbody')
if tbody:
rows = tbody.find_all('tr')
logger.debug(f"找到 {len(rows)} 行成绩数据")
for row in rows:
cells = row.find_all('td')
if cells:
row_data = []
for cell in cells:
# 提取文本包括sup标签的内容
text = cell.get_text(strip=True, separator=' ')
row_data.append(text)
result.append(' | '.join(row_data))
else:
logger.warning("未找到成绩表格")
if not result:
logger.warning("未能提取到成绩信息,页面可能结构异常")
# 保存HTML以便调试
debug_file = self.data_dir / 'debug_page.html'
debug_file.write_text(html, encoding='utf-8')
logger.info(f"已保存HTML到 {debug_file} 供调试")
# 记录HTML摘要用于诊断
html_preview = html[:500].replace('\n', ' ')
logger.debug(f"HTML前500字符: {html_preview}")
# 检查是否是网络错误或空页面
if len(html) < 100:
logger.warning(f"HTML内容过短({len(html)}字节),可能是网络问题")
return "NETWORK_ERROR"
return self._fallback_extract(html)
logger.info(f"成功提取成绩信息,共 {len(result)}")
return '\n'.join(result)
except ImportError:
logger.warning("未安装 beautifulsoup4使用简单文本提取")
return self._fallback_extract(html)
except Exception as e:
logger.error(f"提取成绩信息时出错: {e}")
return self._fallback_extract(html)
def _fallback_extract(self, html: str) -> str:
"""备用的简单文本提取方法"""
import re
# 移除script和style标签
html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL)
html = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.DOTALL)
return html
def parse_courses(self, grade_text: str) -> list:
"""解析成绩文本,提取课程列表"""
# 检查是否需要重新登录
if grade_text == "LOGIN_REQUIRED":
logger.warning("检测到登录失效,需要重新登录")
return []
# 检查是否是网络错误
if grade_text == "NETWORK_ERROR":
logger.warning("检测到网络错误,返回空列表")
return []
courses = []
lines = grade_text.split('\n')
# 检查是否是原始HTML未成功解析的标记
if '<html' in grade_text.lower() or '<body' in grade_text.lower():
logger.warning("检测到未解析的HTML内容成绩提取可能失败")
logger.debug(f"文本前100字符: {grade_text[:100]}")
return []
logger.debug(f"开始解析成绩,共 {len(lines)}")
for line in lines:
# 跳过标题行、分隔行和空行
stripped = line.strip()
# 跳过空行、纯分隔符行、标题行
if not stripped or stripped.startswith('=') or stripped.startswith('-') or '学年学期' in line or '总平均绩点' in line:
continue
# 解析课程行
parts = [p.strip() for p in line.split('|')]
if len(parts) >= 4: # 至少有学期、代码、序号、名称
# 提取关键信息:学期 + 课程代码 + 课程名称
semester = parts[0] if len(parts) > 0 else ''
course_code = parts[1] if len(parts) > 1 else ''
course_name = parts[3] if len(parts) > 3 else ''
if course_code and course_name:
course_key = f"{semester}|{course_code}|{course_name}"
courses.append(course_key)
logger.debug(f"解析到课程: {course_name}")
logger.info(f"共解析到 {len(courses)} 门课程")
return courses
def calculate_hash(self, content: str) -> str:
"""计算内容的哈希值"""
return hashlib.md5(content.encode('utf-8')).hexdigest()
def check_grade_changes(self, current_content: str, current_html: str = None) -> tuple:
"""检查成绩是否有变化
Returns:
tuple: (是否有变化: bool, 新增课程列表: list)
"""
# 保存当前内容(供用户查看)
self.last_grade_content_file.write_text(current_content, encoding='utf-8')
if current_html:
self.last_grade_html_file.write_text(current_html, encoding='utf-8')
# 解析当前课程列表
current_courses = self.parse_courses(current_content)
if not self.last_grade_file.exists():
logger.info("=" * 60)
logger.info("首次运行,保存当前成绩状态")
logger.info(f"当前共有 {len(current_courses)} 门课程")
logger.info(f"原始HTML已保存到: {self.last_grade_html_file}")
logger.info(f"提取的成绩内容已保存到: {self.last_grade_content_file}")
logger.info("请检查这些文件确认内容是否正确!")
logger.info("=" * 60)
# 保存课程列表
courses_file = self.data_dir / '.last_courses.txt'
courses_file.write_text('\n'.join(current_courses), encoding='utf-8')
# 计算哈希
current_hash = self.calculate_hash(current_content)
self.last_grade_file.write_text(current_hash, encoding='utf-8')
return (False, [])
# 读取上次的课程列表
courses_file = self.data_dir / '.last_courses.txt'
if courses_file.exists():
last_courses = courses_file.read_text(encoding='utf-8').strip().split('\n')
else:
last_courses = []
# 比较课程数量和内容,检测新增课程
new_courses = [c for c in current_courses if c not in last_courses]
# 检测课程成绩变化(课程存在但内容不同)
current_hash = self.calculate_hash(current_content)
last_hash = self.last_grade_file.read_text(encoding='utf-8').strip() if self.last_grade_file.exists() else ''
content_changed = current_hash != last_hash
if new_courses:
logger.info("=" * 60)
logger.info(f"✓ 检测到新增课程成绩!共 {len(new_courses)}")
logger.info("-" * 60)
for idx, course in enumerate(new_courses, 1):
parts = course.split('|')
if len(parts) >= 3:
semester = parts[0] if parts[0] else '未知学期'
course_name = parts[2] if len(parts) > 2 else '未知课程'
course_code = parts[1] if len(parts) > 1 else ''
logger.info(f" {idx}. [{semester}] {course_name} ({course_code})")
logger.info("=" * 60)
logger.info(f"详细内容已保存到: {self.last_grade_content_file}")
# 更新保存的课程列表和哈希
courses_file.write_text('\n'.join(current_courses), encoding='utf-8')
self.last_grade_file.write_text(current_hash, encoding='utf-8')
return (True, new_courses)
elif content_changed and current_courses:
# 课程数量没变,但内容变了(可能是成绩更新)
logger.info("=" * 60)
logger.info("✓ 检测到成绩内容发生变化(可能是成绩更新)")
logger.info(f"当前共有 {len(current_courses)} 门课程")
logger.info("=" * 60)
logger.info(f"详细内容已保存到: {self.last_grade_content_file}")
# 更新哈希值
self.last_grade_file.write_text(current_hash, encoding='utf-8')
return (True, []) # 内容变化但没有新增课程
else:
logger.info(f"成绩无变化(共 {len(current_courses)} 门课程)")
return (False, [])
def send_email_notification(self, new_courses: list = None) -> bool:
"""发送邮件通知"""
logger.info("准备发送邮件通知...")
if new_courses:
subject = f"【成绩更新通知】新增 {len(new_courses)} 门课程成绩"
courses_text = "\n".join([f" - {course.split('|')[0]} {course.split('|')[2]}" for course in new_courses if len(course.split('|')) >= 3])
body = f"""尊敬的用户:
您好!系统检测到成绩页面已更新。
新增课程(共{len(new_courses)}门):
{courses_text}
请及时查看详细成绩。
查看地址:{self.config['grade_url']}
检测时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
此邮件由系统自动发送,请勿回复。"""
else:
subject = "【成绩更新通知】成绩页面已更新"
body = f"""尊敬的用户:
您好!系统检测到成绩页面已更新。
请及时查看详细成绩。
查看地址:{self.config['grade_url']}
检测时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
此邮件由系统自动发送,请勿回复。"""
try:
message = MIMEMultipart("alternative")
message["Subject"] = subject
message["From"] = self.config['sender_email']
message["To"] = self.config['receiver_email']
part = MIMEText(body, "plain", "utf-8")
message.attach(part)
context = ssl.create_default_context()
with smtplib.SMTP_SSL(
self.config['smtp_server'],
self.config['smtp_port'],
context=context
) as server:
server.login(
self.config['sender_email'],
self.config['sender_password']
)
server.sendmail(
self.config['sender_email'],
self.config['receiver_email'],
message.as_string()
)
logger.info("邮件发送成功")
return True
except Exception as e:
logger.error(f"邮件发送失败: {e}")
return False
def send_error_notification(self, error_msg: str) -> bool:
"""发送错误通知邮件"""
logger.info("准备发送错误通知邮件...")
subject = "【成绩监控系统】程序运行出错"
body = f"""尊敬的用户:
成绩监控系统在运行过程中遇到错误:
错误信息:
{error_msg}
发生时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
请检查系统状态或查看日志文件 monitor.log 了解详情。
此邮件由系统自动发送,请勿回复。"""
try:
message = MIMEMultipart("alternative")
message["Subject"] = subject
message["From"] = self.config['sender_email']
message["To"] = self.config['receiver_email']
part = MIMEText(body, "plain", "utf-8")
message.attach(part)
context = ssl.create_default_context()
with smtplib.SMTP_SSL(
self.config['smtp_server'],
self.config['smtp_port'],
context=context
) as server:
server.login(
self.config['sender_email'],
self.config['sender_password']
)
server.sendmail(
self.config['sender_email'],
self.config['receiver_email'],
message.as_string()
)
logger.info("错误通知邮件发送成功")
return True
except Exception as e:
logger.error(f"错误通知邮件发送失败: {e}")
return False
def test_fetch(self):
"""测试模式:获取一次成绩并显示内容"""
logger.info("=" * 60)
logger.info("测试模式:获取成绩页面")
logger.info("=" * 60)
# 登录
if not self.login():
logger.error("登录失败,无法获取成绩页面")
return
# 获取成绩页面
logger.info("正在获取成绩页面...")
grade_html = self.fetch_grade_page()
if not grade_html:
logger.error("获取成绩页面失败")
return
# 保存原始HTML
self.last_grade_html_file.write_text(grade_html, encoding='utf-8')
logger.info(f"✓ 原始HTML已保存到: {self.last_grade_html_file}")
# 提取成绩信息
grade_info = self.extract_grade_info(grade_html)
self.last_grade_content_file.write_text(grade_info, encoding='utf-8')
logger.info(f"✓ 提取的成绩内容已保存到: {self.last_grade_content_file}")
# 显示前500个字符
logger.info("=" * 60)
logger.info("提取的成绩内容预览前500字符")
logger.info("-" * 60)
print(grade_info[:500])
if len(grade_info) > 500:
logger.info(f"\n... (还有 {len(grade_info) - 500} 个字符)")
logger.info("-" * 60)
logger.info(f"完整内容请查看文件: {self.last_grade_content_file}")
logger.info("=" * 60)
def monitor_loop(self):
"""主监控循环"""
logger.info("=" * 50)
logger.info("成绩监控程序启动")
logger.info(f"检查间隔: {self.config['check_interval']}")
logger.info("按 Ctrl+C 停止监控")
logger.info("=" * 50)
# 首次登录
if not self.login():
logger.error("初始登录失败,程序退出")
self.send_error_notification("初始登录失败,无法访问成绩页面")
return
# 登录成功后等待一段时间再进行首次检查,避免触发"请不要过快点击"
wait_time = random.uniform(5, 10)
logger.info(f"登录成功,等待 {wait_time:.1f} 秒后开始首次检查(避免请求过快)...")
time.sleep(wait_time)
consecutive_errors = 0
max_consecutive_errors = 5
while self.running:
try:
logger.info("-" * 50)
logger.info("开始新一轮检查")
# 获取成绩页面
grade_html = self.fetch_grade_page()
if not grade_html:
consecutive_errors += 1
logger.error(f"获取成绩页面失败,等待下次检查 (连续失败: {consecutive_errors}/{max_consecutive_errors})")
# 连续失败3次尝试重新登录
if consecutive_errors >= 3:
logger.warning("⚠️ 尝试重新登录以解决页面获取问题...")
if self.login():
logger.info("✓ 重新登录成功,重置错误计数")
consecutive_errors = 0 # 重置错误计数
time.sleep(5)
continue
else:
logger.error("✗ 重新登录失败")
if consecutive_errors >= max_consecutive_errors:
error_msg = f"连续 {consecutive_errors} 次获取成绩页面失败"
logger.error(error_msg)
self.send_error_notification(f"{error_msg}\n\n可能需要检查网络或账号状态")
consecutive_errors = 0 # 重置计数,避免重复发送
time.sleep(self.config['check_interval'])
continue
# 提取成绩信息
grade_info = self.extract_grade_info(grade_html)
# 检查是否需要重新登录
if grade_info == "LOGIN_REQUIRED":
consecutive_errors += 1
logger.warning(f"检测到会话过期,尝试重新登录... (连续失败: {consecutive_errors}/{max_consecutive_errors})")
if self.login():
logger.info("重新登录成功,继续监控")
consecutive_errors = 0 # 重置错误计数
else:
logger.error("重新登录失败")
if consecutive_errors >= max_consecutive_errors:
error_msg = f"连续 {consecutive_errors} 次登录失败,会话可能已过期"
logger.error(error_msg)
self.send_error_notification(error_msg)
consecutive_errors = 0
time.sleep(self.config['check_interval'])
continue
# 检查是否是网络错误(临时性问题)
if grade_info == "NETWORK_ERROR":
consecutive_errors += 1
logger.warning(f"检测到网络错误或页面过短,可能是临时问题 (连续失败: {consecutive_errors}/{max_consecutive_errors})")
# 连续失败3次尝试重新登录
if consecutive_errors >= 3:
logger.warning("⚠️ 尝试重新登录以解决网络问题...")
if self.login():
logger.info("✓ 重新登录成功,重置错误计数")
consecutive_errors = 0 # 重置错误计数
time.sleep(5)
continue
else:
logger.error("✗ 重新登录失败")
if consecutive_errors >= max_consecutive_errors:
error_msg = f"连续 {consecutive_errors} 次网络错误"
logger.error(error_msg)
self.send_error_notification(f"{error_msg}\n\n可能需要检查网络连接或服务器状态")
consecutive_errors = 0
else:
# 临时错误,短暂等待后重试
logger.info(f"等待 30 秒后重试...")
time.sleep(30)
continue
# 解析课程列表
current_courses = self.parse_courses(grade_info)
# 如果解析失败0门课程且之前有课程可能是临时问题
if len(current_courses) == 0:
# 检查是否之前有课程记录
courses_file = self.data_dir / '.last_courses.txt'
if courses_file.exists():
try:
previous_courses = courses_file.read_text(encoding='utf-8').strip().split('\n')
previous_courses = [c for c in previous_courses if c]
if len(previous_courses) > 0:
consecutive_errors += 1
logger.warning(f"⚠️ 解析到0门课程但之前有{len(previous_courses)}门课程,可能是临时问题")
logger.warning(f"连续异常: {consecutive_errors}/{max_consecutive_errors}")
if consecutive_errors >= 3: # 连续3次解析失败尝试重新登录
error_msg = f"连续 {consecutive_errors} 次无法解析课程(之前有{len(previous_courses)}门课程)"
logger.error(error_msg)
# 尝试重新登录
logger.warning("⚠️ 尝试重新登录以解决解析问题...")
if self.login():
logger.info("✓ 重新登录成功,重置错误计数")
consecutive_errors = 0 # 重置错误计数
time.sleep(5) # 等待一下再继续
continue
else:
logger.error("✗ 重新登录失败")
if consecutive_errors >= max_consecutive_errors:
self.send_error_notification(f"{error_msg}\n\n重新登录失败,可能需要人工介入")
consecutive_errors = 0
else:
# 短暂等待后重试
logger.info(f"等待 30 秒后重试...")
time.sleep(30)
continue
except Exception as e:
logger.debug(f"检查历史课程时出错: {e}")
# 检查变化
has_changes, new_courses = self.check_grade_changes(grade_info, grade_html)
if has_changes:
if new_courses:
# 有新增课程,发送包含课程列表的通知
logger.info(f"共解析到 {len(current_courses)} 门课程")
self.send_email_notification(new_courses)
else:
# 没有新增课程但内容变化(成绩更新),发送通用通知
self.send_email_notification()
# 成功执行,重置错误计数
consecutive_errors = 0
logger.info(f"等待 {self.config['check_interval']} 秒后进行下次检查...")
time.sleep(self.config['check_interval'])
except Exception as e:
consecutive_errors += 1
error_msg = f"监控循环发生异常: {e}"
logger.error(error_msg)
if consecutive_errors >= max_consecutive_errors:
logger.error(f"连续 {consecutive_errors} 次出现异常")
self.send_error_notification(f"{error_msg}\n\n连续失败次数: {consecutive_errors}")
consecutive_errors = 0
time.sleep(self.config['retry_delay'])
def run(self):
"""运行监控"""
try:
if self.test_mode:
self.test_fetch()
else:
self.monitor_loop()
except KeyboardInterrupt:
logger.info("收到键盘中断信号,程序退出")
except Exception as e:
logger.error(f"程序异常退出: {e}")
sys.exit(1)
def main():
"""主函数"""
parser = argparse.ArgumentParser(
description='成绩监控系统 - 自动监控成绩变化并发送邮件通知',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
使用示例:
# 正常监控模式
python3 monitor.py
# 测试模式(获取一次成绩并显示内容)
python3 monitor.py --test
# 调试模式(显示详细日志)
python3 monitor.py --debug
# 查看保存的成绩内容
cat .last_grade_content.txt
# 查看原始HTML
cat .last_grade_page.html
"""
)
parser.add_argument(
'--test',
action='store_true',
help='测试模式:仅获取一次成绩页面并显示内容,不进行监控'
)
parser.add_argument(
'--debug',
action='store_true',
help='调试模式:显示详细的调试信息'
)
parser.add_argument(
'--config',
default='config.ini',
help='配置文件路径默认config.ini'
)
args = parser.parse_args()
# 如果启用调试模式,调整日志级别
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
logger.info("已启用调试模式")
monitor = GradeMonitor(config_file=args.config, test_mode=args.test)
monitor.run()
if __name__ == '__main__':
main()