Json-Python-Server/app/services/analysis/modules/time_series.py

243 lines
11 KiB
Python
Raw Permalink Normal View History

2026-01-29 18:18:32 +08:00
import gc
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import acf, pacf
from statsmodels.tsa.seasonal import seasonal_decompose
from scipy.signal import spectrogram, periodogram
def generate_time_series_plots(self):
"""生成时间序列图"""
try:
self._log_step("Generating time series plots...")
if not hasattr(self, 'data') or self.data is None or len(self.data.columns) == 0:
self._log_step("No data available for time series plots", "warning")
return None, "No data available", None
# 准备数据
n_plots = min(4, len(self.data.columns))
plot_data = self.data.iloc[:, :n_plots].reset_index()
# 将 timestamp 转为字符串确保JSON可序列化
if 'timestamp' in plot_data.columns:
plot_data['timestamp'] = plot_data['timestamp'].astype(str)
summary = f"Generated {n_plots} time series charts"
# charts 模式:仅返回数据,不生成图片;保留绘图版在下方注释
self._log_step("Time series data prepared", "success")
return None, summary, plot_data
# --- 绘图版保留参考 ---
# fig, axes = plt.subplots(2, 2, figsize=(10, 8), dpi=100)
# fig.suptitle('Time Series Analysis', fontsize=14)
# axes = axes.flatten()
# for i in range(n_plots):
# try:
# col = self.data.columns[i]
# axes[i].plot(self.data.index, self.data[col], linewidth=1)
# axes[i].set_title(f'{col}')
# axes[i].tick_params(axis='x', rotation=45)
# axes[i].grid(True, alpha=0.3)
# except Exception as plot_err:
# self._log_step(f"Plot {col} error: {plot_err}", "warning")
# for i in range(n_plots, len(axes)):
# fig.delaxes(axes[i])
# plt.tight_layout()
# img_path = os.path.join(self.temp_dir.name, 'time_series.png')
# plt.savefig(img_path, dpi=100, bbox_inches='tight', format='png')
# plt.close(fig)
# self._log_step("Time series plots generated", "success")
# return img_path, summary, plot_data
except Exception as e:
self._log_step(f"Time series plots failed: {str(e)[:100]}", "error")
return None, f"Error: {e}", None
def generate_acf_pacf_plots(self):
"""生成自相关和偏自相关图"""
try:
self._log_step("Generating ACF and PACF plots...")
if hasattr(self, 'data') and self.data is not None:
numeric_cols = self.data.select_dtypes(include=[np.number]).columns
n_cols = min(3, len(numeric_cols))
# 计算ACF和PACF数据
acf_pacf_results = {}
for col in numeric_cols[:n_cols]:
series = self.data[col].dropna()
try:
acf_vals = np.asarray(acf(series, nlags=min(40, len(series) // 4)))
pacf_vals = np.asarray(pacf(series, nlags=min(20, len(series) // 5)))
acf_pacf_results[col] = {
'acf': acf_vals.tolist(),
'pacf': pacf_vals.tolist(),
}
except Exception as e:
self._log_step(f"Error calculating ACF/PACF for {col}: {e}", "warning")
summary = f"生成 {n_cols} 个变量的ACF和PACF数据"
self._log_step("ACF and PACF data generated", "success")
return None, summary, acf_pacf_results
# --- 绘图版保留参考 ---
# fig, axes = plt.subplots(n_cols, 2, figsize=(12, 4 * n_cols))
# fig.suptitle('自相关和偏自相关分析', fontsize=16)
# if n_cols == 1:
# axes = axes.reshape(1, -1)
# for i, col in enumerate(numeric_cols[:n_cols]):
# series = self.data[col].dropna()
# plot_acf(series, ax=axes[i, 0], lags=min(40, len(series) // 4))
# axes[i, 0].set_title(f'{col} - 自相关函数 (ACF)')
# plot_pacf(series, ax=axes[i, 1], lags=min(20, len(series) // 5))
# axes[i, 1].set_title(f'{col} - 偏自相关函数 (PACF)')
# plt.tight_layout()
# img_path = os.path.join(self.temp_dir.name, 'acf_pacf_plots.png')
# plt.savefig(img_path, dpi=150, bbox_inches='tight')
# plt.close()
# self._log_step("ACF and PACF plots generated", "success")
# return img_path, f"生成 {n_cols} 个变量的ACF和PACF图", acf_pacf_results
self._log_step("No data available for ACF/PACF plots", "warning")
return None, "数据不足无法生成ACF/PACF图", None
except Exception as e:
self._log_step(f"ACF/PACF plots failed: {e}", "error")
return None, f"ACF/PACF图生成失败: {e}", None
def perform_seasonal_decomposition(self):
"""执行季节性分解"""
try:
self._log_step("Performing seasonal decomposition...")
if hasattr(self, 'data') and self.data is not None:
numeric_cols = self.data.select_dtypes(include=[np.number]).columns
# 选择第一个数值列进行分解
if len(numeric_cols) > 0:
col = numeric_cols[0]
series = self.data[col].dropna()
# 季节性分解
result = seasonal_decompose(series, model='additive', period=min(24, len(series) // 2))
decomposition_data = pd.DataFrame({
'observed': result.observed,
'trend': result.trend,
'seasonal': result.seasonal,
'resid': result.resid,
})
# 填充NaN以确保JSON序列化
decomposition_data = decomposition_data.astype(object).where(
pd.notnull(decomposition_data),
None, # type: ignore[arg-type]
)
summary = f"季节性分解完成,变量: {col}"
self._log_step("Seasonal decomposition completed (data only)", "success")
return None, summary, decomposition_data
# --- 绘图版保留参考 ---
# fig, axes = plt.subplots(4, 1, figsize=(12, 10))
# fig.suptitle(f'{col} - 季节性分解', fontsize=16)
# result.observed.plot(ax=axes[0], title='原始序列')
# result.trend.plot(ax=axes[1], title='趋势成分')
# result.seasonal.plot(ax=axes[2], title='季节成分')
# result.resid.plot(ax=axes[3], title='残差成分')
# for ax in axes:
# ax.tick_params(axis='x', rotation=45)
# ax.grid(True, alpha=0.3)
# plt.tight_layout()
# img_path = os.path.join(self.temp_dir.name, 'seasonal_decomposition.png')
# plt.savefig(img_path, dpi=150, bbox_inches='tight')
# plt.close()
# self._log_step("Seasonal decomposition completed", "success")
# return img_path, summary, decomposition_data
self._log_step("No numeric columns for decomposition", "warning")
return None, "没有数值列可用于季节性分解", None
self._log_step("No data available for seasonal decomposition", "warning")
return None, "数据不足,无法进行季节性分解", None
except Exception as e:
self._log_step(f"Seasonal decomposition failed: {e}", "error")
return None, f"季节性分解失败: {e}", None
def perform_spectral_analysis(self):
"""执行频谱分析"""
try:
self._log_step("Performing spectral analysis...")
if hasattr(self, 'data') and self.data is not None:
numeric_cols = self.data.select_dtypes(include=[np.number]).columns
# 计算频谱数据(简化输出,避免数据量过大)
spectral_results = {}
for col in numeric_cols[:2]:
try:
series = self.data[col].dropna().values
f, t, Sxx = spectrogram(series, fs=1.0, nperseg=min(256, len(series) // 4))
f_p, Pxx_den = periodogram(series, fs=1.0)
# 仅保留频谱的均值和形状,避免返回完整矩阵
Sxx_log = 10 * np.log10(Sxx + 1e-12)
spectral_results[col] = {
'spectrogram': {
'f': f.tolist(),
't': t.tolist(),
'Sxx_log10_mean': float(np.mean(Sxx_log)),
'Sxx_shape': Sxx.shape,
},
'periodogram': {
'f': f_p.tolist()[:20],
'Pxx_den': Pxx_den.tolist()[:20],
},
}
except Exception as e:
self._log_step(f"Spectral calc failed for {col}: {e}", "warning")
summary = "Spectral analysis completed"
self._log_step("Spectral analysis completed (data only)", "success")
return None, summary, spectral_results
# --- 绘图版保留参考 ---
# n_cols = min(2, len(numeric_cols))
# fig, axes = plt.subplots(n_cols, 2, figsize=(15, 5 * n_cols))
# fig.suptitle('频谱分析', fontsize=16)
# if n_cols == 1:
# axes = axes.reshape(1, -1)
# for i, col in enumerate(numeric_cols[:n_cols]):
# series = self.data[col].dropna().values
# f, t, Sxx = spectrogram(series, fs=1.0, nperseg=min(256, len(series) // 4))
# axes[i, 0].pcolormesh(t, f, 10 * np.log10(Sxx), shading='gouraud')
# axes[i, 0].set_title(f'{col} - 频谱图')
# axes[i, 0].set_ylabel('频率 [Hz]')
# axes[i, 0].set_xlabel('时间')
# f, Pxx_den = periodogram(series, fs=1.0)
# axes[i, 1].semilogy(f, Pxx_den)
# axes[i, 1].set_title(f'{col} - 周期图')
# axes[i, 1].set_xlabel('频率 [Hz]')
# axes[i, 1].set_ylabel('PSD [V**2/Hz]')
# axes[i, 1].grid(True, alpha=0.3)
# plt.tight_layout()
# img_path = os.path.join(self.temp_dir.name, 'spectral_analysis.png')
# plt.savefig(img_path, dpi=150, bbox_inches='tight')
# plt.close()
# self._log_step("Spectral analysis completed", "success")
# return img_path, f"频谱分析完成,分析了 {n_cols} 个变量", spectral_results
self._log_step("No data available for spectral analysis", "warning")
return None, "数据不足,无法进行频谱分析", None
except Exception as e:
self._log_step(f"Spectral analysis failed: {e}", "error")
return None, f"频谱分析失败: {e}", None