You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

341 lines
11 KiB

# -*- coding: utf-8 -*-
"""
@Time : 2020/6/14 16:42
@Author : 杰森·家乐森
@File : sae_diagnosis.py
@Software: PyCharm
"""
import json
import time
import torch
import itertools
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.utils.data as Data
from torch.autograd import Variable
from sklearn.metrics import r2_score, mean_squared_error
from sklearn.preprocessing import MinMaxScaler
from matplotlib import pyplot as plt
from warnings import warn
from scipy.stats import f
class Diagnosis(nn.Module):
def __init__(self, model, dir):
super(Diagnosis, self).__init__()
self.model = model
self.dir = torch.from_numpy(dir).float()
self.tile = nn.Parameter(torch.ones(dir.shape[0], 1))
for p in self.parameters():
p.requires_grad = False
self.amp = nn.Parameter(torch.zeros(*dir.shape))
def forward(self, x):
input = self.tile.mm(x) + self.dir * self.amp
output = predict(input, self.model)
return input, output
class RMSELoss(nn.Module):
def __init__(self):
super().__init__()
self.mse = nn.MSELoss(reduce=False, size_average=False)
def forward(self, y_true, y_pred):
mse_loss = self.mse(y_true, y_pred)
return torch.sqrt(mse_loss).data.numpy()[0][0]
def mse_list(y_true, y_pre):
return torch.sqrt(torch.mean(torch.pow(y_true - y_pre, 2), dim=1))
def predict(data, model):
k = (len(model['weights']) // 2) - 1
output = [data]
for i in range(len(model['weights'])):
# if i == k:
# output.append(output[i].mm(model['weights'][i]) + model["bias"][i])
# else:
output.append(torch.sigmoid(output[i].mm(model['weights'][i]) + model["bias"][i]))
return output[-1]
def get_dir_array(data_dim, fault_num):
"""
获取方向矩阵
:param data_dim: 样本维度
:param fault_num: 故障个数
:return: 方向矩阵
"""
cols = np.array(list(itertools.combinations(range(data_dim), fault_num)))
rows = np.array([[i] * fault_num for i in range(cols.shape[0])])
array = np.zeros([cols.shape[0], data_dim])
array[rows, cols] = 1
return array
def bessel_correction(x, y):
n1 = x.shape[0] - 1
if y is None:
n2 = 0
else:
n2 = y.shape[0] - 1
return n1, n2
def pooled_covariance_matrix(x, y, bessel=True):
""" pooled covariance
Compute the pooled covariance matrix
Equation:
The pooled covariance matrix is defined as:
.. math::
S = \\frac{n_xS_x + n_yS_y}{n_x+n_y}
And with bessel correction as:
.. math::
S = \\frac{(n_x-1)S_x + (n_y-1)S_y}{n_x+n_y-2}
Reference
---------
see: https://en.wikipedia.org/wiki/Hotelling%27s_T-squared_distribution#Pooled_covariance_matrix
:param x: array-like, samples of observations
:param y: array-like, samples of observations
:param bessel: bool, apply bessel correction (default)
:return: float, the pooled variance
"""
_, *p = x.shape
p = p[0] if p else 1
if bessel:
n1, n2 = bessel_correction(x, y)
else:
n1 = x.shape[0]
n2 = y.shape[0]
try:
s1 = n1 * x.cov()
except AttributeError:
s1 = n1 * np.cov(x, rowvar=False)
try:
s2 = n2 * y.cov()
except AttributeError:
s2 = n2 * np.cov(y, rowvar=False)
s = (s1 + s2) / (n1 + n2)
return s
def inverse_covariance_matrix(x, y, bessel=True):
"""
:param x: array-like, samples of observations
:param y: array-like, samples of observations
:param bessel: bool, apply bessel correction (default)
:return: float, the pooled variance inverse, the pooled variance
"""
_, *p = x.shape
p = p[0] if p else 1
s = pooled_covariance_matrix(x, y, bessel)
inv = np.linalg.solve(s, np.identity(p))
return inv, s
def hotelling_t2(x, y=None, bessel=True, S=None):
"""
Compute the Hotelling (T2) test statistic.
It is the multivariate extension of the Student's t-test.
Test the null hypothesis that two multivariate samples have the same underlying
probability distribution, when specifying samples for x and y. The number of samples do not have
to be the same, but the number of features does have to be equal.
Equation:
Hotelling's t-squared statistic is defined as:
.. math::
T^2 = n (\\bar{x} - {\mu})^{T} S^{-1} (\\bar{x} - {\mu})
Where S is the pooled covariance matrix and ᵀ represents the transpose.
The two sample t-squared statistic is defined as:
.. math::
T^2 = (\\bar{x} - \\bar{y})^{T} [S(\\frac1 n_x +\\frac 1 n_y)]^{-1} (\\bar{x}̄ - \\bar{y})
References:
- Hotelling, Harold. (1931). The Generalization of Student's Ratio. Ann. Math. Statist. 2, no. 3, 360--378.
doi:10.1214/aoms/1177732979. https://projecteuclid.org/euclid.aoms/1177732979
- Hotelling, Harold. (1955) Les Rapports entre les Methodes Statistiques recentes portant sur des Variables Multiples
et l'Analyse Factorielle. 107-119.
In: L'Analyse Factorielle et ses Applications. Centre National de la Recherche Scientifique, Paris.
- Anderson T.W. (1992) Introduction to Hotelling (1931) The Generalization of Student’s Ratio.
In: Kotz S., Johnson N.L. (eds) Breakthroughs in Statistics.
Springer Series in Statistics (Perspectives in Statistics). Springer, New York, NY
:param x: array-like, samples of observations for one or two sample test (required)
:param y: for two sample test, array-like, samples of observations (optional), for one sample, list of means to test
:param bessel: bool, apply bessel correction (default)
:return:
statistic: float,
the t2 statistic
f_value: float,
the f value
p_value: float,
the p value
s: 2d array,
the pooled variance
"""
try:
nx, p = x.shape
except AttributeError as ex:
if "list" in str(ex):
x = np.asarray(x)
nx, *p = x.shape
p = p[0] if p else 1
y = np.asarray(y)
else:
warn(f"Error: The two samples must be in arrays or dataframes format.")
raise ValueError
# samples observed means
x_bar = x.mean(0)
one_sample = False
if y is None:
# One sample T-squared
one_sample = True
y = np.zeros(p)
ny = None
py = p
diff_bar = x_bar - y
else:
ny, *py = y.shape
if len(py) == 0:
one_sample = True
py = p
diff_bar = x_bar - y
else:
# Two sample T-squared
py = py[0] if py else 1
y_bar = y.mean(0)
# difference of means
diff_bar = x_bar - y_bar
if p != py:
warn(
f"Error: the two samples must have the same number of features ({p} != {py})."
)
raise ValueError
# bessel correction ( -1 )
if bessel:
n1, n2 = bessel_correction(x, y)
else:
n1 = nx
n2 = ny
if one_sample:
n = nx
else:
n = n1 + n2
# calculate the T2 statistics
# Technically, we use diff_bar.T for the transpose, but with Pandas, a 1 dimensional dataframe
# is automatically aligned for @ and is not required
if one_sample:
if S is not None:
cov = S
else:
try:
cov = x.cov()
except AttributeError:
cov = np.cov(x, rowvar=False)
inv_cov = np.linalg.pinv(cov)
t2_stat = n * (diff_bar.T @ inv_cov @ diff_bar)
if S is not None:
return t2_stat
# f statistic
# TODO: use chi square instead of f statistic for large sample
f_value = (n - p) * t2_stat / ((n - 1) * p)
else:
# pooled covariance
inv_s, s = inverse_covariance_matrix(x, y, bessel)
t2_stat = nx * ny / (nx + ny) * (diff_bar.T @ inv_s @ diff_bar)
# f statistic
# TODO: use chi square instead of f statistic for large sample
f_value = (nx + ny - p - 1) * t2_stat / (n * p)
# p-value
p_value = f.sf(f_value, p, n - p) # survival function, 1 - cdf
# return the list of results
return t2_stat, f_value, p_value, cov if one_sample else s
def rb_diagnosis(model, data, spe, dir_array, epochs=8000):
"""
重构诊断
:param model: 建模模型
:param data: 故障数据
:param spe: spe限值
:param dir_array: 方向矩阵
:param epochs: 迭代次数
:return: 故障矩阵
"""
data = np.atleast_2d(data)
diagnosis = Diagnosis(model, dir_array)
optimizer = torch.optim.SGD(filter(lambda p: p.requires_grad, diagnosis.parameters()), lr=0.4)
loss_func = nn.MSELoss()
data = torch.from_numpy(data).float()
dataset = Data.TensorDataset(data, data)
train_set = Data.DataLoader(dataset=dataset)
# 训练模型
for epoch in range(epochs):
for step, (x, y) in enumerate(train_set):
b_x = Variable(x).float()
b_y = Variable(x).float()
b_label = Variable(y)
compensate, decoded = diagnosis(b_x)
if step == 0 and epoch % 100 == 0 and (mse_list(decoded, compensate) < spe).nonzero().shape[0] > 0:
break
loss = loss_func(decoded, compensate)
optimizer.zero_grad()
loss.backward()
optimizer.step()
else:
continue
break
diagnosis.eval()
com, pre = diagnosis(data)
return dir_array[mse_list(com, pre) < spe], (com - data).data.numpy()[mse_list(com, pre) < spe]
def diagnosis(model, data, spe):
"""
诊断函数
:param model: 建模模型
:param data: 诊断数据
:param spe: spe限值
:return: 故障矩阵
"""
for i in range(len(model['weights'])):
model['weights'][i] = torch.tensor(model['weights'][i])
model['bias'][i] = torch.tensor(model['bias'][i])
test_data = torch.from_numpy(data).float()
predict_data = predict(test_data, model)
mse = mse_list(predict_data, test_data).data.numpy()
dection_array = np.zeros(data.shape)
amp_array = np.zeros(data.shape)
# plt.hlines(spe, 0, 1000)
# plt.plot(mse)
# plt.show()
it = iter(np.where(mse > spe)[0])
for i in it:
time1 = time.time()
for j in range(data.shape[1] - 1):
dir = get_dir_array(data.shape[1], j + 1)
fault_dir, com_amp = rb_diagnosis(model, data[i, :], spe, dir)
if fault_dir.shape[0] > 0:
dection_array[i, :] = fault_dir[0, :]
amp_array[i, :] = com_amp[0, :]
time2 = time.time()
print("%d轮诊断完成" % i)
print("耗时%f" % (time2 - time1))
print(fault_dir[0, :])
print(com_amp[0, :])
break
return dection_array, amp_array