Skip to content
Snippets Groups Projects
utils.py 6.32 KiB
Newer Older
import numpy as np
import scipy.signal
from dataclasses import dataclass
import tarfile
import re


def avg(*data, n=100):
    return [d[:int(len(d)/n)*n].reshape(-1, n).mean(axis=1) for d in data]

@dataclass
class Trace:
    pass
    

# timing
#########

CMD_SEQ_START = 257
CMD_BEAM_INJECTION = 283


# lassie spill data
####################

from .bdiolib.bdio import bdio


@dataclass
class LassieSpillData(Trace):
    """Container for time series data from lassie spill application
    
    t - time array in seconds
    v - value array in unit
    unit - unit of value array
    """
    t: np.ndarray
    v: np.ndarray
    unit: str = 'a.u.'
    
    @classmethod
    def from_file(cls, fname, calibrated=True, from_event=None, verbose=1):
        """Read in time series data from a *.tdf file saved with lassiespill application
           
        :param fname: path to filename
        :param calibrated: use calibrated or raw data
        :param from_event: return data from this event, time will also be relative to this event                    
        
        References: https://git.acc.gsi.de/bi/bdiolib.python
        """
        
        b = bdio.BDIOReader(fname)

        appname = ''
        device = ''
        fs = None
        t0 = 0
        t1 = None
        unit = 'a.u.'
        
        blocks = b.get_directory()
        for bi in blocks:
            b.seek(bi.get_pos())
            block = b.next_block()
            #print(bi)
            #print(block)
            #if hasattr(block, 'get_rows'):
            #    for row in block.get_rows():
            #        print(row)
            ###############
            if bi.is_header_block():            
                appname = block.get_app_name()
                device = block.get_device()
                
            elif bi.get_title() == 'TraceInfo':
                for row in block.get_rows():
                    if verbose: print(row)
                    if row.get_tag() == 'sampling frequency':
                        assert row.get_unit().lower() == 'hz', 'sampling frequency has unexpected unit {}'.format(row.get_unit())
                        fs = row.get_value()
                        
            elif bi.get_title() == 'Integrals':
                for row in block.get_rows():
                    if verbose: print(row)
                    if row.get_tag() == ('Calibrated' if calibrated else 'Raw') + ': beam in integral':
                        unit = row.get_unit()
                        
            elif bi.get_title() == ('Calibrated' if calibrated else 'Raw'):
                values = np.array(block.get_array())
                
            elif bi.is_event_table_block():
                t0 = block.get_rows()[0].get_time() # TODO: save to asume the first event is the start of data?
                for row in block.get_rows():
                    if row.get_event() == from_event:
                        t1 = row.get_time()
                

        
        
        times = np.linspace(0, len(values)/fs, len(values))
        
        if from_event:
            assert t1 is not None, 'Requested data from event {}, but event not found in data file'.format(from_event)
            from_time = (t1 - t0)*1e-9    
            idx = np.argmin(np.abs(times - from_time))
            values = values[idx:]
            times = times[idx:] - times[idx]
        
        return LassieSpillData(times, values, unit=unit)



# IPM data
###########

@dataclass
class IPMData(Trace):
    """Container for profile data from IPM
    
    t - time array in seconds
    p - profile position array
    p_unit - unit of displacement array values
    x - horizontal profile amplitudes
    y - vertical profile amplitudes
    unit - unit of profile amplitudes
    """
    t: np.ndarray
    p: np.ndarray
    x: np.ndarray
    y: np.ndarray
    unit: str = 'a.u.'
    p_unit: str = 'mm'
    
    @classmethod
    def from_file(cls, fname, clean=False, verbose=1):
        """Read in profile data from a *.tgz file saved with IPM application
           
        :param fname: path to filename
        :param clean: use adc_clean.dat instead of adc.dat
        :param from_event: return data from this event, time will also be relative to this event
        
        References: Giacomini, Tino <T.Giacomini@gsi.de>
                One beam profile consist of 64 values.
                The distance between two wires is 0.6 mm. The wires diameter is 1.5 mm. So the center to center distance of the wires is 2.1 mm.
        
        """
        tar = tarfile.open(fname, "r:gz")
        
        # profile data
        f = tar.extractfile('adc_clean.dat' if clean else 'adc.dat')
        dat = np.loadtxt(f, skiprows=1)
        p = 2.1 * (np.arange(64)-31.5)
        x = dat[:,1].reshape((-1, 64))
        y = dat[:,2].reshape((-1, 64))
        
        # settings
        dt = 0
        f = tar.extractfile('param.dat')
        for line in f:
            if verbose: print(line.decode('ascii').strip())
            m = re.match(r'intt\s*=\s*(\d+)', 'intt =  0')
            if m:
                interval = int(m.group(1))
                if interval == 0: dt = 5e-3 # 5 ms
                #if interval == 1: dt = 5e-4 # 0.5 ms
        assert dt, 'Could not determine time interval of profiles'
        t = dt * np.arange(x.shape[0])         
        
        return IPMData(t, p, x, y)



# Libera data
##############


@dataclass
class LiberaBBBData(Trace):
    """Container for bunch-by-bunch data from libera-ireg dump
    
    t - time array in seconds
    """
    t: np.ndarray
    x: np.ndarray
    y: np.ndarray
    s: np.ndarray
    x_unit: str = 'mm'
    y_unit: str = 'mm'
    s_unit: str = 'a.u.'
    
    @classmethod
    def from_file(cls, fname):
        """Read in bunch-by-bunch data from a *.bin file saved with libera-ireg
        
        :param fname: path to filename
        References: Libera_Hadron_User_Manual_1.04.pdf
        """
        bunch = np.memmap(fname, dtype=np.dtype([
                ('S', '<i4'),
                ('r1', '<i4'),
                ('X', '<i4'),  # in nm
                ('Y', '<i4'),  # in nm
                ('TS', '<u8'), # in 4ns (ADC clock at 250MS/s)
                ('t2', '<u2'),
                ('t1', '<u2'),
                ('status', '<u4')]))
        time = (bunch['TS'] + bunch['t1'])*4e-9 # in s
        return LiberaBBBData(time, bunch['X']*1e-6, bunch['Y']*1e-6, bunch['S'])