-
Notifications
You must be signed in to change notification settings - Fork 0
/
ChannelCheck.py
159 lines (133 loc) · 4.52 KB
/
ChannelCheck.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#!/usr/bin/env python3
import sys
import re
import math
import argparse
import utils
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
def print_ok(msg):
print(f"[{bcolors.OKGREEN} PASS {bcolors.ENDC}] {msg}")
def print_fail(msg):
print(f"[{bcolors.FAIL} FAIL {bcolors.ENDC}] {msg}")
class wire:
def __init__(self, tid, pid, wid, x0, y0, x1, y1):
self.tpc_ = tid
self.plane_ = pid
self.wire_ = wid
self.start_ = [x0, y0]
self.end_ = [x1, y1]
self.len_ = self.__length()
def __length(self):
x0 = self.start_[0]
y0 = self.start_[1]
x1 = self.end_[0]
y1 = self.end_[1]
l = math.sqrt((x1 - x0)**2 + (y1 - y0)**2)
return l
#
#
def get_channels_from_dump( fname ):
chans = {}
wire_data = utils.read_dump_file( fname )
for r in wire_data:
ch = r['ch']
tid = r['tpc']
pid = r['plane']
wid = r['wire']
x0 = r['start'][1]
y0 = r['start'][2]
x1 = r['stop'][1]
y1 = r['stop'][2]
w = wire( tid, pid, wid, x0, y0, x1, y1 )
if( not ch in chans ): chans[ch] = []
chans[ch].append( w )
return chans
#
#
def check_totch_count( chan_record, nch_exp ):
chk_name = 'check_totch_count'
nch = len(chan_record)
passed = True
if( nch != nch_exp ): passed = False
msg = f"[{chk_name}] Total number of channels expected / found: {nch_exp} / {nch}"
if( not passed ):
print_fail( msg )
return False
print_ok( msg )
return True
#
#
def check_viewch_count( chan_record, crpch, ncrp ):
chk_name = 'check_viewch_count'
# first check that all channels follow each other
chans = sorted(chan_record)
if( len(list(filter(lambda i: i!=-1, [chans[i]-chans[i+1] for i in range(len(chans)-1) ]))) ):
print_fail( f"[{chk_name}] Channel increment is incorrect" )
return False
# check
crp_chans = []
for ch in chans:
if( ch % crpch == 0 ): crp_chans.append({0:0, 1:0, 2:0})
pids = [ w.plane_ for w in chan_record[ch] ]
pid = sum(pids)/len(pids)
if( not pid.is_integer() ):
msg = f"Multiple wire planes found {pids}"
print_fail( f"[{chk_name}] {msg}" )
return False
pid = int(pid)
if( pid not in crp_chans[-1]):
msg = f"Plane ID {pid} does not appear to be valid"
print_fail( f"[{chk_name}] {msg}" )
return False
crp_chans[-1][pid] += 1
if( ncrp != len(crp_chans)):
msg = f"Mistmatch in number of CRPs found"
print_fail( f"[{chk_name}] {msg}" )
return False
# check that all views have the same number of channels
views = sorted(crp_chans[0])
viewch = len(views)*[0]
for view in views:
lst = ncrp * [0]
for i in range(ncrp): lst[i] = crp_chans[i][view]
if( not all( x == lst[0] for x in lst )):
msg = f"Mistmatch in channel numbers in view {view}: {lst}"
print_fail( f"[{chk_name}] {msg}" )
return False
viewch[ view ] = lst[0]
#print(viewch)
stot = ( sum (viewch) == crpch )
if( not stot ):
msg = f"Mistmatch in view channel number sum: {viewch}"
print_fail( f"[{chk_name}] {msg}" )
return False
msg = f"Number of channels per each CRP view: {viewch}"
print_ok(f"[{chk_name}] {msg}" )
return True
if __name__=='__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-f', '--file', help='wire dump file',
required = True, type=str )
parser.add_argument('-c', '--channels', help='number of channels per CRP',
default = 3072, type=int)
parser.add_argument('-n', '--ncrp', help='number of CRPs',
default = 1, type=int)
args = parser.parse_args()
ncrp = args.ncrp
crpch = args.channels
totch = ncrp * crpch
fname = args.file
print(f"Analyzing the file {fname}");
print(f"Expected number of CRPs : {ncrp}")
print(f"Expected number of channel per CRP : {crpch}")
print(f"Expected number of total channels : {totch}")
#print_ok(f"{fname} {totch}")
chan_record = get_channels_from_dump( fname )
if( not check_totch_count( chan_record, totch ) ): sys.exit(1)
elif( not check_viewch_count( chan_record, crpch, ncrp )): sys.exit(1)