import numpy as np from dataclasses import dataclass def avg(*data, n=100): return [d[:int(len(d)/n)*n].reshape(-1, n).mean(axis=1) for d in data] @dataclass class Trace: pass # timing ######### CMD_SEQ_START = 257 CMD_BEAM_INJECTION = 283 # lassie spill data #################### from .bdiolib.bdio import bdio @dataclass class LassieSpillData(Trace): """Container for time series data from lassie spill application t - time array in seconds v - value array in unit unit - unit of value array """ t: np.ndarray v: np.ndarray unit: str = 'a.u.' @classmethod def from_file(cls, fname, calibrated=True, from_event=None): """Read in time series data from a *.tdf file saved with lassiespill application :param calibrated: use calibrated or raw data :param from_event: return data from this event, time will also be relative to this event References: https://git.acc.gsi.de/bi/bdiolib.python """ b = bdio.BDIOReader(fname) appname = '' device = '' fs = None t0 = 0 t1 = None unit = 'a.u.' blocks = b.get_directory() for bi in blocks: b.seek(bi.get_pos()) block = b.next_block() #print(bi) #print(block) #if hasattr(block, 'get_rows'): # for row in block.get_rows(): # print(row) ############### if bi.is_header_block(): appname = block.get_app_name() device = block.get_device() elif bi.get_title() == 'TraceInfo': for row in block.get_rows(): if row.get_tag() == 'sampling frequency': assert row.get_unit().lower() == 'hz', 'sampling frequency has unexpected unit {}'.format(row.get_unit()) fs = row.get_value() elif bi.get_title() == 'Integrals': for row in block.get_rows(): print('>'+row.get_tag()+'<') if row.get_tag() == ('Calibrated' if calibrated else 'Raw') + ': beam in integral': unit = row.get_unit() elif bi.get_title() == ('Calibrated' if calibrated else 'Raw'): values = np.array(block.get_array()) elif bi.is_event_table_block(): t0 = block.get_rows()[0].get_time() # TODO: save to asume the first event is the start of data? for row in block.get_rows(): if row.get_event() == from_event: t1 = row.get_time() times = np.linspace(0, len(values)/fs, len(values)) if from_event: assert t1 is not None, 'Requested data from event {}, but event not found in data file'.format(from_event) from_time = (t1 - t0)*1e-9 idx = np.argmin(np.abs(times - from_time)) values = values[idx:] times = times[idx:] - times[idx] return LassieSpillData(times, values, unit=unit) # IPM data ########### ... # TODO # Libera data ############## @dataclass class LiberaBBBData(Trace): """Container for bunch-by-bunch data from libera-ireg dump t - time array in seconds """ t: np.ndarray x: np.ndarray y: np.ndarray s: np.ndarray f: np.ndarray x_unit: str = 'mm' y_unit: str = 'mm' s_unit: str = 'a.u.' f_unit: str = 'Hz' @classmethod def from_file(cls, fname): """Read in bunch-by-bunch data from a *.bin file saved with libera-ireg References: Libera_Hadron_User_Manual_1.04.pdf """ bunch = np.memmap(fname, dtype=np.dtype([ ('S', '<i4'), ('r1', '<i4'), ('X', '<i4'), # in nm ('Y', '<i4'), # in nm ('TS', '<u8'), # in 4ns (ADC clock at 250MS/s) ('t2', '<u2'), ('t1', '<u2'), ('status', '<u4')])) time = (bunch['TS'] + bunch['t1'])*4e-9 # in s freq = 1/np.diff(time) freq = np.concatenate((freq, [freq[-1]])) return LiberaBBBData(time, bunch['X']*1e-6, bunch['Y']*1e-6, bunch['S'], freq)