-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathnpz_to_images.py
123 lines (93 loc) · 3.81 KB
/
npz_to_images.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import numpy as np
import pandas as pd
from math import sqrt
import matplotlib.pyplot as plt
import os
import argparse
def to_image(event, mask, map):
""" Recibe Dataframe de evento (id, charge, time) y retorna una matriz de la imagen"""
# Se crea una matriz del tamaño del tanque desenrollado (mismo tamaño que máscara)
# pero con dos canales: (corriente, tiempo de activación)
event_image = np.zeros(mask.shape + (2,))
# Se itera sobre cada pmt activado en el evento
for index, pmt in event.iterrows():
id = pmt['id']
charge = pmt['charge']
time = pmt['time']
# Si el pmt activado no está en la tapa, se actualizan los valores
if id in map:
(i_mask, j_mask) = map[id]
event_image[i_mask][j_mask] = np.array([charge, time])
return event_image
def npz_to_images(npz, mask, filename, debug):
data = npz
if data['pid'][0] == 11:
print('Processing e-')
elif data['pid'][0] == 13:
print('Processing mu-')
elif data['pid'][0] == 22:
print('Processing gamma')
# Primero se preprocesa la máscara para hacerla de acceso aleatorio con llave valor
# Llave (PMT_id) -> Valor ((x, y) en máscara)
map = { }
for i in range(len(mask)):
for j in range(len(mask[i])):
pmt_id = mask[i][j]
map[pmt_id] = (i, j)
images = []
for i in data['event_id']:
event = {
'id': data['digi_hit_pmt'][i],
'charge': data['digi_hit_charge'][i],
'time': data['digi_hit_time'][i],
}
event = pd.DataFrame(event)
images.append(to_image(event, mask, map))
images = np.array(images)
if debug:
plt.imshow(images[0, :, :, 0])
plt.show()
output = os.path.basename(filename).split('.')[0]
output = 'IMAGES_' + output
print(f'{len(images)} processed in {filename} stored in {output}.npy')
np.save(output, images)
return images
def is_valid_file(parser, arg, extension):
if not arg.endswith(extension):
parser.error(f"The file provided is not {extension}")
if not os.path.exists(arg):
parser.error("The file %s does not exist!" % arg)
return arg
def dir_path(string):
if os.path.isdir(string):
return string
else:
raise NotADirectoryError(string)
def main():
parser = argparse.ArgumentParser(description='Convert .npz event or events to images of the barrel using the same .npy mask')
parser.add_argument('-m', '--mask', type=lambda x: is_valid_file(parser, x, 'npy'), required=True, help='Path of .npy mask file', metavar="FILE")
parser.add_argument('-f', '--file', type=lambda x: is_valid_file(parser, x, 'npz'), help='Path to single .npz file')
parser.add_argument('-d', '--dir', type=dir_path, help='Path to directory with multiple .npz files')
parser.add_argument('-D', '--debug', type=str, help='set True for plotting an image after processing a simulation')
args = parser.parse_args()
debug = args.debug == 'True'
# Se carga la máscara generada con el otro script, que sirve para identificar la posición de los PMTs en la imagen.
mask = np.load(args.mask)
if(args.file is not None):
print(f'PROCESSING {args.file}')
# Unico .npz
data = np.load(args.file, allow_pickle=True)
npz_to_images(data, mask, args.file, debug)
elif(args.dir is not None):
# Múltiples .npz en un directorio
print(f'PROCESSING ALL FILES INSIDE {args.dir}')
for filename in os.listdir(args.dir):
if filename.endswith(".npz"):
path = os.path.join(args.dir, filename)
data = np.load(path, allow_pickle=True)
npz_to_images(data, mask, path, debug)
print("")
else:
parser.error('Neither single file or directory provided.')
if __name__ == '__main__':
main()