Skip to content
Snippets Groups Projects
utils.py 4.4 KiB
Newer Older
import numpy as np
from dataclasses import dataclass


def avg(*data, n=100):
    return [d[:int(len(d)/n)*n].reshape(-1, n).mean(axis=1) for d in data]

@dataclass
class Trace:
    pass
    

# timing
#########

CMD_SEQ_START = 257
CMD_BEAM_INJECTION = 283


# lassie spill data
####################

from .bdiolib.bdio import bdio


@dataclass
class LassieSpillData(Trace):
    """Container for time series data from lassie spill application
    
    t - time array in seconds
    v - value array in unit
    unit - unit of value array
    """
    t: np.ndarray
    v: np.ndarray
    unit: str = 'a.u.'
    
    @classmethod
    def from_file(cls, fname, calibrated=True, from_event=None):
        """Read in time series data from a *.tdf file saved with lassiespill application
           
        :param calibrated: use calibrated or raw data
        :param from_event: return data from this event, time will also be relative to this event                    
        
        References: https://git.acc.gsi.de/bi/bdiolib.python
        """
        
        b = bdio.BDIOReader(fname)

        appname = ''
        device = ''
        fs = None
        t0 = 0
        t1 = None
        unit = 'a.u.'
        
        blocks = b.get_directory()
        for bi in blocks:
            b.seek(bi.get_pos())
            block = b.next_block()
            #print(bi)
            #print(block)
            #if hasattr(block, 'get_rows'):
            #    for row in block.get_rows():
            #        print(row)
            ###############
            if bi.is_header_block():            
                appname = block.get_app_name()
                device = block.get_device()
                
            elif bi.get_title() == 'TraceInfo':
                for row in block.get_rows():
                    if row.get_tag() == 'sampling frequency':
                        assert row.get_unit().lower() == 'hz', 'sampling frequency has unexpected unit {}'.format(row.get_unit())
                        fs = row.get_value()
                        
            elif bi.get_title() == 'Integrals':
                for row in block.get_rows():
                    print('>'+row.get_tag()+'<')
                    if row.get_tag() == ('Calibrated' if calibrated else 'Raw') + ': beam in integral':
                        unit = row.get_unit()
                        
            elif bi.get_title() == ('Calibrated' if calibrated else 'Raw'):
                values = np.array(block.get_array())
                
            elif bi.is_event_table_block():
                t0 = block.get_rows()[0].get_time() # TODO: save to asume the first event is the start of data?
                for row in block.get_rows():
                    if row.get_event() == from_event:
                        t1 = row.get_time()
                

        
        
        times = np.linspace(0, len(values)/fs, len(values))
        
        if from_event:
            assert t1 is not None, 'Requested data from event {}, but event not found in data file'.format(from_event)
            from_time = (t1 - t0)*1e-9    
            idx = np.argmin(np.abs(times - from_time))
            values = values[idx:]
            times = times[idx:] - times[idx]
        
        return LassieSpillData(times, values, unit=unit)



# IPM data
###########

... # TODO



# Libera data
##############


@dataclass
class LiberaBBBData(Trace):
    """Container for bunch-by-bunch data from libera-ireg dump
    
    t - time array in seconds
    """
    t: np.ndarray
    x: np.ndarray
    y: np.ndarray
    s: np.ndarray
    f: np.ndarray
    x_unit: str = 'mm'
    y_unit: str = 'mm'
    s_unit: str = 'a.u.'
    f_unit: str = 'Hz'
    
    @classmethod
    def from_file(cls, fname):
        """Read in bunch-by-bunch data from a *.bin file saved with libera-ireg
        
        References: Libera_Hadron_User_Manual_1.04.pdf
        """
        bunch = np.memmap(fname, dtype=np.dtype([
                ('S', '<i4'),
                ('r1', '<i4'),
                ('X', '<i4'),  # in nm
                ('Y', '<i4'),  # in nm
                ('TS', '<u8'), # in 4ns (ADC clock at 250MS/s)
                ('t2', '<u2'),
                ('t1', '<u2'),
                ('status', '<u4')]))
        time = (bunch['TS'] + bunch['t1'])*4e-9 # in s
        freq = 1/np.diff(time)
        freq = np.concatenate((freq, [freq[-1]]))
        return LiberaBBBData(time, bunch['X']*1e-6, bunch['Y']*1e-6, bunch['S'], freq)