import numpy as np import scipy.signal from dataclasses import dataclass, field import tarfile import re def avg(*data, n=100): return [d[:int(len(d)/n)*n].reshape(-1, n).mean(axis=1) for d in data] def irng(array, start, stop): """Return a slice indexing the array in the given value range""" indices = [np.argmin(np.abs(array - start)), np.argmin(np.abs(array - stop))] return slice(min(indices), max(indices)) @dataclass class Trace: pass # timing ######### EVT_MB_TRIGGER = 40 EVT_FLATTOP = 45 EVT_EXTR_END = 51 CMD_SEQ_START = 257 CMD_BEAM_INJECTION = 283 # lassie spill data #################### from .bdiolib.bdio import bdio @dataclass class LassieSpillData(Trace): """Container for time series data from lassie spill or DCCT application t - time array in seconds v - value array in unit unit - unit of value array events - dict mapping events to time """ t: np.ndarray v: np.ndarray unit: str = 'particles' events: dict = field(default_factory=dict) @classmethod def from_file(cls, fname, from_event=None, time_offset=0, verbose=0): """Read in time series data from a *.tdf file saved with lassiespill or DCCT application :param fname: path to filename :param from_event: return data from this event, time will also be relative to this event References: https://git.acc.gsi.de/bi/bdiolib.python """ b = bdio.BDIOReader(fname) appname = '' device = '' fs = None t0 = 0 t1 = None unit = 'particles' events = {} blocks = b.get_directory() for bi in blocks: b.seek(bi.get_pos()) block = b.next_block() if verbose: print(bi.get_title(), block) #if hasattr(block, 'get_rows'): # for row in block.get_rows(): # print(row) ############### if bi.is_header_block(): appname = block.get_app_name() device = block.get_device() if verbose: print(device, 'data saved from', appname) elif bi.get_title() == 'TraceInfo': for row in block.get_rows(): if verbose: print(row) if row.get_tag() == ('intensity frequency' if appname=='DCCT' else 'sampling frequency'): assert row.get_unit().lower() == 'hz', 'sampling frequency has unexpected unit {}'.format(row.get_unit()) fs = row.get_value() elif bi.get_title() == 'Integrals': for row in block.get_rows(): if verbose: print(row) if row.get_tag() == 'Calibrated: beam in integral': unit = row.get_unit() elif bi.is_double_array_block() and bi.get_title() == ('Intensity' if appname=='DCCT' else 'Calibrated'): values = np.array(block.get_array()) elif bi.is_event_table_block(): t0 = block.get_rows()[0].get_time() # asume the first event is the start of data # TODO: is there a better way to do it? for row in block.get_rows(): events[row.get_event()] = (row.get_time() - t0)*1e-9 times = np.linspace(0, len(values)/fs, len(values)) if from_event: if from_event not in events: raise ValueError(f'Requested data from event {from_event}, but event not found in data file') from_time = events[from_event] idx = np.argmin(np.abs(times - from_time)) values = values[idx:] times = times[idx:] - times[idx] events = {e: t - from_time for e,t in events.items()} return LassieSpillData(times + time_offset, values, unit=unit, events=events) # IPM data ########### @dataclass class IPMData(Trace): """Container for profile data from IPM t - time array in seconds w - profile width array w_unit - unit of displacement array values x - horizontal profile amplitudes y - vertical profile amplitudes unit - unit of profile amplitudes """ t: np.ndarray w: np.ndarray x: np.ndarray y: np.ndarray unit: str = 'a.u.' w_unit: str = 'mm' x_rms: np.ndarray = None y_rms: np.ndarray = None beam_current: np.ndarray = None dipole: np.ndarray = None hf: np.ndarray = None def apply_calibration(self, time_range): """Calibrates the profile data by subtracting the average profile measured in the given timespan Note that this does not affect rms beam sizes :param time_range: tuple of (start, end) of calibration window in seconds """ window = irng(self.t, *time_range) self.x -= np.mean(self.x[window], axis=0) self.y -= np.mean(self.y[window], axis=0) @classmethod def from_file(cls, fname, clean=False, from_event=None, time_offset=0, verbose=0): """Read in profile data from a *.tgz file saved with IPM application :param fname: path to filename :param clean: use adc_clean.dat instead of adc.dat :param from_event: return data from this event, time will also be relative to this event References: Giacomini, Tino <T.Giacomini@gsi.de> One beam profile consist of 64 values. The distance between two wires is 0.6 mm. The wires diameter is 1.5 mm. So the center to center distance of the wires is 2.1 mm. Handbook RGM_SIS At SIS cyclus start event 32 appears. This event is read by the RGM and the measure cyclus starts, see Measure cyclus. Mit dem Start eines SIS Zyklus wird auch der RGM gestartet und nimmt Daten auf. Bei Auftreten eines vorher festgelegten Events, speichert die Software die gerade aktuelle Profilnummer und die seit dem Spillstart vergangene Zeit in ms, sog. Time Stamp Function. Es wird mit einer Periode von 10ms ein Strahlprofil aufgenommen. """ tar = tarfile.open(fname, "r:gz") # profile data f = tar.extractfile('adc_clean.dat' if clean else 'adc.dat') dat = np.loadtxt(f, skiprows=1) p = 2.1 * (np.arange(64)-31.5) n = dat.shape[0]//64*64 if dat.shape[0]%64 != 0: print(f'WARNING: loaded incomplete data from {fname}') x = dat[:n,1].reshape((-1, 64)) y = dat[:n,2].reshape((-1, 64)) t = np.arange(x.shape[0]) * 10e-3 # Es wird mit einer Periode von 10ms ein Strahlprofil aufgenommen. # time data dat = np.loadtxt(tar.extractfile('bwrms.dat'), skiprows=1) _, x_rms, y_rms = dat.T # scalar data # File scl.dat contains the scalerdata. # Col 0 = internal timing, # Col 1 = DC (Current), # Col 2 = Dipole signal, # Col 3 = HF signal, # Col 4 = Ev40 (MB Trigger) / Injection # Col 5 = Ev45 (Flattop) / Accel end # Col 6 = Ev51 (Extraction End) # Col 7 = EvXX see GUI->Parameter->Event scl = np.loadtxt(tar.extractfile('scl.dat'), skiprows=1) index, beam_current, dipole, hf, event_40, event_45, event_51, event_user = scl.T offset = 0 if from_event is None: offset = 0 elif from_event == 40: assert np.max(event_40) == 1, f'Requested data from event {from_event}, but event 40 not found in data' offset = np.argmax(event_40) elif from_event == 45: assert np.max(event_45) == 1, f'Requested data from event {from_event}, but event 45 not found in data' offset = np.argmax(event_45) elif from_event == 51: assert np.max(event_51) == 1, f'Requested data from event {from_event}, but event 51 not found in data' offset = np.argmax(event_51) else: # not a default event, maybe user event? f = tar.extractfile('param.dat') ev = None for line in f: if verbose: print(line.decode('ascii').strip()) m = re.match(r'ev\s*=\s*(\d+)', line) if m: ev = int(m.group(1)) if from_event == ev: assert np.max(event_user) == 1, f'Requested data from event {from_event}, but event {ev} not found in data' offset = np.argmax(event_user) else: raise ValueError(f'Requested data from event {from_event} which is neither a standard event (40,45,51) nor the user event ({ev})') if verbose: print(f'Returning data with offset {offset} as requested by event {from_event}') x, y, t = x[offset:, :], y[offset:, :], t[offset:] - t[offset] x_rms, y_rms, beam_current, dipole, hf = [_[offset:] for _ in (x_rms, y_rms, beam_current, dipole, hf)] return IPMData(t + time_offset, p, x, y, x_rms = x_rms, y_rms = y_rms, beam_current = np.concatenate((beam_current, [np.nan])), dipole = np.concatenate((dipole, [np.nan])), hf = np.concatenate((hf, [np.nan])), ) # Libera data ############## @dataclass class LiberaData(Trace): """Container for data from libera-ireg dump t - time array in seconds """ t: np.ndarray x: np.ndarray y: np.ndarray s: np.ndarray x_unit: str = 'mm' y_unit: str = 'mm' s_unit: str = 'a.u.' @dataclass class LiberaBBBData(LiberaData): """Container for bunch-by-bunch data from libera-ireg dump """ @classmethod def from_file(cls, fname, time_offset=0, verbose=0): """Read in bunch-by-bunch data from a *.bin file saved with libera-ireg :param fname: path to filename References: Libera_Hadron_User_Manual_1.04.pdf """ bunch = np.memmap(fname, dtype=np.dtype([ ('S', '<i4'), ('r1', '<i4'), ('X', '<i4'), # in nm ('Y', '<i4'), # in nm ('TS', '<u8'), # in 4ns (ADC clock at 250MS/s) ('t2', '<u2'), ('t1', '<u2'), ('status', '<u4')])) time = (bunch['TS'] + bunch['t1'])*4e-9 # in s return LiberaBBBData(time + time_offset, bunch['X']*1e-6, bunch['Y']*1e-6, bunch['S']) def to_tbt_data(self, h, b=None): """Returns the turn-by-turn data :param h: the harmonic number of the RF system (number of bunches per turn) :param b: the bunch index or None for an average """ wrap = lambda a: a[:(len(a)//h)*h].reshape(-1, h).mean(axis=1) if b is None else a[b::h] return LiberaTBTData(t=wrap(self.t), x=wrap(self.x), y=wrap(self.y), s=wrap(self.s), x_unit=self.x_unit, y_unit=self.y_unit, s_unit=self.s_unit) @dataclass class LiberaTBTData(LiberaData): """Container for turn-by-turn data from libera-ireg dump """ pass @dataclass class NWAData(Trace): """Container for data from Network Analyser save f - frequency in Hz m - magnitude p - phase """ f: np.ndarray m: np.ndarray p: np.ndarray f_unit: str = 'Hz' m_unit: str = 'a.u.' p_unit: str = 'a.u.' @classmethod def from_file(cls, magfile, phasefile=None, *, isdeg=True, unwrap=False, verbose=0): """Read in data from CSV file for magnitude and phase :param magfile: path to filename with magnitude trace data :param phasefile: path to filename with phase trace data. If None, phase data is assumed to be the second column in magfile. :param unwrap: if true, relative phase is unwraped to absolute phase centered around zero :param isdeg: if phase data is in degree (True) or radians (False) """ f, m, *other = np.loadtxt(magfile, skiprows=3, delimiter=',').T if phasefile is None: p = other[0] else: fp, p, *_ = np.loadtxt(phasefile, skiprows=3, delimiter=',').T if np.any(f != fp): raise ValueError('Files provided do not have equal frequency components.') if unwrap: if isdeg: p = np.deg2rad(p) p = np.unwrap(p) p -= np.mean(p) if isdeg: p = np.rad2deg(p) return NWAData(f, m, p, p_unit='deg' if isdeg else 'rad')