import os import numpy as np import matplotlib.pyplot as plt from statsmodels.tsa.stattools import adfuller, kpss def perform_stationarity_tests(self): """执行平稳性检验 - ADF, KPSS, PP检验""" try: self._log_step("Performing stationarity tests...") if hasattr(self, 'data') and self.data is not None: numeric_cols = self.data.select_dtypes(include=[np.number]).columns results = {} for col in numeric_cols[:3]: # 只测试前3个变量 series = self.data[col].dropna() col_results = {} # ADF检验 adf_result = adfuller(series) adf_crit = adf_result[4] # type: ignore[index] if isinstance(adf_crit, dict): adf_crit = {str(k): float(v) for k, v in adf_crit.items()} col_results['ADF'] = { 'statistic': float(adf_result[0]), 'p_value': float(adf_result[1]), 'critical_values': adf_crit, 'stationary': bool(adf_result[1] < 0.05), } # KPSS检验 try: kpss_result = kpss(series, regression='c') kpss_crit = kpss_result[3] if isinstance(kpss_crit, dict): kpss_crit = {str(k): float(v) for k, v in kpss_crit.items()} col_results['KPSS'] = { 'statistic': float(kpss_result[0]), 'p_value': float(kpss_result[1]), 'critical_values': kpss_crit, 'stationary': bool(kpss_result[1] > 0.05), } except Exception: col_results['KPSS'] = '检验失败' results[col] = col_results summary = f"平稳性检验完成,测试了 {len(results)} 个变量" if not self.generate_plots: self._log_step("Stationarity tests completed (data only)", "success") return None, summary, results # 创建平稳性检验可视化 fig, axes = plt.subplots(2, 2, figsize=(15, 10)) fig.suptitle('平稳性检验结果', fontsize=16) # 绘制时间序列 for i, col in enumerate(numeric_cols[:2]): axes[0, i].plot(self.data.index, self.data[col]) axes[0, i].set_title(f'{col} - 时间序列') axes[0, i].tick_params(axis='x', rotation=45) axes[0, i].grid(True, alpha=0.3) # 绘制ADF检验结果 test_stats = [results[col]['ADF']['statistic'] for col in list(results.keys())[:2]] p_values = [results[col]['ADF']['p_value'] for col in list(results.keys())[:2]] x_pos = np.arange(len(test_stats)) axes[1, 0].bar(x_pos - 0.2, test_stats, 0.4, label='检验统计量', alpha=0.7) axes[1, 0].bar(x_pos + 0.2, p_values, 0.4, label='p值', alpha=0.7) axes[1, 0].set_title('ADF检验结果') axes[1, 0].set_xticks(x_pos) axes[1, 0].set_xticklabels(list(results.keys())[:2]) axes[1, 0].legend() axes[1, 0].axhline(y=0.05, color='r', linestyle='--', label='显著性水平 (0.05)') # 绘制结论 stationary_status = [ '平稳' if results[col]['ADF']['stationary'] else '非平稳' for col in list(results.keys())[:2] ] colors = ['green' if status == '平稳' else 'red' for status in stationary_status] axes[1, 1].bar(x_pos, [1] * len(stationary_status), color=colors, alpha=0.7) axes[1, 1].set_title('平稳性结论') axes[1, 1].set_xticks(x_pos) axes[1, 1].set_xticklabels(list(results.keys())[:2]) for i, status in enumerate(stationary_status): axes[1, 1].text(i, 0.5, status, ha='center', va='center', fontweight='bold') plt.tight_layout() img_path = os.path.join(self.temp_dir.name, 'stationarity_tests.png') plt.savefig(img_path, dpi=150, bbox_inches='tight') plt.close() self._log_step("Stationarity tests completed", "success") return img_path, summary, results self._log_step("No data available for stationarity tests", "warning") return None, "数据不足,无法进行平稳性检验", None except Exception as e: self._log_step(f"Stationarity tests failed: {e}", "error") return None, f"平稳性检验失败: {e}", None def perform_cointegration_test(self): """执行协整检验""" try: self._log_step("Performing cointegration test...") if not (hasattr(self, 'data') and self.data is not None and len(self.data.columns) > 1): self._log_step("Not enough data for cointegration test", "warning") return None, "数据不足,无法进行协整检验", None from statsmodels.tsa.vector_ar.vecm import coint_johansen numeric_data = self.data.select_dtypes(include=[np.number]) if len(numeric_data.columns) < 2: self._log_step("Not enough numeric columns for cointegration test", "warning") return None, "数值变量不足,无法进行协整检验", None result = coint_johansen(numeric_data, det_order=0, k_ar_diff=1) summary = ( f"协整检验完成,轨迹统计量: {result.trace_stat[0]:.3f}, " f"临界值(95%): {result.trace_stat_crit_vals[0, 1]:.3f}" ) coint_data = { 'trace_stat': result.trace_stat.tolist(), 'trace_stat_crit_vals': result.trace_stat_crit_vals.tolist(), 'eigen_vals': result.eig.tolist(), } if not self.generate_plots: self._log_step("Cointegration test completed (data only)", "success") return None, summary, coint_data plt.figure(figsize=(10, 6)) positions = np.arange(len(result.trace_stat)) plt.bar(positions - 0.2, result.trace_stat, width=0.4, label='Trace Statistic', alpha=0.7) plt.bar( positions + 0.2, result.trace_stat_crit_vals[:, 1], width=0.4, label='Critical Value (95%)', alpha=0.7, ) plt.xlabel('Number of Cointegrating Relations') plt.ylabel('Test Statistic') plt.title('Johansen Cointegration Test Results') plt.legend() plt.grid(True, alpha=0.3) plt.tight_layout() img_path = os.path.join(self.temp_dir.name, 'cointegration_test.png') plt.savefig(img_path, dpi=150, bbox_inches='tight') plt.close() self._log_step("Cointegration test completed", "success") return img_path, summary, coint_data except Exception as e: self._log_step(f"Cointegration test failed: {e}", "error") return None, f"协整检验失败: {e}", None