# -*- coding: utf-8 -*- import numpy as np import pymssql import json import config import requests import random from PCA_Test import get_history_value from recon import contribution_plot import xlsxwriter import pandas as pd def get_test_data(info): samples, amplitudes, fault_index, fault_index_num = info["samples"], info["amplitudes"], info["fault_index"], info["fault_index_num"] data = get_clean_data(info) # get historical data n, m = data.shape if n < samples: random_row = np.random.choice(list(range(n)), samples) else: random_row = np.array(random.sample(list(range(n)), samples)) test_data = data[random_row, :] error_data = np.zeros(shape=(samples, m)) f_matrix = np.zeros(shape=(samples, m)) # 故障方向矩阵 for i in range(samples): f_index = np.random.choice(fault_index, int(fault_index_num), replace=False) for j in f_index: upper, lower = [float(item) for item in amplitudes[j].split(',')] faults = (upper - lower) * np.random.random_sample() + lower error_data[i, j] = faults f_matrix[i, j] = 1 return error_data, test_data + error_data, f_matrix def get_test_data_1(data, samples, amplitudes, fault_index, fault_index_num): n, m = data.shape if n < samples: random_row = np.random.choice(list(range(n)), samples) else: random_row = np.array(random.sample(list(range(n)), samples)) test_data = data[random_row, :] error_data = np.zeros(shape=(samples, m)) f_matrix = np.zeros(shape=(samples, m)) # 故障方向矩阵 for i in range(samples): f_index = np.random.choice(fault_index, int(fault_index_num), replace=False) for j in f_index: upper, lower = [float(item) for item in amplitudes[j].split(',')] faults = (upper - lower) * np.random.random_sample() + lower error_data[i, j] = faults f_matrix[i, j] = 1 # write_excel(test_data + error_data, 'test_1.xlsx') # write_excel(f_matrix, 'fm_1.xlsx') # write_excel(error_data, 'error_1.xlsx') return error_data, test_data + error_data, f_matrix def get_data_from_excel(filename): r = pd.read_excel(filename, header=None) return np.array(r) def get_clean_data(info): ItemsInfo = [] SamplingTimePeriods = [] interval = info["Test_Data"]["interval"] Test_Data = info["Test_Data"] points = Test_Data["points"].split(",") try: times = Test_Data["time"].split(";") for i in range(len(times)): Eachsampletime = {} timess = times[i].split(',') Eachsampletime["StartingTime"] = timess[0] Eachsampletime["TerminalTime"] = timess[1] SamplingTimePeriods.append(Eachsampletime) except KeyError: SamplingTimePeriods = [{"StartingTime": item["st"], "TerminalTime": item["et"]} for item in res["trainTime"]] condition = info["condition"].replace("=", "==").replace(">=", ">").replace("<=", "<") dead = info["dead"].split(',') limit = info["limit"].split(',') uplower = info["uplow"].split(';') count = 0 Constraint = "" for i in range(len(points)): iteminfo = {} iteminfo["ItemName"] = points[i] # 加点 if dead[i] == "1": # 判断是否参与死区清洗 iteminfo["ClearDeadZone"] = "true" else: iteminfo["ClearDeadZone"] = "false" if limit[i] == "1": # 参与上下限清洗 limits = uplower[i].split(',') if isnumber(limits): # 输入上下限正确 count += 1 Constraint += "[" + points[i] + "]>" + limits[0] + " and " + "[" + points[i] + "]<" + limits[ 1] + " and " ItemsInfo.append(iteminfo) if count != 0: Constraint = Constraint[:len(Constraint) - 4:] else: Constraint = "1==1" # 没有上下限清洗 Constraint += " and (" + condition + ")" Constraint = Constraint.replace("\n", " ") url = f"http://{config._CLEAN_IP}/exawebapi/exatime/GetCleaningData?ItemsInfo=%s&SamplingTimePeriods=%s&Constraint=%s&SamplingPeriod=%s&DCount=6" % ( ItemsInfo, SamplingTimePeriods, Constraint, interval) response = requests.get(url) content = json.loads(response.text) data = np.array([item for item in content["ClearData"]]).T # test data return data def get_simulation_data(n): m = np.array([[1, 0, 0], [-3, 1, 0], [0, 3, -1]]) # m = np.random.ranf() t1 = np.random.normal(0, 1, n) t2 = np.random.normal(0, 0.8, n) t3 = np.random.normal(0, 0.6, n) t = np.vstack((t1, t2, t3)) noise = np.random.normal(0, 0.2, size=(m.shape[0], t.shape[1])) train_data = (m @ t + noise).T # write_excel(train_data, 'train_1.xlsx') return train_data def get_m(): n = 1000 m1 = np.random.ranf(size=(10, 2)) # 10个参数,2个主元 m2 = np.random.ranf(size=(20, 3)) # 20个参数, 3个主元 m3 = np.random.ranf(size=(50, 5)) # 50个参数, 5个主元 t1 = np.random.normal(0, 1, n) t2 = np.random.normal(0, 0.8, n) t3 = np.random.normal(0, 0.6, n) t4 = np.random.normal(0, 0.4, n) t5 = np.random.normal(0, 0.2, n) t1_1 = np.vstack((t1, t2)) t1_2 = np.vstack((t1, t2, t3)) t1_3 = np.vstack((t1, t2, t3, t4, t5)) noise_1 = np.random.normal(0, 0.2, size=(m1.shape[0], t1_1.shape[1])) noise_2 = np.random.normal(0, 0.2, size=(m2.shape[0], t1_2.shape[1])) noise_3 = np.random.normal(0, 0.2, size=(m3.shape[0], t1_3.shape[1])) data_1 = (m1 @ t1_1 + noise_1).T data_2 = (m2 @ t1_2 + noise_2).T data_3 = (m3 @ t1_3 + noise_3).T write_excel(data_1, "data_1.xlsx") write_excel(data_2, "data_2.xlsx") write_excel(m3, "m5.xlsx") def write_excel(data, filename): wb = xlsxwriter.Workbook(filename) ws = wb.add_worksheet() for i in range(data.shape[0]): for j in range(data.shape[1]): ws.write(i, j, data[i,j]) wb.close() if __name__ == "__main__": get_m()