Json-Python-Server/app/services/analysis/modules/time_series.py
2026-01-29 18:18:32 +08:00

243 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import gc
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import acf, pacf
from statsmodels.tsa.seasonal import seasonal_decompose
from scipy.signal import spectrogram, periodogram
def generate_time_series_plots(self):
"""生成时间序列图"""
try:
self._log_step("Generating time series plots...")
if not hasattr(self, 'data') or self.data is None or len(self.data.columns) == 0:
self._log_step("No data available for time series plots", "warning")
return None, "No data available", None
# 准备数据
n_plots = min(4, len(self.data.columns))
plot_data = self.data.iloc[:, :n_plots].reset_index()
# 将 timestamp 转为字符串确保JSON可序列化
if 'timestamp' in plot_data.columns:
plot_data['timestamp'] = plot_data['timestamp'].astype(str)
summary = f"Generated {n_plots} time series charts"
# charts 模式:仅返回数据,不生成图片;保留绘图版在下方注释
self._log_step("Time series data prepared", "success")
return None, summary, plot_data
# --- 绘图版保留参考 ---
# fig, axes = plt.subplots(2, 2, figsize=(10, 8), dpi=100)
# fig.suptitle('Time Series Analysis', fontsize=14)
# axes = axes.flatten()
# for i in range(n_plots):
# try:
# col = self.data.columns[i]
# axes[i].plot(self.data.index, self.data[col], linewidth=1)
# axes[i].set_title(f'{col}')
# axes[i].tick_params(axis='x', rotation=45)
# axes[i].grid(True, alpha=0.3)
# except Exception as plot_err:
# self._log_step(f"Plot {col} error: {plot_err}", "warning")
# for i in range(n_plots, len(axes)):
# fig.delaxes(axes[i])
# plt.tight_layout()
# img_path = os.path.join(self.temp_dir.name, 'time_series.png')
# plt.savefig(img_path, dpi=100, bbox_inches='tight', format='png')
# plt.close(fig)
# self._log_step("Time series plots generated", "success")
# return img_path, summary, plot_data
except Exception as e:
self._log_step(f"Time series plots failed: {str(e)[:100]}", "error")
return None, f"Error: {e}", None
def generate_acf_pacf_plots(self):
"""生成自相关和偏自相关图"""
try:
self._log_step("Generating ACF and PACF plots...")
if hasattr(self, 'data') and self.data is not None:
numeric_cols = self.data.select_dtypes(include=[np.number]).columns
n_cols = min(3, len(numeric_cols))
# 计算ACF和PACF数据
acf_pacf_results = {}
for col in numeric_cols[:n_cols]:
series = self.data[col].dropna()
try:
acf_vals = np.asarray(acf(series, nlags=min(40, len(series) // 4)))
pacf_vals = np.asarray(pacf(series, nlags=min(20, len(series) // 5)))
acf_pacf_results[col] = {
'acf': acf_vals.tolist(),
'pacf': pacf_vals.tolist(),
}
except Exception as e:
self._log_step(f"Error calculating ACF/PACF for {col}: {e}", "warning")
summary = f"生成 {n_cols} 个变量的ACF和PACF数据"
self._log_step("ACF and PACF data generated", "success")
return None, summary, acf_pacf_results
# --- 绘图版保留参考 ---
# fig, axes = plt.subplots(n_cols, 2, figsize=(12, 4 * n_cols))
# fig.suptitle('自相关和偏自相关分析', fontsize=16)
# if n_cols == 1:
# axes = axes.reshape(1, -1)
# for i, col in enumerate(numeric_cols[:n_cols]):
# series = self.data[col].dropna()
# plot_acf(series, ax=axes[i, 0], lags=min(40, len(series) // 4))
# axes[i, 0].set_title(f'{col} - 自相关函数 (ACF)')
# plot_pacf(series, ax=axes[i, 1], lags=min(20, len(series) // 5))
# axes[i, 1].set_title(f'{col} - 偏自相关函数 (PACF)')
# plt.tight_layout()
# img_path = os.path.join(self.temp_dir.name, 'acf_pacf_plots.png')
# plt.savefig(img_path, dpi=150, bbox_inches='tight')
# plt.close()
# self._log_step("ACF and PACF plots generated", "success")
# return img_path, f"生成 {n_cols} 个变量的ACF和PACF图", acf_pacf_results
self._log_step("No data available for ACF/PACF plots", "warning")
return None, "数据不足无法生成ACF/PACF图", None
except Exception as e:
self._log_step(f"ACF/PACF plots failed: {e}", "error")
return None, f"ACF/PACF图生成失败: {e}", None
def perform_seasonal_decomposition(self):
"""执行季节性分解"""
try:
self._log_step("Performing seasonal decomposition...")
if hasattr(self, 'data') and self.data is not None:
numeric_cols = self.data.select_dtypes(include=[np.number]).columns
# 选择第一个数值列进行分解
if len(numeric_cols) > 0:
col = numeric_cols[0]
series = self.data[col].dropna()
# 季节性分解
result = seasonal_decompose(series, model='additive', period=min(24, len(series) // 2))
decomposition_data = pd.DataFrame({
'observed': result.observed,
'trend': result.trend,
'seasonal': result.seasonal,
'resid': result.resid,
})
# 填充NaN以确保JSON序列化
decomposition_data = decomposition_data.astype(object).where(
pd.notnull(decomposition_data),
None, # type: ignore[arg-type]
)
summary = f"季节性分解完成,变量: {col}"
self._log_step("Seasonal decomposition completed (data only)", "success")
return None, summary, decomposition_data
# --- 绘图版保留参考 ---
# fig, axes = plt.subplots(4, 1, figsize=(12, 10))
# fig.suptitle(f'{col} - 季节性分解', fontsize=16)
# result.observed.plot(ax=axes[0], title='原始序列')
# result.trend.plot(ax=axes[1], title='趋势成分')
# result.seasonal.plot(ax=axes[2], title='季节成分')
# result.resid.plot(ax=axes[3], title='残差成分')
# for ax in axes:
# ax.tick_params(axis='x', rotation=45)
# ax.grid(True, alpha=0.3)
# plt.tight_layout()
# img_path = os.path.join(self.temp_dir.name, 'seasonal_decomposition.png')
# plt.savefig(img_path, dpi=150, bbox_inches='tight')
# plt.close()
# self._log_step("Seasonal decomposition completed", "success")
# return img_path, summary, decomposition_data
self._log_step("No numeric columns for decomposition", "warning")
return None, "没有数值列可用于季节性分解", None
self._log_step("No data available for seasonal decomposition", "warning")
return None, "数据不足,无法进行季节性分解", None
except Exception as e:
self._log_step(f"Seasonal decomposition failed: {e}", "error")
return None, f"季节性分解失败: {e}", None
def perform_spectral_analysis(self):
"""执行频谱分析"""
try:
self._log_step("Performing spectral analysis...")
if hasattr(self, 'data') and self.data is not None:
numeric_cols = self.data.select_dtypes(include=[np.number]).columns
# 计算频谱数据(简化输出,避免数据量过大)
spectral_results = {}
for col in numeric_cols[:2]:
try:
series = self.data[col].dropna().values
f, t, Sxx = spectrogram(series, fs=1.0, nperseg=min(256, len(series) // 4))
f_p, Pxx_den = periodogram(series, fs=1.0)
# 仅保留频谱的均值和形状,避免返回完整矩阵
Sxx_log = 10 * np.log10(Sxx + 1e-12)
spectral_results[col] = {
'spectrogram': {
'f': f.tolist(),
't': t.tolist(),
'Sxx_log10_mean': float(np.mean(Sxx_log)),
'Sxx_shape': Sxx.shape,
},
'periodogram': {
'f': f_p.tolist()[:20],
'Pxx_den': Pxx_den.tolist()[:20],
},
}
except Exception as e:
self._log_step(f"Spectral calc failed for {col}: {e}", "warning")
summary = "Spectral analysis completed"
self._log_step("Spectral analysis completed (data only)", "success")
return None, summary, spectral_results
# --- 绘图版保留参考 ---
# n_cols = min(2, len(numeric_cols))
# fig, axes = plt.subplots(n_cols, 2, figsize=(15, 5 * n_cols))
# fig.suptitle('频谱分析', fontsize=16)
# if n_cols == 1:
# axes = axes.reshape(1, -1)
# for i, col in enumerate(numeric_cols[:n_cols]):
# series = self.data[col].dropna().values
# f, t, Sxx = spectrogram(series, fs=1.0, nperseg=min(256, len(series) // 4))
# axes[i, 0].pcolormesh(t, f, 10 * np.log10(Sxx), shading='gouraud')
# axes[i, 0].set_title(f'{col} - 频谱图')
# axes[i, 0].set_ylabel('频率 [Hz]')
# axes[i, 0].set_xlabel('时间')
# f, Pxx_den = periodogram(series, fs=1.0)
# axes[i, 1].semilogy(f, Pxx_den)
# axes[i, 1].set_title(f'{col} - 周期图')
# axes[i, 1].set_xlabel('频率 [Hz]')
# axes[i, 1].set_ylabel('PSD [V**2/Hz]')
# axes[i, 1].grid(True, alpha=0.3)
# plt.tight_layout()
# img_path = os.path.join(self.temp_dir.name, 'spectral_analysis.png')
# plt.savefig(img_path, dpi=150, bbox_inches='tight')
# plt.close()
# self._log_step("Spectral analysis completed", "success")
# return img_path, f"频谱分析完成,分析了 {n_cols} 个变量", spectral_results
self._log_step("No data available for spectral analysis", "warning")
return None, "数据不足,无法进行频谱分析", None
except Exception as e:
self._log_step(f"Spectral analysis failed: {e}", "error")
return None, f"频谱分析失败: {e}", None