Skip to content
Snippets Groups Projects
spill.py 12.9 KiB
Newer Older
Philipp Niedermayer's avatar
Philipp Niedermayer committed
#!/usr/bin/env python
# -*- coding: utf-8 -*-

""" Methos for making spill realted plots

"""

__author__ = "Philipp Niedermayer"
__contact__ = "p.niedermayer@gsi.de"
__date__ = "2022-08-19"



from .common import *




def plot_spill_intensity(ax, spill_data, bin_time=10e-6, *, rate=False, cumulative=False, relative=False, **plot_kwargs):
    """Plot spill intensity (binned particle counts) over extraction time
    
    :param ax: Axis to plot onto
    :param spill_data: Instance of spill data class, e.g. TDCSpill
    :param bin_time: Width of bins for particle counting in sec
    :param rate: If true, plot particle count rate, i.e. counts per s instead of per bin
    :param cumulative: If true, plot cumulative particle number
    :param relative: If true, plot fraction of total particle number
    :param plot_kwargs: Keyword arguments to be passed to plotting method
    """
    assert not (rate and cumulative), 'rate and cumulative plotting are mutually exclusive'
    
    # histogram in time bins
    nbins = int(np.ceil(spill_data.length/bin_time))
    weights = np.ones_like(spill_data.hittimes)
    if rate: weights /= bin_time
    if relative: weights /= spill_data.counts
    hist, edges = np.histogram(spill_data.hittimes, bins=nbins, weights=weights, 
                               range=(spill_data.time_offset, spill_data.time_offset+bin_time*nbins))
    if cumulative: hist = np.cumsum(hist)
    
    # plot
    ax.stairs(hist, edges, **plot_kwargs)
    if relative: ax.yaxis.set_major_formatter(mpl.ticker.PercentFormatter(1))
    ax.set(xlabel='Extraction time / s',
           ylabel='Extracted particle' + \
                   (' fraction' if relative else 's') + \
                   ('' if cumulative else \
                        (' / s' if rate else f'\nper ${(bin_time*SI.s).to_compact():~gL}$ interval')),
          )




def plot_spill_duty_factor(ax, spill_data, counting_dt=10e-6, evaluation_dt=10e-3, *, show_poisson_limit=False, **plot_kwargs):
    """Plot spill duty factor (<x>²/<x²>) over extraction time
    
    :param ax: Axis to plot onto
    :param spill_data: Instance of spill data class, e.g. TDCSpill
    :param counting_dt: Width of bins for particle counting in sec
    :param evaluation_dt: Width of bins for duty factor evaluation (value is rounded to the next multiple of count_bin_time) 
    :param show_poisson_limit: If true, show poison limit of duty factor
    :param plot_kwargs: Keyword arguments to be passed to plotting method
    """
    
    counts, edges = spill_data.hitcounts(counting_dt, evaluation_dt)
    
    # spill duty factor
    F = np.mean(counts, axis=1)**2 / np.mean(counts**2, axis=1)
    s = ax.stairs(F, edges, **plot_kwargs)
    
    if show_poisson_limit:
        Fp = np.mean(counts, axis=1) / ( np.mean(counts, axis=1) + 1 )
        ax.stairs(Fp, edges, label='Poisson limit', color=s.get_edgecolor() or 'gray', alpha=0.5,zorder=-1, lw=1, ls=':')
        
    ax.set(xlabel='Extraction time / s', ylim=(0,1.1),
           ylabel=f'Spill duty factor F\n(${(evaluation_dt*SI.s).to_compact():~gL}$ / ${(counting_dt*SI.s).to_compact():~gL}$ intervals)'
    )




def plot_spill_consecutive_particle_delay(ax, spill_data, limit=5e-6, resolution=10e-9, show_poisson=False, swap_axis=False, log=True,
                                          **plot_kwargs):
    """Plot histogram of consecutive particle separation (particle-interval histograms)
    
    :param ax: Axis to plot onto
    :param spill_data: Instance of TDCSpill
    :param limit: Maximum delay to plot (range of histogram)
    :param resolution: Time resolution (bin width of histogram)
    :param show_poisson: If true, show poison like distribution
    :param log: If true, use logarithmic scaling
    :param swap_axis: If True, swap plot axis
    :param plot_kwargs: Keyword arguments to be passed to plotting method
    """
    nbins = int(np.ceil(limit/resolution))
    delay = spill_data.hitdelay
    hist = c, t = np.histogram(delay, range=(0, resolution*nbins), bins=nbins) #, weights=np.ones_like(delay)/resolution)
    ax.stairs(*hist, orientation='horizontal' if swap_axis else 'vertical', **plot_kwargs)
    ax_set(ax, swap_axis,
           xlabel='Delay between consecutive particles / s', xlim=(resolution, resolution*nbins), xscale='log' if log else None,
           ylabel=f'Particle count / ${(resolution*SI.s).to_compact():~gL}$ delay interval', yscale='log' if log else None)

    if show_poisson:
        # plot poisson statistics (exponential distribution)
        l = spill_data.counts/spill_data.length
        p = np.exp(-l*t[:-1]) - np.exp(-l*t[1:]) # probability of delay in between t[i] and t[i+1]
        c = p*spill_data.counts
        c[c<1] = 0 # supress fractional counts
        ax.stairs(c, t, orientation='horizontal' if swap_axis else 'vertical', color='gray', label='Poisson statistics', zorder=-1, lw=1)




def plot_spill_consecutive_particle_delay_2D(ax, spill_data, nseg, limit=5e-6, resolution=10e-9, log=True, colorbar=False, **plot_kwargs):
    """Plot histogram of consecutive particle separation (particle-interval histograms) over extraction time
    
    :param ax: Axis to plot onto
    :param spill_data: Instance of TDCSpill
    :param nseg: Number of time segments to divide spill into
    :param limit: Maximum delay to plot (range of histogram)
    :param resolution: Time resolution (bin width of histogram)
    :param log: If true, use logarithmic scaling
    :param colorbar: If true, add a colorbar. To control the location of the colorbar, use colorbar=axis
    :param plot_kwargs: Keyword arguments to be passed to plotting method
    """
    
    nbins = int(np.ceil(limit/resolution))
    counts = np.zeros((nseg, nbins))
    times = spill_data.time_offset + np.linspace(0, spill_data.length, nseg+1)
    
    for i in range(nseg):
        delay = spill_data.segment(times[i], times[i+1]).hitdelay
        counts[i,:], edges = np.histogram(delay, range=(0, resolution*nbins), bins=nbins)
    
    if log and 'norm' not in plot_kwargs: plot_kwargs['norm'] = mpl.colors.LogNorm(vmin=1, vmax=None)
    cm = ax.pcolormesh(times, edges, counts.T, **plot_kwargs)
    
    ax.set(ylabel='Delay between consecutive particles / s', ylim=(resolution, resolution*nbins), yscale='log' if log else None,
           xlabel='Extraction time / s')
    if colorbar:
        ax.get_figure().colorbar(cm, label=f'Particle count / ${(resolution*SI.s).to_compact():~gL}$ delay interval',
                                 ax=colorbar if isinstance(colorbar, mpl.axes.Axes) else None)




def plot_spill_frequency_spectrum(ax, spill_data, fmax=100e3, swap_axis=False, log=True, colorbar=False, **plot_kwargs):
Philipp Niedermayer's avatar
Philipp Niedermayer committed
    """Plot frequency spectrum of particle arrival times
    
    :param ax: Axis to plot onto
    :param spill_data: Instance of TDCSpill
    :param fmax: Maximum frequency for analysis in Hz
    :param log: If true, use logarithmic scaling
    :param swap_axis: If True, swap plot axis
Philipp Niedermayer's avatar
Philipp Niedermayer committed
    :param plot_kwargs: Keyword arguments to be passed to plotting method
    """
    
    # re-sample timestamps into equally sampled time series
    dt = 1/2/fmax
    timeseries, _ = spill_data.hitcounts(dt)
    
    # frequency analysis
    freq = np.fft.rfftfreq(len(timeseries), d=dt)[1:]
    mag = np.abs(np.fft.rfft(timeseries))[1:]
    mag *= 2/len(timeseries) # amplitude
    mag = mag**2 # power
    plot = (mag, freq) if swap_axis else (freq, mag)
    ax.plot(*plot, **plot_kwargs)
    
    props = dict(xlabel=f'Frequency / Hz', xlim=(0, fmax), 
                 ylabel='Power spectrum / a.u.', ylim=(0, 1.1*max(mag)))
    if log: props.update(dict(xscale='log', xlim=(10, fmax),
                              yscale='log', ylim=(0.1*np.mean(mag), 1.1*max(mag))))
    ax_set(ax, swap_axis, **props)



def plot_spill_frequency_spectrum_2D(ax, spill_data, nseg, fmax=100e3, log=True, colorbar=False, debug_mode=1, **plot_kwargs):
    """Plot frequency spectrum of particle arrival times over extraction time
    
    :param ax: Axis to plot onto
    :param spill_data: Instance of TDCSpill
    :param nseg: Number of time segments to divide spill into
    :param fmax: Maximum frequency for analysis in Hz
    :param log: If true, use logarithmic scaling
    :param colorbar: If true, add a colorbar. To control the location of the colorbar, use colorbar=axis
    :param plot_kwargs: Keyword arguments to be passed to plotting method
    """

    # frequency analysis with STFT
    dt = 1/2/fmax
    timeseries, _ = spill_data.hitcounts(dt)
    freq, times, mags = scipy.signal.stft(timeseries, fs=1/dt, nperseg=timeseries.size/nseg, noverlap=0, window='boxcar', padded=False, scaling='psd')
    mags = np.abs(mags)**2
            
    # plot
    if log and 'norm' not in plot_kwargs: plot_kwargs['norm'] = mpl.colors.LogNorm(vmin=0.1*np.mean(mags), vmax=1.1*np.max(mags))
    cm = ax.pcolormesh(times, freq, mags, **plot_kwargs)
    
    props = dict(xlabel='Extraction time / s',
                 ylabel='Frequency / Hz', ylim=(0, fmax))
    if log: props.update(dict(yscale='log', ylim=(10, fmax)))
    ax.set(**props)
    if colorbar:
        ax.get_figure().colorbar(cm, label=f'Power spectral density / a.u.', ax=colorbar if isinstance(colorbar, mpl.axes.Axes) else None)
        
    
    # # re-sample timestamps into equally sampled time series
    # dt = 1/2/fmax
    # timeseries, _ = spill_data.hitcounts(dt, spill_data.length/nseg)
    # 
    # # frequency analysis (DC supressed)
    # freq = (np.arange(timeseries.shape[1]//2+1)+0.5) / (dt*timeseries.shape[1]) # bin edges (without DC)
    # mags = np.zeros((timeseries.shape[0], freq.size-1))
    # times = spill_data.time_offset + np.linspace(0, spill_data.length, nseg+1) # bin edges
    # for i in range(nseg):
    #     mags[i,:] = np.abs(np.fft.rfft(timeseries[i]))[1:]
    # 
    # # plot
    # if log and 'norm' not in plot_kwargs: plot_kwargs['norm'] = mpl.colors.LogNorm(vmin=0.1*np.mean(mags), vmax=1.1*np.max(mags))
    # cm = ax.pcolormesh(times, freq, mags.T, **plot_kwargs)
    # 
    # props = dict(xlabel='Extraction time / s',
    #              ylabel='Frequency / Hz', ylim=(0, fmax))
    # if log: props.update(dict(yscale='log', ylim=(10, fmax)))
    # ax.set(**props)
    # if colorbar:
    #     ax.get_figure().colorbar(cm, label=f'Magnitude / a.u.', ax=colorbar if isinstance(colorbar, mpl.axes.Axes) else None)
def plot_harmonics(ax, f, df=0, *, n=20, time=False, swap_axis=False, **plot_kwargs):
    """Add vertical lines or spans indicating the location of the frequencies / frequency bands and its harmonics
    :param ax: Axis to plot onto
    :param f: Frequency or list of frequencies
    :param df: Bandwidth or list of bandwidths
    :param n: Number of harmonics to plot
    :param time: If true, plot multiples of period T=1/f
    :param swap_axis: If True, swap plot axis (i.e. plot horizontal spans)
    :param plot_kwargs: Keyword arguments to be passed to plotting method
    """
    if not hasattr(f, '__iter__'): f = [f]
    if not hasattr(df, '__iter__'): df = [df]*len(f)
    kwargs = dict(zorder=-1, color='gray', alpha=0.5, lw=1)
    kwargs.update(plot_kwargs)
    for i in range(1, n+1):
        for j, (fi, dfi) in enumerate(zip(f, df)):
            if dfi == 0:
                method = (ax.axhline if swap_axis else ax.axvline)
                args = [i/fi] if time else [i*fi]
            else:
                method = (ax.axhspan if swap_axis else ax.axvspan)
                args = [i/(fi+dfi/2), i/(fi-dfi/2)] if time else [i*(fi-dfi/2), i*(fi+dfi/2)]
            method(*args, **kwargs)
            kwargs.pop('label', None)

def plot_revolution_harmonics(ax, f0, **kwargs):
    """See plot_harmonics
    """
    plot_harmonics(ax, f0, 0, **dict(dict(label='Revolution harmonics', color='lightgray', alpha=1, zorder=-3, ls=':'), **kwargs))

def plot_tune_harmonics(ax, f0, q, dq=0, *, n=20, time=False, swap_axis=False, **plot_kwargs):
    """Add vertical lines or spans indicating the location of the frequencies / frequency bands and its harmonics
    :param ax: Axis to plot onto
    :param f0: Revolution frequency to which the tune parameters refer
    :param q: Tune or list of tunes
    :param dq: Bandwidth or list of bandwidths in tune units
    :param n: Number of harmonics to plot
    :param time: If true, plot multiples of period T=1/f
    :param swap_axis: If True, swap plot axis (i.e. plot horizontal spans)
    :param plot_kwargs: Keyword arguments to be passed to plotting method
    """
    if not hasattr(q, '__iter__'): q = [q]
    if not hasattr(dq, '__iter__'): dq = [dq]*len(q)
    plot_harmonics(ax, f0*np.array(q), f0*np.array(dq), n=n, time=time, swap_axis=swap_axis, **plot_kwargs)

def plot_excitation_harmonics(ax, f0, q, dq, **kwargs):
    """See plot_tune_harmonics
    """
    plot_tune_harmonics(ax, f0, q, dq, **dict(dict(label='Excitation harmonics'), **kwargs))

def plot_3rd_order_harmonics(ax, f0, **kwargs):
    """See plot_tune_harmonics
    """
    plot_tune_harmonics(ax, f0, 1/3, 0, **dict(dict(label='1/3 resonance harmonics', color='red', zorder=-2), **kwargs))