diff --git a/tools/processing.py b/tools/processing.py index db854c3a61724dd002f23d0cb64e2d21451e0b66..b2904a7b0a1737b4df46de7cf4f7bb77aa5398bd 100644 --- a/tools/processing.py +++ b/tools/processing.py @@ -8,6 +8,15 @@ import scipy.signal as scs #%% def autocorrelation (time: np.ndarray, signal: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + """Auto-correlates a signal + + Args: + time (np.ndarray): Time axis + signal (np.ndarray): Signal to auto-correlate + + Returns: + Tuple[np.ndarray, np.ndarray]: Lag and correlation axis + """ time_step = time[1] - time[0] output_time = np.arange(-len(signal) + 1, len(signal)) * time_step @@ -19,6 +28,16 @@ def autocorrelation (time: np.ndarray, signal: np.ndarray) -> Tuple[np.ndarray, #%% def correlation (time: np.ndarray, signal: np.ndarray, known: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + """Correlates a signal with a known one + + Args: + time (np.ndarray): Time axis + signal (np.ndarray): Signal to correlate + known (np.ndarray): Knonw signal + + Returns: + Tuple[np.ndarray, np.ndarray]: Lag and correlation axis + """ time_step = time[1] - time[0] output_time = np.arange(-len(signal) + 1, len(signal)) * time_step @@ -29,6 +48,15 @@ def correlation (time: np.ndarray, signal: np.ndarray, known: np.ndarray) -> Tup #%% def Window (data: pd.Series | np.ndarray): + """Applies a hanning window to the signal. The width of the window + is the width of the signal. + + Args: + data (pd.Series | np.ndarray): Signal to window + + Returns: + The windowed signal + """ window = np.hanning(len(data)) return window * data @@ -50,6 +78,16 @@ def moving_window (signal: np.ndarray, iterations: int): # %% def FFT (data: pd.Series | np.ndarray, sampling: int, zero_pad: int = 0): + """FFT of a signal in the Shannon band + + Args: + data (pd.Series | np.ndarray): The signal to analyse + sampling (int): The sampling rate of the signal, in ms + zero_pad (int, optional): The padding added to the length of the data + + Returns: + Tuple[ndarray, ndarray]: The frequency axis and the absolute magnitude axis + """ # Defining the constants Fe = 1000 / sampling @@ -68,12 +106,29 @@ def FFT (data: pd.Series | np.ndarray, sampling: int, zero_pad: int = 0): #%% def fundamental (freqs: np.ndarray, fft: np.ndarray): + """Finds the spike with the greatest amplitude + + Args: + freqs (np.ndarray): Frequency axis + fft (np.ndarray): Magnitude axis + Returns: + The frequency of the spike with the greatest amplitude + """ return freqs[fft.argmax()] #%% def Filter (data: pd.Series | np.ndarray, h: np.ndarray, plot: bool | str = False): - + """Applies a filter to some signal + + Args: + data (pd.Series | np.ndarray): Signal to filter + h (np.ndarray): Filter + + Returns: + The filtered signal + """ + filtered = np.convolve(data, h) len_diff = (len(filtered) - len(data)) // 2 @@ -111,6 +166,12 @@ def Filter (data: pd.Series | np.ndarray, h: np.ndarray, plot: bool | str = Fals #%% def MakeFilter (*filters: List[np.ndarray]): + """Convolves multiple filters to one + + Returns: + The merged filter + """ + h = [ 1 ] for f in filters: h = np.convolve(h, f) @@ -118,6 +179,17 @@ def MakeFilter (*filters: List[np.ndarray]): # %% def high_pass (fc: float, sampling_rate: int, N: int = None): + """Creates a high-pass filter with a cutoff frequency fc + + Args: + fc (float): Cutoff frequency + sampling_rate (int): Sampling rate, in ms + N (int, optional): Number of samples making the filter + + Returns: + FIR high-pass filter, computed using the window method, with a kaiser window + """ + Fe = 1000 / sampling_rate N = N if N is not None else 500 // sampling_rate N += (1 - (N % 2)) # Make N odd @@ -145,6 +217,17 @@ def multi_zero_filter (f: float, sampling_rate: int): #%% def low_pass (fc: float, sampling_rate: int, N: int = None): + """Creates a low-pass filter with a cutoff frequency fc + + Args: + fc (float): Cutoff frequency + sampling_rate (int): Sampling rate, in ms + N (int, optional): Number of samples making the filter + + Returns: + FIR low-pass filter, computed using the window method, with a kaiser window + """ + Fe = 1000 / sampling_rate N = N if N is not None else 500 // sampling_rate N += (1 - (N % 2)) # Make N odd