# -*- coding: utf-8 -*- """ Created on Sun Feb 28 10:04:26 2016 PCA source code @author: liudiwei """ import xlsxwriter as xw import numpy as np import pandas as pd import matplotlib.pyplot as plt from scipy.stats import norm from scipy.stats.distributions import chi2 import json import sys import pymssql import requests import datetime from scipy.stats import norm from scipy.stats import f from scipy.stats import chi2 import jenkspy import xlrd import time # import PCA_Test_offline import config """ 参数: - XMat:传入的是一个numpy的矩阵格式,行表示样本数,列表示特征 - k:表示取前k个特征值对应的特征向量 返回值: - finalData:参数一指的是返回的低维矩阵,对应于输入参数二 - reconData:参数二对应的是移动坐标轴后的矩阵 """ def min_pos(X): X[X <= 0] = np.max(X) m = np.min(X) re = np.where(X == m) min_i = re[0] min_j = re[1] if m < 0: m = 0 return m, min_i, min_j def Lars(X, Y, D, DIAG, t, limit_line, lamuta): n, m = X.shape beta = np.zeros((1, m)) A = [] N_Co_added = 0 i = 0 mse = [] for k in range(m): i += 1 ero = np.array(Y - beta[-1, :].T) # c=np.dot(P,DIAG,P.T,ero) c = np.dot(D, DIAG).dot(D.T).dot(ero) # 计算相关性 C = np.max(np.abs(c)) mse.append(np.dot(ero.T, D).dot(DIAG).dot(D.T).dot(ero)) if mse[k] < limit_line: break elif k == 0: addTndex = np.where(abs(c) == C)[-1][0] A.append(addTndex) # 活动集 # 更新正在添加的相应协方差索引的数量 N_Co_added = N_Co_added + 1 A_c = list(set(range(0, m)).difference(set(A))) # 非活动集 s_A = np.diag(np.sign(c[A])) num_Zero_Coeff = len(A_c) ## 计算 X_A, A_A , u_A ,the inner product vecto X_A = np.dot(X[:, A], s_A).reshape(n, -1) G_A = np.dot(X_A.T, X_A) One_A = np.ones((len(A), 1)) s = One_A.copy() if G_A.shape == (): inv_GA = 1 / G_A else: inv_GA = np.linalg.pinv(G_A) # G_a_inv_red_cols = np.sum(inv_GA, 1) A_A = 1 / np.sqrt(np.dot(s.T, inv_GA).dot(s)) w_A = (A_A * inv_GA).dot(s) # w_A: (less then 90%)构成等角的单位向量 u_A = np.dot(X_A, w_A) # .reshape(n) a = X.T.dot(u_A) # inner product vector gamma_Test = np.zeros((num_Zero_Coeff, 2)) # gamma=[] if N_Co_added == m - 1: gamma = C / A_A else: for j in range(num_Zero_Coeff): j_p = A_c[j] first_term = (C - c[j_p]) / (A_A - a[j_p]) second_term = (C + c[j_p]) / (A_A + a[j_p]) gamma_Test[j, :] = np.array([first_term, second_term]).reshape(1, -1) gamma, min_i, min_j = min_pos(gamma_Test) # gamma.append(m_s) addTndex = A_c[np.min(min_i)] beta_temp = np.zeros((m, 1)) beta_temp[A] = beta[k, A].reshape(-1, 1) + np.dot(s_A, gamma * w_A) beta = np.vstack((beta, beta_temp.transpose())) # 更新的系数即故障f return beta, mse # import sklearn # q=sklearn.linear_model.Lars class MSSQL: def __init__(self,host,user,pwd,database): self.host = host self.user = user self.pwd = pwd self.db = database def __GetConnect(self): """ 得到连接信息 返回: conn.cursor() """ if not self.db: raise(NameError,"没有设置数据库信息") self.conn = pymssql.connect(host=self.host,user=self.user,password=self.pwd,database=self.db,port=config._PORT, charset="utf8") cur = self.conn.cursor() if not cur: raise(NameError,"连接数据库失败") else: return cur def ExecQuery(self,sql): """ 执行查询语句 返回的是一个包含tuple的list,list的元素是记录行,tuple的元素是每行记录的字段 """ cur = self.__GetConnect() cur.execute(sql) resList = cur.fetchall() #查询完毕后必须关闭连接 self.conn.close() return resList def ExecNonQuery(self,sql): """ 执行非查询语句 调用示例: cur = self.__GetConnect() cur.execute(sql) self.conn.commit() self.conn.close() """ cur = self.__GetConnect() cur.execute(sql) self.conn.commit() self.conn.close() def get_model_by_ID(model_id, version="v-test"): ms = MSSQL(host=config._SQL_IP, user="root", pwd="123456", database="alert") resList = ms.ExecQuery(f"SELECT Model_info FROM model_cfg where model_id={model_id}") return json.loads(resList[0][0]) def get_model_by_id_and_version(model_id, version): ms = MSSQL(host=config._SQL_IP, user="root", pwd="123456", database="alert") resList = ms.ExecQuery(f"SELECT Model_info FROM model_version where model_id={model_id} and version='{version}'") return json.loads(resList[0][0]) def pca(model, Data_origin): Data = (Data_origin - model["Train_X_mean"]) / model["Train_X_std"] featValue = np.array(model["featValue"]) # 训练数据的特征值 k=(model["K"]) # 主元个数 featVec = np.array(model["featVec"]) # 训练数据的特征向量 selectVec1 = np.array(model["selectVec"]) selectVec=featVec[:, 0:k] index = np.argsort(-np.array(featValue)) # 按照featValue进行从大到小排序 featValue_sort = featValue[index] # 排序后的特征值 ############----------*********-SPE-**************----------######################## numbel_variable = featValue.shape[0] C_ = np.eye(numbel_variable) - np.dot(selectVec, selectVec.T) SPE_list = [] for i in range(Data.shape[0]): Y = Data[i, :] # 测试数据的每一行 #########*********************计算SPE****************************** SPE_line =np.dot(Y, C_).dot(Y.T)###SPE根号 SPE_list.append(SPE_line) paraState = np.zeros([np.array(Data_origin).shape[0], np.array(Data_origin).shape[1]]) finalData = np.dot(Data, selectVec).dot(selectVec.T) reconData = np.add(np.multiply(finalData, model["Train_X_std"]), model["Train_X_mean"]) # 重构值 errorData =Data_origin - reconData # 偏差值 # cos检验值 R = 0 res={} for index in range(0, reconData.shape[1]): vector1 = Data_origin[:, index] vector2 = np.array(reconData)[:, index] R += np.dot(vector1, vector2.T) / (np.sqrt(np.sum(vector1 ** 2)) * np.sqrt(np.sum(vector2 ** 2))) R /= reconData.shape[1] #items = [('reconData', np.around(reconData, decimals=3).tolist()) # , ('errorData', np.around(errorData, decimals=3).tolist()), ('R', R.tolist()), ('SPE', np.array(SPE_list).tolist()), # ('paraState', paraState.tolist())] #res["sampleData"]=np.transpose(Data_origin.tolist()) res["sampleData"]=np.transpose(np.array(Data_origin)).tolist() res["reconData"]=np.around(np.transpose(np.array(reconData)), decimals=3).tolist() res["errorData"]=np.around(np.transpose(np.array(errorData)), decimals=3).tolist() res["R"]=np.around(R, decimals=3).tolist() res["SPE"]=np.around(np.transpose(np.array(SPE_list)), decimals=3).tolist() res["paraState"]=np.transpose(np.array(paraState)).tolist() #result = json.dumps(dict(items)) # json.dumps(result) #return result return res def get_history_value(points,time,interval): #url="http://192.168.1.201:8080/openPlant/getMultiplePointHistorys" url=f"http://{config._EXA_IP}:9000/exawebapi/exatime/GetSamplingValueArrayFloat" headers = {"Content-Type": "application/json;charset=utf-8"}#,"token":get_token() point_array = points.split(",") time_span = time.split(";") value_array = [] for item in point_array: value_group = [] for time_piece in time_span: st = time_piece.split(",")[0] et = time_piece.split(",")[1] para = {"ItemName": item, "StartingTime": st, "TerminalTime": et, "SamplingPeriod": interval} response = requests.get(url, headers=headers, params=para) content = response.text.replace('"[','[').replace(']"',']') value = json.loads(content) if not isinstance(value, list): print("aaa") for row in value: value_group.append(row[1]) value_array.append(value_group) return np.transpose(np.array(value_array)) #return valres def main(info): model_id = info["Model_id"] try: version = info["version"] except KeyError: version = "v-test" if version == "v-test": res = get_model_by_ID(model_id) else: res = get_model_by_id_and_version(model_id, version) Test_Data = info["Test_Data"] points = Test_Data["points"] time1 = Test_Data["time"] interval = Test_Data["interval"] model = res["para"]["Model_info"] Data = get_history_value(points, time1, interval) result = pca(model, Data) index = time1.index(",") result["time"] = time1[:index:] return result # 根据数据集data.txt if __name__ == "__main__": info_str='{"Test_Data":{"time":"2021-01-13 12:52:40,2021-01-14 12:52:40","points":"JL_D2_20SCS02A:MAG10CT311.PNT,JL_D2_20DAS01B:MAG10CE101.PNT,JL_D2_20MCS01A:MAG10AN001ZT.PNT,JL_D2_20SCS02A:MAG10CT312.PNT,JL_D2_20DAS01B:MAG10CE102.PNT,JL_D2_20MCS01A:MAG10AN002ZT.PNT,JL_D2_20SCS02A:MAG10CT313.PNT,JL_D2_20DAS01B:MAG10CE103.PNT,JL_D2_20MCS01A:MAG10AN003ZT.PNT,JL_D2_20SCS02A:MAG10CT314.PNT,JL_D2_20DAS01B:MAG10CE104.PNT,JL_D2_20MCS01A:MAG10AN004ZT.PNT,JL_D2_20SCS02A:MAG10CT315.PNT,JL_D2_20DAS01B:MAG10CE105.PNT,JL_D2_20MCS01A:MAG10AN005ZT.PNT,JL_D2_20SCS02A:MAG10CT316.PNT,JL_D2_20DAS01B:MAG10CE106.PNT,JL_D2_20MCS01A:MAG10AN006ZT.PNT,JL_D2_20SCS02A:MAG10CT317.PNT,JL_D2_20DAS01B:MAG10CE107.PNT,JL_D2_20MCS01A:MAG10AN007ZT.PNT,JL_D2_20DAS01B:MAJ10CT101.PNT,JL_D2_20DAS01B:MAJ10CT102.PNT,JL_D2_20SCS02A:MAG10CT101.PNT,JL_D2_20SCS02A:MAG10CT102.PNT,JL_D2_20DAS01B:MAG03CG101.PNT,JL_D2_20DAS01B:MAG03CS101.PNT","interval":300000},"Model_id":528,"version":"v-test"}' info = json.loads(info_str) print(main(info)) # model_id=info["Model_id"] # Test_Data = info["Test_Data"] # points = Test_Data["points"] # time = Test_Data["time"] # interval = Test_Data["interval"] # Data = get_history_value(points, time, interval) # # workbook = xw.Workbook("pca_test.xlsx") # # worksheet = workbook.add_worksheet() # # for row, item in enumerate(Data.tolist()): # # for col, cell in enumerate(item): # # worksheet.write(row, col, cell) # # workbook.close() # model = PCA_Test_offline.get_model_by_ID(model_id)["para"]["Model_info"] # result = pca(model,Data)#模型参数,训练数据 # aaa=json.dumps(result) # print (result)