You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
143 lines
4.3 KiB
143 lines
4.3 KiB
2 weeks ago
|
# -*- coding: utf-8 -*-
|
||
|
"""
|
||
|
@Time : 2020/6/14 16:42
|
||
|
@Author : 杰森·家乐森
|
||
|
@File : sae_diagnosis.py
|
||
|
@Software: PyCharm
|
||
|
"""
|
||
|
import json
|
||
|
import torch
|
||
|
import itertools
|
||
|
import numpy as np
|
||
|
import pandas as pd
|
||
|
import torch.nn as nn
|
||
|
import torch.utils.data as Data
|
||
|
from torch.autograd import Variable
|
||
|
from sklearn.metrics import r2_score, mean_squared_error
|
||
|
from sklearn.preprocessing import MinMaxScaler
|
||
|
|
||
|
|
||
|
class Diagnosis(nn.Module):
|
||
|
def __init__(self, model, dir):
|
||
|
super(Diagnosis, self).__init__()
|
||
|
self.model = model
|
||
|
self.dir = torch.from_numpy(dir).float()
|
||
|
self.tile = nn.Parameter(torch.ones(dir.shape[0], 1))
|
||
|
for p in self.parameters():
|
||
|
p.requires_grad = False
|
||
|
self.amp = nn.Parameter(torch.zeros(*dir.shape))
|
||
|
|
||
|
def forward(self, x):
|
||
|
input = self.tile.mm(x) + self.dir * self.amp
|
||
|
output = predict(input, self.model)
|
||
|
return input, output
|
||
|
|
||
|
|
||
|
class RMSELoss(nn.Module):
|
||
|
def __init__(self):
|
||
|
super().__init__()
|
||
|
self.mse = nn.MSELoss(reduce=False, size_average=False)
|
||
|
|
||
|
def forward(self, y_true, y_pred):
|
||
|
mse_loss = self.mse(y_true, y_pred)
|
||
|
return torch.sqrt(mse_loss).data.numpy()[0][0]
|
||
|
|
||
|
|
||
|
def mse_list(y_true, y_pre):
|
||
|
return torch.mean(torch.pow(y_true - y_pre, 2), dim=1)
|
||
|
|
||
|
|
||
|
def predict(data, model):
|
||
|
k = (len(model['weights']) / 2) - 1
|
||
|
output = [data]
|
||
|
for i in range(len(model['weights'])):
|
||
|
if i == k:
|
||
|
output.append(torch.sigmoid(output[i].mm(model['weights'][i].t()) + model["bias"][i]))
|
||
|
else:
|
||
|
output.append(output[i].mm(model['weights'][i].t()) + model["bias"][i])
|
||
|
return output[-1]
|
||
|
|
||
|
|
||
|
def get_dir_array(data_dim, fault_num):
|
||
|
"""
|
||
|
获取方向矩阵
|
||
|
:param data_dim: 样本维度
|
||
|
:param fault_num: 故障个数
|
||
|
:return: 方向矩阵
|
||
|
"""
|
||
|
cols = np.array(list(itertools.combinations(range(data_dim), fault_num)))
|
||
|
rows = np.array([[i] * fault_num for i in range(cols.shape[0])])
|
||
|
array = np.zeros([cols.shape[0], data_dim])
|
||
|
array[rows, cols] = 1
|
||
|
return array
|
||
|
|
||
|
|
||
|
def rb_diagnosis(model, data, spe, dir_array, epochs=8000):
|
||
|
"""
|
||
|
重构诊断
|
||
|
:param model: 建模模型
|
||
|
:param data: 故障数据
|
||
|
:param spe: spe限值
|
||
|
:param dir_array: 方向矩阵
|
||
|
:param epochs: 迭代次数
|
||
|
:return: 故障矩阵
|
||
|
"""
|
||
|
data = np.atleast_2d(data)
|
||
|
diagnosis = Diagnosis(model, dir_array)
|
||
|
optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, diagnosis.parameters()))
|
||
|
loss_func = nn.MSELoss()
|
||
|
data = torch.from_numpy(data).float()
|
||
|
dataset = Data.TensorDataset(data, data)
|
||
|
train_set = Data.DataLoader(dataset=dataset)
|
||
|
# 训练模型
|
||
|
for epoch in range(epochs):
|
||
|
for step, (x, y) in enumerate(train_set):
|
||
|
b_x = Variable(x).float()
|
||
|
b_y = Variable(x).float()
|
||
|
b_label = Variable(y)
|
||
|
|
||
|
compensate, decoded = diagnosis(b_x)
|
||
|
if step == 0 and epoch % 100 == 0 and (mse_list(decoded, compensate) < spe).nonzero().shape[0] > 0:
|
||
|
break
|
||
|
loss = loss_func(decoded, compensate)
|
||
|
|
||
|
optimizer.zero_grad()
|
||
|
loss.backward()
|
||
|
optimizer.step()
|
||
|
|
||
|
if step % 100 == 0:
|
||
|
print('Epoch: ', epoch, ' loss: %.4f' % loss.data.numpy())
|
||
|
else:
|
||
|
continue
|
||
|
break
|
||
|
diagnosis.eval()
|
||
|
com, pre = diagnosis(data)
|
||
|
|
||
|
return dir_array[mse_list(com, pre) < spe]
|
||
|
|
||
|
|
||
|
def diagnosis(model, data, spe):
|
||
|
"""
|
||
|
诊断函数
|
||
|
:param model: 建模模型
|
||
|
:param data: 诊断数据
|
||
|
:param spe: spe限值
|
||
|
:return: 故障矩阵
|
||
|
"""
|
||
|
for i in range(len(model['weights'])):
|
||
|
model['weights'][i] = torch.tensor(model['weights'][i])
|
||
|
model['bias'][i] = torch.tensor(model['bias'][i])
|
||
|
test_data = torch.from_numpy(data).float()
|
||
|
predict_data = predict(test_data, model)
|
||
|
mse = mse_list(predict_data, test_data).data.numpy()
|
||
|
dection_array = np.zeros(data.shape)
|
||
|
it = iter(np.where(mse > spe)[0])
|
||
|
for i in it:
|
||
|
for j in range(data.shape[1] - 1):
|
||
|
dir = get_dir_array(data.shape[1], j + 1)
|
||
|
fault_dir = rb_diagnosis(model, data[i, :], spe, dir)
|
||
|
if fault_dir.shape[0] > 0:
|
||
|
dection_array[i, :] = fault_dir[0, :]
|
||
|
break
|
||
|
return dection_array
|