Skip to content
Snippets Groups Projects
Commit 546fb882 authored by VulcanixFR's avatar VulcanixFR
Browse files

Dissociate processing and plotting

parent d3e380c3
No related merge requests found
......@@ -2,4 +2,5 @@ db
db/*
__pycache__
__pycache__/*
__*.py
__*
figures/*
import numpy as np
import matplotlib.pyplot as plt
from typing import List, Any, Dict, Tuple
import re
from .processing import *
# %%
def PlotFilter (filter: np.ndarray, sampling_rate: int = 1000):
f = plt.figure()
plt.subplot(2,1,1)
plt.title("Impulse response")
plt.xlabel("Sample")
plt.ylabel("Amplitude")
plt.stem(filter)
plt.subplot(2,1,2)
freqs, fft = FFT(filter, sampling_rate, 9 * len(filter))
plt.plot(freqs, fft)
plt.title("Frequency Response")
plt.xlabel("Frequency (Hz)")
plt.ylabel("Amplitude")
plt.yscale("log")
plt.grid()
plt.tight_layout()
return f
# %%
def correlation_plot (lag: np.ndarray, correlation: np.ndarray, name: str = None, save: bool = False):
f = plt.figure()
plt.plot(lag, correlation)
plt.grid()
plt.xlabel("Lag")
plt.ylabel("Correlation")
if type(name) == str:
plt.title(name)
plt.tight_layout()
if type(name) == str and save:
folder = Path("figures\\autocorrelation\\")
if not folder.exists():
folder.mkdir(parents=True)
plt.savefig(folder.joinpath(name + ".png"))
return f
# %%
def FFT_plot (freqs: np.ndarray, fft: np.ndarray, log: bool = False, name: str = None, save: bool = False):
f = plt.figure()
plt.plot(freqs, fft)
plt.grid()
plt.xlabel("Frequency (Hz)")
plt.ylabel("Amplitude" if not log else "Amplitude (dB)")
if log:
plt.yscale("log")
if type(name) == str:
plt.title(name)
plt.tight_layout()
if type(name) == str and save:
folder = Path("figures\\FFT\\")
if not folder.exists():
folder.mkdir(parents=True)
plt.savefig(folder.joinpath(name + ".png"))
return f
# %%
def Time_plot (time: np.ndarray, signal: np.ndarray, name: str = None, save: bool = False):
f = plt.figure()
plt.plot(time, signal)
plt.grid()
plt.xlabel("Frequency (Hz)")
plt.ylabel("Amplitude" if name is None else name)
if type(name) == str:
plt.title(name)
plt.tight_layout()
if type(name) == str and save:
folder = Path("figures\\FFT\\")
if not folder.exists():
folder.mkdir(parents=True)
plt.savefig(folder.joinpath(name + ".png"))
return f
#%%
def FFT_by_plot (cls: List[Any], freqs: np.ndarray, ffts: List[np.ndarray], log: bool = False, name: str = None, save: bool = False):
f = plt.figure()
ax = plt.subplot(1,1,1)
image = np.float64(ffts)
if log:
image = 10 * np.log10(image)
extent = (freqs[0], freqs[-1], 0, len(cls))
im1 = plt.imshow(image, "inferno", origin="lower", aspect="auto", extent=extent)
plt.colorbar(im1, label="Magnitude" + (" (dB)" if log else ""))
ax.set_xlabel("Fequency (Hz)")
ax.set_yticks(range(len(cls)), labels=cls)
ax.grid(axis="y")
plt.suptitle(name)
plt.tight_layout()
if type(name) == str and save:
escaped = re.sub(r"[^\w\d\(\)\ ]", "_", name)
folder = Path("figures\\FFT_by\\")
if not folder.exists():
folder.mkdir(parents=True)
plt.savefig(folder.joinpath(escaped + ".png"))
return f
\ No newline at end of file
......@@ -7,108 +7,13 @@ from pathlib import Path
import scipy.signal as scs
#%%
def make_periodic (time: np.ndarray, signal: np.ndarray, plot: bool | str = False) -> Tuple[np.ndarray, np.ndarray]:
# Center around 0
signal = signal - signal.mean()
# Find 0-crossings
zero_crossings = []
zero_slopes = []
# From 0 to singal - 1 to prevent out of range exception
for i in range(len(signal) - 1):
sign = signal[i + 1] * signal[i]
# zero found between signal[i] and signal[i + 1]
if sign < 0:
zero_crossings.append(i)
slope = signal[i + 1] - signal[i]
zero_slopes.append(slope)
# Finding the first zero that has the same slope as the maximum we can find
first_zero_index = 0
first_zero = zero_slopes[first_zero_index]
max_slope = np.max(np.abs(zero_slopes))
while abs(first_zero) < 0.6 * max_slope:
first_zero_index += 1
first_zero = zero_slopes[first_zero_index]
# Try to find the last zero-crossing with the same slope as the first one
last_zero_index = -1
current_zero = zero_slopes[last_zero_index]
min_slope_difference = abs(current_zero - first_zero)
delta_slope = min_slope_difference
for k in range(2, len(zero_crossings)):
current_zero = zero_slopes[-k]
difference = abs(current_zero - first_zero)
if difference < min_slope_difference:
delta_slope = min_slope_difference - difference
min_slope_difference = difference
last_zero_index = -k
delta = difference - min_slope_difference
if delta > 0.8 * delta_slope:
break
# Plot
x = time[zero_crossings[first_zero_index]:zero_crossings[last_zero_index]]
y = signal[zero_crossings[first_zero_index]:zero_crossings[last_zero_index]]
if plot:
plt.figure()
plt.plot(time, signal, label="Input signal")
plt.plot(x, y, label="Output signal")
plt.scatter([ time[z] for z in zero_crossings ], np.zeros(len(zero_crossings)), c="r", label="Zeros")
plt.grid()
plt.xlabel("Time")
plt.ylabel("Signal")
plt.legend()
if type(plot) == str:
plt.title(plot)
plt.tight_layout()
if type(plot) == str:
folder = Path("figures\\make_periodic\\")
if not folder.exists():
folder.mkdir(parents=True)
plt.savefig(folder.joinpath(plot + ".png"))
plt.pause(0.001)
return x, y
#%%
def autocorrelation (time: np.ndarray, signal: np.ndarray, plot: bool | str = False) -> Tuple[np.ndarray, np.ndarray]:
def autocorrelation (time: np.ndarray, signal: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
time_step = time[1] - time[0]
output_time = np.arange(-len(signal) + 1, len(signal)) * time_step
autocorrelation = np.correlate(signal, signal, "full") / len(signal)
if plot:
plt.figure()
plt.plot(output_time, autocorrelation)
plt.grid()
plt.xlabel("Lag")
plt.ylabel("Autocorrelation")
if type(plot) == str:
plt.title(plot)
plt.tight_layout()
if type(plot) == str:
folder = Path("figures\\autocorrelation\\")
if not folder.exists():
folder.mkdir(parents=True)
plt.savefig(folder.joinpath(plot + ".png"))
plt.pause(0.001)
return output_time, autocorrelation
......@@ -122,14 +27,13 @@ def correlation (time: np.ndarray, signal: np.ndarray, known: np.ndarray) -> Tup
return output_time, correlation
#%%
def Window (data: pd.Series | np.ndarray):
window = np.hanning(len(data))
return window * data
#%%
def moving_window (signal: np.ndarray, iterations: int, plot: bool | str = False):
def moving_window (signal: np.ndarray, iterations: int):
K = 2
window_number = (iterations - 1) * K + 1
......@@ -142,33 +46,10 @@ def moving_window (signal: np.ndarray, iterations: int, plot: bool | str = False
end = start + window_size
output[start:end] += Window(signal[start:end])
if plot:
plt.figure()
plt.plot(output)
plt.grid()
plt.xlabel("Sample")
plt.ylabel("Signal")
if type(plot) == str:
plt.title(plot)
plt.tight_layout()
if type(plot) == str:
folder = Path("figures\\moving_window\\")
if not folder.exists():
folder.mkdir(parents=True)
plt.savefig(folder.joinpath(plot + ".png"))
plt.pause(0.001)
return output
# %%
def FFT (data: pd.Series | np.ndarray, sampling: int, zero_pad: int = 0, plot: bool | str = False, plot_log: bool = True):
def FFT (data: pd.Series | np.ndarray, sampling: int, zero_pad: int = 0):
# Defining the constants
Fe = 1000 / sampling
......@@ -183,30 +64,6 @@ def FFT (data: pd.Series | np.ndarray, sampling: int, zero_pad: int = 0, plot: b
freqs = freqs[:M//2]
fft = fft[:M//2]
if plot:
plt.figure()
plt.plot(freqs, fft)
plt.grid()
plt.xlabel("Frequency (Hz)")
plt.ylabel("Amplitude" if not plot_log else "Amplitude (dB)")
if plot_log:
plt.yscale("log")
if type(plot) == str:
plt.title(plot)
plt.tight_layout()
if type(plot) == str:
folder = Path("figures\\FFT\\")
if not folder.exists():
folder.mkdir(parents=True)
plt.savefig(folder.joinpath(plot + ".png"))
plt.pause(0.001)
return freqs, fft
#%%
......@@ -251,7 +108,6 @@ def Filter (data: pd.Series | np.ndarray, h: np.ndarray, plot: bool | str = Fals
plt.pause(0.001)
return filtered
#%%
def MakeFilter (*filters: List[np.ndarray]):
......@@ -259,28 +115,6 @@ def MakeFilter (*filters: List[np.ndarray]):
for f in filters:
h = np.convolve(h, f)
return h
#%%
def PlotFilter (filter: np.ndarray, sampling_rate: int = 1000):
plt.figure()
plt.subplot(2,1,1)
plt.title("Impulse response")
plt.xlabel("Sample")
plt.ylabel("Amplitude")
plt.stem(filter)
plt.subplot(2,1,2)
freqs, fft = FFT(filter, sampling_rate, 9 * len(filter))
plt.plot(freqs, fft)
plt.title("Frequency Response")
plt.xlabel("Frequency (Hz)")
plt.ylabel("Amplitude")
plt.yscale("log")
plt.grid()
plt.tight_layout()
plt.pause(0.001)
# %%
def high_pass (fc: float, sampling_rate: int, N: int = None):
......@@ -409,3 +243,93 @@ def make_periodic_2 (X: np.ndarray, Y_Current: np.ndarray, Y_Position: np.ndarra
return XP, CP, PP, fmov, periods, plateaus
# %% -
def FFT_by (
data: pd.DataFrame, by: str,
time: str, position: str, current: str
):
# Define variables
cls: List[Any] = data[by].unique()
freqs: np.ndarray = None
ffts: List[np.ndarray] = []
times = []
positions = []
currents = []
lengths = []
# Making the samples periodic
for c in cls:
subdata = data[data[by] == c]
T = subdata[time].to_numpy()
P = subdata[position].to_numpy()
C = subdata[current].to_numpy()
TP, CP, PP, fmov, periods, plateaus = make_periodic_2(T, C, P)
times.append(TP)
positions.append(PP)
currents.append(CP)
lengths.append(len(TP))
# Detemining the length of the FFT
N = 8 * np.max(lengths)
# Creating the FFTs
for i in range(len(times)):
T = times[i]
C = currents[i]
lag, ac = autocorrelation(T, C)
ac_w = Window(ac)
n = N - len(ac_w)
freq, fft = FFT(ac_w, Ts, n)
# sig_w = Window(C)
# n = N - len(sig_w)
# freq, fft = FFT(sig_w, Ts, n)
if freqs is None:
freqs = freq
ffts.append(fft)
return cls, freqs, ffts, (times, positions, currents)
#%%
def make_periodic (time: np.ndarray, position: np.ndarray, currents: List[np.ndarray], W = 5):
Y2 = np.diff(position - position.mean())
Y2 /= np.max(Y2)
# Findig all places where the signal is not changing
mask = (np.abs(Y2) > 0.1) * 1
start = 0
plateaus = []
a,b,w = find_plateau(mask, start, False, W)
while w > 0:
plateaus.append([a,b,w])
start = b + 1
a,b,w = find_plateau(mask, start, False, W)
plateaus = np.int16(plateaus)
max_w = plateaus[:,2].max()
stable = plateaus[np.where(np.abs((plateaus[:,2] - max_w) / max_w) < 0.2)]
zeros = stable[::2]
start = zeros[0][0]
end = zeros[-1][0]
periods = len(zeros) - 1
fmov = periods / (time[end] - time[start])
XP = time[start:end].copy()
CP = [ currents[i][start:end].copy() for i in range(len(currents)) ]
PP = position[start:end].copy()
return XP, CP, PP, fmov, periods, plateaus
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment