You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
311 lines
14 KiB
311 lines
14 KiB
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
import json
|
|
import time
|
|
import traceback
|
|
import datetime
|
|
from itertools import repeat, combinations
|
|
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import xlsxwriter
|
|
from PCA_Test import get_model_by_ID, get_model_by_id_and_version
|
|
from base import get_test_data, get_simulation_data, get_test_data_1, get_data_from_excel
|
|
from recon import Lars
|
|
import pca_train_k
|
|
|
|
|
|
def get_pca_lars_rb_test(test_data, f_matrix, model, lock, limit, al_type):
|
|
# error_data, test_data, f_matrix = get_test_data(info)
|
|
data = (test_data - model["Train_X_mean"]) / model["Train_X_std"]
|
|
featValue = np.array(model["featValue"]) + 0.00001 # 训练数据的特征值
|
|
featVec = np.array(model["featVec"]) # 训练数据的特征向量
|
|
numbel_variable = featValue.shape[0]
|
|
k = model["K"] # 主元个数
|
|
selectVec = featVec[:, 0:k]
|
|
featValue_sort = featValue # [index] # 排序后的特征值
|
|
C_ = np.eye(numbel_variable) - np.dot(selectVec, selectVec.T)
|
|
X_SPE = C_.T
|
|
D_SPE = C_
|
|
DIAG_SPE = np.eye(numbel_variable)
|
|
DIAG_T2 = np.linalg.inv(np.diag(featValue_sort[:int(model["K"])]))
|
|
D_T2 = selectVec.copy()
|
|
X_T2 = np.dot(D_T2, np.linalg.cholesky(DIAG_T2)).T
|
|
II = featValue_sort.copy()
|
|
II[:int(model["K"])] = II[:int(model["K"])] * model["T2CUL_99"]
|
|
II[int(model["K"]):] = model["QCUL_99"]
|
|
DIAG_Fai = np.linalg.inv(np.diag(II))
|
|
D_Fai = featVec.copy()
|
|
X_Fai = np.dot(D_Fai, np.linalg.cholesky(DIAG_Fai)).T
|
|
f_ds_matrix_a = np.zeros(shape=data.shape)
|
|
st = time.time()
|
|
for index in range(data.shape[0]):
|
|
if al_type == "FAI":
|
|
beta, _ = Lars(X_Fai, data[index], D_Fai, DIAG_Fai, 50000, limit, lock)
|
|
elif al_type == "SPE":
|
|
beta, _ = Lars(X_SPE, data[index], D_SPE, DIAG_SPE, 50000, limit, lock)
|
|
else:
|
|
beta, _ = Lars(X_T2, data[index], D_T2, DIAG_T2, 50000, limit, lock)
|
|
beta_end = abs(beta[-1, :])
|
|
f_index = np.where(beta_end > 0)[0] # 故障方向
|
|
f_ds_matrix_a[index, f_index] = 1 # 更新
|
|
et = time.time()
|
|
fd1 = (f_matrix == 1) & (f_ds_matrix_a == 1)
|
|
fa1 = (f_matrix == 0) & (f_ds_matrix_a == 1)
|
|
fdr_variable1 = fd1[fd1].shape[0] / f_matrix[f_matrix == 1].shape[0]
|
|
far_variable1 = fa1[fa1].shape[0] / f_matrix[f_matrix == 0].shape[0]
|
|
t = (et - st) / data.shape[0] * 1000
|
|
return dict([("fdr_variable", round(fdr_variable1, 4)),
|
|
("far_variable", round(far_variable1, 4)),
|
|
("time", round(t, 2))])
|
|
|
|
|
|
def get_rb_pca(data, model, limit, al_type, f_matrix):
|
|
featValue = np.array(model["featValue"]) + 0.00001 # 训练数据的特征值
|
|
featVec = np.array(model["featVec"]) # 训练数据的特征向量
|
|
numbel_variable = featValue.shape[0] #
|
|
k = model["K"] # 主元个数
|
|
selectVec = featVec[:, 0:k]
|
|
featValue_sort = featValue # [index] # 排序后的特征值
|
|
C_ = np.eye(numbel_variable) - np.dot(selectVec, selectVec.T)
|
|
X_SPE = C_.T
|
|
II = featValue_sort.copy()
|
|
II[:int(model["K"])] = II[:int(model["K"])] * model["T2CUL_99"]
|
|
II[int(model["K"]):] = model["QCUL_99"]
|
|
DIAG_Fai = np.linalg.inv(np.diag(II))
|
|
D_Fai = featVec.copy()
|
|
DIAG_T2 = np.linalg.inv(np.diag(featValue_sort[:int(model["K"])]))
|
|
D_T2 = selectVec.copy()
|
|
if al_type == "SPE":
|
|
m = X_SPE @ X_SPE.T
|
|
elif al_type == "FAI":
|
|
m = D_Fai @ DIAG_Fai @ D_Fai.T
|
|
else:
|
|
m = D_T2 @ DIAG_T2 @ D_T2.T
|
|
f_ds_matrix_a = np.zeros(shape=data.shape)
|
|
m_c = sqrtm(m) # 1/2 M
|
|
st = time.time()
|
|
for index in range(data.shape[0]):
|
|
line = data[index] @ m @ data[index].T # 控制线
|
|
if line > limit:
|
|
c = [] # index
|
|
for j in range(data.shape[1]):
|
|
f = np.zeros(shape=(data.shape[1], 1)) # 故障方向
|
|
f[j] = 1
|
|
t = np.square(f.T @ m_c @ data[index])
|
|
c.append(t.tolist()[0])
|
|
f_index = [k for k, v in enumerate(c) if v > limit]
|
|
f_ds_matrix_a[index, f_index] = 1
|
|
et = time.time()
|
|
fd1 = (f_matrix == 1) & (f_ds_matrix_a == 1)
|
|
fa1 = (f_matrix == 0) & (f_ds_matrix_a == 1)
|
|
fdr_variable1 = fd1[fd1].shape[0] / f_matrix[f_matrix == 1].shape[0]
|
|
far_variable1 = fa1[fa1].shape[0] / f_matrix[f_matrix == 0].shape[0]
|
|
t = (et - st) / data.shape[0] * 1000
|
|
return dict([("fdr_variable", round(fdr_variable1, 4)),
|
|
("far_variable", round(far_variable1, 4)),
|
|
("time", round(t, 2))])
|
|
|
|
|
|
def pca_cp_mtcl_test(info, model, lock, limit, al_type):
|
|
error_data, test_data, f_matrix = get_test_data(info)
|
|
data = (test_data - model["Train_X_mean"]) / model["Train_X_std"]
|
|
featValue = np.array(model["featValue"]) + 0.00001 # 训练数据的特征值
|
|
featVec = np.array(model["featVec"]) # 训练数据的特征向量
|
|
numbel_variable = featValue.shape[0] #
|
|
k = model["K"] # 主元个数
|
|
selectVec = featVec[:, 0:k]
|
|
featValue_sort = featValue # [index] # 排序后的特征值
|
|
C_ = np.eye(numbel_variable) - np.dot(selectVec, selectVec.T)
|
|
X_SPE = C_.T
|
|
II = featValue_sort.copy()
|
|
II[:int(model["K"])] = II[:int(model["K"])] * model["T2CUL_99"]
|
|
II[int(model["K"]):] = model["QCUL_99"]
|
|
DIAG_Fai = np.linalg.inv(np.diag(II))
|
|
D_Fai = featVec.copy()
|
|
DIAG_T2 = np.linalg.inv(np.diag(featValue_sort[:int(model["K"])]))
|
|
D_T2 = selectVec.copy()
|
|
if al_type == "SPE":
|
|
m = X_SPE @ X_SPE.T
|
|
elif al_type == "FAI":
|
|
m = D_Fai @ DIAG_Fai @ D_Fai.T
|
|
else:
|
|
m = D_T2 @ DIAG_T2 @ D_T2.T
|
|
f_ds_matrix_a = np.zeros(shape=data.shape)
|
|
m_c = sqrtm(m) # 1/2 M
|
|
st = time.time()
|
|
for index in range(data.shape[0]):
|
|
line = data[index] @ m @ data[index].T # 控制线
|
|
if line > limit:
|
|
c = [] # index
|
|
for j in range(data.shape[1]):
|
|
f = np.zeros(shape=(data.shape[1], 1)) # 故障方向
|
|
f[j] = 1
|
|
t = np.square(f.T @ m_c @ data[index])
|
|
c.append(t.tolist()[0])
|
|
f_index = [k for k, v in enumerate(c) if v > limit]
|
|
f_ds_matrix_a[index, f_index] = 1
|
|
et = time.time()
|
|
fd1 = (f_matrix == 1) & (f_ds_matrix_a == 1)
|
|
fa1 = (f_matrix == 0) & (f_ds_matrix_a == 1)
|
|
fdr_variable1 = fd1[fd1].shape[0] / f_matrix[f_matrix == 1].shape[0]
|
|
far_variable1 = fa1[fa1].shape[0] / f_matrix[f_matrix == 0].shape[0]
|
|
t = (et - st) / data.shape[0] * 1000
|
|
return dict([("fdr_variable", round(fdr_variable1, 4)),
|
|
("far_variable", round(far_variable1, 4)),
|
|
("time", round(t, 2))])
|
|
|
|
|
|
# 计算m的1/2次方
|
|
def sqrtm(m):
|
|
featValue, featVec = np.linalg.eig(m) # 求解协方差矩阵的特征值和特征向量
|
|
diag = np.diag(featValue)
|
|
return featVec @ np.sqrt(diag) @ np.linalg.inv(featVec)
|
|
|
|
|
|
def cp_main(info):
|
|
model_id = info["Model_id"]
|
|
version = info["version"]
|
|
if version == 'v-test':
|
|
model = get_model_by_ID(model_id)
|
|
else:
|
|
model = get_model_by_id_and_version(model_id, version)
|
|
lock = []
|
|
point_info = model["pointInfo"]
|
|
for i in range(len(point_info)):
|
|
try:
|
|
if point_info[i]["lock"]:
|
|
lock.append(i)
|
|
except:
|
|
continue
|
|
al_type = info["Test_Type"]
|
|
limit = info["Limit_Value"]
|
|
fault_index = info["fault_index"]
|
|
if info["algorithm"] == "Cont":
|
|
res = pca_cp_mtcl_test(info, model["para"]["Model_info"], lock, limit, al_type)
|
|
else:
|
|
res = get_pca_lars_rb_test(info, model["para"]["Model_info"], lock, limit, al_type)
|
|
return json.dumps(res)
|
|
|
|
|
|
def rm_enum_test(data, limit, m, f_m, de_num):
|
|
f_ds_matrix_a = np.zeros(shape=data.shape) # 故障方向矩阵
|
|
dimension = data.shape[1]
|
|
ke_si_matrix = np.zeros(shape=(dimension,dimension))
|
|
for i in range(dimension):
|
|
for j in range(dimension):
|
|
ke_si_matrix[i, j] = m[i, j] / m[i, i]
|
|
for index in range(data.shape[0]):
|
|
line = data[index] @ m @ data[index].T
|
|
if line > limit:
|
|
k_matrix = np.zeros(shape=(dimension, 1))
|
|
for i in range(dimension):
|
|
ke_si = np.zeros(shape=(dimension, 1))
|
|
ke_si[i, 0] = 1
|
|
fai = m @ ke_si @ np.linalg.inv(ke_si.T @ m @ ke_si) # 计算每一行的fai
|
|
k_matrix[i] = data[index].T @ fai # 计算k 矩阵
|
|
for num in range(1, data.shape[1] + 1):
|
|
rb_indexs = []
|
|
fault_para_index = list(combinations(range(data.shape[1]), num))
|
|
for item in fault_para_index:
|
|
f_r = ke_si_matrix[list(item)][:, list(item)]
|
|
f_matrix = k_matrix[item, 0] @ np.linalg.inv(f_r) # 计算f 矩阵
|
|
f = np.zeros(shape=(1, dimension))
|
|
f[0, item] = f_matrix
|
|
rb_indexs.append(((data[index] - f) @ m @ (data[index] - f).T)[0][0])
|
|
min_value_index = np.argmin(rb_indexs)
|
|
min_value = np.min(rb_indexs)
|
|
if min_value < limit:
|
|
f_ds_matrix_a[index, fault_para_index[min_value_index]] = 1
|
|
break
|
|
fd1 = (f_m == 1) & (f_ds_matrix_a == 1)
|
|
fa1 = (f_m == 0) & (f_ds_matrix_a == 1)
|
|
fdr_variable1 = fd1[fd1].shape[0] / f_m[f_m == 1].shape[0]
|
|
far_variable1 = fa1[fa1].shape[0] / f_m[f_m == 0].shape[0]
|
|
return fdr_variable1, far_variable1
|
|
|
|
|
|
def simulation_test(al_type):
|
|
# 训练
|
|
train_data = get_simulation_data(1000)
|
|
# train_data = get_data_from_excel("data_5.xlsx")
|
|
samples = 2000
|
|
amplitudes = list(repeat("0,5", train_data.shape[1]))
|
|
fault_index = list(range(train_data.shape[1]))
|
|
# fault_index_num = 2
|
|
# _, test_data, f_m = get_test_data_1(train_data, samples, amplitudes, fault_index, fault_index_num)
|
|
result = []
|
|
# w = xlsxwriter.Workbook("result.xlsx")
|
|
# calc limits
|
|
try:
|
|
for i in range(1, 3):
|
|
k = i
|
|
model = pca_train_k.pca(train_data, k)
|
|
if al_type == "SPE":
|
|
limit = model["QCUL_95"]
|
|
elif al_type == "FAI":
|
|
limit = model["Kesi_95"]
|
|
else:
|
|
limit = model["T2CUL_95"]
|
|
_, test_data, f_m = get_test_data_1(train_data, samples, amplitudes, fault_index, 1)
|
|
data = (test_data - model["Train_X_mean"]) / model["Train_X_std"]
|
|
r = get_rb_pca(data, model, limit, al_type, f_m)
|
|
r["k"] = i
|
|
result.append(r)
|
|
except Exception as e:
|
|
with open('log.log', "a") as f:
|
|
f.write(f"{str(datetime.datetime.now())}{traceback.format_exc()}")
|
|
|
|
# 画线 k =1
|
|
model = pca_train_k.pca(train_data, 1)
|
|
k = model["K"] # 主元个数
|
|
train_data = (train_data - model["Train_X_mean"]) / model["Train_X_std"]
|
|
featValue = np.array(model["featValue"]) + 0.00001 # 训练数据的特征值
|
|
featVec = np.array(model["featVec"]) # 训练数据的特征向量
|
|
selectVec = featVec[:, 0:k]
|
|
featValue_sort = featValue # [index] # 排序后的特征值
|
|
numbel_variable = featValue.shape[0] #
|
|
C_ = np.eye(numbel_variable) - np.dot(selectVec, selectVec.T)
|
|
X_SPE = C_.T
|
|
II = featValue_sort.copy()
|
|
II[:int(model["K"])] = II[:int(model["K"])] * model["T2CUL_99"]
|
|
II[int(model["K"]):] = model["QCUL_99"]
|
|
DIAG_Fai = np.linalg.inv(np.diag(II))
|
|
D_Fai = featVec.copy()
|
|
DIAG_T2 = np.linalg.inv(np.diag(featValue_sort[:int(model["K"])]))
|
|
D_T2 = selectVec.copy()
|
|
count = 0
|
|
if al_type == "SPE":
|
|
limit = model["QCUL_95"]
|
|
elif al_type == "FAI":
|
|
limit = model["Kesi_95"]
|
|
else:
|
|
limit = model["T2CUL_95"]
|
|
if al_type == "SPE":
|
|
m = X_SPE @ X_SPE.T
|
|
elif al_type == "FAI":
|
|
m = D_Fai @ DIAG_Fai @ D_Fai.T
|
|
else:
|
|
m = D_T2 @ DIAG_T2 @ D_T2.T
|
|
lines = []
|
|
for index in range(train_data.shape[0]):
|
|
line = train_data[index] @ m @ train_data[index].T
|
|
if line < limit:
|
|
count += 1
|
|
lines.append(line)
|
|
print(count / train_data.shape[0])
|
|
x = list(range(train_data.shape[0]))
|
|
limits_line = list(repeat(limit, train_data.shape[0]))
|
|
plt.plot(x, lines)
|
|
plt.plot(x, limits_line)
|
|
plt.title(f'k={1},limit={limit}')
|
|
plt.show()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# info = {"Test_Data":{"time":"2020-09-03 13:58:53,2020-09-07 11:57:15","points":"JL_D2_20SCS02A:MAG10CT311.PNT,JL_D2_20DAS01B:MAG10CE101.PNT,JL_D2_20MCS01A:MAG10AN001ZT.PNT,JL_D2_20SCS02A:MAG10CT312.PNT,JL_D2_20DAS01B:MAG10CE102.PNT,JL_D2_20MCS01A:MAG10AN002ZT.PNT,JL_D2_20SCS02A:MAG10CT313.PNT,JL_D2_20DAS01B:MAG10CE103.PNT,JL_D2_20MCS01A:MAG10AN003ZT.PNT,JL_D2_20SCS02A:MAG10CT314.PNT,JL_D2_20DAS01B:MAG10CE104.PNT,JL_D2_20MCS01A:MAG10AN004ZT.PNT,JL_D2_20SCS02A:MAG10CT315.PNT,JL_D2_20DAS01B:MAG10CE105.PNT,JL_D2_20MCS01A:MAG10AN005ZT.PNT,JL_D2_20SCS02A:MAG10CT316.PNT,JL_D2_20DAS01B:MAG10CE106.PNT,JL_D2_20MCS01A:MAG10AN006ZT.PNT,JL_D2_20SCS02A:MAG10CT317.PNT,JL_D2_20DAS01B:MAG10CE107.PNT,JL_D2_20MCS01A:MAG10AN007ZT.PNT,JL_D2_20DAS01B:MAJ10CT101.PNT,JL_D2_20DAS01B:MAJ10CT102.PNT,JL_D2_20SCS02A:MAG10CT101.PNT,JL_D2_20SCS02A:MAG10CT102.PNT,JL_D2_20DAS01B:MAG03CG101.PNT,JL_D2_20DAS01B:MAG03CS101.PNT","interval":300000},"Model_id":"528","samples":1000,"number_fault_variable":0,"dead":"1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1","limit":"1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0","amplitudes":["0,3.4989999999999997","0,12.893","0,64.614","0,2.683","0,13.307","0,64.51700000000001","0,2.879","0,13.494","0,65.378","0,2.9619999999999997"],"fault_index":[0,1,2,3,4,5,6,7,8,9],"condition":"1=1","Test_Type":"FAI","Limit_Value":1.42,"uplow":",;,;null,null;null,null;null,null;null,null;null,null;null,null;null,null;null,null;null,null;null,null;null,null;null,null;null,null;null,null;null,null;null,null;null,null;null,null;null,null;null,null;null,null;null,null;null,null;null,null;null,null","number":"500","fault_index_num":"1","k":3,"version":"v-test","algorithm":"Cont"}
|
|
# r = cp_main(info)
|
|
# print(r)
|
|
simulation_test("SPE")
|
|
|
|
|