Skip to content
Snippets Groups Projects
hitspill.py 7.52 KiB
Newer Older
#!/usr/bin/env python
# -*- coding: utf-8 -*-

""" Dataclasses for TDC data

Classes to work with spill data from ScopeViewControl application at HIT Heidelberg

"""

__author__ = "Philipp Niedermayer"
__contact__ = "p.niedermayer@gsi.de"
__date__ = "2022-08-25"




import numpy as np
from dataclasses import dataclass, field
from .common import Trace



@dataclass
class HITSpill:
    """Container for spill data
    
    t - time array in seconds
    c - array of counts per time bin
    header - dict with metadata
    """
    t: np.ndarray
    c: np.ndarray
    header: dict = field(default_factory=dict)
    
    @property
    def detector(self):
        """Detector name"""
        return self.header.get('Name')
    
    @property
    def length(self):
        """Spill duration in sec"""
        return self.t[-1] - self.t[0]
    
    @property
    def counts(self):
        """Total particle count"""
        return np.sum(self.c)
    
    @property
    def dt(self):
        """Time bin of detector in s"""
        return 1e-3*self.header['SampleTime[ms]']
    
    def hitcounts(self, dt=None, dt2=None):
        """Particle counts per time bin
        
        Re-samples the timestamps into an equally spaced time series with counts per unit time
        :param dt: width in s of time bin for counting, defaults to detector time bin.
        :param dt2: width in s of macro time bin. If not None, this function returns a 2D array with time resolution dt2 along the first and dt along the second axis
        :returns: (count_array, bin_edges) in units of (1, second)
        """
        
        # re-sample timestamps
        if dt is not None and dt != self.dt:            
            if dt < self.dt: raise ValueError(f'Parameter dt must be >= detector resolution of {self.dt} s')
            raise NotImplementedError(f'Currently only dt={self.dt} (detector resolution) is supported')
            
        counts, edges = self.c, np.hstack([self.t, [self.t[-1]+self.dt]])
        
        if dt2 is not None:
            if dt2 <= dt: raise ValueError(f'Macro time bin dt2 must be greater than micro time bin dt of {dt} s')
            # make 2D array of shape [length/dt2, dt2/dt]
            ebins = int(dt2/dt)
            counts = counts[:int(len(counts)/ebins)*ebins].reshape((-1, ebins))
            edges = edges[:int(len(edges)/ebins+1)*ebins:ebins]
    
        return counts, edges
    
    def cropped(self, *, set_time_zero=False):
        """Return a slice of this spill with leading and trailing zero counts removed"""
        i_start = (self.c > 0).argmax() # first index where condition is true
        i_stop = self.c.size - 1 - (self.c > 0)[::-1].argmax() # last index where condition is true
        t, c = self.t[i_start:i_stop+1], self.c[i_start:i_stop+1]
        if set_time_zero: t -= t[0]
        return HITSpill(t, c, self.header)
    
    def segment(self, t_start, t_stop, *, set_time_zero=False):
        """Return a slice of this spill in given time interval
        
        :param t_start: Start time in s of segment (including)
        :param t_stop: Start time in s of segment (excluding)
        :param set_time_zero: If true, set time 0 at start of new segment.
        """
        #ts_start, ts_stop = [self.ts[0] + (t - self.time_offset) / self.header['tdcTickSecs'].item() for t in (t_start, t_stop)]
        ## we do not consider channel 8 which carries time relative to RF instead of absolute timestamp
        i_start = (self.t >= t_start).argmax() # first index where condition is true
        i_stop = self.t.size - 1 - (self.t < t_stop)[::-1].argmax() # last index where condition is true
        t, c = self.t[i_start:i_stop+1], self.c[i_start:i_stop+1]
        if set_time_zero: t -= t[0]
        return HITSpill(t, c, self.header)
    
    
    def __repr__(self):
        return f'HITSpill({self.detector}, length={self.length:g}s, counts={self.counts})'


@dataclass
class HITSpillData:    
    spills: list
    
    @classmethod
    def from_file(cls, fname, *, time_offset=0, windows='auto', window_crop=None, window_set_time_zero=None, verbose=0):
        """Read Spills data from a *.csv file saved with ScopeViewControl
           
        :param fname: path to filename
        :param time_offset: adds a given time offset (in s) to the time axis
        :param windows: Time windows in s to cut data into spills in the form ((t_start, t_stop), ...)
                        If 'auto', then this looks for spills of at least 10 ms length and at least 1 s pause in between
        :param window_crop: If true, remove leading and trailing zero counts from spills. The default is true only if windows=='auto'
        :param window_set_time_zero: If true, set time to zero on start of every spill. If false, use absolute time of csv file. The default is true only if windows=='auto'
        """
        # header
        header = {}
        with open(r'D:\pniederm\GSI Measurements\2022-08-07_HIT-KO\Data\E108_RF0.0_ExcA.csv') as f:
            for i in range(21):
                l = f.readline().strip().split('\t')
                if len(l) >= 2:
                    header[l[0]] = l[1]
        
        # data
        # TODO: how to speed this up?
        t, c = np.genfromtxt(fname, delimiter='\t', encoding='ascii',
                             skip_header=21, skip_footer=2, usecols=(0,1),
                             converters={i: lambda x: float(x.replace(',','.')) for i in range(2)}
                            ).T
        data = HITSpill(t/1e3, c, header)
        
        # ensure important metadata
        if 'SampleTime[ms]' in header:
            header['SampleTime[ms]'] = float(header['SampleTime[ms]'].replace(',','.'))
        else:
            header['SampleTime[ms]'] = 1e3 * (t[1]-t[0])
        
        # find spills in data stream
        if windows == 'auto':
            if window_set_time_zero is None: window_set_time_zero = True
            if window_crop is None: window_crop = True
            iszero = np.concatenate(([0], np.equal(data.c, 0).view(np.int8), [0]))
            startstop = np.where(np.abs(np.diff(iszero)) == 1)[0].reshape(-1,2) # [(start, stop), ...] ranges of consecutive zeros
            # filter out by spill pause, but keep first and last in any case
            min_pause = 1 / data.dt
            startstop = startstop[np.where(np.logical_or(np.logical_or(np.diff(startstop)[:,0] > min_pause, 
                                           startstop[:,0] == 0), startstop[:,1] == len(data.c)))]
            # filter out by spill length
            min_length = 0.01 / data.dt
            stopstart = startstop.reshape(-1)[1:-1].reshape(-1,2)
            stopstart = stopstart[np.where(np.diff(stopstart)[:,0] > min_length)]
            startstop = np.hstack([startstop[0,0], stopstart.reshape(-1), startstop[-1,1]]).reshape(-1,2)
            
            if window_crop:
                windows = startstop.reshape(-1)[1:-1].reshape(-1, 2)
            else:
                windows = np.mean(startstop, axis=1, dtype=int)
                windows = np.hstack([0, windows[1:-1].repeat(2), len(data.c)-1]).reshape(-1, 2)
            windows = data.t[windows] # indices to times
        
        # cut data into spills
        spills = []
        for w in windows:
            s = data.segment(*w, set_time_zero=window_set_time_zero)
            if window_crop:
                s = s.cropped(set_time_zero=window_set_time_zero)
            spills.append(HITSpill(s.t+time_offset, s.c, s.header))
        
        return HITSpillData(spills)
    
    def __repr__(self):
        return 'HITSpillData(spills='+'\n                    '.join([repr(s) for s in self.spills])+')'