170 lines
7.1 KiB
Python
170 lines
7.1 KiB
Python
import os
|
|
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
from statsmodels.tsa.stattools import adfuller, kpss
|
|
|
|
|
|
def perform_stationarity_tests(self):
|
|
"""执行平稳性检验 - ADF, KPSS, PP检验"""
|
|
try:
|
|
self._log_step("Performing stationarity tests...")
|
|
|
|
if hasattr(self, 'data') and self.data is not None:
|
|
numeric_cols = self.data.select_dtypes(include=[np.number]).columns
|
|
results = {}
|
|
|
|
for col in numeric_cols[:3]: # 只测试前3个变量
|
|
series = self.data[col].dropna()
|
|
col_results = {}
|
|
|
|
# ADF检验
|
|
adf_result = adfuller(series)
|
|
adf_crit = adf_result[4] # type: ignore[index]
|
|
if isinstance(adf_crit, dict):
|
|
adf_crit = {str(k): float(v) for k, v in adf_crit.items()}
|
|
col_results['ADF'] = {
|
|
'statistic': float(adf_result[0]),
|
|
'p_value': float(adf_result[1]),
|
|
'critical_values': adf_crit,
|
|
'stationary': bool(adf_result[1] < 0.05),
|
|
}
|
|
|
|
# KPSS检验
|
|
try:
|
|
kpss_result = kpss(series, regression='c')
|
|
kpss_crit = kpss_result[3]
|
|
if isinstance(kpss_crit, dict):
|
|
kpss_crit = {str(k): float(v) for k, v in kpss_crit.items()}
|
|
col_results['KPSS'] = {
|
|
'statistic': float(kpss_result[0]),
|
|
'p_value': float(kpss_result[1]),
|
|
'critical_values': kpss_crit,
|
|
'stationary': bool(kpss_result[1] > 0.05),
|
|
}
|
|
except Exception:
|
|
col_results['KPSS'] = '检验失败'
|
|
|
|
results[col] = col_results
|
|
|
|
summary = f"平稳性检验完成,测试了 {len(results)} 个变量"
|
|
|
|
if not self.generate_plots:
|
|
self._log_step("Stationarity tests completed (data only)", "success")
|
|
return None, summary, results
|
|
|
|
# 创建平稳性检验可视化
|
|
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
|
|
fig.suptitle('平稳性检验结果', fontsize=16)
|
|
|
|
# 绘制时间序列
|
|
for i, col in enumerate(numeric_cols[:2]):
|
|
axes[0, i].plot(self.data.index, self.data[col])
|
|
axes[0, i].set_title(f'{col} - 时间序列')
|
|
axes[0, i].tick_params(axis='x', rotation=45)
|
|
axes[0, i].grid(True, alpha=0.3)
|
|
|
|
# 绘制ADF检验结果
|
|
test_stats = [results[col]['ADF']['statistic'] for col in list(results.keys())[:2]]
|
|
p_values = [results[col]['ADF']['p_value'] for col in list(results.keys())[:2]]
|
|
|
|
x_pos = np.arange(len(test_stats))
|
|
axes[1, 0].bar(x_pos - 0.2, test_stats, 0.4, label='检验统计量', alpha=0.7)
|
|
axes[1, 0].bar(x_pos + 0.2, p_values, 0.4, label='p值', alpha=0.7)
|
|
axes[1, 0].set_title('ADF检验结果')
|
|
axes[1, 0].set_xticks(x_pos)
|
|
axes[1, 0].set_xticklabels(list(results.keys())[:2])
|
|
axes[1, 0].legend()
|
|
axes[1, 0].axhline(y=0.05, color='r', linestyle='--', label='显著性水平 (0.05)')
|
|
|
|
# 绘制结论
|
|
stationary_status = [
|
|
'平稳' if results[col]['ADF']['stationary'] else '非平稳' for col in list(results.keys())[:2]
|
|
]
|
|
colors = ['green' if status == '平稳' else 'red' for status in stationary_status]
|
|
axes[1, 1].bar(x_pos, [1] * len(stationary_status), color=colors, alpha=0.7)
|
|
axes[1, 1].set_title('平稳性结论')
|
|
axes[1, 1].set_xticks(x_pos)
|
|
axes[1, 1].set_xticklabels(list(results.keys())[:2])
|
|
for i, status in enumerate(stationary_status):
|
|
axes[1, 1].text(i, 0.5, status, ha='center', va='center', fontweight='bold')
|
|
|
|
plt.tight_layout()
|
|
img_path = os.path.join(self.temp_dir.name, 'stationarity_tests.png')
|
|
plt.savefig(img_path, dpi=150, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
self._log_step("Stationarity tests completed", "success")
|
|
|
|
return img_path, summary, results
|
|
|
|
self._log_step("No data available for stationarity tests", "warning")
|
|
return None, "数据不足,无法进行平稳性检验", None
|
|
|
|
except Exception as e:
|
|
self._log_step(f"Stationarity tests failed: {e}", "error")
|
|
return None, f"平稳性检验失败: {e}", None
|
|
|
|
|
|
def perform_cointegration_test(self):
|
|
"""执行协整检验"""
|
|
try:
|
|
self._log_step("Performing cointegration test...")
|
|
|
|
if not (hasattr(self, 'data') and self.data is not None and len(self.data.columns) > 1):
|
|
self._log_step("Not enough data for cointegration test", "warning")
|
|
return None, "数据不足,无法进行协整检验", None
|
|
|
|
from statsmodels.tsa.vector_ar.vecm import coint_johansen
|
|
|
|
numeric_data = self.data.select_dtypes(include=[np.number])
|
|
if len(numeric_data.columns) < 2:
|
|
self._log_step("Not enough numeric columns for cointegration test", "warning")
|
|
return None, "数值变量不足,无法进行协整检验", None
|
|
|
|
result = coint_johansen(numeric_data, det_order=0, k_ar_diff=1)
|
|
|
|
summary = (
|
|
f"协整检验完成,轨迹统计量: {result.trace_stat[0]:.3f}, "
|
|
f"临界值(95%): {result.trace_stat_crit_vals[0, 1]:.3f}"
|
|
)
|
|
|
|
coint_data = {
|
|
'trace_stat': result.trace_stat.tolist(),
|
|
'trace_stat_crit_vals': result.trace_stat_crit_vals.tolist(),
|
|
'eigen_vals': result.eig.tolist(),
|
|
}
|
|
|
|
if not self.generate_plots:
|
|
self._log_step("Cointegration test completed (data only)", "success")
|
|
return None, summary, coint_data
|
|
|
|
plt.figure(figsize=(10, 6))
|
|
positions = np.arange(len(result.trace_stat))
|
|
plt.bar(positions - 0.2, result.trace_stat, width=0.4, label='Trace Statistic', alpha=0.7)
|
|
plt.bar(
|
|
positions + 0.2,
|
|
result.trace_stat_crit_vals[:, 1],
|
|
width=0.4,
|
|
label='Critical Value (95%)',
|
|
alpha=0.7,
|
|
)
|
|
|
|
plt.xlabel('Number of Cointegrating Relations')
|
|
plt.ylabel('Test Statistic')
|
|
plt.title('Johansen Cointegration Test Results')
|
|
plt.legend()
|
|
plt.grid(True, alpha=0.3)
|
|
plt.tight_layout()
|
|
|
|
img_path = os.path.join(self.temp_dir.name, 'cointegration_test.png')
|
|
plt.savefig(img_path, dpi=150, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
self._log_step("Cointegration test completed", "success")
|
|
return img_path, summary, coint_data
|
|
|
|
except Exception as e:
|
|
self._log_step(f"Cointegration test failed: {e}", "error")
|
|
return None, f"协整检验失败: {e}", None
|