diff --git a/tools/database.py b/tools/database.py
index 5ef8bb9e0d44c4206e50fa72b408178a0e28cce2..0b8a8c07d9f0ef5df60e27e984fd3c76f0dc95af 100644
--- a/tools/database.py
+++ b/tools/database.py
@@ -353,20 +353,19 @@ class Database:
# Get the output
data_sysvar, data_trace = Extractor.extract()
data = data_trace if f.trace else data_sysvar
- merged: pd.DataFrame = pd.concat(data.values())
# Add labels
- data_len = len(merged[merged.columns[0]])
- merged["Sample_rate"] = [ f.sampling_time ] * data_len
- merged["run"] = [ index + i ] * data_len
- merged["trace"] = [ 1 if f.trace else 0 ] * data_len
+ data_len = len(data[data.columns[0]])
+ data["Sample_rate"] = [ f.sampling_time ] * data_len
+ data["run"] = [ index + i ] * data_len
+ data["trace"] = [ 1 if f.trace else 0 ] * data_len
Chrono.tick(True)
# Add to database
print(prefix, "Saving file", f.file_name, end=" ... ")
table = f.robot.replace(" ", "_")
- merged.to_sql(table, self.db, if_exists="append", index=False)
+ data.to_sql(table, self.db, if_exists="append", index=False)
Chrono.tick(True)
# raise Exception("")
diff --git a/tools/extractor.py b/tools/extractor.py
index e087df37cc78ed8ecf03e7671eb375e4d74eb150..e9b1758a9e58b2538b7105482069f2cda73c131f 100644
--- a/tools/extractor.py
+++ b/tools/extractor.py
@@ -3,6 +3,7 @@ import traceback
from typing import List, Dict, Tuple
import os
from pathlib import Path
+import numpy as np
class CorrectionCoefficient:
@@ -314,7 +315,7 @@ class DataExtractor:
neo["Class"] = sysvar["Load"]
neo["Speed"] = [ int(speed[:-1]) for speed in sysvar["Speed"] ]
neo["MovingMotor"] = (
- sysvar["A1"]
+ 1 * sysvar["A1"]
+ 2 * sysvar["A2"]
+ 3 * sysvar["A3"]
+ 4 * sysvar["A4"]
@@ -338,7 +339,7 @@ class DataExtractor:
neo["Sample_time"] = trace["Sample_time"]
neo["Class"] = trace["Load"]
neo["Speed"] = trace["Speed"]
- neo["MovingMotor"] = trace["AnalogOut1"]
+ neo["MovingMotor"] = trace["AnalogOut1"] % 7
for i in range(1,7):
neo[f'Position_A{i}'] = trace[f'Position_A{i}']
@@ -349,72 +350,76 @@ class DataExtractor:
return neo
- def extract (self) -> Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]:
+ def extract (self) -> Tuple[pd.DataFrame | None, pd.DataFrame | None]:
"""Extracts only the moments when the motors are moving
Returns:
Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]: The resulting extraction
"""
+ output_sysvar = None
+ output_trace = None
- output_sysvar = {}
- output_trace = {}
+ if self.data_sysvar is not None:
+ output_sysvar = self.normalize_sysvar(self.data_sysvar)
+ if self.data_trace is not None:
+ output_trace = self.normalize_trace(self.data_trace)
+
+ # for i in range(0, 8):
+ # motor = f'A{i}'
+ # if self.data_sysvar is not None:
+ # output_sysvar[motor] = self.normalize_sysvar(self.data_sysvar[self.data_sysvar[motor] == 1])
+ # if self.data_trace is not None:
+ # output_trace[motor] = self.normalize_trace(self.data_trace[self.data_trace["AnalogOut1"] == i])
- for i in range(1, 7):
- motor = f'A{i}'
- if self.data_sysvar is not None:
- output_sysvar[motor] = self.normalize_sysvar(self.data_sysvar[self.data_sysvar[motor] == 1])
- if self.data_trace is not None:
- output_trace[motor] = self.normalize_trace(self.data_trace[self.data_trace["AnalogOut1"] == i])
-
return output_sysvar, output_trace
- def export (self):
- """Extracts and saves to files the moments when the motors are moving
- """
+ # def export (self):
+ # """Extracts and saves to files the moments when the motors are moving
+ # """
- sysvar, trace = self.extract()
+ # sysvar, trace = self.extract()
- # Preparing the file name
- folder = Path(self.export_folder, self.export_prefix)
+ # # Preparing the file name
+ # folder = Path(self.export_folder, self.export_prefix)
- if not os.path.exists(folder):
- os.mkdir(folder)
+ # if not os.path.exists(folder):
+ # os.mkdir(folder)
- if sysvar is not None:
+ # if sysvar is not None:
- # Exporting sys vars data
- for motor in sysvar.keys():
- file = folder.joinpath(f"SysVars - {motor}")
+ # # Exporting sys vars data
+ # for motor in sysvar.keys():
+ # file = folder.joinpath(f"SysVars - {motor}")
- if self.excel:
- sysvar[motor].to_excel(file.with_suffix(".xlsx"))
- else:
- sysvar[motor].to_csv(file.with_suffix(".csv"))
-
- if trace is not None:
-
- # Exporting trace data
- for motor in trace.keys():
- file = folder.joinpath(f"Trace - {motor}")
- if self.excel:
- trace[motor].to_excel(file.with_suffix(".xlsx"))
- else:
- trace[motor].to_csv(file.with_suffix(".csv"))
-
-
- def run (self, sysvar: str | Path, trace: str | Path = None):
- """Loads, corrects and exports the specified data
-
- Args:
- sysvar (str): The file path to System Variables data
- trace (str, optional): The file path to Trace data. Defaults to None.
- """
-
- self.load(sysvar, False)
- if trace is not None:
- self.load(trace, True)
- self.apply_correction()
- self.export()
+ # if self.excel:
+ # sysvar[motor].to_excel(file.with_suffix(".xlsx"))
+ # else:
+ # sysvar[motor].to_csv(file.with_suffix(".csv"))
+
+ # if trace is not None:
+
+ # # Exporting trace data
+ # for motor in trace.keys():
+ # file = folder.joinpath(f"Trace - {motor}")
+ # if self.excel:
+ # trace[motor].to_excel(file.with_suffix(".xlsx"))
+ # else:
+ # trace[motor].to_csv(file.with_suffix(".csv"))
+
+
+ # def run (self, sysvar: str | Path, trace: str | Path = None):
+ # """Loads, corrects and exports the specified data
+
+ # Args:
+ # sysvar (str): The file path to System Variables data
+ # trace (str, optional): The file path to Trace data. Defaults to None.
+ # """
+
+ # self.load(sysvar, False)
+ # if trace is not None:
+ # self.load(trace, True)
+ # self.apply_correction()
+ # self.export()
diff --git a/tools/predictive_indicators.py b/tools/predictive_indicators.py
new file mode 100644
index 0000000000000000000000000000000000000000..0fcd990b9ae8e64e301b9919deb1eab238e74f3a
--- /dev/null
+++ b/tools/predictive_indicators.py
@@ -0,0 +1,110 @@
+"""
+This files contains the logic to compute the indicators used for the
+predictive diagnosis.
+"""
+
+# %% - Include Dependencies
+import numpy as np
+import pandas as pd
+from typing import List, Dict, Tuple, Any
+import re
+
+from .database import Database
+from .processing import *
+from .plots import *
+from time import time
+
+# %% - Constants
+QUERY_DROP_TABLE = lambda r: f"drop table if exists Robot{r}_indicators;"
+QUERY_RUNS = lambda r: f'SELECT `index`, `Name` FROM DataFiles WHERE `Robot` = "Robot_{r}";'
+COMPUTED_COLUMS = [
+ "Time",
+ "RunTime",
+ "Speed",
+ "Axis",
+ "RMS", "KMeans", "PeakFactor", "MeanTemperature"
+]
+# for m in range(1,7):
+# for v in [ "RMS", "KMeans", "PeakFactor", "MeanTemperature" ]:
+# COMPUTED_COLUMS.append(f"{v}_A{m}")
+QUERY_INDICATORS = lambda r: f"SELECT * FROM Robot{r}_indicators;"
+
+# %% - Computation for one run
+def compute_run (DB: Database, robot: int, run: int, t0: int):
+
+ out = []
+
+ data = DB.robot(robot).by_run(run).run()
+
+ # Compute sub-arrays
+ subs: List[pd.DataFrame] = [
+ data[data["MovingMotor"] == m] for m in range(1,7)
+ ]
+
+ # Compute for whole experiment
+ # line = [ t0, 0 ]
+ for m in range(1,7):
+ sub = subs[m - 1]
+ c = sub[f"Current_A{m}"].to_numpy()
+ temp = sub[f"Temperature_A{m}"].to_numpy()
+ line = [
+ # *line,
+ t0, t0, 0,
+ f"A{m}", RMS(c), kmeans(c), peak_factor(c), np.mean(temp)
+ ]
+ out.append(line)
+
+ # out.append(line)
+
+ # Compute for each speed
+ speeds = data['Speed'].unique()
+ speed_data = [
+ data[data["Speed"] == s] for s in speeds
+ ]
+ for i, s in enumerate(speeds):
+ d = speed_data[i]
+ # time = d["Sample_time"].to_numpy()[0] + t0
+ # line = [ time, s ]
+ for m in range(1,7):
+ sub = d[d["MovingMotor"] == m]
+ time = d["Sample_time"].to_numpy()[0] + t0
+ c = sub[f"Current_A{m}"].to_numpy()
+ temp = sub[f"Temperature_A{m}"].to_numpy()
+ line = [
+ # *line,
+ time, t0, s, f"A{m}",
+ RMS(c), kmeans(c), peak_factor(c), np.mean(temp)
+ ]
+ out.append(line)
+ # out.append(line)
+
+ return out
+
+# %% - Transformation function
+def compute_indicators (DB: Database, robot: int):
+
+ if not (1 <= robot <= 3):
+ print("Robot must be between 1 and 3")
+ return
+
+ # Remove any existing computation
+ DB.db.execute(QUERY_DROP_TABLE(robot))
+
+ # Get the list of runs
+ files = pd.read_sql(QUERY_RUNS(robot), DB.db)
+ runs = files['index'].to_list()
+ times = [ name.split(" ")[0] for name in files["Name"] ]
+ times = [ t.split("h") for t in times ]
+ times = [ 3600 * int(t[0]) + 60 * int(t[1]) for t in times ]
+
+ # Compute
+ out = []
+ for i, run in enumerate(runs):
+ print("Robot", robot, "run n°", run)
+ out = [ *out, *compute_run(DB, robot, run, times[i]) ]
+
+ # Save
+ df = pd.DataFrame(out, columns=COMPUTED_COLUMS)
+ df.to_sql(f"Robot{robot}_indicators", DB.db)
+
+ return df
diff --git a/tools/processing.py b/tools/processing.py
index b2904a7b0a1737b4df46de7cf4f7bb77aa5398bd..3c4addc7710980ee7ae2c9f51877f28c9ef22c4f 100644
--- a/tools/processing.py
+++ b/tools/processing.py
@@ -1,4 +1,4 @@
-from typing import List, Tuple, Dict, Any
+from typing import List, Tuple, Dict, Any, Callable
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
@@ -268,7 +268,7 @@ def find_plateau (mask: np.ndarray, start: int, backwards: bool = False, W: int
# %% -
def FFT_by (
data: pd.DataFrame, by: str,
- time: str, position: str, current: str, Ts=4
+ time: str, position: str, current: str, Ts=4, filter: np.ndarray = None
):
"""Makes an FFT for each of the values taken by the "by" variable
@@ -308,8 +308,12 @@ def FFT_by (
P = subdata[position].to_numpy()
C = subdata[current].to_numpy()
+
TP, [CP], PP, fmov, periods, plateaus = make_periodic(T, P, [C])
+ if type(filter) == np.ndarray:
+ CP = Filter(CP, filter)
+
times.append(TP)
positions.append(PP)
currents.append(CP)
@@ -376,22 +380,60 @@ def make_periodic (time: np.ndarray, position: np.ndarray, currents: List[np.nda
return XP, CP, PP, fmov, periods, plateaus
-# %%
-def RMS (signal: np.ndarray) -> float:
- """Computes the root mean square value of the given signal
+# %% -
+def Spectrogram (
+ signal: np.ndarray,
+ widow_width: float = 1, Ts: float = 0.004, no_overlap=False
+ ) -> Tuple[np.ndarray, scs.ShortTimeFFT]:
+
+ """Returns a 2D Spectrogram of the signal
Args:
- signal (np.ndarray): Input signal
+ time (np.ndarray): The time axis
+ signal (np.ndarray): The signal to analyse
+ widow_width (float): The length of the window (in s, 1 by defaut)
+ Ts (float): The sampling rate (in s, 0.004 by default)
+ no_overlap (bool): Disables window overlapping
Returns:
- float: The RMS of the signal
+ Tuple[np.ndarray, scs.ShortTimeFFT: The 2D spectrogram and the STFT object
"""
- return np.sqrt(np.mean(signal ** 2))
-# %%
-def moving_rms (
+ # Sampling frequency
+ Fs = 1 / Ts
+
+ # Width in n° of samples
+ width = int(widow_width / Ts)
+
+ # Overlap
+ hop = width if no_overlap else width // 4
+
+ # Window
+ window = np.hanning(width)
+
+ # Zero-padding
+ pad = len(signal) * 5
+
+ # Get the spectrogram
+ SFT = scs.ShortTimeFFT(window, hop, Fs, mfft=pad, scale_to="magnitude")
+ Sx = SFT.stft(signal)
+
+ return Sx, SFT
+
+def Density_spectrum (time: pd.Series | np.ndarray, signal: pd.Series | np.ndarray):
+ signal = signal - signal.mean()
+ b,a = scs.butter(7,0.99, btype='low', analog=False)
+ signal = scs.filtfilt(b,a,signal)
+ x,y = autocorrelation(time, signal)
+ signal_windowed = moving_window(y, 1)
+ return FFT(signal_windowed ,(time[1]-time[0])*1000, 32*len(signal_windowed) )
+
+
+#%%
+def moving_function (
time: np.ndarray,
signal: np.ndarray,
+ function: Callable[[np.ndarray], float],
window: float = 2,
overlap: float = 0.5
) -> Tuple[np.ndarray, np.ndarray]:
@@ -401,6 +443,7 @@ def moving_rms (
Args:
time (np.ndarray): Time axis
signal (np.ndarray): Signal to compute the RMS from
+ function (Callable[[np.ndarray], np.ndarray]): Function to move on the signal
window (float, optional): Width of the window (in s). Defaults to 2s.
overlap (float): percentage of the window overlapping with the previous one
@@ -420,54 +463,48 @@ def moving_rms (
for k in range(rms_segments):
a = hops[k]
b = a + int(rms_width)
- rms_over_time[k] = RMS(signal[a:b])
+ rms_over_time[k] = function(signal[a:b])
return rms_time, rms_over_time
-# %% -
-def Spectrogram (
- signal: np.ndarray,
- widow_width: float = 1, Ts: float = 0.004, no_overlap=False
- ) -> Tuple[np.ndarray, scs.ShortTimeFFT]:
-
- """Returns a 2D Spectrogram of the signal
+# %%
+def RMS (signal: np.ndarray) -> float:
+ """Computes the root mean square value of the given signal
Args:
- time (np.ndarray): The time axis
- signal (np.ndarray): The signal to analyse
- widow_width (float): The length of the window (in s, 1 by defaut)
- Ts (float): The sampling rate (in s, 0.004 by default)
- no_overlap (bool): Disables window overlapping
+ signal (np.ndarray): Input signal
Returns:
- Tuple[np.ndarray, scs.ShortTimeFFT: The 2D spectrogram and the STFT object
+ float: The RMS of the signal
"""
+ return np.sqrt(np.mean(signal ** 2))
- # Sampling frequency
- Fs = 1 / Ts
-
- # Width in n° of samples
- width = int(widow_width / Ts)
-
- # Overlap
- hop = width if no_overlap else width // 4
-
- # Window
- window = np.hanning(width)
+# %%
+def kmeans (x: np.ndarray) -> float:
+ return np.mean(((x - np.mean(x)) / np.std(x)) ** 4)
- # Zero-padding
- pad = len(signal) * 5
+#%%
+def peak_factor (x: np.ndarray) -> float:
+ return np.max(x) / RMS(x)
- # Get the spectrogram
- SFT = scs.ShortTimeFFT(window, hop, Fs, mfft=pad, scale_to="magnitude")
- Sx = SFT.stft(signal)
+# %%
+def moving_rms (
+ time: np.ndarray,
+ signal: np.ndarray,
+ window: float = 2,
+ overlap: float = 0.5
+ ) -> Tuple[np.ndarray, np.ndarray]:
+ """Computes the RMS value of the signal using a moving window,
+ with 50% overlap. The RMS sampling rate is window / 2.
- return Sx, SFT
+ Args:
+ time (np.ndarray): Time axis
+ signal (np.ndarray): Signal to compute the RMS from
+ window (float, optional): Width of the window (in s). Defaults to 2s.
+ overlap (float): percentage of the window overlapping with the previous one
+
+ Returns:
+ Tuple[np.ndarray, np.ndarray]: Time and RMS axis
+ """
+ return moving_function(time, signal, RMS, window, overlap)
-def Density_spectrum (time: pd.Series | np.ndarray, signal: pd.Series | np.ndarray):
- signal = signal - signal.mean()
- b,a = scs.butter(7,0.99, btype='low', analog=False)
- signal = scs.filtfilt(b,a,signal)
- x,y = autocorrelation(time, signal)
- signal_windowed = moving_window(y, 1)
- return FFT(signal_windowed ,(time[1]-time[0])*1000, 32*len(signal_windowed) )