# -*- coding: utf-8 -*- """ Created on Sun Feb 28 10:04:26 2016 PCA source code @author: liudiwei """ import numpy as np import json import pymssql import requests import config """ 参数: - XMat:传入的是一个numpy的矩阵格式,行表示样本数,列表示特征 - k:表示取前k个特征值对应的特征向量 返回值: - finalData:参数一指的是返回的低维矩阵,对应于输入参数二 - reconData:参数二对应的是移动坐标轴后的矩阵 """ def min_pos(X): X[X <= 0] = np.max(X) m = np.min(X) re = np.where(X == m) min_i = re[0] min_j = re[1] if m < 0: m = 0 return m, min_i, min_j def Lars(X, Y, D, DIAG, t, limit_line, lamuta): n, m = X.shape beta = np.zeros((1, m)) A = [] N_Co_added = 0 i = 0 mse = [] for k in range(m): i += 1 ero = np.array(Y - beta[-1, :].T) # c=np.dot(P,DIAG,P.T,ero) c = np.dot(D, DIAG).dot(D.T).dot(ero) # 计算相关性 C = np.max(np.abs(c)) mse.append(np.dot(ero.T, D).dot(DIAG).dot(D.T).dot(ero)) if mse[k] < limit_line: break elif k == 0: addTndex = np.where(abs(c) == C)[-1][0] A.append(addTndex) # 活动集 # 更新正在添加的相应协方差索引的数量 N_Co_added = N_Co_added + 1 A_c = list(set(range(0, m)).difference(set(A))) # 非活动集 s_A = np.diag(np.sign(c[A])) num_Zero_Coeff = len(A_c) ## 计算 X_A, A_A , u_A ,the inner product vecto X_A = np.dot(X[:, A], s_A).reshape(n, -1) G_A = np.dot(X_A.T, X_A) One_A = np.ones((len(A), 1)) s = One_A.copy() if G_A.shape == (): inv_GA = 1 / G_A else: inv_GA = np.linalg.pinv(G_A) # G_a_inv_red_cols = np.sum(inv_GA, 1) A_A = 1 / np.sqrt(np.dot(s.T, inv_GA).dot(s)) w_A = (A_A * inv_GA).dot(s) # w_A: (less then 90%)构成等角的单位向量 u_A = np.dot(X_A, w_A) # .reshape(n) a = X.T.dot(u_A) # inner product vector gamma_Test = np.zeros((num_Zero_Coeff, 2)) # gamma=[] if N_Co_added == m - 1: gamma = C / A_A else: for j in range(num_Zero_Coeff): j_p = A_c[j] first_term = (C - c[j_p]) / (A_A - a[j_p]) second_term = (C + c[j_p]) / (A_A + a[j_p]) gamma_Test[j, :] = np.array([first_term, second_term]).reshape(1, -1) gamma, min_i, min_j = min_pos(gamma_Test) # gamma.append(m_s) addTndex = A_c[np.min(min_i)] beta_temp = np.zeros((m, 1)) beta_temp[A] = beta[k, A].reshape(-1, 1) + np.dot(s_A, gamma * w_A) beta = np.vstack((beta, beta_temp.transpose())) # 更新的系数即故障f return beta, mse # import sklearn # q=sklearn.linear_model.Lars class MSSQL: def __init__(self,host,user,pwd,database): self.host = host self.user = user self.pwd = pwd self.db = database def __GetConnect(self): """ 得到连接信息 返回: conn.cursor() """ if not self.db: raise(NameError,"没有设置数据库信息") self.conn = pymssql.connect(host=self.host,user=self.user,password=self.pwd,database=self.db,port=config._PORT, charset="utf8") cur = self.conn.cursor() if not cur: raise(NameError,"连接数据库失败") else: return cur def ExecQuery(self,sql): """ 执行查询语句 返回的是一个包含tuple的list,list的元素是记录行,tuple的元素是每行记录的字段 """ cur = self.__GetConnect() cur.execute(sql) resList = cur.fetchall() #查询完毕后必须关闭连接 self.conn.close() return resList def ExecNonQuery(self,sql): """ 执行非查询语句 调用示例: cur = self.__GetConnect() cur.execute(sql) self.conn.commit() self.conn.close() """ cur = self.__GetConnect() cur.execute(sql) self.conn.commit() self.conn.close() def get_model_by_ID(model_id, version="v-test"): ms = MSSQL(host=config._SQL_IP, user="root", pwd="123456", database="alert") resList = ms.ExecQuery(f"SELECT Model_info FROM model_cfg where model_id={model_id}") return json.loads(resList[0][0]) def get_model_by_id_and_version(model_id, version): ms = MSSQL(host=config._SQL_IP, user="root", pwd="123456", database="alert") resList = ms.ExecQuery(f"SELECT Model_info FROM model_version where model_id={model_id} and version='{version}'") return json.loads(resList[0][0]) def pca(model, Data_origin): Data = (Data_origin - model["Train_X_mean"]) / model["Train_X_std"] featValue = np.array(model["featValue"]) # 训练数据的特征值 k=(model["K"]) # 主元个数 featVec = np.array(model["featVec"]) # 训练数据的特征向量 selectVec1 = np.array(model["selectVec"]) selectVec=featVec[:, 0:k] index = np.argsort(-np.array(featValue)) # 按照featValue进行从大到小排序 featValue_sort = featValue[index] # 排序后的特征值 ############----------*********-SPE-**************----------######################## numbel_variable = featValue.shape[0] C_ = np.eye(numbel_variable) - np.dot(selectVec, selectVec.T) SPE_list = [] for i in range(Data.shape[0]): Y = Data[i, :] # 测试数据的每一行 #########*********************计算SPE****************************** SPE_line =np.dot(Y, C_).dot(Y.T)###SPE根号 SPE_list.append(SPE_line) paraState = np.zeros([np.array(Data_origin).shape[0], np.array(Data_origin).shape[1]]) finalData = np.dot(Data, selectVec).dot(selectVec.T) reconData = np.add(np.multiply(finalData, model["Train_X_std"]), model["Train_X_mean"]) # 重构值 errorData =Data_origin - reconData # 偏差值 # cos检验值 R = 0 res={} for index in range(0, reconData.shape[1]): vector1 = Data_origin[:, index] vector2 = np.array(reconData)[:, index] R += np.dot(vector1, vector2.T) / (np.sqrt(np.sum(vector1 ** 2)) * np.sqrt(np.sum(vector2 ** 2))) R /= reconData.shape[1] #items = [('reconData', np.around(reconData, decimals=3).tolist()) # , ('errorData', np.around(errorData, decimals=3).tolist()), ('R', R.tolist()), ('SPE', np.array(SPE_list).tolist()), # ('paraState', paraState.tolist())] #res["sampleData"]=np.transpose(Data_origin.tolist()) res["sampleData"]=np.transpose(np.array(Data_origin)).tolist() res["reconData"]=np.around(np.transpose(np.array(reconData)), decimals=3).tolist() res["errorData"]=np.around(np.transpose(np.array(errorData)), decimals=3).tolist() res["R"]=np.around(R, decimals=3).tolist() res["SPE"]=np.around(np.transpose(np.array(SPE_list)), decimals=3).tolist() res["paraState"]=np.transpose(np.array(paraState)).tolist() #result = json.dumps(dict(items)) # json.dumps(result) #return result return res def get_history_value(points,time,interval): #url="http://192.168.1.201:8080/openPlant/getMultiplePointHistorys" url=f"http://{config._EXA_IP}:9000/exawebapi/exatime/GetSamplingValueArrayFloat" headers = {"Content-Type": "application/json;charset=utf-8"}#,"token":get_token() point_array = points.split(",") time_span = time.split(";") value_array = [] for item in point_array: value_group = [] for time_piece in time_span: st = time_piece.split(",")[0] et = time_piece.split(",")[1] para = {"ItemName": item, "StartingTime": st, "TerminalTime": et, "SamplingPeriod": interval} response = requests.get(url, headers=headers, params=para) content = response.text.replace('"[','[').replace(']"',']') value = json.loads(content) if not isinstance(value, list): print("aaa") for row in value: value_group.append(row[1]) value_array.append(value_group) return np.transpose(np.array(value_array)) #return valres def main(info): model_id = info["Model_id"] try: version = info["version"] except KeyError: version = "v-test" if version == "v-test": res = get_model_by_ID(model_id) else: res = get_model_by_id_and_version(model_id, version) Test_Data = info["Test_Data"] points = Test_Data["points"] time1 = Test_Data["time"] interval = Test_Data["interval"] model = res["para"]["Model_info"] Data = get_history_value(points, time1, interval) result = pca(model, Data) index = time1.index(",") result["time"] = time1[:index:] return result # 根据数据集data.txt if __name__ == "__main__": info_str='{"Test_Data":{"time":"2021-01-13 12:52:40,2021-01-14 12:52:40","points":"JL_D2_20SCS02A:MAG10CT311.PNT,JL_D2_20DAS01B:MAG10CE101.PNT,JL_D2_20MCS01A:MAG10AN001ZT.PNT,JL_D2_20SCS02A:MAG10CT312.PNT,JL_D2_20DAS01B:MAG10CE102.PNT,JL_D2_20MCS01A:MAG10AN002ZT.PNT,JL_D2_20SCS02A:MAG10CT313.PNT,JL_D2_20DAS01B:MAG10CE103.PNT,JL_D2_20MCS01A:MAG10AN003ZT.PNT,JL_D2_20SCS02A:MAG10CT314.PNT,JL_D2_20DAS01B:MAG10CE104.PNT,JL_D2_20MCS01A:MAG10AN004ZT.PNT,JL_D2_20SCS02A:MAG10CT315.PNT,JL_D2_20DAS01B:MAG10CE105.PNT,JL_D2_20MCS01A:MAG10AN005ZT.PNT,JL_D2_20SCS02A:MAG10CT316.PNT,JL_D2_20DAS01B:MAG10CE106.PNT,JL_D2_20MCS01A:MAG10AN006ZT.PNT,JL_D2_20SCS02A:MAG10CT317.PNT,JL_D2_20DAS01B:MAG10CE107.PNT,JL_D2_20MCS01A:MAG10AN007ZT.PNT,JL_D2_20DAS01B:MAJ10CT101.PNT,JL_D2_20DAS01B:MAJ10CT102.PNT,JL_D2_20SCS02A:MAG10CT101.PNT,JL_D2_20SCS02A:MAG10CT102.PNT,JL_D2_20DAS01B:MAG03CG101.PNT,JL_D2_20DAS01B:MAG03CS101.PNT","interval":300000},"Model_id":528,"version":"v-test"}' info = json.loads(info_str) print(main(info)) # model_id=info["Model_id"] # Test_Data = info["Test_Data"] # points = Test_Data["points"] # time = Test_Data["time"] # interval = Test_Data["interval"] # Data = get_history_value(points, time, interval) # # workbook = xw.Workbook("pca_test.xlsx") # # worksheet = workbook.add_worksheet() # # for row, item in enumerate(Data.tolist()): # # for col, cell in enumerate(item): # # worksheet.write(row, col, cell) # # workbook.close() # model = PCA_Test_offline.get_model_by_ID(model_id)["para"]["Model_info"] # result = pca(model,Data)#模型参数,训练数据 # aaa=json.dumps(result) # print (result)