159 lines
5.5 KiB
Python
159 lines
5.5 KiB
Python
import os
|
|
import sys
|
|
import shutil
|
|
import json
|
|
from pathlib import Path
|
|
import pandas as pd
|
|
import numpy as np
|
|
|
|
# Add project root to path
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from app.services.analysis_system import TimeSeriesAnalysisSystem
|
|
from app.core.config import settings
|
|
|
|
class NpEncoder(json.JSONEncoder):
|
|
"""
|
|
JSON encoder that handles NumPy types
|
|
"""
|
|
def default(self, obj):
|
|
if isinstance(obj, np.integer):
|
|
return int(obj)
|
|
if isinstance(obj, np.floating):
|
|
return float(obj)
|
|
if isinstance(obj, (np.bool_, bool)):
|
|
return bool(obj)
|
|
if isinstance(obj, np.ndarray):
|
|
return obj.tolist()
|
|
if isinstance(obj, pd.Timestamp):
|
|
return str(obj)
|
|
return super(NpEncoder, self).default(obj)
|
|
|
|
def format_details(details):
|
|
"""
|
|
Format detailed results for text output
|
|
"""
|
|
if details is None:
|
|
return ""
|
|
|
|
# Handle pandas Series/DataFrame
|
|
if isinstance(details, (pd.DataFrame, pd.Series)):
|
|
try:
|
|
return details.to_markdown() if hasattr(details, 'to_markdown') else details.to_string()
|
|
except ImportError:
|
|
return details.to_string()
|
|
|
|
# Handle Dict/List (JSON-like)
|
|
if isinstance(details, (dict, list)):
|
|
try:
|
|
return json.dumps(details, cls=NpEncoder, indent=2, ensure_ascii=False)
|
|
except Exception as e:
|
|
return f"JSON Serialization Error: {e}\nRaw: {str(details)}"
|
|
|
|
return str(details)
|
|
|
|
def run_all_analyses():
|
|
# Setup paths
|
|
base_dir = Path(__file__).parent
|
|
test_dir = base_dir / "test"
|
|
csv_filename = "comprehensive_test_data.csv"
|
|
csv_path = test_dir / csv_filename
|
|
|
|
if not csv_path.exists():
|
|
print(f"Error: Test file not found at {csv_path}")
|
|
return
|
|
|
|
output_dir = test_dir / "results"
|
|
output_dir.mkdir(exist_ok=True)
|
|
|
|
print(f"Starting analysis on {csv_path}")
|
|
print(f"Results will be saved to {output_dir}")
|
|
|
|
# Initialize System
|
|
# generate_plots=False allows skipping image generation but still returns full data details
|
|
system = TimeSeriesAnalysisSystem(
|
|
str(csv_path),
|
|
task_description="Test Suite Analysis",
|
|
language="zh",
|
|
generate_plots=False
|
|
)
|
|
|
|
if not system.load_and_preprocess_data():
|
|
print("Failed to load data")
|
|
return
|
|
|
|
# Define methods to run
|
|
methods = [
|
|
('statistical_overview', system.generate_statistical_overview),
|
|
('time_series_analysis', system.generate_time_series_plots),
|
|
('acf_pacf_analysis', system.generate_acf_pacf_plots),
|
|
('stationarity_tests', system.perform_stationarity_tests),
|
|
('normality_tests', system.perform_normality_tests),
|
|
('seasonal_decomposition', system.perform_seasonal_decomposition),
|
|
('spectral_analysis', system.perform_spectral_analysis),
|
|
('correlation_analysis', system.generate_correlation_heatmap),
|
|
('pca_scree_plot', system.generate_pca_scree_plot),
|
|
('pca_analysis', system.perform_pca_analysis),
|
|
('feature_importance', system.analyze_feature_importance),
|
|
('clustering_analysis', system.perform_clustering_analysis),
|
|
('factor_analysis', system.perform_factor_analysis),
|
|
('cointegration_test', system.perform_cointegration_test),
|
|
('var_analysis', system.perform_var_analysis)
|
|
]
|
|
|
|
for name, method in methods:
|
|
print(f"\nrunning {name}...")
|
|
try:
|
|
result = method()
|
|
|
|
img_path = None
|
|
summary = ""
|
|
details = None
|
|
|
|
# Parse result
|
|
if isinstance(result, tuple):
|
|
if len(result) == 3:
|
|
img_path, summary, details = result
|
|
elif len(result) == 2:
|
|
img_path, summary = result
|
|
else:
|
|
summary = str(result)
|
|
|
|
# Save Output
|
|
base_output_name = f"{name}_output"
|
|
|
|
# 1. Save Summary & Details
|
|
txt_path = output_dir / f"{base_output_name}.txt"
|
|
with open(txt_path, "w", encoding="utf-8") as f:
|
|
f.write(f"Method: {name}\n")
|
|
f.write("-" * 50 + "\n")
|
|
f.write("Summary:\n")
|
|
f.write(str(summary))
|
|
f.write("\n\n")
|
|
|
|
if details is not None:
|
|
f.write("Detailed Results:\n")
|
|
f.write("-" * 50 + "\n")
|
|
formatted_details = format_details(details)
|
|
f.write(formatted_details)
|
|
f.write("\n")
|
|
|
|
print(f" Saved full details to {txt_path.name}")
|
|
|
|
# 2. Save Image (if any)
|
|
if img_path and os.path.exists(img_path):
|
|
ext = os.path.splitext(img_path)[1]
|
|
target_img_path = output_dir / f"{base_output_name}{ext}"
|
|
shutil.copy2(img_path, target_img_path)
|
|
print(f" Saved image to {target_img_path.name}")
|
|
else:
|
|
pass # No image expected if generate_plots=False
|
|
|
|
except Exception as e:
|
|
print(f" Error running {name}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
if __name__ == "__main__":
|
|
run_all_analyses()
|