From 36042633d451fdfe6cb9b07115811e10b98369c7 Mon Sep 17 00:00:00 2001 From: VulcanixFR <vulcanix.gamingfr@gmail.com> Date: Wed, 31 Jul 2024 15:18:02 +0200 Subject: [PATCH] Add functions to processing --- tools/database.py | 11 ++- tools/extractor.py | 111 ++++++++++++++------------- tools/predictive_indicators.py | 110 +++++++++++++++++++++++++++ tools/processing.py | 133 +++++++++++++++++++++------------ 4 files changed, 258 insertions(+), 107 deletions(-) create mode 100644 tools/predictive_indicators.py diff --git a/tools/database.py b/tools/database.py index 5ef8bb9..0b8a8c0 100644 --- a/tools/database.py +++ b/tools/database.py @@ -353,20 +353,19 @@ class Database: # Get the output data_sysvar, data_trace = Extractor.extract() data = data_trace if f.trace else data_sysvar - merged: pd.DataFrame = pd.concat(data.values()) # Add labels - data_len = len(merged[merged.columns[0]]) - merged["Sample_rate"] = [ f.sampling_time ] * data_len - merged["run"] = [ index + i ] * data_len - merged["trace"] = [ 1 if f.trace else 0 ] * data_len + data_len = len(data[data.columns[0]]) + data["Sample_rate"] = [ f.sampling_time ] * data_len + data["run"] = [ index + i ] * data_len + data["trace"] = [ 1 if f.trace else 0 ] * data_len Chrono.tick(True) # Add to database print(prefix, "Saving file", f.file_name, end=" ... ") table = f.robot.replace(" ", "_") - merged.to_sql(table, self.db, if_exists="append", index=False) + data.to_sql(table, self.db, if_exists="append", index=False) Chrono.tick(True) # raise Exception("") diff --git a/tools/extractor.py b/tools/extractor.py index e087df3..e9b1758 100644 --- a/tools/extractor.py +++ b/tools/extractor.py @@ -3,6 +3,7 @@ import traceback from typing import List, Dict, Tuple import os from pathlib import Path +import numpy as np class CorrectionCoefficient: @@ -314,7 +315,7 @@ class DataExtractor: neo["Class"] = sysvar["Load"] neo["Speed"] = [ int(speed[:-1]) for speed in sysvar["Speed"] ] neo["MovingMotor"] = ( - sysvar["A1"] + 1 * sysvar["A1"] + 2 * sysvar["A2"] + 3 * sysvar["A3"] + 4 * sysvar["A4"] @@ -338,7 +339,7 @@ class DataExtractor: neo["Sample_time"] = trace["Sample_time"] neo["Class"] = trace["Load"] neo["Speed"] = trace["Speed"] - neo["MovingMotor"] = trace["AnalogOut1"] + neo["MovingMotor"] = trace["AnalogOut1"] % 7 for i in range(1,7): neo[f'Position_A{i}'] = trace[f'Position_A{i}'] @@ -349,72 +350,76 @@ class DataExtractor: return neo - def extract (self) -> Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]: + def extract (self) -> Tuple[pd.DataFrame | None, pd.DataFrame | None]: """Extracts only the moments when the motors are moving Returns: Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]: The resulting extraction """ + output_sysvar = None + output_trace = None - output_sysvar = {} - output_trace = {} + if self.data_sysvar is not None: + output_sysvar = self.normalize_sysvar(self.data_sysvar) + if self.data_trace is not None: + output_trace = self.normalize_trace(self.data_trace) + + # for i in range(0, 8): + # motor = f'A{i}' + # if self.data_sysvar is not None: + # output_sysvar[motor] = self.normalize_sysvar(self.data_sysvar[self.data_sysvar[motor] == 1]) + # if self.data_trace is not None: + # output_trace[motor] = self.normalize_trace(self.data_trace[self.data_trace["AnalogOut1"] == i]) - for i in range(1, 7): - motor = f'A{i}' - if self.data_sysvar is not None: - output_sysvar[motor] = self.normalize_sysvar(self.data_sysvar[self.data_sysvar[motor] == 1]) - if self.data_trace is not None: - output_trace[motor] = self.normalize_trace(self.data_trace[self.data_trace["AnalogOut1"] == i]) - return output_sysvar, output_trace - def export (self): - """Extracts and saves to files the moments when the motors are moving - """ + # def export (self): + # """Extracts and saves to files the moments when the motors are moving + # """ - sysvar, trace = self.extract() + # sysvar, trace = self.extract() - # Preparing the file name - folder = Path(self.export_folder, self.export_prefix) + # # Preparing the file name + # folder = Path(self.export_folder, self.export_prefix) - if not os.path.exists(folder): - os.mkdir(folder) + # if not os.path.exists(folder): + # os.mkdir(folder) - if sysvar is not None: + # if sysvar is not None: - # Exporting sys vars data - for motor in sysvar.keys(): - file = folder.joinpath(f"SysVars - {motor}") + # # Exporting sys vars data + # for motor in sysvar.keys(): + # file = folder.joinpath(f"SysVars - {motor}") - if self.excel: - sysvar[motor].to_excel(file.with_suffix(".xlsx")) - else: - sysvar[motor].to_csv(file.with_suffix(".csv")) - - if trace is not None: - - # Exporting trace data - for motor in trace.keys(): - file = folder.joinpath(f"Trace - {motor}") - if self.excel: - trace[motor].to_excel(file.with_suffix(".xlsx")) - else: - trace[motor].to_csv(file.with_suffix(".csv")) - - - def run (self, sysvar: str | Path, trace: str | Path = None): - """Loads, corrects and exports the specified data - - Args: - sysvar (str): The file path to System Variables data - trace (str, optional): The file path to Trace data. Defaults to None. - """ - - self.load(sysvar, False) - if trace is not None: - self.load(trace, True) - self.apply_correction() - self.export() + # if self.excel: + # sysvar[motor].to_excel(file.with_suffix(".xlsx")) + # else: + # sysvar[motor].to_csv(file.with_suffix(".csv")) + + # if trace is not None: + + # # Exporting trace data + # for motor in trace.keys(): + # file = folder.joinpath(f"Trace - {motor}") + # if self.excel: + # trace[motor].to_excel(file.with_suffix(".xlsx")) + # else: + # trace[motor].to_csv(file.with_suffix(".csv")) + + + # def run (self, sysvar: str | Path, trace: str | Path = None): + # """Loads, corrects and exports the specified data + + # Args: + # sysvar (str): The file path to System Variables data + # trace (str, optional): The file path to Trace data. Defaults to None. + # """ + + # self.load(sysvar, False) + # if trace is not None: + # self.load(trace, True) + # self.apply_correction() + # self.export() diff --git a/tools/predictive_indicators.py b/tools/predictive_indicators.py new file mode 100644 index 0000000..0fcd990 --- /dev/null +++ b/tools/predictive_indicators.py @@ -0,0 +1,110 @@ +""" +This files contains the logic to compute the indicators used for the +predictive diagnosis. +""" + +# %% - Include Dependencies +import numpy as np +import pandas as pd +from typing import List, Dict, Tuple, Any +import re + +from .database import Database +from .processing import * +from .plots import * +from time import time + +# %% - Constants +QUERY_DROP_TABLE = lambda r: f"drop table if exists Robot{r}_indicators;" +QUERY_RUNS = lambda r: f'SELECT `index`, `Name` FROM DataFiles WHERE `Robot` = "Robot_{r}";' +COMPUTED_COLUMS = [ + "Time", + "RunTime", + "Speed", + "Axis", + "RMS", "KMeans", "PeakFactor", "MeanTemperature" +] +# for m in range(1,7): +# for v in [ "RMS", "KMeans", "PeakFactor", "MeanTemperature" ]: +# COMPUTED_COLUMS.append(f"{v}_A{m}") +QUERY_INDICATORS = lambda r: f"SELECT * FROM Robot{r}_indicators;" + +# %% - Computation for one run +def compute_run (DB: Database, robot: int, run: int, t0: int): + + out = [] + + data = DB.robot(robot).by_run(run).run() + + # Compute sub-arrays + subs: List[pd.DataFrame] = [ + data[data["MovingMotor"] == m] for m in range(1,7) + ] + + # Compute for whole experiment + # line = [ t0, 0 ] + for m in range(1,7): + sub = subs[m - 1] + c = sub[f"Current_A{m}"].to_numpy() + temp = sub[f"Temperature_A{m}"].to_numpy() + line = [ + # *line, + t0, t0, 0, + f"A{m}", RMS(c), kmeans(c), peak_factor(c), np.mean(temp) + ] + out.append(line) + + # out.append(line) + + # Compute for each speed + speeds = data['Speed'].unique() + speed_data = [ + data[data["Speed"] == s] for s in speeds + ] + for i, s in enumerate(speeds): + d = speed_data[i] + # time = d["Sample_time"].to_numpy()[0] + t0 + # line = [ time, s ] + for m in range(1,7): + sub = d[d["MovingMotor"] == m] + time = d["Sample_time"].to_numpy()[0] + t0 + c = sub[f"Current_A{m}"].to_numpy() + temp = sub[f"Temperature_A{m}"].to_numpy() + line = [ + # *line, + time, t0, s, f"A{m}", + RMS(c), kmeans(c), peak_factor(c), np.mean(temp) + ] + out.append(line) + # out.append(line) + + return out + +# %% - Transformation function +def compute_indicators (DB: Database, robot: int): + + if not (1 <= robot <= 3): + print("Robot must be between 1 and 3") + return + + # Remove any existing computation + DB.db.execute(QUERY_DROP_TABLE(robot)) + + # Get the list of runs + files = pd.read_sql(QUERY_RUNS(robot), DB.db) + runs = files['index'].to_list() + times = [ name.split(" ")[0] for name in files["Name"] ] + times = [ t.split("h") for t in times ] + times = [ 3600 * int(t[0]) + 60 * int(t[1]) for t in times ] + + # Compute + out = [] + for i, run in enumerate(runs): + print("Robot", robot, "run n°", run) + out = [ *out, *compute_run(DB, robot, run, times[i]) ] + + # Save + df = pd.DataFrame(out, columns=COMPUTED_COLUMS) + df.to_sql(f"Robot{robot}_indicators", DB.db) + + return df diff --git a/tools/processing.py b/tools/processing.py index b2904a7..3c4addc 100644 --- a/tools/processing.py +++ b/tools/processing.py @@ -1,4 +1,4 @@ -from typing import List, Tuple, Dict, Any +from typing import List, Tuple, Dict, Any, Callable import pandas as pd import numpy as np import matplotlib.pyplot as plt @@ -268,7 +268,7 @@ def find_plateau (mask: np.ndarray, start: int, backwards: bool = False, W: int # %% - def FFT_by ( data: pd.DataFrame, by: str, - time: str, position: str, current: str, Ts=4 + time: str, position: str, current: str, Ts=4, filter: np.ndarray = None ): """Makes an FFT for each of the values taken by the "by" variable @@ -308,8 +308,12 @@ def FFT_by ( P = subdata[position].to_numpy() C = subdata[current].to_numpy() + TP, [CP], PP, fmov, periods, plateaus = make_periodic(T, P, [C]) + if type(filter) == np.ndarray: + CP = Filter(CP, filter) + times.append(TP) positions.append(PP) currents.append(CP) @@ -376,22 +380,60 @@ def make_periodic (time: np.ndarray, position: np.ndarray, currents: List[np.nda return XP, CP, PP, fmov, periods, plateaus -# %% -def RMS (signal: np.ndarray) -> float: - """Computes the root mean square value of the given signal +# %% - +def Spectrogram ( + signal: np.ndarray, + widow_width: float = 1, Ts: float = 0.004, no_overlap=False + ) -> Tuple[np.ndarray, scs.ShortTimeFFT]: + + """Returns a 2D Spectrogram of the signal Args: - signal (np.ndarray): Input signal + time (np.ndarray): The time axis + signal (np.ndarray): The signal to analyse + widow_width (float): The length of the window (in s, 1 by defaut) + Ts (float): The sampling rate (in s, 0.004 by default) + no_overlap (bool): Disables window overlapping Returns: - float: The RMS of the signal + Tuple[np.ndarray, scs.ShortTimeFFT: The 2D spectrogram and the STFT object """ - return np.sqrt(np.mean(signal ** 2)) -# %% -def moving_rms ( + # Sampling frequency + Fs = 1 / Ts + + # Width in n° of samples + width = int(widow_width / Ts) + + # Overlap + hop = width if no_overlap else width // 4 + + # Window + window = np.hanning(width) + + # Zero-padding + pad = len(signal) * 5 + + # Get the spectrogram + SFT = scs.ShortTimeFFT(window, hop, Fs, mfft=pad, scale_to="magnitude") + Sx = SFT.stft(signal) + + return Sx, SFT + +def Density_spectrum (time: pd.Series | np.ndarray, signal: pd.Series | np.ndarray): + signal = signal - signal.mean() + b,a = scs.butter(7,0.99, btype='low', analog=False) + signal = scs.filtfilt(b,a,signal) + x,y = autocorrelation(time, signal) + signal_windowed = moving_window(y, 1) + return FFT(signal_windowed ,(time[1]-time[0])*1000, 32*len(signal_windowed) ) + + +#%% +def moving_function ( time: np.ndarray, signal: np.ndarray, + function: Callable[[np.ndarray], float], window: float = 2, overlap: float = 0.5 ) -> Tuple[np.ndarray, np.ndarray]: @@ -401,6 +443,7 @@ def moving_rms ( Args: time (np.ndarray): Time axis signal (np.ndarray): Signal to compute the RMS from + function (Callable[[np.ndarray], np.ndarray]): Function to move on the signal window (float, optional): Width of the window (in s). Defaults to 2s. overlap (float): percentage of the window overlapping with the previous one @@ -420,54 +463,48 @@ def moving_rms ( for k in range(rms_segments): a = hops[k] b = a + int(rms_width) - rms_over_time[k] = RMS(signal[a:b]) + rms_over_time[k] = function(signal[a:b]) return rms_time, rms_over_time -# %% - -def Spectrogram ( - signal: np.ndarray, - widow_width: float = 1, Ts: float = 0.004, no_overlap=False - ) -> Tuple[np.ndarray, scs.ShortTimeFFT]: - - """Returns a 2D Spectrogram of the signal +# %% +def RMS (signal: np.ndarray) -> float: + """Computes the root mean square value of the given signal Args: - time (np.ndarray): The time axis - signal (np.ndarray): The signal to analyse - widow_width (float): The length of the window (in s, 1 by defaut) - Ts (float): The sampling rate (in s, 0.004 by default) - no_overlap (bool): Disables window overlapping + signal (np.ndarray): Input signal Returns: - Tuple[np.ndarray, scs.ShortTimeFFT: The 2D spectrogram and the STFT object + float: The RMS of the signal """ + return np.sqrt(np.mean(signal ** 2)) - # Sampling frequency - Fs = 1 / Ts - - # Width in n° of samples - width = int(widow_width / Ts) - - # Overlap - hop = width if no_overlap else width // 4 - - # Window - window = np.hanning(width) +# %% +def kmeans (x: np.ndarray) -> float: + return np.mean(((x - np.mean(x)) / np.std(x)) ** 4) - # Zero-padding - pad = len(signal) * 5 +#%% +def peak_factor (x: np.ndarray) -> float: + return np.max(x) / RMS(x) - # Get the spectrogram - SFT = scs.ShortTimeFFT(window, hop, Fs, mfft=pad, scale_to="magnitude") - Sx = SFT.stft(signal) +# %% +def moving_rms ( + time: np.ndarray, + signal: np.ndarray, + window: float = 2, + overlap: float = 0.5 + ) -> Tuple[np.ndarray, np.ndarray]: + """Computes the RMS value of the signal using a moving window, + with 50% overlap. The RMS sampling rate is window / 2. - return Sx, SFT + Args: + time (np.ndarray): Time axis + signal (np.ndarray): Signal to compute the RMS from + window (float, optional): Width of the window (in s). Defaults to 2s. + overlap (float): percentage of the window overlapping with the previous one + + Returns: + Tuple[np.ndarray, np.ndarray]: Time and RMS axis + """ + return moving_function(time, signal, RMS, window, overlap) -def Density_spectrum (time: pd.Series | np.ndarray, signal: pd.Series | np.ndarray): - signal = signal - signal.mean() - b,a = scs.butter(7,0.99, btype='low', analog=False) - signal = scs.filtfilt(b,a,signal) - x,y = autocorrelation(time, signal) - signal_windowed = moving_window(y, 1) - return FFT(signal_windowed ,(time[1]-time[0])*1000, 32*len(signal_windowed) ) -- GitLab