import gc import os import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from sklearn.decomposition import PCA from sklearn.cluster import KMeans def generate_correlation_heatmap(self): """生成相关性热力图""" fig = None try: self._log_step("Generating correlation heatmap...") if not hasattr(self, 'data') or self.data is None or len(self.data.columns) <= 1: self._log_step("Not enough data for correlation analysis", "warning") return None, "Not enough data", None # 计算相关性矩阵 numeric_cols = self.data.select_dtypes(include=[np.number]).columns corr_matrix = self.data[numeric_cols].corr() summary = "Correlation matrix calculated" if not self.generate_plots: self._log_step("Correlation analysis completed (data only)", "success") # 替换NaN为None以兼容JSON return None, summary, corr_matrix.where(pd.notnull(corr_matrix), None) # 创建热力图 fig = plt.figure(figsize=(8, 6), dpi=100) sns.heatmap( corr_matrix, annot=True, fmt=".2f", cmap='coolwarm', center=0, square=True, cbar_kws={"shrink": 0.8}, ) plt.title('Correlation Heatmap') plt.tight_layout() # 保存图片 img_path = os.path.join(self.temp_dir.name, 'correlation_heatmap.png') try: plt.savefig(img_path, dpi=100, bbox_inches='tight', format='png') except Exception as save_err: self._log_step(f"Save error: {save_err}", "error") return None, f"Save error: {str(save_err)[:100]}", corr_matrix.where(pd.notnull(corr_matrix), None) finally: plt.close(fig) gc.collect() self._log_step("Correlation heatmap generated", "success") return img_path, summary, corr_matrix.where(pd.notnull(corr_matrix), None) except Exception as e: self._log_step(f"Correlation heatmap failed: {str(e)[:100]}", "error") if fig is not None: try: plt.close(fig) except Exception: pass return None, f"Correlation heatmap failed: {str(e)[:100]}", None def generate_pca_scree_plot(self): """生成PCA碎石图""" try: self._log_step("Generating PCA scree plot...") if hasattr(self, 'scaled_data') and self.scaled_data is not None: pca = PCA() pca.fit(self.scaled_data) explained_variance = pca.explained_variance_ratio_ cumulative_variance = np.cumsum(explained_variance) # 准备数据 scree_data = pd.DataFrame({ 'component': range(1, len(explained_variance) + 1), 'explained_variance': explained_variance, 'cumulative_variance': cumulative_variance, }) summary = ( "PCA碎石图生成完成,前2个主成分解释 " f"{cumulative_variance[min(1, len(cumulative_variance) - 1)]:.2%} 方差" ) if not self.generate_plots: self._log_step("PCA scree data generated", "success") return None, summary, scree_data # 创建碎石图 plt.figure(figsize=(10, 6)) # 绘制碎石图 plt.subplot(1, 2, 1) plt.plot(range(1, len(explained_variance) + 1), explained_variance, 'bo-') plt.title('PCA碎石图') plt.xlabel('主成分') plt.ylabel('解释方差比例') plt.grid(True, alpha=0.3) # 绘制累积方差图 plt.subplot(1, 2, 2) plt.plot(range(1, len(cumulative_variance) + 1), cumulative_variance, 'ro-') plt.title('累积解释方差') plt.xlabel('主成分数量') plt.ylabel('累积方差比例') plt.axhline(y=0.85, color='g', linestyle='--', label='85% 方差') plt.legend() plt.grid(True, alpha=0.3) plt.tight_layout() img_path = os.path.join(self.temp_dir.name, 'pca_scree_plot.png') plt.savefig(img_path, dpi=150, bbox_inches='tight') plt.close() self._log_step("PCA scree plot generated", "success") return img_path, summary, scree_data self._log_step("No scaled data available for PCA scree plot", "warning") return None, "没有标准化数据可用于PCA碎石图", None except Exception as e: self._log_step(f"PCA scree plot failed: {e}", "error") return None, f"PCA碎石图生成失败: {e}", None def perform_pca_analysis(self): """执行主成分分析""" try: self._log_step("Performing PCA analysis...") if hasattr(self, 'scaled_data') and self.scaled_data is not None and len(self.scaled_data.columns) > 1: pca = PCA(n_components=2) principal_components = pca.fit_transform(self.scaled_data) summary = ( "PCA analysis completed, explained variance: " f"{pca.explained_variance_ratio_[0]:.2%} + {pca.explained_variance_ratio_[1]:.2%}" ) if not self.generate_plots: self._log_step("PCA analysis completed (data only)", "success") pca_df = pd.DataFrame(data=principal_components, columns=['PC1', 'PC2']) pca_df['timestamp'] = self.data.index.astype(str) return None, summary, pca_df pca_df = pd.DataFrame(data=principal_components, columns=['PC1', 'PC2']) pca_df['timestamp'] = self.data.index.astype(str) # 创建PCA散点图 plt.figure(figsize=(8, 6)) plt.scatter(principal_components[:, 0], principal_components[:, 1], alpha=0.7) plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%})') plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%})') plt.title('Principal Component Analysis (PCA)') plt.grid(True, alpha=0.3) plt.tight_layout() # 保存图片 img_path = os.path.join(self.temp_dir.name, 'pca_analysis.png') plt.savefig(img_path, dpi=150, bbox_inches='tight') plt.close() self._log_step("PCA analysis completed", "success") return img_path, summary, pca_df self._log_step("Not enough data for PCA analysis", "warning") return None, "Not enough data for PCA analysis", None except Exception as e: self._log_step(f"PCA analysis failed: {e}", "error") return None, f"PCA analysis failed: {e}", None def perform_clustering_analysis(self): """执行聚类分析""" try: self._log_step("Performing clustering analysis...") if hasattr(self, 'scaled_data') and self.scaled_data is not None and len(self.scaled_data.columns) > 1: kmeans = KMeans(n_clusters=3, random_state=42) clusters = kmeans.fit_predict(self.scaled_data) summary = f"Clustering analysis completed, found {len(np.unique(clusters))} clusters" if not self.generate_plots: self._log_step("Clustering analysis completed (data only)", "success") cluster_df = pd.DataFrame({'cluster': clusters}) cluster_df['timestamp'] = self.data.index.astype(str) return None, summary, cluster_df cluster_df = pd.DataFrame({'cluster': clusters}) cluster_df['timestamp'] = self.data.index.astype(str) # 如果数据是2D的,直接绘制聚类结果 if len(self.scaled_data.columns) >= 2: plt.figure(figsize=(8, 6)) plt.scatter( self.scaled_data.iloc[:, 0], self.scaled_data.iloc[:, 1], c=clusters, cmap='viridis', alpha=0.7, ) plt.xlabel(self.scaled_data.columns[0]) plt.ylabel(self.scaled_data.columns[1]) plt.title('Clustering Analysis') plt.colorbar(label='Cluster') plt.tight_layout() else: # 对于高维数据,使用PCA降维后可视化 pca = PCA(n_components=2) reduced_data = pca.fit_transform(self.scaled_data) plt.figure(figsize=(8, 6)) plt.scatter(reduced_data[:, 0], reduced_data[:, 1], c=clusters, cmap='viridis', alpha=0.7) plt.xlabel('PC1') plt.ylabel('PC2') plt.title('Clustering Analysis (PCA Reduced)') plt.colorbar(label='Cluster') plt.tight_layout() # 保存图片 img_path = os.path.join(self.temp_dir.name, 'clustering_analysis.png') plt.savefig(img_path, dpi=150, bbox_inches='tight') plt.close() self._log_step("Clustering analysis completed", "success") return img_path, summary, cluster_df self._log_step("Not enough data for clustering analysis", "warning") return None, "Not enough data for clustering analysis", None except Exception as e: self._log_step(f"Clustering analysis failed: {e}", "error") return None, f"Clustering analysis failed: {e}", None def perform_factor_analysis(self): """执行因子分析""" try: self._log_step("Performing factor analysis...") if hasattr(self, 'scaled_data') and self.scaled_data is not None and len(self.scaled_data.columns) > 1: from sklearn.decomposition import FactorAnalysis fa = FactorAnalysis(n_components=2, random_state=42) factors = fa.fit_transform(self.scaled_data) summary = "因子分析完成,提取了2个主要因子" if not self.generate_plots: self._log_step("Factor analysis completed (data only)", "success") factor_df = pd.DataFrame(data=factors, columns=['Factor1', 'Factor2']) factor_df['timestamp'] = self.data.index.astype(str) return None, summary, factor_df factor_df = pd.DataFrame(data=factors, columns=['Factor1', 'Factor2']) factor_df['timestamp'] = self.data.index.astype(str) # 创建因子分析图 plt.figure(figsize=(10, 8)) plt.scatter(factors[:, 0], factors[:, 1], alpha=0.7) plt.xlabel('Factor 1') plt.ylabel('Factor 2') plt.title('Factor Analysis') plt.grid(True, alpha=0.3) # 添加因子载荷 for i, (x, y) in enumerate(factors[:10]): # 只显示前10个点 plt.annotate(str(i), (x, y), xytext=(5, 5), textcoords='offset points', fontsize=8) plt.tight_layout() # 保存图片 img_path = os.path.join(self.temp_dir.name, 'factor_analysis.png') plt.savefig(img_path, dpi=150, bbox_inches='tight') plt.close() self._log_step("Factor analysis completed", "success") return img_path, summary, factor_df self._log_step("Not enough data for factor analysis", "warning") return None, "数据不足,无法进行因子分析", None except Exception as e: self._log_step(f"Factor analysis failed: {e}", "error") return None, f"因子分析失败: {e}", None