-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathHG_AEN.py
488 lines (449 loc) · 27.7 KB
/
HG_AEN.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
# coding=utf-8
"""
Author: Michael
Department: SEU Hand Group
Date:2019/5/28
Profile: Integration of semi-supervised deep learning network
脚本完成的任务:
1.根据slidWin_Data和PCA_Matrix生成PCA_Data,按主成分特征进行"统一"绝对最大值归一化,然后划分训练集与测试集,最后存入PCA_Data(归一化数据)
2.自PCA输出的归一化参数存入Scaler_Factors,保存为pca_layer_MAS_scale
3.进行自编码器学习,学习权值存入AEN_Matrix,编码数据先不进行标准归一化,然后存入AEN_Data(未归一化数据)的transform,解码数据(复原数据)
无需进行归一化即存入AEN_Data的restruct
4.生成标签,存放在AGL_Data(AutoGeneratedLabel)
5.回归神经网络拟合。正负标签独立进行最值归一化,输入编码数据进行标准归一化,进行拟合,此时再将编码数据的归一化参数存储在aen_layer_SS
“提前存储经过归一化的肌肉协同数据,该数据不能很好的提取出隐含标签”
Improvements:
1.整合数据采集和处理,写在统一的函数内,方便记录训练集和测试集
2.自生成标签增加了低通滤波,截止频率与训练时长挂钩,轨迹更为光滑,更易找到峰峰值
3.标签生成过程充分与峰峰值挂钩
4.增加网络一键生成
5.增加识别率评估值
6.为了让产生负标签更好,干脆更改训练顺序,将正负标签划分清,不再交替
Tip:首先需要先运行Data_Collect_and_Preprocess采集训练或测试数据,并作PCA预处理
然后运行HG_AEN进行数据集的划分和自编码器学习
学习完成后,直接将包含权值矩阵的用户信息拷贝至树莓派。
离线测试时,进行神经网络的运算时,存储的数据已经进行了归一化处理,在线识别时不包含,这是在线与离线在代码上的差异。
自编码器网络学习关键参数
1.训练集与测试集之比 7:3
2019.7.18
bug: 单次训练的文件有问题,生成的测试集的末尾会全为0,长度为原序列的两倍长,目前简单得解决了一下,即当单次采集时,只取前面的序列。
2019.9.1
目前预处理部分也转移到HG_AEN
"""
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from scipy import signal
import sys
import os
from math import floor
from sklearn.preprocessing import MaxAbsScaler, StandardScaler
from sklearn.neural_network import MLPRegressor
from sklearn.metrics import r2_score
import schedule
from Lib.wblgers.Autoencoder import Autoencoder # github搜索到的库
from Lib.peakdet.peakdet import peakdet
from sklearn.decomposition import PCA
from sklearn.metrics import r2_score
class HG_AEN(object):
def __init__(self):
"""
输入身份信息
"""
while True:
self.usr_name = input("用户名(一卡通号或身份证号后6位):")
if len(self.usr_name) != 6: # 如果未输入6位身份号码
# print("Please enter the valid information, *** Last 6 digits of school ID or personal ID ***")
print("警告!请正确输入用户名!*** 一卡通号或身份证号后6位 ***")
os.system('pause') # 按任意键退出暂停
os.system('cls') # 清屏
continue # 退出循环 重新输入
else:
if os.path.isdir(os.path.join(os.path.abspath('.'),'Data', self.usr_name)) is True: # 如果存在该用户的数据
for root, dirs, files in os.walk(os.path.join(os.path.abspath('.'), 'Data', self.usr_name)):
break # 循环一次退出for循环
for file_name in files:
print(file_name)
self.target_addr = os.path.join(os.path.abspath('.'), 'Data', self.usr_name, file_name)
'''
下面这句话用于预处理,只需要调用一次,不然会重复生成数据
'''
emg_process(self.target_addr) # 数据预处理
self.target_addr = os.path.join(os.path.abspath('.'), 'Data', self.usr_name)
self.report = np.loadtxt(os.path.join(self.target_addr, 'slidWin_Data', 'report.out'))
print(np.array(self.report))
if np.size(self.report) > 1:
self.seg_bound_indx = floor(np.size(self.report) * 0.9) # 这个有用,用于确定训练集和测试集的边界位置,在生成标签数据时有用。
self.seg_boundary = np.sum(self.report[0:self.seg_bound_indx], dtype=np.int32) # 划分训练集
break
else: # 训练次数不满两次
print("警告!该用户仅完成了一轮训练,请增加训练次数!")
self.seg_bound_indx = np.size(self.report) # 这个有用,用于确定训练集和测试集的边界位置,在生成标签数据时有用。
self.seg_boundary = self.report # 划分训练集
break
else: # else表示未录入过该数据
print("警告!未录入过该用户,请检查用户名!")
continue
"""
将数据集进行划分
"""
self.dataset_segment()
"""
自编码器层学习
"""
self.autoencoder()
"""
标签自动生成
"""
self.auto_generated_label()
"""
回归神经网络层学习
"""
self.reg_neuralnetwork()
def dataset_segment(self): # 再写一个文件,存储slidWin的存储信息,比如每次训练集的长度
temp = None
scaler_list = np.zeros((8, 1)) # 存放8通道缩放因子的矩阵
plt.figure()
for i in range(8):
U_T = np.loadtxt(os.path.join(self.target_addr, 'PCA_Martix', 'PCAM_Ch%d.out' % i)) # 加载主成分矩阵 20行3列
X = np.loadtxt(os.path.join(self.target_addr, 'slidWin_Data', 'slidWin_Ch%d.out' % i)) # 加载主成分矩阵 20行3列
pca_data = np.dot(X, U_T)
X_ = np.loadtxt(os.path.join(self.target_addr, 'PCA_Data', 'X_%d.out' % i)) # 加载主成分矩阵 20行3列
# pca_r2 = r2_score(X[6500:8000, 0], X_[6500:8000, 0])
# print("The r2 value of Ch%d is %f" % (i, pca_r2))
# plt.subplot(8, 1, i+1)
# plt.plot(X[:, 0])
# plt.plot(X_[:, 0], '--')
"""
数据归一化,同一通道的所有主元采用第一主元的缩放因子
"""
scaler = MaxAbsScaler() # 数据标准化
scaler.fit(pca_data[:, 0].reshape(-1, 1)) # 仅取第一主元的缩放因子,所有成分都用第一主元的缩放因子
scaler_list[i, 0] = scaler.scale_
pca_data = pca_data / scaler.scale_
if i is 0:
temp = pca_data
else:
temp = np.hstack((temp, pca_data)) # 将数据进行水平合并
plt.plot(X[6500:8000, 0])
plt.plot(X_[6500:8000, 0], '--')
plt.show()
if os.path.isdir(os.path.join(self.target_addr, 'Scaler_Factors')) is False: # 建立归一化参数路径
os.mkdir(r'%s' % os.path.join(self.target_addr, 'Scaler_Factors')) # 目录需要一级一级建立,否则会找不到底层目录
np.savetxt(os.path.join(self.target_addr, 'Scaler_Factors', 'pca_layer_MAS_scale.out'), scaler_list)
'''
划分训练集和测试集
'''
if np.sum(self.report) != temp.shape[0]:
print("警告!数据报告与数据库内容不符!")
if np.size(self.report) == 1:
x_train = temp[:, :]
x_test = x_train
else:
x_train = temp[0:self.seg_boundary, :]
x_test = temp[self.seg_boundary:, :]
'''
存储数据
'''
if os.path.isdir(os.path.join(self.target_addr, 'PCA_Data')) is False: # 路径不存在则新建
os.mkdir(os.path.join(self.target_addr, 'PCA_Data'))
np.savetxt(os.path.join(self.target_addr, 'PCA_Data', 'pca_train.out'), x_train)
np.savetxt(os.path.join(self.target_addr, 'PCA_Data', 'pca_test.out'), x_test)
def autoencoder(self, training_epochs=500, batch_size=128, display_step=2, corruption_level=0,
sparse_reg=0, lr=0.001, n_hidden=6):
x_train = np.loadtxt(os.path.join(self.target_addr, 'PCA_Data', 'pca_train.out'))
x_test = np.loadtxt(os.path.join(self.target_addr, 'PCA_Data', 'pca_test.out'))
'''
输入神经元的可视化
'''
plt.figure()
for i in range(3):
plt.subplot(3, 1, i+1)
plt.plot(x_train[:, i])
plt.show()
n_inputs = x_train.shape[1]
ae = Autoencoder(n_layers=[n_inputs, n_hidden],
transfer_function=tf.nn.relu,
optimizer=tf.train.AdamOptimizer(learning_rate=lr),
ae_para=[corruption_level, sparse_reg])
with tf.Session() as sess:
tf.global_variables_initializer().run() # 初始化参数
for epoch in range(training_epochs):
avg_cost = 0.
# total_batch = int(X_train.shape[0] / batch_size)
minibatches = ae.random_mini_batches(x_train, batch_size) # 训练样本的批处理
# Loop over all batches
for minibatch in minibatches:
with tf.device('/gpu:0'):
cost, opt, hidden = sess.run(ae.partial_fit(),
feed_dict={ae.x: minibatch, ae.keep_prob: ae.in_keep_prob})
# Compute average loss
avg_cost += cost / x_train.shape[0] * batch_size
# Display logs per epoch step
if epoch % display_step == 0:
# sys.stdout.write("\r{0}".format(emg) + " ")
sys.stdout.write("\rEpoch: %d, Cost: %f" % ((epoch + 1), avg_cost))
sys.stdout.flush() # 直接输出
# print("\rEpoch:", '%d,' % (epoch + 1),
# "Cost:", "{:.9f}".format(avg_cost))
if epoch % (display_step * 10) == 0:
print("\rEpoch:", '%d,' % (epoch + 1),
"Cost:", "{:.9f}".format(avg_cost))
X_test_transform = sess.run(ae.transform(), feed_dict={ae.x: x_test, ae.keep_prob: 1.0})
X_test_restruct = sess.run(ae.reconstruct(), feed_dict={ae.x: x_test, ae.keep_prob: 1.0})
X_train_transform = sess.run(ae.transform(), feed_dict={ae.x: x_train, ae.keep_prob: 1.0})
X_train_restruct = sess.run(ae.reconstruct(), feed_dict={ae.x: x_train, ae.keep_prob: 1.0})
w = sess.run(ae.weights['encode'][0]['w'])
b = sess.run(ae.weights['encode'][0]['b'])
if os.path.isdir(os.path.join(self.target_addr, 'AEN_Matrix')) is False: # 路径不存在则新建
os.mkdir(os.path.join(self.target_addr, 'AEN_Matrix'))
np.savetxt(os.path.join(self.target_addr, 'AEN_Matrix', 'w_ae.out'), w)
np.savetxt(os.path.join(self.target_addr, 'AEN_Matrix', 'b_ae.out'), b)
if os.path.isdir(os.path.join(self.target_addr, 'AEN_Data')) is False: # 路径不存在则新建
os.mkdir(os.path.join(self.target_addr, 'AEN_Data'))
np.savetxt(os.path.join(self.target_addr, 'AEN_Data', 'X_train_transform.out'), X_train_transform)
np.savetxt(os.path.join(self.target_addr, 'AEN_Data', 'X_test_transform.out'), X_test_transform)
# 主成分分析的复原信号(解码信号),用于观察
np.savetxt(os.path.join(self.target_addr, 'AEN_Data', 'X_train_restruct.out'), X_train_restruct)
np.savetxt(os.path.join(self.target_addr, 'AEN_Data', 'X_test_restruct.out'), X_test_restruct)
def auto_generated_label(self):
x_train = np.loadtxt(os.path.join(self.target_addr, 'PCA_Data', 'pca_train.out'))
x_train_restruct = np.loadtxt(os.path.join(self.target_addr, 'AEN_Data', 'X_train_restruct.out'))
x_train_transform = np.loadtxt(os.path.join(self.target_addr, 'AEN_Data', 'X_train_transform.out'))
x_test_transform = np.loadtxt(os.path.join(self.target_addr, 'AEN_Data', 'X_test_transform.out'))
"""
自编码器复原信号的可视化
"""
plt.figure()
for i in range(24):
plt.subplot(8, 3, i + 1)
plt.plot(x_train_restruct[:, i], '--')
plt.plot(x_train[:, i])
plt.show()
"""
肌肉协同特征的可视化
"""
plt.figure()
for i in range(6):
plt.subplot(6, 1, i + 1)
plt.plot(x_train_transform[0:-1, i])# + i * 0.5)
plt.show()
"""
自动生成标签.包括训练集和测试集
"""
plt.figure()
series_musyn = np.sum(np.vstack((x_train_transform, x_test_transform)), axis=1)
plt.plot(series_musyn)
b, a = signal.butter(2, (2 * 1.65 / 100), 'lowpass')
series = signal.filtfilt(b, a, x=series_musyn)
plt.plot(series, '--')
# print(len(series))
maxtab, mintab = peakdet(series, delta=0.4) # 这里的代码可以优化,通过检查峰谷的个数,距离,自动纠正步长,在这之前还能再滤波一下
# 增加一个约束条件,假如没有检测到序列的最后一个波谷,即最后一个极值点是以波峰结尾的,则追加序列的末位序列元素为波谷
if maxtab[-1, 0] > mintab[-1, 0]:
print("警告!最后一个波谷可能未录入,已自动修补")
mintab = np.vstack((mintab, [len(series)-1, 0])) # 数组标号从0开始,所以这里要-1
if len(maxtab[:, 0]) != len(mintab[:, 0]):
print("警告!检测到波峰与波谷个数不一致")
plt.scatter(np.array(maxtab)[:, 0], np.array(maxtab)[:, 1], color='blue')
plt.scatter(np.array(mintab)[:, 0], np.array(mintab)[:, 1], color='red')
plt.show()
label = series
auto_generated_label = np.zeros((3, len(label)))
# print(label.shape,mintab.shape)
# 为了让产生负标签更好,干脆更改训练顺序,将正负标签划分清,不再交替
for i in range(np.size(self.report)): # 生成负标签
# print(i,int(mintab[i * 18 + 17, 0]))
label[int(mintab[i * 18 + 8, 0]): int(mintab[i * 18 + 17, 0])] = - np.array(label[int(mintab[i * 18 + 8, 0]):int(mintab[i * 18 + 17, 0])]) \
+ 2 * min([label[int(mintab[i * 18 + 8, 0])], label[int(mintab[i * 18 + 17, 0])]])
label = label - np.mean(label) # 去中心化
# plt.figure()
# plt.plot(label)
# plt.show()
# 对标签序列进行分类
for i in range(np.size(self.report)): # 循环多轮附标签
if i == 0:
auto_generated_label[0, 0:int(mintab[i * 18 + 2, 0])] = label[0:int(mintab[2, 0])] - np.min(label[0:int(mintab[2, 0])])
else:
auto_generated_label[0, int(mintab[i * 18 - 1, 0]):int(mintab[i * 18 + 2, 0])] = label[int(mintab[i * 18 - 1, 0]):int(mintab[i * 18 + 2, 0])]\
- np.min(label[int(mintab[i * 18 - 1, 0]):int(mintab[i * 18 + 2, 0])])
auto_generated_label[1, int(mintab[i * 18 + 2, 0]):int(mintab[i * 18 + 5, 0])] = label[int(mintab[i * 18 + 2, 0]):int(mintab[i * 18 + 5, 0])]\
- np.min(label[int(mintab[i * 18 + 2, 0]):int(mintab[i * 18 + 5, 0])])
auto_generated_label[2, int(mintab[i * 18 + 5, 0]):int(mintab[i * 18 + 8, 0])] = label[int(mintab[i * 18 + 5, 0]):int(mintab[i * 18 + 8, 0])]\
- np.min(label[int(mintab[i * 18 + 5, 0]):int(mintab[i * 18 + 8, 0])])
auto_generated_label[0, int(mintab[i * 18 + 8, 0]):int(mintab[i * 18 + 11, 0])] = label[int(mintab[i * 18 + 8, 0]):int(mintab[i * 18 + 11, 0])]\
- np.max(label[int(mintab[i * 18 + 8, 0]):int(mintab[i * 18 + 11, 0])])
auto_generated_label[1, int(mintab[i * 18 + 11, 0]):int(mintab[i * 18 + 14, 0])] = label[int(mintab[i * 18 + 11, 0]):int(mintab[i * 18 + 14, 0])]\
- np.max(label[int(mintab[i * 18 + 11, 0]):int(mintab[i * 18 + 14, 0])])
auto_generated_label[2, int(mintab[i * 18 + 14, 0]):int(mintab[i * 18 + 17, 0])] = label[int(mintab[i * 18 + 14, 0]):int(mintab[i * 18 + 17, 0])]\
- np.max(label[int(mintab[i * 18 + 14, 0]):int(mintab[i * 18 + 17, 0])])
plt.figure()
for i in range(3):
# plt.subplot(3, 1, i + 1)
plt.plot(auto_generated_label[i, :] + i)
plt.show()
# print(np.shape(auto_generated_label))
if os.path.isdir(os.path.join(self.target_addr, 'AGL_Data')) is False: # 路径不存在则新建
os.mkdir(os.path.join(self.target_addr, 'AGL_Data'))
if np.size(self.report) > 1: # 对于一次训练的优化
np.savetxt(os.path.join(self.target_addr, 'AGL_Data', 'y_train.out'), auto_generated_label[:, 0:self.seg_boundary])
np.savetxt(os.path.join(self.target_addr, 'AGL_Data', 'y_test.out'), auto_generated_label[:, self.seg_boundary:])
else:
np.savetxt(os.path.join(self.target_addr, 'AGL_Data', 'y_train.out'), auto_generated_label)
np.savetxt(os.path.join(self.target_addr, 'AGL_Data', 'y_test.out'), auto_generated_label)
def reg_neuralnetwork(self):
x_train_transform = np.loadtxt(os.path.join(self.target_addr, 'AEN_Data', 'X_train_transform.out'))
x_test_transform = np.loadtxt(os.path.join(self.target_addr, 'AEN_Data', 'X_test_transform.out'))
y_train = np.loadtxt(os.path.join(self.target_addr, 'AGL_Data', 'y_train.out'))
y_test = np.loadtxt(os.path.join(self.target_addr, 'AGL_Data', 'y_test.out'))
print(np.shape(y_train),np.shape(y_test))
# 编码信号的归一化,用于输入下一层网络。归一化参数完全依赖训练集,非测试集
scaler = StandardScaler() # 数据标准化
scaler.fit(x_train_transform) # 用train的归一化参数同时对train和test做归一化处理,restruct用来观察复原情况,无需归一化
x_train_transform = scaler.transform(x_train_transform)
x_test_transform = scaler.transform(x_test_transform)
if os.path.isdir(os.path.join(self.target_addr, 'Scaler_Factors')) is False: # 建立归一化参数路径
os.mkdir(r'%s' % os.path.join(self.target_addr, 'Scaler_Factors')) # 目录需要一级一级建立,否则会找不到底层目录
np.savetxt(os.path.join(self.target_addr, 'Scaler_Factors', 'aen_layer_SS_mean.out'), scaler.mean_)
np.savetxt(os.path.join(self.target_addr, 'Scaler_Factors', 'aen_layer_SS_scale.out'), scaler.scale_)
# 标签数据的归一化
for i in range(3):
y_train[i, np.argwhere(y_train > 0)] = y_train[i, np.argwhere(y_train > 0)] / np.max(y_train[i, :])
y_train[i, np.argwhere(y_train < 0)] = y_train[i, np.argwhere(y_train < 0)] / np.abs(np.min(y_train[i, :]))
y_test[i, np.argwhere(y_test > 0)] = y_test[i, np.argwhere(y_test > 0)] / np.max(y_test[i, :])
y_test[i, np.argwhere(y_test < 0)] = y_test[i, np.argwhere(y_test < 0)] / np.abs(np.min(y_test[i, :]))
# plt.figure()
# for i in range(3):
# # plt.subplot(3, 1, i + 1)
# plt.plot(y_test[i, :] + i)
# plt.show()
# print(np.array(x_train_transform).shape, np.array(y_train).shape)
model = MLPRegressor(solver='adam', alpha=0.001, hidden_layer_sizes=200) # 默认激活函数relu
# print(x_test_transform.shape,x_train_transform.shape,y_test.shape,y_train.shape)
# print(np.vstack((x_train_transform,x_test_transform)).shape,np.vstack((y_train,y_test)).T.shape)
# print(np.shape(x_train_transform),np.shape(y_train))
if np.size(self.report) > 1:
model.fit(x_train_transform, y_train.T)
y_hat_train = model.predict(x_train_transform)
y_hat_test = model.predict(x_test_transform)
else:
model.fit(x_train_transform, y_train.T[0:int(self.report),:])
y_hat_train = model.predict(x_train_transform)
y_hat_test = model.predict(x_test_transform)
# y = model.predict(np.vstack((x_train_transform, x_test_transform)))
# plt.figure()
# for i in range(3):
# # print(np.shape(y_hat_train)[0] / 2)
# bound = int(np.shape(y_hat_train)[0] / 2)
# y1 = y_hat_train[0:bound, i].T
# y_hat = np.where(np.abs(y1) > 0.1, y1, 0)
# y = y_train[i, 0:bound]
# plt.subplot(3, 1, i + 1)
# plt.plot(y_hat, '--')
# plt.plot(y)
# reg_r2 = r2_score(y_hat, y)
# print("The r2 value of Freedom %d is %f" % (i, reg_r2))
"""
训练集拟合曲线
"""
plt.figure()
for i in range(3):
plt.subplot(3, 1, i + 1)
plt.plot(y_hat_train[:, i].T, '--')
plt.plot(y_train[i, :])
# reg_r2 = r2_score(y_hat_train[:, i].T, y_train[i, :])
# print("The training set r2 value of Freedom %d is %f" % (i, reg_r2))
# plt.plot(np.vstack((y_train.T,y_test.T))[:,i])
# plt.plot(np.vstack((y_hat_train.T,y_hat_test.T))[:,i],'--')
"""
测试集拟合曲线
"""
plt.figure()
for i in range(3):
plt.subplot(3, 1, i + 1)
plt.plot(y_hat_test[:, i].T, '--')
plt.plot(y_test[i, :])
# reg_r2 = r2_score(y_hat_test[:, i].T, y_test[i, :])
# print("The testing set r2 value of Freedom %d is %f" % (i, reg_r2))
if os.path.isdir(os.path.join(self.target_addr, 'RegNN_Matrix')) is False: # 建立归一化参数路径
os.mkdir(r'%s' % os.path.join(self.target_addr, 'RegNN_Matrix')) # 目录需要一级一级建立,否则会找不到底层目录
np.savetxt(os.path.join(self.target_addr, 'RegNN_Matrix', 'weight_input_MLPReg.out'), model.coefs_[0])
np.savetxt(os.path.join(self.target_addr, 'RegNN_Matrix', 'weight_output_MLPReg.out'), model.coefs_[1])
np.savetxt(os.path.join(self.target_addr, 'RegNN_Matrix', 'bias_input_MLPReg.out'), model.intercepts_[0])
np.savetxt(os.path.join(self.target_addr, 'RegNN_Matrix', 'bias_output_MLPReg.out'), model.intercepts_[1])
plt.show()
print("完成回归网络层的训练!")
def emg_process(target_addr): # 处理最近一次写入的肌电数据集,主要是转为PCA后的数据,给出一个量化值,还原度多少
addr_split = os.path.split(target_addr)
with open(target_addr, 'r') as f:
x = f.read()
data = x.split(',')
data.pop(-1)
data = [float(numeric_string) for numeric_string in data]
X = np.array(data).reshape(-1, 8)
# 对原始数据进行标准化,去中心化,除以方差 等等
scaler = StandardScaler() # 数据标准化
scaler.fit(X)
X = scaler.transform(X)
if os.path.isdir(os.path.join(addr_split[0], 'Scaler_Factors')) is False: # 建立归一化参数路径
os.mkdir(r'%s' % os.path.join(addr_split[0], 'Scaler_Factors')) # 目录需要一级一级建立,否则会找不到底层目录
np.savetxt(os.path.join(addr_split[0], 'Scaler_Factors', 'raw_layer_SS_mean.out'), scaler.mean_)
np.savetxt(os.path.join(addr_split[0], 'Scaler_Factors', 'raw_layer_SS_scale.out'), scaler.scale_)
input_units = 20 # 滑动窗口长度为200ms
X_with_window = np.zeros((X.shape[0] - input_units, input_units)) # 这是一个通道的样本
X_with_window_num = X_with_window.shape[0]
# 建立一级slidWin_Data的目录
if os.path.isdir(os.path.join(addr_split[0], 'slidWin_Data')) is False: # 如果路径不存在,直接新建路径
os.mkdir(r'%s' % os.path.join(addr_split[0], 'slidWin_Data')) # 目录需要一级一级建立,否则会找不到底层目录
for i in range(8): # 肌电通道数
X_with_window = np.zeros((X.shape[0] - input_units, input_units)) # 初始化滑窗矩阵,序列长度x滑窗长度
for j in range(0, X_with_window_num, 1):
X_with_window[j, :] = X[range(j, j + input_units), i]
# 如果文件路径不存在,直接写入文件(表明是第一次写入)
if os.path.isfile(os.path.join(addr_split[0], 'slidWin_Data', 'slidWin_Ch%d.out' % i)) is False:
np.savetxt(os.path.join(addr_split[0], 'slidWin_Data', 'slidWin_Ch%d.out' % i), X_with_window)
else: # 否则读取历史数据,并和本次数据合并,再写入。即完成追加过程。
temp = np.vstack((np.loadtxt(os.path.join(addr_split[0], 'slidWin_Data', 'slidWin_Ch%d.out' % i)),
X_with_window))
np.savetxt(os.path.join(addr_split[0], 'slidWin_Data', 'slidWin_Ch%d.out' % i), temp)
# 写入一个数据报告,初始版本仅统计每次采集流程样本集的容量
with open(os.path.join(addr_split[0], 'slidWin_Data', 'report.out'), 'a+') as f:
f.write('%d\n' % X_with_window.shape[0])
print('滑动数据生成完成!')
'''
进行主成分分析 并将U_T保存至out,矩阵运算式为:H = X * U_T,最后打印了原始波形和复原波形
'''
# 建立一级PCA_Martix的目录
if os.path.isdir(os.path.join(addr_split[0], 'PCA_Martix')) is False: # 若路径不存在 则新建路径
os.mkdir(r'%s' % os.path.join(addr_split[0], 'PCA_Martix')) # 目录需要一级一级建立,否则会找不到底层目录
# plt.figure()
if os.path.isdir(os.path.join(addr_split[0], 'PCA_Data')) is False: # 路径不存在则新建
os.mkdir(os.path.join(addr_split[0], 'PCA_Data'))
for i in range(8):
X = np.loadtxt(os.path.join(addr_split[0], 'slidWin_Data', 'slidWin_Ch%d.out' % i))
pca = PCA(n_components=3)
H = pca.fit_transform(X) # Reconstruct signals based on orthogonal components
Ureduce = pca.components_
U_T = np.transpose(Ureduce)
X_ = np.dot(H, np.linalg.pinv(U_T)) # 这是复原后的X
pca_r2 = r2_score(X[:, 0], X_[:, 0])
print("The r2 value of Ch%d is %f" % (i, pca_r2))
np.savetxt(os.path.join(addr_split[0], 'PCA_Martix', 'PCAM_Ch%d.out' % i), U_T)
np.savetxt(os.path.join(addr_split[0], 'PCA_Data', 'X_%d.out' % i), X_)
# plt.subplot(8, 1, i+1)
# plt.plot(X[:, 0])
# plt.plot(X_[:, 0], '--')
# plt.show()
# plt.show()
if __name__ == '__main__':
hg_aen = HG_AEN()
# try:
# while True: # time out就是超时的意思!
# coll_and_proc.myo.run() # 这个数据可能要调试。同样是一个方法,理解成一个函数。0会有延迟,2正常,不知道怎么调,不写也ok
#
# except KeyboardInterrupt: # 用户中断执行,如果进入了except,有finally肯定会进入.
# coll_and_proc.myo.disconnect()
# print("\n提示 !数据采集结束,最后进行数据预处理")
# finally:
# coll_and_proc.emg_process() # 直接得到经过PCA后的数据
# print("提示!数据预处理完成")