1062 lines
44 KiB
Python
1062 lines
44 KiB
Python
import os
|
||
import tempfile
|
||
import pandas as pd
|
||
import numpy as np
|
||
import matplotlib
|
||
import time
|
||
import re
|
||
import requests
|
||
from datetime import datetime
|
||
import warnings
|
||
import gc
|
||
import math
|
||
from decimal import Decimal
|
||
from typing import Any, Dict, List, Tuple
|
||
from openai import OpenAI
|
||
|
||
matplotlib.use('Agg')
|
||
import matplotlib.pyplot as plt
|
||
from sklearn.preprocessing import StandardScaler
|
||
from app.services.font_manager import FontManager
|
||
|
||
from app.services.analysis.modules.basic import (
|
||
generate_statistical_overview as _generate_statistical_overview,
|
||
perform_normality_tests as _perform_normality_tests,
|
||
)
|
||
from app.services.analysis.modules.modeling import (
|
||
analyze_feature_importance as _analyze_feature_importance,
|
||
perform_var_analysis as _perform_var_analysis,
|
||
)
|
||
from app.services.analysis.modules.multivariate import (
|
||
generate_correlation_heatmap as _generate_correlation_heatmap,
|
||
generate_pca_scree_plot as _generate_pca_scree_plot,
|
||
perform_clustering_analysis as _perform_clustering_analysis,
|
||
perform_factor_analysis as _perform_factor_analysis,
|
||
perform_pca_analysis as _perform_pca_analysis,
|
||
)
|
||
from app.services.analysis.modules.stationarity import (
|
||
perform_cointegration_test as _perform_cointegration_test,
|
||
perform_stationarity_tests as _perform_stationarity_tests,
|
||
)
|
||
from app.services.analysis.modules.time_series import (
|
||
generate_acf_pacf_plots as _generate_acf_pacf_plots,
|
||
generate_time_series_plots as _generate_time_series_plots,
|
||
perform_seasonal_decomposition as _perform_seasonal_decomposition,
|
||
perform_spectral_analysis as _perform_spectral_analysis,
|
||
)
|
||
|
||
|
||
class TimeSeriesAnalysisSystem:
|
||
def __init__(self, csv_path, task_description, data_background=None, language='en', generate_plots=False):
|
||
self.csv_path = csv_path
|
||
self.task_description = task_description
|
||
self.data_background = data_background or {}
|
||
self.language = language
|
||
self.generate_plots = generate_plots
|
||
self.data = None
|
||
self.temp_dir = tempfile.TemporaryDirectory()
|
||
|
||
# 统一字体设置
|
||
self.chinese_font = self.setup_fonts(language)
|
||
|
||
# 初始化API客户端
|
||
self.client_config = self.create_qwen_client()
|
||
if not self.client_config:
|
||
print("警告: 无法初始化API客户端,将使用模拟分析")
|
||
|
||
# 分析日志
|
||
self.analysis_log = []
|
||
self.preprocessing_steps = []
|
||
|
||
# ---- 通用清洗工具 ----
|
||
def to_echarts_safe(self, obj: Any, _seen: Tuple[int, ...] = ()):
|
||
"""递归清洗为 JSON/ECharts 可序列化结构,处理 NaN/Inf/Timestamp/numpy/Decimal。
|
||
|
||
使用 _seen 记录已访问对象,避免循环引用。
|
||
"""
|
||
obj_id = id(obj)
|
||
if obj_id in _seen:
|
||
return None
|
||
seen = _seen + (obj_id,)
|
||
|
||
# pandas NA
|
||
try:
|
||
if obj is pd.NA: # type: ignore[comparison-overlap]
|
||
return None
|
||
except Exception:
|
||
pass
|
||
|
||
# None / bool / int / str
|
||
if obj is None or isinstance(obj, (bool, int, str)):
|
||
return obj
|
||
|
||
# float / numpy float
|
||
if isinstance(obj, (float, np.floating)):
|
||
value = float(obj)
|
||
if math.isnan(value) or math.isinf(value):
|
||
return None
|
||
return value
|
||
|
||
# numpy integer
|
||
if isinstance(obj, np.integer):
|
||
return int(obj)
|
||
|
||
# numpy bool
|
||
if isinstance(obj, np.bool_):
|
||
return bool(obj)
|
||
|
||
# Decimal
|
||
if isinstance(obj, Decimal):
|
||
return float(obj)
|
||
|
||
# pandas Timestamp
|
||
if isinstance(obj, pd.Timestamp):
|
||
return obj.isoformat()
|
||
|
||
# datetime
|
||
if isinstance(obj, datetime):
|
||
return obj.isoformat()
|
||
|
||
# numpy array
|
||
if isinstance(obj, np.ndarray):
|
||
return [self.to_echarts_safe(v, seen) for v in obj.tolist()]
|
||
|
||
# pandas DataFrame/Series
|
||
if isinstance(obj, pd.DataFrame):
|
||
return [obj.columns.tolist()] + [self.to_echarts_safe(row, seen) for row in obj.values.tolist()]
|
||
if isinstance(obj, pd.Series):
|
||
return self.to_echarts_safe(obj.tolist(), seen)
|
||
|
||
# mapping
|
||
if isinstance(obj, dict):
|
||
return {str(k): self.to_echarts_safe(v, seen) for k, v in obj.items()}
|
||
|
||
# list/tuple
|
||
if isinstance(obj, (list, tuple)):
|
||
return [self.to_echarts_safe(v, seen) for v in obj]
|
||
|
||
# fallback
|
||
return obj
|
||
|
||
def _log_step(self, message, status="info"):
|
||
"""记录分析步骤"""
|
||
log_entry = {
|
||
"timestamp": datetime.now().strftime("%H:%M:%S"),
|
||
"message": message,
|
||
"status": status
|
||
}
|
||
self.analysis_log.append(log_entry)
|
||
print(f"[{log_entry['timestamp']}] {message}")
|
||
|
||
def _log_preprocessing_step(self, step_name, description, status="completed"):
|
||
"""记录预处理步骤"""
|
||
step_entry = {
|
||
"name": step_name,
|
||
"description": description,
|
||
"status": status
|
||
}
|
||
self.preprocessing_steps.append(step_entry)
|
||
|
||
def setup_fonts(self, language='zh'):
|
||
"""统一设置字体 - 合并中英文设置"""
|
||
try:
|
||
# 设置 matplotlib 字体
|
||
self.setup_matplotlib_font(language)
|
||
return 'Helvetica' # 返回默认字体,因为没有PDF生成了
|
||
except Exception as e:
|
||
print(f"字体设置失败: {e}")
|
||
return 'Helvetica'
|
||
|
||
def setup_matplotlib_font(self, language='zh'):
|
||
"""设置 matplotlib 字体"""
|
||
try:
|
||
font_manager = FontManager()
|
||
font_manager.setup_matplotlib_font(language)
|
||
return True
|
||
except Exception as e:
|
||
print(f"Matplotlib字体设置警告: {e}")
|
||
plt.rcParams['font.family'] = ['DejaVu Sans', 'Arial Unicode MS', 'Arial']
|
||
plt.rcParams['axes.unicode_minus'] = False
|
||
return True
|
||
|
||
def create_qwen_client(self):
|
||
"""创建阿里云千问API客户端配置"""
|
||
try:
|
||
# 阿里云千问API配置
|
||
api_key = os.environ.get("MY_API_KEY", "sk-f1ef83c90dcf4c839efae2a7e63dcb3d")
|
||
base_url = os.environ.get("MY_API_BASE", "https://dashscope.aliyuncs.com/compatible-mode/v1")
|
||
model_name = os.environ.get("MY_MODEL", "qwen-turbo")
|
||
|
||
# 初始化OpenAI客户端(兼容模式)
|
||
self.openai_client = OpenAI(
|
||
api_key=api_key,
|
||
base_url=base_url
|
||
)
|
||
|
||
print("✓ 阿里云千问API客户端配置完成")
|
||
return {
|
||
'api_key': api_key,
|
||
'base_url': base_url,
|
||
'model': model_name,
|
||
'client': self.openai_client
|
||
}
|
||
except Exception as e:
|
||
print(f"API客户端配置失败: {e}")
|
||
# 返回模拟配置
|
||
return {
|
||
'api_key': "simulation-mode",
|
||
'base_url': "simulation",
|
||
'model': "simulation-model",
|
||
'client': None
|
||
}
|
||
|
||
def call_api(self, prompt, language='zh', max_retries=2):
|
||
"""统一的API调用方法"""
|
||
# 如果是模拟模式,直接返回模拟响应
|
||
if self.client_config.get('api_key') == 'simulation-mode':
|
||
return self._get_simulation_response(prompt, language)
|
||
|
||
for attempt in range(max_retries):
|
||
try:
|
||
self._log_step(f"调用API (尝试 {attempt + 1})..." if language == 'zh'
|
||
else f"Calling API (attempt {attempt + 1})...")
|
||
|
||
client = self.client_config.get('client')
|
||
if not client:
|
||
self._log_step("API客户端未初始化" if language == 'zh'
|
||
else "API client not initialized", "error")
|
||
break
|
||
|
||
# 准备消息
|
||
if language == 'zh':
|
||
system_content = "你是一个专业的数据分析师,擅长分析时间序列数据和统计图表。请用中文回答,提供深入、专业的分析见解。"
|
||
else:
|
||
system_content = "You are a professional data analyst, skilled in analyzing time series data and statistical charts. Please answer in English, providing in-depth, professional analysis insights."
|
||
|
||
messages = [
|
||
{"role": "system", "content": system_content},
|
||
{"role": "user", "content": prompt}
|
||
]
|
||
|
||
# 调用API
|
||
response = client.chat.completions.create(
|
||
model=self.client_config.get('model', 'qwen-turbo'),
|
||
messages=messages,
|
||
max_tokens=2048,
|
||
temperature=0.7,
|
||
stream=False
|
||
)
|
||
|
||
# 提取响应内容
|
||
if response and response.choices and response.choices[0].message:
|
||
content = response.choices[0].message.content.strip()
|
||
if content:
|
||
self._log_step(f"API调用成功,返回内容长度: {len(content)}" if language == 'zh'
|
||
else f"API call successful, content length: {len(content)}", "success")
|
||
return content
|
||
|
||
self._log_step("API响应内容为空" if language == 'zh'
|
||
else "API response content empty", "warning")
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
self._log_step(f"API调用失败 (尝试 {attempt + 1}): {error_msg}" if language == 'zh'
|
||
else f"API call failed (attempt {attempt + 1}): {error_msg}", "warning")
|
||
|
||
# 如果是认证错误,不再重试
|
||
if "401" in error_msg or "403" in error_msg or "authentication" in error_msg.lower():
|
||
break
|
||
|
||
if attempt < max_retries - 1:
|
||
time.sleep(2)
|
||
|
||
self._log_step("所有API调用尝试都失败,使用模拟分析" if language == 'zh'
|
||
else "All API call attempts failed, using simulation analysis", "warning")
|
||
return self._get_simulation_response(prompt, language)
|
||
|
||
def _get_simulation_response(self, prompt, language='zh'):
|
||
"""统一的模拟API响应"""
|
||
if language == 'zh':
|
||
simulation_responses = {
|
||
"statistical_overview": "统计概览分析显示数据具有良好的统计特性,各变量分布较为均匀。",
|
||
"time_series_analysis": "时间序列分析揭示了数据的趋势性和周期性特征。",
|
||
"correlation_analysis": "相关性分析表明变量间存在显著的线性关系。",
|
||
"pca_analysis": "主成分分析成功降低了数据维度,保留了主要信息。",
|
||
"feature_importance": "特征重要性分析识别了对预测目标最重要的变量。",
|
||
"clustering_analysis": "聚类分析发现了数据中的自然分组结构。"
|
||
}
|
||
default_response = "基于数据的专业分析已完成,结果显示数据具有良好的统计特性和分析价值。"
|
||
else:
|
||
simulation_responses = {
|
||
"statistical_overview": "Statistical overview analysis shows good statistical characteristics with uniform variable distributions.",
|
||
"time_series_analysis": "Time series analysis reveals trend and periodic characteristics in the data.",
|
||
"correlation_analysis": "Correlation analysis indicates significant linear relationships between variables.",
|
||
"pca_analysis": "Principal component analysis successfully reduced data dimensionality while preserving key information.",
|
||
"feature_importance": "Feature importance analysis identified the most important variables for prediction.",
|
||
"clustering_analysis": "Clustering analysis discovered natural grouping structures in the data."
|
||
}
|
||
default_response = "Professional data analysis completed. The results show good statistical characteristics and analytical value."
|
||
|
||
for key, response in simulation_responses.items():
|
||
if key in prompt.lower():
|
||
return response
|
||
|
||
return default_response
|
||
|
||
def test_api_connection(self):
|
||
"""测试阿里云千问API连接"""
|
||
try:
|
||
self._log_step("测试阿里云千问API连接...")
|
||
|
||
# 如果是模拟模式,直接返回成功
|
||
if self.client_config.get('api_key') == 'simulation-mode':
|
||
self._log_step("模拟模式,跳过API连接测试", "info")
|
||
return True
|
||
|
||
# 简单的API测试
|
||
test_prompt = "请回复'连接测试成功',不要添加其他内容。"
|
||
response = self.call_api(test_prompt, 'zh')
|
||
|
||
if response and "连接测试成功" in response:
|
||
self._log_step("阿里云千问API连接测试成功", "success")
|
||
return True
|
||
else:
|
||
self._log_step(f"API连接测试失败,响应: {response}", "warning")
|
||
return False
|
||
except Exception as e:
|
||
self._log_step(f"API连接测试异常: {e}", "warning")
|
||
return False
|
||
|
||
def _format_analysis_text(self, text):
|
||
"""格式化分析文本"""
|
||
if not text:
|
||
return "暂无分析内容"
|
||
|
||
# 简单的文本格式化
|
||
formatted = text.replace("\n", "<br/>")
|
||
return formatted
|
||
|
||
def _create_sample_data(self):
|
||
"""创建示例数据"""
|
||
self._log_step("Creating sample data...")
|
||
|
||
# 生成示例时间序列数据
|
||
# pandas 3.x 要求频率字符串小写
|
||
dates = pd.date_range(start='2023-01-01', periods=100, freq='h')
|
||
|
||
sample_data = {
|
||
'timestamp': dates,
|
||
'temperature': np.random.normal(25, 5, 100),
|
||
'humidity': np.random.normal(60, 10, 100),
|
||
'pressure': np.random.normal(1013, 5, 100)
|
||
}
|
||
|
||
df = pd.DataFrame(sample_data)
|
||
df.set_index('timestamp', inplace=True)
|
||
|
||
self._log_step("Sample data created", "success")
|
||
return df
|
||
|
||
def query_api_for_data_description(self, data_summary, language='zh'):
|
||
"""统一的数据描述查询方法"""
|
||
try:
|
||
if language == 'zh':
|
||
prompt = f"""
|
||
作为专业数据分析师,请分析以下数据集:
|
||
|
||
{data_summary}
|
||
|
||
请提供:
|
||
1. 数据的基本特征和结构描述
|
||
2. 潜在的数据质量问题识别
|
||
3. 适合的分析方法和技术建议
|
||
4. 预期的分析价值和业务意义
|
||
|
||
请用中文回答,确保分析全面且专业。
|
||
"""
|
||
else:
|
||
prompt = f"""
|
||
As a professional data analyst, please analyze the following dataset:
|
||
|
||
{data_summary}
|
||
|
||
Please provide in English:
|
||
1. Basic characteristics and structure description of the data
|
||
2. Identification of potential data quality issues
|
||
3. Recommended analysis methods and technical approaches
|
||
4. Expected analytical value and business significance
|
||
|
||
Ensure the analysis is comprehensive and professional.
|
||
"""
|
||
|
||
response = self.call_api(prompt, language)
|
||
return response or "数据描述分析暂不可用" if language == 'zh' else "Data description analysis is temporarily unavailable"
|
||
except Exception as e:
|
||
self._log_step(f"数据描述查询失败: {e}" if language == 'zh'
|
||
else f"Data description query failed: {e}", "warning")
|
||
return "数据描述分析暂不可用" if language == 'zh' else "Data description analysis is temporarily unavailable"
|
||
|
||
def query_api_with_text(self, chart_description, data_summary, language='zh'):
|
||
"""使用文本查询API"""
|
||
try:
|
||
if language == 'zh':
|
||
prompt = f"""
|
||
数据摘要:
|
||
{data_summary}
|
||
|
||
图表分析:
|
||
{chart_description}
|
||
|
||
请基于以上信息提供专业的分析见解。
|
||
"""
|
||
else:
|
||
prompt = f"""
|
||
Data Summary:
|
||
{data_summary}
|
||
|
||
Chart Analysis:
|
||
{chart_description}
|
||
|
||
Please provide professional analysis insights based on the above information.
|
||
"""
|
||
|
||
response = self.call_api(prompt, language)
|
||
return response or "无法获取分析结果" if language == 'zh' else "Unable to get analysis results"
|
||
except Exception as e:
|
||
self._log_step(f"API query failed: {e}", "warning")
|
||
return "分析结果暂不可用" if language == 'zh' else "Analysis results temporarily unavailable"
|
||
|
||
def load_and_preprocess_data(self):
|
||
"""加载和预处理数据"""
|
||
try:
|
||
self._log_step("Loading data from CSV file...")
|
||
self._log_preprocessing_step("数据加载", "开始读取CSV文件")
|
||
|
||
# 尝试多种编码方式读取CSV
|
||
encodings = ['utf-8-sig', 'utf-8', 'gbk', 'latin-1']
|
||
self.data = None
|
||
|
||
for encoding in encodings:
|
||
try:
|
||
self.data = pd.read_csv(self.csv_path, encoding=encoding)
|
||
if not self.data.empty:
|
||
self._log_preprocessing_step("数据加载", f"使用 {encoding} 编码成功读取CSV文件")
|
||
print(f"✓ 使用 {encoding} 编码成功读取CSV文件")
|
||
break
|
||
except Exception as e:
|
||
print(f"使用 {encoding} 编码读取失败: {e}")
|
||
continue
|
||
|
||
if self.data is None or self.data.empty:
|
||
# 创建示例数据作为最后手段
|
||
self._log_step("无法解析数据,创建示例数据...")
|
||
self._log_preprocessing_step("数据加载", "无法解析原始数据,创建示例数据", "warning")
|
||
self.data = self._create_sample_data()
|
||
return True
|
||
|
||
print(f"原始数据形状: {self.data.shape}")
|
||
print(f"列名: {list(self.data.columns)}")
|
||
|
||
# 处理缺失值
|
||
missing_values = self.data.isnull().sum().sum()
|
||
if missing_values > 0:
|
||
self.data.fillna(method='ffill', inplace=True)
|
||
self.data.fillna(method='bfill', inplace=True)
|
||
self._log_preprocessing_step("缺失值处理",
|
||
f"检测到 {missing_values} 个缺失值,使用前向填充和后向填充处理")
|
||
else:
|
||
self._log_preprocessing_step("缺失值处理", "未检测到缺失值")
|
||
|
||
# 添加时间索引
|
||
if 'timestamp' not in self.data.columns:
|
||
# pandas 3.x 频率字符串需小写
|
||
self.data['timestamp'] = pd.date_range(start='2023-01-01', periods=len(self.data), freq='s')
|
||
self._log_preprocessing_step("时间索引", "添加默认时间戳列")
|
||
else:
|
||
# 尝试转换时间戳列
|
||
try:
|
||
self.data['timestamp'] = pd.to_datetime(self.data['timestamp'])
|
||
self._log_preprocessing_step("时间索引", "成功转换时间戳列")
|
||
except:
|
||
self.data['timestamp'] = pd.date_range(start='2023-01-01', periods=len(self.data), freq='s')
|
||
self._log_preprocessing_step("时间索引", "时间戳转换失败,使用默认时间戳", "warning")
|
||
|
||
self.data.set_index('timestamp', inplace=True)
|
||
|
||
# 只对数值列进行标准化
|
||
numeric_columns = self.data.select_dtypes(include=[np.number]).columns
|
||
print(f"数值列: {list(numeric_columns)}")
|
||
|
||
if len(numeric_columns) == 0:
|
||
self._log_step("没有找到数值列,创建示例数据", "warning")
|
||
self._log_preprocessing_step("数据标准化", "没有找到数值列,创建示例数据", "warning")
|
||
self.data = self._create_sample_data()
|
||
numeric_columns = self.data.select_dtypes(include=[np.number]).columns
|
||
|
||
# 只标准化数值列
|
||
self.scaler = StandardScaler()
|
||
numeric_data = self.data[numeric_columns]
|
||
scaled_numeric = self.scaler.fit_transform(numeric_data)
|
||
|
||
# 创建完整的scaled_data(包含所有列)
|
||
self.scaled_data = self.data.copy()
|
||
self.scaled_data[numeric_columns] = scaled_numeric
|
||
|
||
self._log_preprocessing_step("数据标准化", f"对 {len(numeric_columns)} 个数值列进行标准化")
|
||
|
||
self._log_step(f"✓ Data loaded: {self.data.shape[0]} rows, {self.data.shape[1]} columns", "success")
|
||
return True
|
||
except Exception as e:
|
||
self._log_step(f"✗ Data loading error: {e}", "error")
|
||
import traceback
|
||
print(f"详细错误信息: {traceback.format_exc()}")
|
||
|
||
# 创建示例数据作为备选
|
||
self.data = self._create_sample_data()
|
||
return True
|
||
|
||
def _generate_data_summary(self):
|
||
"""生成数据摘要"""
|
||
if not hasattr(self, 'data') or self.data is None:
|
||
return "No data available"
|
||
|
||
summary = f"""
|
||
数据集摘要:
|
||
- 记录数量: {len(self.data)}
|
||
- 变量数量: {len(self.data.columns)}
|
||
- 时间范围: {self.data.index.min()} 到 {self.data.index.max()}
|
||
- 变量列表: {', '.join(self.data.columns.tolist())}
|
||
"""
|
||
|
||
# 添加基本统计信息
|
||
if len(self.data.columns) > 0:
|
||
numeric_cols = self.data.select_dtypes(include=[np.number]).columns
|
||
if len(numeric_cols) > 0:
|
||
summary += f"\n数值变量统计:"
|
||
for col in numeric_cols[:3]: # 只显示前3个变量
|
||
stats = self.data[col].describe()
|
||
summary += f"\n {col}: 均值={stats['mean']:.2f}, 标准差={stats['std']:.2f}, 范围=[{stats['min']:.2f}, {stats['max']:.2f}]"
|
||
|
||
return summary
|
||
|
||
def _create_analysis_prompt(self, analysis_type, title, chart_summary, data_summary, language='zh'):
|
||
"""统一的分析提示创建方法"""
|
||
background = self.data_background
|
||
|
||
if language == 'zh':
|
||
prompt_template = f"""
|
||
你是一个专业的数据科学家,请分析以下时间序列数据图表并提供深入见解。
|
||
|
||
【分析任务】
|
||
{title} ({analysis_type})
|
||
|
||
【图表信息】
|
||
{chart_summary}
|
||
|
||
【数据集背景】
|
||
{data_summary}
|
||
|
||
【用户提供的背景信息】
|
||
- 数据来源: {background.get('source', '未提供')}
|
||
- 采集方法: {background.get('method', '未提供')}
|
||
- 数据用途: {background.get('purpose', '未提供')}
|
||
- 领域知识: {background.get('domain', '未提供')}
|
||
|
||
【具体分析要求】
|
||
请从以下角度提供专业分析:
|
||
1. 图表的主要发现和关键洞察
|
||
2. 数据中表现出的模式、趋势和异常
|
||
3. 各变量之间的关系和相互影响
|
||
4. 结合用户提供的背景信息,分析数据的业务或研究意义
|
||
5. 进一步分析的潜在方向和价值
|
||
|
||
请用中文回答,确保分析专业、深入且具有可操作性。
|
||
"""
|
||
else:
|
||
prompt_template = f"""
|
||
You are a professional data scientist, please analyze the following time series data chart and provide in-depth insights.
|
||
|
||
【Analysis Task】
|
||
{title} ({analysis_type})
|
||
|
||
【Chart Information】
|
||
{chart_summary}
|
||
|
||
【Dataset Background】
|
||
{data_summary}
|
||
|
||
【User-Provided Background Information】
|
||
- Data Source: {background.get('source', 'Not provided')}
|
||
- Collection Method: {background.get('method', 'Not provided')}
|
||
- Data Purpose: {background.get('purpose', 'Not provided')}
|
||
- Domain Knowledge: {background.get('domain', 'Not provided')}
|
||
|
||
【Specific Analysis Requirements】
|
||
Please provide professional analysis from the following perspectives:
|
||
1. Main findings and key insights from the chart
|
||
2. Patterns, trends, and anomalies exhibited in the data
|
||
3. Relationships and interactions between variables
|
||
4. Business or research significance considering the background
|
||
5. Potential directions and value for further analysis
|
||
|
||
Please answer in English, ensuring the analysis is professional and actionable.
|
||
"""
|
||
return prompt_template
|
||
|
||
def _create_fallback_analysis_for_step(self, step, language='zh'):
|
||
"""为分析步骤创建备选分析"""
|
||
if language == 'zh':
|
||
fallback_analyses = {
|
||
'Statistical Overview': "统计概览分析: 数据展示了基本的统计特征和分布情况",
|
||
'Time Series Analysis': "时间序列分析: 数据展示了随时间变化的趋势和模式",
|
||
'Correlation Analysis': "相关性分析: 揭示了变量之间的线性关系强度",
|
||
'PCA Analysis': "主成分分析: 展示了数据在降维后的主要变化方向",
|
||
'Feature Importance': "特征重要性分析: 识别了对目标变量预测最重要的特征",
|
||
'Clustering Analysis': "聚类分析: 将数据点分组为具有相似特征的簇"
|
||
}
|
||
default_response = f"{step.get('zh_title', step['title'])}分析完成"
|
||
else:
|
||
fallback_analyses = {
|
||
'Statistical Overview': "Statistical Overview Analysis: Data shows good statistical characteristics",
|
||
'Time Series Analysis': "Time Series Analysis: Clear temporal patterns identified",
|
||
'Correlation Analysis': "Correlation Analysis: Linear relationships between variables detected",
|
||
'PCA Analysis': "Principal Component Analysis: Dimensionality reduction successful",
|
||
'Feature Importance': "Feature Importance Analysis: Key predictive features identified",
|
||
'Clustering Analysis': "Clustering Analysis: Natural grouping structure discovered"
|
||
}
|
||
default_response = f"{step.get('en_title', step['title'])} analysis completed."
|
||
|
||
return fallback_analyses.get(step['title'], default_response)
|
||
|
||
def _create_fallback_chart(self, chart_key):
|
||
"""创建备选图表"""
|
||
try:
|
||
# 创建简单的占位图表
|
||
plt.figure(figsize=(8, 6))
|
||
plt.text(0.5, 0.5, f'{chart_key} Chart\n(Placeholder)',
|
||
ha='center', va='center', fontsize=14)
|
||
plt.axis('off')
|
||
|
||
img_path = os.path.join(self.temp_dir.name, f'{chart_key}_fallback.png')
|
||
plt.savefig(img_path, dpi=150, bbox_inches='tight')
|
||
plt.close()
|
||
|
||
return img_path
|
||
except:
|
||
return None
|
||
|
||
# ====== 基础分析方法 ======
|
||
|
||
def generate_statistical_overview(self):
|
||
return _generate_statistical_overview(self)
|
||
|
||
def generate_time_series_plots(self):
|
||
return _generate_time_series_plots(self)
|
||
|
||
def generate_correlation_heatmap(self):
|
||
return _generate_correlation_heatmap(self)
|
||
|
||
# ====== 时间序列特性分析 ======
|
||
|
||
def generate_acf_pacf_plots(self):
|
||
return _generate_acf_pacf_plots(self)
|
||
|
||
def perform_stationarity_tests(self):
|
||
return _perform_stationarity_tests(self)
|
||
|
||
def perform_normality_tests(self):
|
||
return _perform_normality_tests(self)
|
||
|
||
def perform_seasonal_decomposition(self):
|
||
return _perform_seasonal_decomposition(self)
|
||
|
||
def perform_spectral_analysis(self):
|
||
return _perform_spectral_analysis(self)
|
||
|
||
# ====== 多元统计分析 ======
|
||
|
||
def generate_pca_scree_plot(self):
|
||
return _generate_pca_scree_plot(self)
|
||
|
||
def perform_pca_analysis(self):
|
||
return _perform_pca_analysis(self)
|
||
|
||
def analyze_feature_importance(self):
|
||
return _analyze_feature_importance(self)
|
||
|
||
def perform_clustering_analysis(self):
|
||
return _perform_clustering_analysis(self)
|
||
|
||
def perform_factor_analysis(self):
|
||
return _perform_factor_analysis(self)
|
||
|
||
def perform_cointegration_test(self):
|
||
return _perform_cointegration_test(self)
|
||
|
||
def perform_var_analysis(self):
|
||
return _perform_var_analysis(self)
|
||
|
||
# ====== 完整分析流程 ======
|
||
|
||
def run_analysis(self):
|
||
"""执行完整分析流程"""
|
||
self._log_step("Starting analysis pipeline...")
|
||
|
||
# charts 模式下强制不生成图片
|
||
self.generate_plots = False
|
||
|
||
# 测试API连接
|
||
try:
|
||
api_connected = self.test_api_connection()
|
||
if not api_connected:
|
||
self._log_step("API connection test failed, but will continue with local analysis", "warning")
|
||
except Exception as e:
|
||
self._log_step(f"API connection test exception: {e}, will continue with local analysis", "warning")
|
||
|
||
if not self.load_and_preprocess_data():
|
||
return None, self.analysis_log
|
||
|
||
# 生成数据描述 - 根据语言选择
|
||
data_summary = self._generate_data_summary()
|
||
data_description = self.query_api_for_data_description(data_summary, self.language)
|
||
|
||
# 初始化结果字典
|
||
results = {
|
||
'api_analysis': {},
|
||
'data_description': data_description,
|
||
'preprocessing_steps': self.preprocessing_steps,
|
||
# steps 用于描述顺序与摘要
|
||
'steps': [],
|
||
'charts': {},
|
||
}
|
||
|
||
# 定义扩展的分析步骤(key 用于兼容旧命名,chart_key 用于 charts 映射)
|
||
analysis_steps = [
|
||
{
|
||
'key': 'stats_img',
|
||
'chart_key': 'stats',
|
||
'method': self.generate_statistical_overview,
|
||
'title': 'Statistical Overview',
|
||
'zh_title': '统计概览',
|
||
'en_title': 'Statistical Overview'
|
||
},
|
||
{
|
||
'key': 'ts_img',
|
||
'chart_key': 'ts',
|
||
'method': self.generate_time_series_plots,
|
||
'title': 'Time Series Analysis',
|
||
'zh_title': '时间序列分析',
|
||
'en_title': 'Time Series Analysis'
|
||
},
|
||
{
|
||
'key': 'acf_pacf_img',
|
||
'chart_key': 'acf_pacf',
|
||
'method': self.generate_acf_pacf_plots,
|
||
'title': 'ACF PACF Analysis',
|
||
'zh_title': '自相关和偏自相关分析',
|
||
'en_title': 'Autocorrelation Analysis'
|
||
},
|
||
{
|
||
'key': 'stationarity_img',
|
||
'chart_key': 'stationarity',
|
||
'method': self.perform_stationarity_tests,
|
||
'title': 'Stationarity Tests',
|
||
'zh_title': '平稳性检验',
|
||
'en_title': 'Stationarity Tests'
|
||
},
|
||
{
|
||
'key': 'normality_img',
|
||
'chart_key': 'normality',
|
||
'method': self.perform_normality_tests,
|
||
'title': 'Normality Tests',
|
||
'zh_title': '正态性检验',
|
||
'en_title': 'Normality Tests'
|
||
},
|
||
{
|
||
'key': 'seasonal_img',
|
||
'chart_key': 'seasonal',
|
||
'method': self.perform_seasonal_decomposition,
|
||
'title': 'Seasonal Decomposition',
|
||
'zh_title': '季节性分解',
|
||
'en_title': 'Seasonal Decomposition'
|
||
},
|
||
{
|
||
'key': 'spectral_img',
|
||
'chart_key': 'spectral',
|
||
'method': self.perform_spectral_analysis,
|
||
'title': 'Spectral Analysis',
|
||
'zh_title': '频谱分析',
|
||
'en_title': 'Spectral Analysis'
|
||
},
|
||
{
|
||
'key': 'heatmap_img',
|
||
'chart_key': 'heatmap',
|
||
'method': self.generate_correlation_heatmap,
|
||
'title': 'Correlation Analysis',
|
||
'zh_title': '相关性分析',
|
||
'en_title': 'Correlation Analysis'
|
||
},
|
||
{
|
||
'key': 'pca_scree_img',
|
||
'chart_key': 'pca_scree',
|
||
'method': self.generate_pca_scree_plot,
|
||
'title': 'PCA Scree Plot',
|
||
'zh_title': 'PCA碎石图',
|
||
'en_title': 'PCA Scree Plot'
|
||
},
|
||
{
|
||
'key': 'pca_img',
|
||
'chart_key': 'pca_scatter',
|
||
'method': self.perform_pca_analysis,
|
||
'title': 'PCA Analysis',
|
||
'zh_title': '主成分分析',
|
||
'en_title': 'Principal Component Analysis'
|
||
},
|
||
{
|
||
'key': 'fi_img',
|
||
'chart_key': 'feature_importance',
|
||
'method': self.analyze_feature_importance,
|
||
'title': 'Feature Importance',
|
||
'zh_title': '特征重要性分析',
|
||
'en_title': 'Feature Importance Analysis'
|
||
},
|
||
{
|
||
'key': 'cluster_img',
|
||
'chart_key': 'cluster',
|
||
'method': self.perform_clustering_analysis,
|
||
'title': 'Clustering Analysis',
|
||
'zh_title': '聚类分析',
|
||
'en_title': 'Clustering Analysis'
|
||
},
|
||
{
|
||
'key': 'factor_img',
|
||
'chart_key': 'factor',
|
||
'method': self.perform_factor_analysis,
|
||
'title': 'Factor Analysis',
|
||
'zh_title': '因子分析',
|
||
'en_title': 'Factor Analysis'
|
||
},
|
||
{
|
||
'key': 'cointegration_img',
|
||
'chart_key': 'cointegration',
|
||
'method': self.perform_cointegration_test,
|
||
'title': 'Cointegration Test',
|
||
'zh_title': '协整检验',
|
||
'en_title': 'Cointegration Test'
|
||
},
|
||
{
|
||
'key': 'var_img',
|
||
'chart_key': 'var_forecast',
|
||
'method': self.perform_var_analysis,
|
||
'title': 'VAR Analysis',
|
||
'zh_title': '向量自回归分析',
|
||
'en_title': 'Vector Autoregression Analysis'
|
||
}
|
||
]
|
||
|
||
# 执行每个分析步骤
|
||
for step in analysis_steps:
|
||
step_entry = {
|
||
'key': step['key'],
|
||
'title': step['title'],
|
||
'zh_title': step.get('zh_title'),
|
||
'en_title': step.get('en_title'),
|
||
}
|
||
try:
|
||
self._log_step(f"Generating {step['title']}...")
|
||
|
||
# 执行方法
|
||
step_result = step['method']()
|
||
|
||
summary = ""
|
||
step_data = None
|
||
|
||
if isinstance(step_result, tuple):
|
||
if len(step_result) == 3:
|
||
_img_unused, summary, step_data = step_result
|
||
elif len(step_result) == 2:
|
||
_img_unused, summary = step_result
|
||
|
||
step_entry['summary'] = summary
|
||
|
||
# 转换数据并构建 charts
|
||
if step_data is not None:
|
||
chart_payload = self._build_chart_payload(step['chart_key'], step_data)
|
||
if chart_payload is not None:
|
||
results['charts'][step['chart_key']] = chart_payload
|
||
step_entry['chart'] = step['chart_key']
|
||
step_entry['data_preview'] = chart_payload.get('preview')
|
||
|
||
# success 判断:有 charts 或 summary 即可
|
||
success = step['chart_key'] in results['charts'] or bool(summary)
|
||
|
||
if success:
|
||
# 调用API分析图表 - 根据语言选择,添加方法名称
|
||
self._log_step(f"Calling API for {step['title']} analysis...")
|
||
|
||
title = step['zh_title'] if self.language == 'zh' else step['en_title']
|
||
analysis_prompt = self._create_analysis_prompt(
|
||
step['title'], title, summary, data_summary, self.language
|
||
)
|
||
api_analysis = self.call_api(analysis_prompt, self.language)
|
||
|
||
if api_analysis:
|
||
results['api_analysis'][step['title']] = api_analysis
|
||
step_entry['api_analysis'] = api_analysis
|
||
self._log_step(f"✓ {step['title']} analysis completed with API", "success")
|
||
else:
|
||
fallback_analysis = self._create_fallback_analysis_for_step(step, self.language)
|
||
results['api_analysis'][step['title']] = fallback_analysis
|
||
step_entry['api_analysis'] = fallback_analysis
|
||
self._log_step(f"✓ {step['title']} analysis completed (fallback)", "warning")
|
||
|
||
else:
|
||
self._log_step(f"✗ {step['title']} data generation failed", "error")
|
||
step_entry['error'] = 'data_generation_failed'
|
||
|
||
except Exception as e:
|
||
self._log_step(f"✗ {step['title']} analysis failed: {e}", "error")
|
||
step_entry['error'] = str(e)
|
||
if step['title'] not in results['api_analysis']:
|
||
results['api_analysis'][step['title']] = self._create_fallback_analysis_for_step(step,
|
||
self.language)
|
||
|
||
# 无论成功与否都记录 step entry,便于前端展示与调试
|
||
results['steps'].append(step_entry)
|
||
|
||
self._log_step("✓ Analysis pipeline completed", "success")
|
||
return results, self.analysis_log
|
||
|
||
def _build_chart_payload(self, chart_key: str, data: Any) -> Dict[str, Any]:
|
||
"""根据 chart_key 将步骤数据组装为 ECharts 友好的结构。"""
|
||
try:
|
||
if chart_key == 'stats' and isinstance(data, pd.DataFrame):
|
||
dataset = [data.columns.tolist()] + data.values.tolist()
|
||
return {
|
||
'type': 'table',
|
||
'dataset': self.to_echarts_safe(dataset),
|
||
'meta': {'rows': len(data), 'cols': len(data.columns)}
|
||
}
|
||
|
||
if chart_key == 'ts' and isinstance(data, pd.DataFrame):
|
||
dataset = [data.columns.tolist()] + data.values.tolist()
|
||
return {
|
||
'type': 'line',
|
||
'dataset': self.to_echarts_safe(dataset),
|
||
'meta': {'columns': data.columns.tolist()}
|
||
}
|
||
|
||
if chart_key == 'acf_pacf' and isinstance(data, dict):
|
||
series_payload = []
|
||
for col, vals in data.items():
|
||
acf_vals = vals.get('acf') or []
|
||
pacf_vals = vals.get('pacf') or []
|
||
acf_points = [{'lag': idx, 'value': v} for idx, v in enumerate(acf_vals)]
|
||
pacf_points = [{'lag': idx, 'value': v} for idx, v in enumerate(pacf_vals)]
|
||
series_payload.append({'name': col, 'acf': acf_points, 'pacf': pacf_points})
|
||
|
||
return {
|
||
'type': 'bar',
|
||
'series': self.to_echarts_safe(series_payload),
|
||
'meta': {'columns': list(data.keys())}
|
||
}
|
||
|
||
if chart_key == 'stationarity' and isinstance(data, dict):
|
||
return {
|
||
'type': 'table',
|
||
'records': self.to_echarts_safe([
|
||
{'column': col, **vals}
|
||
for col, vals in data.items()
|
||
]),
|
||
}
|
||
|
||
if chart_key == 'normality' and isinstance(data, dict):
|
||
return {
|
||
'type': 'table',
|
||
'records': self.to_echarts_safe([
|
||
{'column': col, **vals}
|
||
for col, vals in data.items()
|
||
]),
|
||
}
|
||
|
||
if chart_key == 'seasonal' and isinstance(data, pd.DataFrame):
|
||
dataset = [data.columns.tolist()] + data.values.tolist()
|
||
return {
|
||
'type': 'line',
|
||
'dataset': self.to_echarts_safe(dataset),
|
||
}
|
||
|
||
if chart_key == 'spectral' and isinstance(data, dict):
|
||
return {
|
||
'type': 'spectral',
|
||
'series': self.to_echarts_safe(data),
|
||
}
|
||
|
||
if chart_key == 'heatmap':
|
||
# data 期望为 DataFrame 相关矩阵
|
||
if isinstance(data, pd.DataFrame):
|
||
labels = data.columns.tolist()
|
||
flattened: List[List[Any]] = []
|
||
for i, row_label in enumerate(labels):
|
||
for j, col_label in enumerate(labels):
|
||
flattened.append([i, j, data.iloc[i, j]])
|
||
return {
|
||
'type': 'heatmap',
|
||
'data': self.to_echarts_safe(flattened),
|
||
'xLabels': labels,
|
||
'yLabels': labels,
|
||
}
|
||
|
||
if chart_key == 'pca_scree' and isinstance(data, pd.DataFrame):
|
||
dataset = [data.columns.tolist()] + data.values.tolist()
|
||
return {
|
||
'type': 'bar',
|
||
'dataset': self.to_echarts_safe(dataset),
|
||
}
|
||
|
||
if chart_key == 'pca_scatter' and isinstance(data, pd.DataFrame):
|
||
return {
|
||
'type': 'scatter',
|
||
'records': self.to_echarts_safe(data.to_dict(orient='records')),
|
||
}
|
||
|
||
if chart_key == 'feature_importance' and isinstance(data, pd.DataFrame):
|
||
return {
|
||
'type': 'bar',
|
||
'records': self.to_echarts_safe(data.to_dict(orient='records')),
|
||
}
|
||
|
||
if chart_key == 'cluster' and isinstance(data, pd.DataFrame):
|
||
return {
|
||
'type': 'scatter',
|
||
'records': self.to_echarts_safe(data.to_dict(orient='records')),
|
||
}
|
||
|
||
if chart_key == 'factor' and isinstance(data, pd.DataFrame):
|
||
return {
|
||
'type': 'scatter',
|
||
'records': self.to_echarts_safe(data.to_dict(orient='records')),
|
||
}
|
||
|
||
if chart_key == 'cointegration' and isinstance(data, dict):
|
||
return {
|
||
'type': 'table',
|
||
'meta': self.to_echarts_safe(data),
|
||
}
|
||
|
||
if chart_key == 'var_forecast' and isinstance(data, pd.DataFrame):
|
||
# 添加 step 序号便于前端 encode
|
||
data_with_step = data.copy()
|
||
data_with_step.insert(0, 'step', range(1, len(data_with_step) + 1))
|
||
dataset = [data_with_step.columns.tolist()] + data_with_step.values.tolist()
|
||
return {
|
||
'type': 'line',
|
||
'dataset': self.to_echarts_safe(dataset),
|
||
}
|
||
|
||
return None
|
||
except Exception as e:
|
||
self._log_step(f"build chart payload failed for {chart_key}: {e}", "warning")
|
||
return None
|
||
|
||
# ====== 报告生成方法 (Removed) ====== |