This commit is contained in:
ChuXun
2025-12-29 03:11:16 +08:00
parent c882a7a216
commit 8d4419b1a0
5 changed files with 702 additions and 65 deletions

View File

@@ -1,77 +1,96 @@
import pandas as pd
import numpy as np
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, WhiteKernel, ConstantKernel as C
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PolynomialFeatures
import re
try:
# 1. 加载数据
df0 = pd.read_csv('input_file_0.csv')
df1 = pd.read_csv('input_file_1.csv')
df = pd.concat([df0, df1], axis=0, ignore_index=True)
# 清洗列名
df.columns = [c.strip() for c in df.columns]
def clean_male_fetus_data(file_path):
"""
读取并清洗您的 '男胎数据.csv',提取关键特征用于建模。
"""
try:
# 1. 读取数据
df = pd.read_csv(file_path)
print(f"原始数据行数: {len(df)}")
# 筛选男胎
male_df = df[df['Y染色体浓度'].notna() & (df['Y染色体浓度'] > 0)].copy()
# 2. 列名映射 (根据您文件中的实际表头)
col_map = {
'孕妇代码': 'ID',
'年龄': 'Age',
'身高': 'Height',
'体重': 'Weight',
'检测孕周': 'GA_Raw', # 原始的 "11w+6" 格式
'孕妇BMI': 'BMI',
'Y染色体浓度': 'Y_Conc', # 关键列
'检测日期': 'TestTime'
}
# 重命名列,未在字典中的列保持原名
df_clean = df.rename(columns=col_map)
# 解析孕周
def parse_ga(s):
try:
# 3. 清洗孕周 (解析 "11w+6" 为 11.857 周)
def parse_ga(s):
if pd.isna(s): return np.nan
s = str(s).lower().replace('w', ' ').replace('+', ' ').split()
return float(s[0]) + (float(s[1])/7.0 if len(s) > 1 else 0)
except:
return np.nan
s = str(s).strip().lower()
try:
weeks = 0
days = 0
# 如果是纯数字 (如 "12.5")
if re.match(r'^\d+(\.\d+)?$', s):
return float(s)
# 提取周数 (如 "11w")
if 'w' in s:
parts = s.split('w')
weeks = float(parts[0])
# 提取天数 (如 "+6" 或 "6d")
if len(parts) > 1 and parts[1]:
d_part = parts[1]
# 提取其中的数字
d_match = re.search(r'(\d+)', d_part)
if d_match:
days = float(d_match.group(1))
if weeks == 0 and days == 0:
return np.nan
return weeks + days / 7.0
except:
return np.nan
male_df['GA_numeric'] = male_df['检测孕周'].apply(parse_ga)
# 剔除缺失值
male_df = male_df.dropna(subset=['Y染色体浓度', 'GA_numeric', '孕妇BMI'])
df_clean['GA'] = df_clean['GA_Raw'].apply(parse_ga)
# 2. 准备数据
X = male_df[['GA_numeric', '孕妇BMI']].values
y = male_df['Y染色体浓度'].values
# 4. 数值转换
numeric_cols = ['Age', 'Height', 'Weight', 'BMI', 'GA', 'Y_Conc']
for col in numeric_cols:
if col in df_clean.columns:
df_clean[col] = pd.to_numeric(df_clean[col], errors='coerce')
# 数据集划分
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 5. 过滤有效数据
# 必须包含: 孕周, BMI, Y浓度
# 过滤掉物理上不可能的数值 (异常值处理)
df_male = df_clean.dropna(subset=['GA', 'BMI', 'Y_Conc']).copy()
df_male = df_male[
(df_male['BMI'] > 10) & (df_male['BMI'] < 60) & # 合理BMI范围
(df_male['GA'] > 0) & (df_male['GA'] < 45) & # 合理孕周范围
(df_male['Y_Conc'] > 0) # Y浓度必须存在
]
# 3. 建立 GPR 模型
# 关键:数据标准化对 GPR 至关重要
scaler_X = StandardScaler()
X_train_scaled = scaler_X.fit_transform(X_train)
X_test_scaled = scaler_X.transform(X_test)
print(f"清洗完成,有效样本数: {len(df_male)}")
return df_male
# Kernel设计: RBF(趋势) + WhiteKernel(噪声)
# 初始 length_scale 设为 1.0 (因为数据已标准化)
kernel = C(1.0) * RBF(length_scale=1.0) + WhiteKernel(noise_level=0.1)
except Exception as e:
print(f"数据清洗发生错误: {e}")
return None
# 训练 (n_restarts_optimizer=0 以节省时间,实际建模建议设为 5-10)
gpr = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=0, random_state=42)
gpr.fit(X_train_scaled, y_train)
# ==========================================
# 运行清洗
# ==========================================
file_name = '男胎数据.csv' # 您的文件名
cleaned_data = clean_male_fetus_data(file_name)
# 4. 建立对比模型 (交互项回归)
poly = make_pipeline(
PolynomialFeatures(degree=2, interaction_only=True, include_bias=False),
LinearRegression()
)
poly.fit(X_train, y_train)
# 5. 评估
y_pred_gpr, y_std = gpr.predict(X_test_scaled, return_std=True)
y_pred_poly = poly.predict(X_test)
print("=== 模型性能大比拼 (Test Set) ===")
print(f"GPR RMSE: {np.sqrt(mean_squared_error(y_test, y_pred_gpr)):.6f}")
print(f"Poly RMSE: {np.sqrt(mean_squared_error(y_test, y_pred_poly)):.6f}")
print(f"GPR R2: {r2_score(y_test, y_pred_gpr):.4f}")
print(f"Poly R2: {r2_score(y_test, y_pred_poly):.4f}")
print(f"\nGPR Learned Kernel Params: {gpr.kernel_}")
except Exception as e:
print(f"Error: {e}")
if cleaned_data is not None:
# 显示前5行检查结果
print(cleaned_data[['ID', 'Age', 'BMI', 'GA_Raw', 'GA', 'Y_Conc']].head())
# 保存结果
# cleaned_data.to_csv('cleaned_male_data.csv', index=False)