"""PCA的重构算法""" import math import numpy as np def min_pos(X): X[X <= 0] = np.max(X) m = np.min(X) re = np.where(X == m) min_i = re[0] min_j = re[1] if m < 0: m = 0 return m, min_i, min_j def Lars(X, Y, D, DIAG, t, limit_line, LockVariable): n, m = X.shape beta = np.zeros((1, m)) beta_temp = np.zeros((m, 1)) A = [] N_Co_added = 0 i = 0 mse = [] limit_m = len(LockVariable) for k in range(m - limit_m - 1): i += 1 ero = np.array(Y - beta[-1, :].T) # c=np.dot(P,DIAG,P.T,ero) c = np.dot(D, DIAG).dot(D.T).dot(ero) # 计算相关性 c[LockVariable] = 0 C = np.max(np.abs(c)) mse.append(np.dot(ero.T, D).dot(DIAG).dot(D.T).dot(ero)) if mse[k] < limit_line: break elif k == 0: addTndex = np.where(abs(c) == C)[-1][0] A.append(addTndex) # 活动集 # 更新正在添加的相应协方差索引的数量 N_Co_added = N_Co_added + 1 A_c = list(set(range(0, m)).difference(set(A))) # 非活动集 s_A = np.diag(np.sign(c[A])) num_Zero_Coeff = len(A_c) ## 计算 X_A, A_A , u_A ,the inner product vecto X_A = np.dot(X[:, A], s_A).reshape(n, -1) G_A = np.dot(X_A.T, X_A) One_A = np.ones((len(A), 1)) s = One_A.copy() if G_A.shape == (): inv_GA = 1 / G_A else: inv_GA = np.linalg.pinv(G_A) # G_a_inv_red_cols = np.sum(inv_GA, 1) A_A = 1 / np.sqrt(np.dot(s.T, inv_GA).dot(s)) w_A = (A_A * inv_GA).dot(s) # w_A: (less then 90%)构成等角的单位向量 u_A = np.dot(X_A, w_A) # .reshape(n) a = X.T.dot(u_A) # inner product vector gamma_Test = np.zeros((num_Zero_Coeff, 2)) # gamma=[] # if k == m - 1 - limit_m: if N_Co_added == m - 1: gamma = C / A_A else: for j in range(num_Zero_Coeff): j_p = A_c[j] first_term = (C - c[j_p]) / (A_A - a[j_p]) second_term = (C + c[j_p]) / (A_A + a[j_p]) gamma_Test[j, :] = np.array([first_term, second_term]).reshape(1, -1) gamma, min_i, min_j = min_pos(gamma_Test) # gamma.append(m_s) try: addTndex = A_c[np.min(min_i)] except: print(123) beta_temp = np.zeros((m, 1)) beta_temp[A] = beta[k, A].reshape(-1, 1) + np.dot(s_A, gamma * w_A) beta = np.vstack((beta, beta_temp.transpose())) # 更新的系数即故障f return beta, mse def recon_fault_diagnosis_r(x, m, limit_line, beta_value, model, spe_recon, m_spe, rbc=None): """基于重构的故障诊断,重构故障>3 """ k_p = int(model["K"]) count = 0 dimension = x.shape[0] if rbc is None: fault_para = sorted(beta_value, key=lambda item: item[1], reverse=True) else: fault_para = sorted(rbc, key=lambda item: item[1], reverse=True) fault_para_index_temp = [item[0] for item in fault_para] fault_para_index = [fault_para_index_temp[: i] for i in range(1, len(beta_value) + 1)][::-1] fault_matrix, fault_index_recon = [], [] for item in fault_para_index: ke_si_matrix = np.zeros(shape=(dimension, dimension)) k_matrix = np.zeros(shape=(dimension, 1)) for i in range(dimension): ke_si = np.zeros(shape=(dimension, 1)) ke_si_i = np.zeros(shape=(dimension, 1)) # 每一行的fai对应的ke_si if i in item: ke_si_i[i] = 1 # 计算每一行的ke_si fai = m @ ke_si_i @ np.linalg.pinv(ke_si_i.T @ m @ ke_si_i) # 计算每一行的fai for j in range(dimension): ke_si_j = ke_si.copy() # 每一行每一列的ke_si if i == j: ke_si_matrix[i, j] = 1 # ke_si矩阵对角线元素全为1 else: if j in item: ke_si_j[j] = 1 ke_si_matrix[i, j] = ke_si_j.T @ fai # 计算矩阵其他元素 k_matrix[i] = x.T @ fai # 计算k 矩阵 if np.linalg.det(ke_si_matrix) > 1e-15 and len(item) <= ke_si_matrix.shape[0] - k_p: f_matrix = np.linalg.pinv(ke_si_matrix) @ k_matrix # 计算f 矩阵 else: continue f_matrix_1 = np.zeros(shape=(dimension, 1)) for k in item: f_matrix_1[k] = f_matrix[k] if ((x - f_matrix_1.T) @ m @ (x - f_matrix_1.T).T)[0][0] < limit_line: count += 1 fault_matrix = f_matrix_1 fault_index_recon = item else: return fault_matrix, np.array(fault_index_recon) return fault_matrix, np.array(fault_index_recon) # 没有找到故障 def recon_fault_diagnosis_r_l(x, m, fault_index: "list"): """基于重构的故障诊断,重构故障<=3,或者是直接用于fai后的spe重构 """ dimension = x.shape[0] item = fault_index.copy() ke_si_matrix = np.zeros(shape=(dimension, dimension)) k_matrix = np.zeros(shape=(dimension, 1)) for i in range(dimension): ke_si = np.zeros(shape=(dimension, 1)) ke_si_i = np.zeros(shape=(dimension, 1)) # 每一行的fai对应的ke si if i in item: ke_si_i[i] = 1 # 计算每一行的ke_si fai = m @ ke_si_i @ np.linalg.pinv(ke_si_i.T @ m @ ke_si_i) # 计算每一行的fai for j in range(dimension): ke_si_j = ke_si.copy() # 每一行每一列的ke si if i == j: ke_si_matrix[i, j] = 1 # ke_si矩阵对角线元素全为1 else: if j in item: ke_si_j[j] = 1 ke_si_matrix[i, j] = ke_si_j.T @ fai # 计算矩阵其他元素 k_matrix[i] = x.T @ fai # 计算k 矩阵 f_matrix = np.linalg.pinv(ke_si_matrix) @ k_matrix # 计算f 矩阵 f_matrix_1 = np.zeros(shape=(dimension, 1)) for k in fault_index: f_matrix_1[k] = f_matrix[k] fault_matrix = f_matrix_1 return fault_matrix, np.array(fault_index) # 没有找到故障 def recon_fault_diagnosis_r_c(x, m, limit_line, beta_value, model, spe_recon, m_spe, lock_variable, selectVec, rbc=None): """基于重构的故障诊断,重构故障>3 """ k_p = int(model["K"]) count = 0 dimension = x.shape[0] if rbc is None: fault_para = sorted(beta_value, key=lambda item: item[1], reverse=True) else: fault_para = sorted(rbc, key=lambda item: item[1], reverse=True) fault_para_index_temp = [item[0] for item in fault_para] fault_para_index = [fault_para_index_temp[: i] for i in range(1, len(beta_value) + 1)][::-1] fault_matrix, fault_index_recon = [], [] for item in fault_para_index: ke_si_matrix = np.zeros(shape=(dimension, dimension)) k_matrix = np.zeros(shape=(dimension, 1)) for i in range(dimension): ke_si = np.zeros(shape=(dimension, 1)) ke_si_i = np.zeros(shape=(dimension, 1)) # 每一行的fai对应的ke_si if i in item: ke_si_i[i] = 1 # 计算每一行的ke_si fai = m @ ke_si_i @ np.linalg.pinv(ke_si_i.T @ m @ ke_si_i) # 计算每一行的fai for j in range(dimension): ke_si_j = ke_si.copy() # 每一行每一列的ke_si if i == j: ke_si_matrix[i, j] = 1 # ke_si矩阵对角线元素全为1 else: if j in item: ke_si_j[j] = 1 ke_si_matrix[i, j] = ke_si_j.T @ fai # 计算矩阵其他元素 k_matrix[i] = x.T @ fai # 计算k 矩阵 if np.linalg.det(ke_si_matrix) > 1e-15 and len(item) <= ke_si_matrix.shape[0] - k_p: f_matrix = np.linalg.pinv(ke_si_matrix) @ k_matrix # 计算f 矩阵 else: # 如果矩阵不可逆则使用贡献图法进行重构 res = contribution_plot(x, model, lock_variable, selectVec) return res, "plot" # 返回故障幅值和故障参数 f_matrix_1 = np.zeros(shape=(dimension, 1)) for k in item: f_matrix_1[k] = f_matrix[k] if ((x - f_matrix_1.T) @ m @ (x - f_matrix_1.T).T)[0][0] < limit_line: count += 1 fault_matrix = f_matrix_1 fault_index_recon = item else: return fault_matrix, np.array(fault_index_recon) return fault_matrix, np.array(fault_index_recon) # 没有找到故障 def contribution_plot(x, model, lock_variable, select_vec): """贡献图法直接计算重构值""" final_data = np.dot(x, select_vec).dot(select_vec.T) final_data[lock_variable] = x[lock_variable] recon_data = np.add(np.multiply(final_data, model["Train_X_std"]), model["Train_X_mean"]) # 重构值 return recon_data # 前后项序列 def sequence_d(test_data, limit, m, f_m, model, ke_si_matrix): """ 采用前后项序列进行诊断 :param ke_si_matrix: 重构矩阵 :param model: 模型信息 :param test_data: 故障数据 :param limit: 限值 :param m: 指标矩阵 :param f_m: 故障方向矩阵 :return: """ f_ds_matrix_a = np.zeros(shape=test_data.shape) # 诊断结果的故障方向矩阵 st = time.time() f_d = np.zeros(shape=test_data.shape) for i in range(test_data.shape[0]): data = test_data[i] if data @ m @ data.T > limit: # 超限 f_index, f_a = sequence(data, limit, m, ke_si_matrix) # 故障方向以及故障幅值 f_d[i] = f_a f_ds_matrix_a[i, f_index] = 1 et = time.time() fd1 = (f_m == 1) & (f_ds_matrix_a == 1) # 诊出率 fa1 = (f_m == 0) & (f_ds_matrix_a == 1) # 误诊率 fdr_variable1 = fd1[fd1].shape[0] / f_m[f_m == 1].shape[0] far_variable1 = fa1[fa1].shape[0] / f_m[f_m == 0].shape[0] recon_data = np.add(np.multiply(test_data - f_d, model["Train_X_std"]), model["Train_X_mean"]) return {"fdr_variable": fdr_variable1, "far_variable": far_variable1, "time": round(et - st, 2), "recon_data": recon_data.tolist()} def sequence(data, limit, m, ke_si_matrix): """ 前后项序列 :param ke_si_matrix: ke_si矩阵 :param data: 样本 :param limit: 限值 :param m: 指标矩阵 :return: """ t_c = [] # 故障的目标集合 c_c = list(range(data.shape[0])) # 候选集合 flag = False # 记录前项序列是否诊断出 # 初始化矩阵 dimension = data.shape[0] k_matrix = np.zeros(shape=(dimension, 1)) f_a = np.zeros(shape=(1, dimension)) # 故障幅值 for i in range(dimension): ke_si = np.zeros(shape=(dimension, 1)) ke_si[i, 0] = 1 fai = m @ ke_si @ np.linalg.inv(ke_si.T @ m @ ke_si) # 计算每一行的fai k_matrix[i] = data.T @ fai # 计算k 矩阵 # 前项序列 while True: if len(t_c) == 0: index_c = list(product(c_c)) # 第一次计算 else: index_c = generate_index(t_c, c_c) # 生成故障参数的排列 c = {} # 每个index的限值 for index in index_c: f_r = ke_si_matrix[list(index)][:, list(index)] f_matrix = k_matrix[index, 0] @ np.linalg.inv(f_r) # 计算f 矩阵 f = np.zeros(shape=(1, dimension)) f[0, index] = f_matrix c[index] = ((data - f) @ m @ (data - f).T)[0][0] # 计算每个index的重构限值 l = min(c.values()) # 计算限值的最小值 l_index = [k for k in c.keys() if c[k] == l][0] # 计算最小值的k即为此次的index t_c = list(l_index) # 更新目标集合 c_c = list(set(c_c) - set(t_c)) # 更新候选集合 if l < limit or len(c_c) == 0: flag = True break if flag and len(t_c) > 1: # 前项序列诊断出来 while True: index_c = list(combinations(t_c, len(t_c) - 1)) # 生成故障参数的集合 c = {} # 每个index的限值 for index in index_c: f_r = ke_si_matrix[list(index)][:, list(index)] f_matrix = k_matrix[index, 0] @ np.linalg.inv(f_r) # 计算f 矩阵 f = np.zeros(shape=(1, dimension)) f[0, index] = f_matrix c[index] = ((data - f) @ m @ (data - f).T)[0][0] # 计算每个index的重构限值 l = min(c.values()) # 计算限值的最小值 l_index = [k for k in c.keys() if c[k] == l][0] # 计算最小值的k即为此次的index if l < limit: # 比限值小 t_c = l_index else: # 比限值大 break # 计算故障幅值 f_r = ke_si_matrix[list(t_c)][:, list(t_c)] f_matrix = k_matrix[t_c, 0] @ np.linalg.inv(f_r) # 计算f 矩阵 f = np.zeros(shape=(1, dimension)) f[0, t_c] = f_matrix return t_c, f