# -*- coding: utf-8 -*- """ @Time : 2020/6/14 16:42 @Author : 杰森·家乐森 @File : sae_diagnosis.py @Software: PyCharm """ import json import torch import itertools import numpy as np import pandas as pd import torch.nn as nn import torch.utils.data as Data from torch.autograd import Variable from sklearn.metrics import r2_score, mean_squared_error from sklearn.preprocessing import MinMaxScaler class Diagnosis(nn.Module): def __init__(self, model, dir): super(Diagnosis, self).__init__() self.model = model self.dir = torch.from_numpy(dir).float() self.tile = nn.Parameter(torch.ones(dir.shape[0], 1)) for p in self.parameters(): p.requires_grad = False self.amp = nn.Parameter(torch.zeros(*dir.shape)) def forward(self, x): input = self.tile.mm(x) + self.dir * self.amp output = predict(input, self.model) return input, output class RMSELoss(nn.Module): def __init__(self): super().__init__() self.mse = nn.MSELoss(reduce=False, size_average=False) def forward(self, y_true, y_pred): mse_loss = self.mse(y_true, y_pred) return torch.sqrt(mse_loss).data.numpy()[0][0] def mse_list(y_true, y_pre): return torch.mean(torch.pow(y_true - y_pre, 2), dim=1) def predict(data, model): k = (len(model['weights']) / 2) - 1 output = [data] for i in range(len(model['weights'])): if i == k: output.append(torch.sigmoid(output[i].mm(model['weights'][i].t()) + model["bias"][i])) else: output.append(output[i].mm(model['weights'][i].t()) + model["bias"][i]) return output[-1] def get_dir_array(data_dim, fault_num): """ 获取方向矩阵 :param data_dim: 样本维度 :param fault_num: 故障个数 :return: 方向矩阵 """ cols = np.array(list(itertools.combinations(range(data_dim), fault_num))) rows = np.array([[i] * fault_num for i in range(cols.shape[0])]) array = np.zeros([cols.shape[0], data_dim]) array[rows, cols] = 1 return array def rb_diagnosis(model, data, spe, dir_array, epochs=8000): """ 重构诊断 :param model: 建模模型 :param data: 故障数据 :param spe: spe限值 :param dir_array: 方向矩阵 :param epochs: 迭代次数 :return: 故障矩阵 """ data = np.atleast_2d(data) diagnosis = Diagnosis(model, dir_array) optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, diagnosis.parameters())) loss_func = nn.MSELoss() data = torch.from_numpy(data).float() dataset = Data.TensorDataset(data, data) train_set = Data.DataLoader(dataset=dataset) # 训练模型 for epoch in range(epochs): for step, (x, y) in enumerate(train_set): b_x = Variable(x).float() b_y = Variable(x).float() b_label = Variable(y) compensate, decoded = diagnosis(b_x) if step == 0 and epoch % 100 == 0 and (mse_list(decoded, compensate) < spe).nonzero().shape[0] > 0: break loss = loss_func(decoded, compensate) optimizer.zero_grad() loss.backward() optimizer.step() if step % 100 == 0: print('Epoch: ', epoch, ' loss: %.4f' % loss.data.numpy()) else: continue break diagnosis.eval() com, pre = diagnosis(data) return dir_array[mse_list(com, pre) < spe] def diagnosis(model, data, spe): """ 诊断函数 :param model: 建模模型 :param data: 诊断数据 :param spe: spe限值 :return: 故障矩阵 """ for i in range(len(model['weights'])): model['weights'][i] = torch.tensor(model['weights'][i]) model['bias'][i] = torch.tensor(model['bias'][i]) test_data = torch.from_numpy(data).float() predict_data = predict(test_data, model) mse = mse_list(predict_data, test_data).data.numpy() dection_array = np.zeros(data.shape) it = iter(np.where(mse > spe)[0]) for i in it: for j in range(data.shape[1] - 1): dir = get_dir_array(data.shape[1], j + 1) fault_dir = rb_diagnosis(model, data[i, :], spe, dir) if fault_dir.shape[0] > 0: dection_array[i, :] = fault_dir[0, :] break return dection_array