Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" Dataclasses for TDC data
Classes to work with spill data from ScopeViewControl application at HIT Heidelberg
"""
__author__ = "Philipp Niedermayer"
__contact__ = "p.niedermayer@gsi.de"
__date__ = "2022-08-25"
import numpy as np
from dataclasses import dataclass, field
from .common import Trace
@dataclass
class HITSpill:
"""Container for spill data
t - time array in seconds
c - array of counts per time bin
header - dict with metadata
"""
t: np.ndarray
c: np.ndarray
header: dict = field(default_factory=dict)
@property
def detector(self):
"""Detector name"""
return self.header.get('Name')
@property
def length(self):
"""Spill duration in sec"""
return self.t[-1] - self.t[0]
@property
def counts(self):
"""Total particle count"""
return np.sum(self.c)
@property
def dt(self):
"""Time bin of detector in s"""
return 1e-3*self.header['SampleTime[ms]']
def hitcounts(self, dt=None, dt2=None):
"""Particle counts per time bin
Re-samples the timestamps into an equally spaced time series with counts per unit time
:param dt: width in s of time bin for counting, defaults to detector time bin.
:param dt2: width in s of macro time bin. If not None, this function returns a 2D array with time resolution dt2 along the first and dt along the second axis
:returns: (count_array, bin_edges) in units of (1, second)
"""
# re-sample timestamps
if dt is not None and dt != self.dt:
if dt < self.dt: raise ValueError(f'Parameter dt must be >= detector resolution of {self.dt} s')
raise NotImplementedError(f'Currently only dt={self.dt} (detector resolution) is supported')
counts, edges = self.c, np.hstack([self.t, [self.t[-1]+self.dt]])
if dt2 is not None:
if dt2 <= dt: raise ValueError(f'Macro time bin dt2 must be greater than micro time bin dt of {dt} s')
# make 2D array of shape [length/dt2, dt2/dt]
ebins = int(dt2/dt)
counts = counts[:int(len(counts)/ebins)*ebins].reshape((-1, ebins))
edges = edges[:int(len(edges)/ebins+1)*ebins:ebins]
return counts, edges
def cropped(self, *, set_time_zero=False):
"""Return a slice of this spill with leading and trailing zero counts removed"""
i_start = (self.c > 0).argmax() # first index where condition is true
i_stop = self.c.size - 1 - (self.c > 0)[::-1].argmax() # last index where condition is true
t, c = self.t[i_start:i_stop+1], self.c[i_start:i_stop+1]
if set_time_zero: t -= t[0]
return HITSpill(t, c, self.header)
def segment(self, t_start, t_stop, *, set_time_zero=False):
"""Return a slice of this spill in given time interval
:param t_start: Start time in s of segment (including)
:param t_stop: Start time in s of segment (excluding)
:param set_time_zero: If true, set time 0 at start of new segment.
"""
#ts_start, ts_stop = [self.ts[0] + (t - self.time_offset) / self.header['tdcTickSecs'].item() for t in (t_start, t_stop)]
## we do not consider channel 8 which carries time relative to RF instead of absolute timestamp
i_start = (self.t >= t_start).argmax() # first index where condition is true
i_stop = self.t.size - 1 - (self.t < t_stop)[::-1].argmax() # last index where condition is true
t, c = self.t[i_start:i_stop+1], self.c[i_start:i_stop+1]
if set_time_zero: t -= t[0]
return HITSpill(t, c, self.header)
def __repr__(self):
return f'HITSpill({self.detector}, length={self.length:g}s, counts={self.counts})'
@dataclass
class HITSpillData:
spills: list
@classmethod
def from_file(cls, fname, *, time_offset=0, windows='auto', window_crop=None, window_set_time_zero=None, verbose=0):
"""Read Spills data from a *.csv file saved with ScopeViewControl
:param fname: path to filename
:param time_offset: adds a given time offset (in s) to the time axis
:param windows: Time windows in s to cut data into spills in the form ((t_start, t_stop), ...)
If 'auto', then this looks for spills of at least 10 ms length and at least 1 s pause in between
:param window_crop: If true, remove leading and trailing zero counts from spills. The default is true only if windows=='auto'
:param window_set_time_zero: If true, set time to zero on start of every spill. If false, use absolute time of csv file. The default is true only if windows=='auto'
"""
# header
header = {}
with open(r'D:\pniederm\GSI Measurements\2022-08-07_HIT-KO\Data\E108_RF0.0_ExcA.csv') as f:
for i in range(21):
l = f.readline().strip().split('\t')
if len(l) >= 2:
header[l[0]] = l[1]
# data
# TODO: how to speed this up?
t, c = np.genfromtxt(fname, delimiter='\t', encoding='ascii',
skip_header=21, skip_footer=2, usecols=(0,1),
converters={i: lambda x: float(x.replace(',','.')) for i in range(2)}
).T
data = HITSpill(t/1e3, c, header)
# ensure important metadata
if 'SampleTime[ms]' in header:
header['SampleTime[ms]'] = float(header['SampleTime[ms]'].replace(',','.'))
else:
header['SampleTime[ms]'] = 1e3 * (t[1]-t[0])
# find spills in data stream
if windows == 'auto':
if window_set_time_zero is None: window_set_time_zero = True
if window_crop is None: window_crop = True
iszero = np.concatenate(([0], np.equal(data.c, 0).view(np.int8), [0]))
startstop = np.where(np.abs(np.diff(iszero)) == 1)[0].reshape(-1,2) # [(start, stop), ...] ranges of consecutive zeros
# filter out by spill pause, but keep first and last in any case
min_pause = 1 / data.dt
startstop = startstop[np.where(np.logical_or(np.logical_or(np.diff(startstop)[:,0] > min_pause,
startstop[:,0] == 0), startstop[:,1] == len(data.c)))]
# filter out by spill length
min_length = 0.01 / data.dt
stopstart = startstop.reshape(-1)[1:-1].reshape(-1,2)
stopstart = stopstart[np.where(np.diff(stopstart)[:,0] > min_length)]
startstop = np.hstack([startstop[0,0], stopstart.reshape(-1), startstop[-1,1]]).reshape(-1,2)
if window_crop:
windows = startstop.reshape(-1)[1:-1].reshape(-1, 2)
else:
windows = np.mean(startstop, axis=1, dtype=int)
windows = np.hstack([0, windows[1:-1].repeat(2), len(data.c)-1]).reshape(-1, 2)
windows = data.t[windows] # indices to times
# cut data into spills
spills = []
for w in windows:
s = data.segment(*w, set_time_zero=window_set_time_zero)
if window_crop:
s = s.cropped(set_time_zero=window_set_time_zero)
spills.append(HITSpill(s.t+time_offset, s.c, s.header))
return HITSpillData(spills)
def __repr__(self):
return 'HITSpillData(spills='+'\n '.join([repr(s) for s in self.spills])+')'