diff --git a/.gitignore b/.gitignore index 8e80082fbc75dacd4a03c45fd7597293546af5c9..275c66ffb9683a321a915c236e6580af045f64c4 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ db db/* __pycache__ __pycache__/* -__*.py +__* +figures/* diff --git a/tools/plots.py b/tools/plots.py new file mode 100644 index 0000000000000000000000000000000000000000..1d957033463340a21fa6cf1d6438c75bd6db0d4b --- /dev/null +++ b/tools/plots.py @@ -0,0 +1,135 @@ +import numpy as np +import matplotlib.pyplot as plt +from typing import List, Any, Dict, Tuple +import re + +from .processing import * + +# %% +def PlotFilter (filter: np.ndarray, sampling_rate: int = 1000): + + f = plt.figure() + + plt.subplot(2,1,1) + plt.title("Impulse response") + plt.xlabel("Sample") + plt.ylabel("Amplitude") + plt.stem(filter) + + plt.subplot(2,1,2) + freqs, fft = FFT(filter, sampling_rate, 9 * len(filter)) + plt.plot(freqs, fft) + plt.title("Frequency Response") + plt.xlabel("Frequency (Hz)") + plt.ylabel("Amplitude") + plt.yscale("log") + plt.grid() + + plt.tight_layout() + + return f + +# %% +def correlation_plot (lag: np.ndarray, correlation: np.ndarray, name: str = None, save: bool = False): + + f = plt.figure() + + plt.plot(lag, correlation) + + plt.grid() + plt.xlabel("Lag") + plt.ylabel("Correlation") + + if type(name) == str: + plt.title(name) + + plt.tight_layout() + + if type(name) == str and save: + folder = Path("figures\\autocorrelation\\") + if not folder.exists(): + folder.mkdir(parents=True) + plt.savefig(folder.joinpath(name + ".png")) + + return f + +# %% +def FFT_plot (freqs: np.ndarray, fft: np.ndarray, log: bool = False, name: str = None, save: bool = False): + + f = plt.figure() + + plt.plot(freqs, fft) + + plt.grid() + plt.xlabel("Frequency (Hz)") + plt.ylabel("Amplitude" if not log else "Amplitude (dB)") + if log: + plt.yscale("log") + + if type(name) == str: + plt.title(name) + + plt.tight_layout() + + if type(name) == str and save: + folder = Path("figures\\FFT\\") + if not folder.exists(): + folder.mkdir(parents=True) + plt.savefig(folder.joinpath(name + ".png")) + + return f + +# %% +def Time_plot (time: np.ndarray, signal: np.ndarray, name: str = None, save: bool = False): + + f = plt.figure() + + plt.plot(time, signal) + + plt.grid() + plt.xlabel("Frequency (Hz)") + plt.ylabel("Amplitude" if name is None else name) + + if type(name) == str: + plt.title(name) + + plt.tight_layout() + + if type(name) == str and save: + folder = Path("figures\\FFT\\") + if not folder.exists(): + folder.mkdir(parents=True) + plt.savefig(folder.joinpath(name + ".png")) + + return f + +#%% +def FFT_by_plot (cls: List[Any], freqs: np.ndarray, ffts: List[np.ndarray], log: bool = False, name: str = None, save: bool = False): + + f = plt.figure() + ax = plt.subplot(1,1,1) + + image = np.float64(ffts) + if log: + image = 10 * np.log10(image) + + extent = (freqs[0], freqs[-1], 0, len(cls)) + im1 = plt.imshow(image, "inferno", origin="lower", aspect="auto", extent=extent) + + plt.colorbar(im1, label="Magnitude" + (" (dB)" if log else "")) + ax.set_xlabel("Fequency (Hz)") + ax.set_yticks(range(len(cls)), labels=cls) + ax.grid(axis="y") + + plt.suptitle(name) + + plt.tight_layout() + + if type(name) == str and save: + escaped = re.sub(r"[^\w\d\(\)\ ]", "_", name) + folder = Path("figures\\FFT_by\\") + if not folder.exists(): + folder.mkdir(parents=True) + plt.savefig(folder.joinpath(escaped + ".png")) + + return f \ No newline at end of file diff --git a/tools/processing.py b/tools/processing.py index 0ddc46a52baeeb50fa70bab7d6dda4d2127d9ada..41ce789c52260ac2dd8a799dcf95207adfc3f440 100644 --- a/tools/processing.py +++ b/tools/processing.py @@ -7,108 +7,13 @@ from pathlib import Path import scipy.signal as scs #%% -def make_periodic (time: np.ndarray, signal: np.ndarray, plot: bool | str = False) -> Tuple[np.ndarray, np.ndarray]: - - # Center around 0 - signal = signal - signal.mean() - - # Find 0-crossings - zero_crossings = [] - zero_slopes = [] - - # From 0 to singal - 1 to prevent out of range exception - for i in range(len(signal) - 1): - sign = signal[i + 1] * signal[i] - - # zero found between signal[i] and signal[i + 1] - if sign < 0: - zero_crossings.append(i) - - slope = signal[i + 1] - signal[i] - zero_slopes.append(slope) - - # Finding the first zero that has the same slope as the maximum we can find - first_zero_index = 0 - first_zero = zero_slopes[first_zero_index] - - max_slope = np.max(np.abs(zero_slopes)) - while abs(first_zero) < 0.6 * max_slope: - first_zero_index += 1 - first_zero = zero_slopes[first_zero_index] - - # Try to find the last zero-crossing with the same slope as the first one - last_zero_index = -1 - current_zero = zero_slopes[last_zero_index] - - min_slope_difference = abs(current_zero - first_zero) - delta_slope = min_slope_difference - - for k in range(2, len(zero_crossings)): - current_zero = zero_slopes[-k] - difference = abs(current_zero - first_zero) - if difference < min_slope_difference: - delta_slope = min_slope_difference - difference - min_slope_difference = difference - last_zero_index = -k - - delta = difference - min_slope_difference - if delta > 0.8 * delta_slope: - break - - # Plot - x = time[zero_crossings[first_zero_index]:zero_crossings[last_zero_index]] - y = signal[zero_crossings[first_zero_index]:zero_crossings[last_zero_index]] - - if plot: - plt.figure() - plt.plot(time, signal, label="Input signal") - plt.plot(x, y, label="Output signal") - plt.scatter([ time[z] for z in zero_crossings ], np.zeros(len(zero_crossings)), c="r", label="Zeros") - plt.grid() - plt.xlabel("Time") - plt.ylabel("Signal") - plt.legend() - if type(plot) == str: - plt.title(plot) - plt.tight_layout() - if type(plot) == str: - folder = Path("figures\\make_periodic\\") - if not folder.exists(): - folder.mkdir(parents=True) - plt.savefig(folder.joinpath(plot + ".png")) - plt.pause(0.001) - - return x, y - -#%% -def autocorrelation (time: np.ndarray, signal: np.ndarray, plot: bool | str = False) -> Tuple[np.ndarray, np.ndarray]: +def autocorrelation (time: np.ndarray, signal: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: time_step = time[1] - time[0] output_time = np.arange(-len(signal) + 1, len(signal)) * time_step autocorrelation = np.correlate(signal, signal, "full") / len(signal) - if plot: - plt.figure() - - plt.plot(output_time, autocorrelation) - - plt.grid() - plt.xlabel("Lag") - plt.ylabel("Autocorrelation") - - if type(plot) == str: - plt.title(plot) - - plt.tight_layout() - - if type(plot) == str: - folder = Path("figures\\autocorrelation\\") - if not folder.exists(): - folder.mkdir(parents=True) - plt.savefig(folder.joinpath(plot + ".png")) - - plt.pause(0.001) return output_time, autocorrelation @@ -122,14 +27,13 @@ def correlation (time: np.ndarray, signal: np.ndarray, known: np.ndarray) -> Tup return output_time, correlation - #%% def Window (data: pd.Series | np.ndarray): window = np.hanning(len(data)) return window * data #%% -def moving_window (signal: np.ndarray, iterations: int, plot: bool | str = False): +def moving_window (signal: np.ndarray, iterations: int): K = 2 window_number = (iterations - 1) * K + 1 @@ -142,33 +46,10 @@ def moving_window (signal: np.ndarray, iterations: int, plot: bool | str = False end = start + window_size output[start:end] += Window(signal[start:end]) - - if plot: - plt.figure() - - plt.plot(output) - - plt.grid() - plt.xlabel("Sample") - plt.ylabel("Signal") - - if type(plot) == str: - plt.title(plot) - - plt.tight_layout() - - if type(plot) == str: - folder = Path("figures\\moving_window\\") - if not folder.exists(): - folder.mkdir(parents=True) - plt.savefig(folder.joinpath(plot + ".png")) - - plt.pause(0.001) - return output # %% -def FFT (data: pd.Series | np.ndarray, sampling: int, zero_pad: int = 0, plot: bool | str = False, plot_log: bool = True): +def FFT (data: pd.Series | np.ndarray, sampling: int, zero_pad: int = 0): # Defining the constants Fe = 1000 / sampling @@ -183,30 +64,6 @@ def FFT (data: pd.Series | np.ndarray, sampling: int, zero_pad: int = 0, plot: b freqs = freqs[:M//2] fft = fft[:M//2] - if plot: - plt.figure() - - plt.plot(freqs, fft) - - plt.grid() - plt.xlabel("Frequency (Hz)") - plt.ylabel("Amplitude" if not plot_log else "Amplitude (dB)") - if plot_log: - plt.yscale("log") - - if type(plot) == str: - plt.title(plot) - - plt.tight_layout() - - if type(plot) == str: - folder = Path("figures\\FFT\\") - if not folder.exists(): - folder.mkdir(parents=True) - plt.savefig(folder.joinpath(plot + ".png")) - - plt.pause(0.001) - return freqs, fft #%% @@ -251,7 +108,6 @@ def Filter (data: pd.Series | np.ndarray, h: np.ndarray, plot: bool | str = Fals plt.pause(0.001) return filtered - #%% def MakeFilter (*filters: List[np.ndarray]): @@ -259,28 +115,6 @@ def MakeFilter (*filters: List[np.ndarray]): for f in filters: h = np.convolve(h, f) return h -#%% -def PlotFilter (filter: np.ndarray, sampling_rate: int = 1000): - - plt.figure() - - plt.subplot(2,1,1) - plt.title("Impulse response") - plt.xlabel("Sample") - plt.ylabel("Amplitude") - plt.stem(filter) - - plt.subplot(2,1,2) - freqs, fft = FFT(filter, sampling_rate, 9 * len(filter)) - plt.plot(freqs, fft) - plt.title("Frequency Response") - plt.xlabel("Frequency (Hz)") - plt.ylabel("Amplitude") - plt.yscale("log") - plt.grid() - - plt.tight_layout() - plt.pause(0.001) # %% def high_pass (fc: float, sampling_rate: int, N: int = None): @@ -409,3 +243,93 @@ def make_periodic_2 (X: np.ndarray, Y_Current: np.ndarray, Y_Position: np.ndarra return XP, CP, PP, fmov, periods, plateaus +# %% - +def FFT_by ( + data: pd.DataFrame, by: str, + time: str, position: str, current: str + ): + + # Define variables + cls: List[Any] = data[by].unique() + freqs: np.ndarray = None + ffts: List[np.ndarray] = [] + + times = [] + positions = [] + currents = [] + + lengths = [] + + # Making the samples periodic + for c in cls: + subdata = data[data[by] == c] + T = subdata[time].to_numpy() + P = subdata[position].to_numpy() + C = subdata[current].to_numpy() + + TP, CP, PP, fmov, periods, plateaus = make_periodic_2(T, C, P) + + times.append(TP) + positions.append(PP) + currents.append(CP) + lengths.append(len(TP)) + + # Detemining the length of the FFT + N = 8 * np.max(lengths) + + # Creating the FFTs + for i in range(len(times)): + T = times[i] + C = currents[i] + + lag, ac = autocorrelation(T, C) + ac_w = Window(ac) + n = N - len(ac_w) + freq, fft = FFT(ac_w, Ts, n) + + # sig_w = Window(C) + # n = N - len(sig_w) + # freq, fft = FFT(sig_w, Ts, n) + + if freqs is None: + freqs = freq + + ffts.append(fft) + + return cls, freqs, ffts, (times, positions, currents) + +#%% +def make_periodic (time: np.ndarray, position: np.ndarray, currents: List[np.ndarray], W = 5): + + Y2 = np.diff(position - position.mean()) + Y2 /= np.max(Y2) + # Findig all places where the signal is not changing + mask = (np.abs(Y2) > 0.1) * 1 + + start = 0 + plateaus = [] + a,b,w = find_plateau(mask, start, False, W) + while w > 0: + plateaus.append([a,b,w]) + start = b + 1 + a,b,w = find_plateau(mask, start, False, W) + + plateaus = np.int16(plateaus) + + max_w = plateaus[:,2].max() + + stable = plateaus[np.where(np.abs((plateaus[:,2] - max_w) / max_w) < 0.2)] + + zeros = stable[::2] + + start = zeros[0][0] + end = zeros[-1][0] + + periods = len(zeros) - 1 + fmov = periods / (time[end] - time[start]) + + XP = time[start:end].copy() + CP = [ currents[i][start:end].copy() for i in range(len(currents)) ] + PP = position[start:end].copy() + + return XP, CP, PP, fmov, periods, plateaus