Json-Python-Server/app/api/routes/analysis.py

190 lines
7.3 KiB
Python
Raw Permalink Normal View History

2026-01-29 18:18:32 +08:00
"""
分析路由
"""
import logging
import json
from datetime import datetime
from typing import Optional, Dict, Any, List
from fastapi import APIRouter, HTTPException, status, BackgroundTasks
from pydantic import BaseModel
import psutil
import os
import gc
import shutil
from app.core.config import settings
from app.services.analysis import TimeSeriesAnalysisSystem
logger = logging.getLogger(__name__)
router = APIRouter()
class AnalysisRequest(BaseModel):
"""分析请求模型"""
filename: str
file_type: str = "csv"
task_description: str = "时间序列数据分析"
data_background: Dict[str, Any] = {}
original_image: Optional[str] = None
language: str = "zh"
generate_plots: bool = False
@router.get("/available_methods", summary="获取可用的分析方法")
async def get_available_methods() -> dict:
"""获取所有可用的分析方法"""
return {
"success": True,
"methods": {
'statistical_overview': {'name': '统计概览', 'description': '生成数据的基本统计信息和分布图表'},
'time_series_analysis': {'name': '时间序列分析', 'description': '分析变量随时间变化的趋势和模式'},
'acf_pacf_analysis': {'name': '自相关分析', 'description': '生成自相关和偏自相关函数图'},
'stationarity_tests': {'name': '平稳性检验', 'description': '执行ADF、KPSS等平稳性检验'},
'normality_tests': {'name': '正态性检验', 'description': '执行Shapiro-Wilk、Jarque-Bera正态性检验'},
'seasonal_decomposition': {'name': '季节性分解', 'description': '分解时间序列的趋势、季节和残差成分'},
'spectral_analysis': {'name': '频谱分析', 'description': '分析时间序列的频域特征'},
'correlation_analysis': {'name': '相关性分析', 'description': '计算变量间的相关性并生成热力图'},
'pca_scree_plot': {'name': 'PCA碎石图', 'description': '显示主成分分析的解释方差'},
'pca_analysis': {'name': '主成分分析', 'description': '降维分析,识别数据的主要变化方向'},
'feature_importance': {'name': '特征重要性', 'description': '分析各变量对目标预测的重要性'},
'clustering_analysis': {'name': '聚类分析', 'description': '将数据点分组为具有相似特征的簇'},
'factor_analysis': {'name': '因子分析', 'description': '识别潜在的因子结构'},
'cointegration_test': {'name': '协整检验', 'description': '检验时间序列变量间的长期均衡关系'},
'var_analysis': {'name': '向量自回归', 'description': '多变量时间序列建模和预测'}
}
}
def check_memory():
"""检查内存使用"""
process = psutil.Process(os.getpid())
memory_mb = process.memory_info().rss / 1024 / 1024
logger.info(f"当前内存使用: {memory_mb:.2f} MB")
if memory_mb > settings.MAX_MEMORY_MB:
logger.warning(f"内存使用超过阈值 ({settings.MAX_MEMORY_MB} MB),执行垃圾回收")
gc.collect()
@router.post("/analyze", summary="执行完整分析")
async def analyze_data(request: AnalysisRequest, background_tasks: BackgroundTasks) -> dict:
"""
执行完整的时间序列分析
流程:
1. 加载并预处理数据
2. 执行15种分析方法
3. 调用AI API 进行深度分析
4. 生成PDF/PPT/HTML报告
"""
try:
logger.info("=" * 60)
logger.info(f"开始分析: {request.filename}")
logger.info(f"任务: {request.task_description}")
logger.info(f"语言: {request.language}")
logger.info("=" * 60)
# 检查内存
check_memory()
# 检查文件存在
file_path = settings.get_upload_path(request.filename)
if not file_path.exists():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"文件未找到: {request.filename}"
)
# 语言处理:支持 zh/en其他值回退为 zh
lang_key = request.language if request.language in {"zh", "en"} else "zh"
# charts 模式下强制不生成图片,即使请求传了 generate_plots=true
generate_plots = False
if request.generate_plots:
logger.info("generate_plots requested true, forcing false to skip image generation")
# 创建分析器实例
logger.info(f"初始化分析器 ({lang_key})...")
analyzer = TimeSeriesAnalysisSystem(
str(file_path),
request.task_description,
data_background=request.data_background,
language=lang_key,
generate_plots=generate_plots
)
# 运行分析
logger.info("执行分析...")
results_zh, log_zh = analyzer.run_analysis()
if results_zh is None:
logger.error("中文分析失败")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="分析失败"
)
logger.info("中文分析完成")
# 准备返回数据
response_data = {
"success": True,
"meta": {
"filename": request.filename,
"task_description": request.task_description,
"language": lang_key,
"generate_plots": generate_plots,
"created_at": datetime.now().isoformat(),
},
"analysis": {
lang_key: {
"pdf_filename": None,
"ppt_filename": None,
"data_description": results_zh.get("data_description"),
"preprocessing_steps": results_zh.get("preprocessing_steps", []),
"api_analysis": results_zh.get("api_analysis", {}),
"steps": results_zh.get("steps", []),
"charts": results_zh.get("charts", {}),
}
},
"images": {},
"log": log_zh[-20:] if log_zh else [],
"original_image": request.original_image if request.file_type == 'image' else None,
}
# 兼容旧前端:始终提供 analysis.zh
if lang_key != "zh":
response_data["analysis"]["zh"] = response_data["analysis"][lang_key]
analysis_bucket = response_data["analysis"][lang_key]
# 去除任何遗留的 image_path兼容旧结构
steps = analysis_bucket.get("steps")
if isinstance(steps, list):
for step in steps:
if isinstance(step, dict) and "image_path" in step:
step.pop("image_path", None)
# images 保持为空兼容旧前端
response_data["images"] = {}
logger.info("分析完成")
return response_data
except HTTPException:
raise
except Exception as e:
logger.error(f"分析异常: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)