Skip to content
Snippets Groups Projects
Commit 09db9506 authored by Philipp Niedermayer's avatar Philipp Niedermayer
Browse files

Add data wrapper for hit spill data

parent 9bef8984
No related branches found
No related tags found
No related merge requests found
......@@ -16,6 +16,7 @@ from .lassie import LassieSpillData
from .libera import LiberaData, LiberaBBBData, LiberaTBTData
from .nwa import NWAData, NWADataAverage
from .tdc import TDCSpill, TDCData
from .hitspill import HITSpill, HITSpillData
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" Dataclasses for TDC data
Classes to work with spill data from ScopeViewControl application at HIT Heidelberg
"""
__author__ = "Philipp Niedermayer"
__contact__ = "p.niedermayer@gsi.de"
__date__ = "2022-08-25"
import numpy as np
from dataclasses import dataclass, field
from .common import Trace
@dataclass
class HITSpill:
"""Container for spill data
t - time array in seconds
c - array of counts per time bin
header - dict with metadata
"""
t: np.ndarray
c: np.ndarray
header: dict = field(default_factory=dict)
@property
def detector(self):
"""Detector name"""
return self.header.get('Name')
@property
def length(self):
"""Spill duration in sec"""
return self.t[-1] - self.t[0]
@property
def counts(self):
"""Total particle count"""
return np.sum(self.c)
@property
def dt(self):
"""Time bin of detector in s"""
return 1e-3*self.header['SampleTime[ms]']
def hitcounts(self, dt=None, dt2=None):
"""Particle counts per time bin
Re-samples the timestamps into an equally spaced time series with counts per unit time
:param dt: width in s of time bin for counting, defaults to detector time bin.
:param dt2: width in s of macro time bin. If not None, this function returns a 2D array with time resolution dt2 along the first and dt along the second axis
:returns: (count_array, bin_edges) in units of (1, second)
"""
# re-sample timestamps
if dt is not None and dt != self.dt:
if dt < self.dt: raise ValueError(f'Parameter dt must be >= detector resolution of {self.dt} s')
raise NotImplementedError(f'Currently only dt={self.dt} (detector resolution) is supported')
counts, edges = self.c, np.hstack([self.t, [self.t[-1]+self.dt]])
if dt2 is not None:
if dt2 <= dt: raise ValueError(f'Macro time bin dt2 must be greater than micro time bin dt of {dt} s')
# make 2D array of shape [length/dt2, dt2/dt]
ebins = int(dt2/dt)
counts = counts[:int(len(counts)/ebins)*ebins].reshape((-1, ebins))
edges = edges[:int(len(edges)/ebins+1)*ebins:ebins]
return counts, edges
def cropped(self, *, set_time_zero=False):
"""Return a slice of this spill with leading and trailing zero counts removed"""
i_start = (self.c > 0).argmax() # first index where condition is true
i_stop = self.c.size - 1 - (self.c > 0)[::-1].argmax() # last index where condition is true
t, c = self.t[i_start:i_stop+1], self.c[i_start:i_stop+1]
if set_time_zero: t -= t[0]
return HITSpill(t, c, self.header)
def segment(self, t_start, t_stop, *, set_time_zero=False):
"""Return a slice of this spill in given time interval
:param t_start: Start time in s of segment (including)
:param t_stop: Start time in s of segment (excluding)
:param set_time_zero: If true, set time 0 at start of new segment.
"""
#ts_start, ts_stop = [self.ts[0] + (t - self.time_offset) / self.header['tdcTickSecs'].item() for t in (t_start, t_stop)]
## we do not consider channel 8 which carries time relative to RF instead of absolute timestamp
i_start = (self.t >= t_start).argmax() # first index where condition is true
i_stop = self.t.size - 1 - (self.t < t_stop)[::-1].argmax() # last index where condition is true
t, c = self.t[i_start:i_stop+1], self.c[i_start:i_stop+1]
if set_time_zero: t -= t[0]
return HITSpill(t, c, self.header)
def __repr__(self):
return f'HITSpill({self.detector}, length={self.length:g}s, counts={self.counts})'
@dataclass
class HITSpillData:
spills: list
@classmethod
def from_file(cls, fname, *, time_offset=0, windows='auto', window_crop=None, window_set_time_zero=None, verbose=0):
"""Read Spills data from a *.csv file saved with ScopeViewControl
:param fname: path to filename
:param time_offset: adds a given time offset (in s) to the time axis
:param windows: Time windows in s to cut data into spills in the form ((t_start, t_stop), ...)
If 'auto', then this looks for spills of at least 10 ms length and at least 1 s pause in between
:param window_crop: If true, remove leading and trailing zero counts from spills. The default is true only if windows=='auto'
:param window_set_time_zero: If true, set time to zero on start of every spill. If false, use absolute time of csv file. The default is true only if windows=='auto'
"""
# header
header = {}
with open(r'D:\pniederm\GSI Measurements\2022-08-07_HIT-KO\Data\E108_RF0.0_ExcA.csv') as f:
for i in range(21):
l = f.readline().strip().split('\t')
if len(l) >= 2:
header[l[0]] = l[1]
# data
# TODO: how to speed this up?
t, c = np.genfromtxt(fname, delimiter='\t', encoding='ascii',
skip_header=21, skip_footer=2, usecols=(0,1),
converters={i: lambda x: float(x.replace(',','.')) for i in range(2)}
).T
data = HITSpill(t/1e3, c, header)
# ensure important metadata
if 'SampleTime[ms]' in header:
header['SampleTime[ms]'] = float(header['SampleTime[ms]'].replace(',','.'))
else:
header['SampleTime[ms]'] = 1e3 * (t[1]-t[0])
# find spills in data stream
if windows == 'auto':
if window_set_time_zero is None: window_set_time_zero = True
if window_crop is None: window_crop = True
iszero = np.concatenate(([0], np.equal(data.c, 0).view(np.int8), [0]))
startstop = np.where(np.abs(np.diff(iszero)) == 1)[0].reshape(-1,2) # [(start, stop), ...] ranges of consecutive zeros
# filter out by spill pause, but keep first and last in any case
min_pause = 1 / data.dt
startstop = startstop[np.where(np.logical_or(np.logical_or(np.diff(startstop)[:,0] > min_pause,
startstop[:,0] == 0), startstop[:,1] == len(data.c)))]
# filter out by spill length
min_length = 0.01 / data.dt
stopstart = startstop.reshape(-1)[1:-1].reshape(-1,2)
stopstart = stopstart[np.where(np.diff(stopstart)[:,0] > min_length)]
startstop = np.hstack([startstop[0,0], stopstart.reshape(-1), startstop[-1,1]]).reshape(-1,2)
if window_crop:
windows = startstop.reshape(-1)[1:-1].reshape(-1, 2)
else:
windows = np.mean(startstop, axis=1, dtype=int)
windows = np.hstack([0, windows[1:-1].repeat(2), len(data.c)-1]).reshape(-1, 2)
windows = data.t[windows] # indices to times
# cut data into spills
spills = []
for w in windows:
s = data.segment(*w, set_time_zero=window_set_time_zero)
if window_crop:
s = s.cropped(set_time_zero=window_set_time_zero)
spills.append(HITSpill(s.t+time_offset, s.c, s.header))
return HITSpillData(spills)
def __repr__(self):
return 'HITSpillData(spills='+'\n '.join([repr(s) for s in self.spills])+')'
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment