Skip to content
Snippets Groups Projects
Commit 36042633 authored by VulcanixFR's avatar VulcanixFR
Browse files

Add functions to processing

parent 5875ed38
Branches
No related merge requests found
......@@ -353,20 +353,19 @@ class Database:
# Get the output
data_sysvar, data_trace = Extractor.extract()
data = data_trace if f.trace else data_sysvar
merged: pd.DataFrame = pd.concat(data.values())
# Add labels
data_len = len(merged[merged.columns[0]])
merged["Sample_rate"] = [ f.sampling_time ] * data_len
merged["run"] = [ index + i ] * data_len
merged["trace"] = [ 1 if f.trace else 0 ] * data_len
data_len = len(data[data.columns[0]])
data["Sample_rate"] = [ f.sampling_time ] * data_len
data["run"] = [ index + i ] * data_len
data["trace"] = [ 1 if f.trace else 0 ] * data_len
Chrono.tick(True)
# Add to database
print(prefix, "Saving file", f.file_name, end=" ... ")
table = f.robot.replace(" ", "_")
merged.to_sql(table, self.db, if_exists="append", index=False)
data.to_sql(table, self.db, if_exists="append", index=False)
Chrono.tick(True)
# raise Exception("")
......
......@@ -3,6 +3,7 @@ import traceback
from typing import List, Dict, Tuple
import os
from pathlib import Path
import numpy as np
class CorrectionCoefficient:
......@@ -314,7 +315,7 @@ class DataExtractor:
neo["Class"] = sysvar["Load"]
neo["Speed"] = [ int(speed[:-1]) for speed in sysvar["Speed"] ]
neo["MovingMotor"] = (
sysvar["A1"]
1 * sysvar["A1"]
+ 2 * sysvar["A2"]
+ 3 * sysvar["A3"]
+ 4 * sysvar["A4"]
......@@ -338,7 +339,7 @@ class DataExtractor:
neo["Sample_time"] = trace["Sample_time"]
neo["Class"] = trace["Load"]
neo["Speed"] = trace["Speed"]
neo["MovingMotor"] = trace["AnalogOut1"]
neo["MovingMotor"] = trace["AnalogOut1"] % 7
for i in range(1,7):
neo[f'Position_A{i}'] = trace[f'Position_A{i}']
......@@ -349,72 +350,76 @@ class DataExtractor:
return neo
def extract (self) -> Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]:
def extract (self) -> Tuple[pd.DataFrame | None, pd.DataFrame | None]:
"""Extracts only the moments when the motors are moving
Returns:
Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]: The resulting extraction
"""
output_sysvar = None
output_trace = None
output_sysvar = {}
output_trace = {}
if self.data_sysvar is not None:
output_sysvar = self.normalize_sysvar(self.data_sysvar)
if self.data_trace is not None:
output_trace = self.normalize_trace(self.data_trace)
# for i in range(0, 8):
# motor = f'A{i}'
# if self.data_sysvar is not None:
# output_sysvar[motor] = self.normalize_sysvar(self.data_sysvar[self.data_sysvar[motor] == 1])
# if self.data_trace is not None:
# output_trace[motor] = self.normalize_trace(self.data_trace[self.data_trace["AnalogOut1"] == i])
for i in range(1, 7):
motor = f'A{i}'
if self.data_sysvar is not None:
output_sysvar[motor] = self.normalize_sysvar(self.data_sysvar[self.data_sysvar[motor] == 1])
if self.data_trace is not None:
output_trace[motor] = self.normalize_trace(self.data_trace[self.data_trace["AnalogOut1"] == i])
return output_sysvar, output_trace
def export (self):
"""Extracts and saves to files the moments when the motors are moving
"""
# def export (self):
# """Extracts and saves to files the moments when the motors are moving
# """
sysvar, trace = self.extract()
# sysvar, trace = self.extract()
# Preparing the file name
folder = Path(self.export_folder, self.export_prefix)
# # Preparing the file name
# folder = Path(self.export_folder, self.export_prefix)
if not os.path.exists(folder):
os.mkdir(folder)
# if not os.path.exists(folder):
# os.mkdir(folder)
if sysvar is not None:
# if sysvar is not None:
# Exporting sys vars data
for motor in sysvar.keys():
file = folder.joinpath(f"SysVars - {motor}")
# # Exporting sys vars data
# for motor in sysvar.keys():
# file = folder.joinpath(f"SysVars - {motor}")
if self.excel:
sysvar[motor].to_excel(file.with_suffix(".xlsx"))
else:
sysvar[motor].to_csv(file.with_suffix(".csv"))
if trace is not None:
# Exporting trace data
for motor in trace.keys():
file = folder.joinpath(f"Trace - {motor}")
if self.excel:
trace[motor].to_excel(file.with_suffix(".xlsx"))
else:
trace[motor].to_csv(file.with_suffix(".csv"))
def run (self, sysvar: str | Path, trace: str | Path = None):
"""Loads, corrects and exports the specified data
Args:
sysvar (str): The file path to System Variables data
trace (str, optional): The file path to Trace data. Defaults to None.
"""
self.load(sysvar, False)
if trace is not None:
self.load(trace, True)
self.apply_correction()
self.export()
# if self.excel:
# sysvar[motor].to_excel(file.with_suffix(".xlsx"))
# else:
# sysvar[motor].to_csv(file.with_suffix(".csv"))
# if trace is not None:
# # Exporting trace data
# for motor in trace.keys():
# file = folder.joinpath(f"Trace - {motor}")
# if self.excel:
# trace[motor].to_excel(file.with_suffix(".xlsx"))
# else:
# trace[motor].to_csv(file.with_suffix(".csv"))
# def run (self, sysvar: str | Path, trace: str | Path = None):
# """Loads, corrects and exports the specified data
# Args:
# sysvar (str): The file path to System Variables data
# trace (str, optional): The file path to Trace data. Defaults to None.
# """
# self.load(sysvar, False)
# if trace is not None:
# self.load(trace, True)
# self.apply_correction()
# self.export()
......
"""
This files contains the logic to compute the indicators used for the
predictive diagnosis.
"""
# %% - Include Dependencies
import numpy as np
import pandas as pd
from typing import List, Dict, Tuple, Any
import re
from .database import Database
from .processing import *
from .plots import *
from time import time
# %% - Constants
QUERY_DROP_TABLE = lambda r: f"drop table if exists Robot{r}_indicators;"
QUERY_RUNS = lambda r: f'SELECT `index`, `Name` FROM DataFiles WHERE `Robot` = "Robot_{r}";'
COMPUTED_COLUMS = [
"Time",
"RunTime",
"Speed",
"Axis",
"RMS", "KMeans", "PeakFactor", "MeanTemperature"
]
# for m in range(1,7):
# for v in [ "RMS", "KMeans", "PeakFactor", "MeanTemperature" ]:
# COMPUTED_COLUMS.append(f"{v}_A{m}")
QUERY_INDICATORS = lambda r: f"SELECT * FROM Robot{r}_indicators;"
# %% - Computation for one run
def compute_run (DB: Database, robot: int, run: int, t0: int):
out = []
data = DB.robot(robot).by_run(run).run()
# Compute sub-arrays
subs: List[pd.DataFrame] = [
data[data["MovingMotor"] == m] for m in range(1,7)
]
# Compute for whole experiment
# line = [ t0, 0 ]
for m in range(1,7):
sub = subs[m - 1]
c = sub[f"Current_A{m}"].to_numpy()
temp = sub[f"Temperature_A{m}"].to_numpy()
line = [
# *line,
t0, t0, 0,
f"A{m}", RMS(c), kmeans(c), peak_factor(c), np.mean(temp)
]
out.append(line)
# out.append(line)
# Compute for each speed
speeds = data['Speed'].unique()
speed_data = [
data[data["Speed"] == s] for s in speeds
]
for i, s in enumerate(speeds):
d = speed_data[i]
# time = d["Sample_time"].to_numpy()[0] + t0
# line = [ time, s ]
for m in range(1,7):
sub = d[d["MovingMotor"] == m]
time = d["Sample_time"].to_numpy()[0] + t0
c = sub[f"Current_A{m}"].to_numpy()
temp = sub[f"Temperature_A{m}"].to_numpy()
line = [
# *line,
time, t0, s, f"A{m}",
RMS(c), kmeans(c), peak_factor(c), np.mean(temp)
]
out.append(line)
# out.append(line)
return out
# %% - Transformation function
def compute_indicators (DB: Database, robot: int):
if not (1 <= robot <= 3):
print("Robot must be between 1 and 3")
return
# Remove any existing computation
DB.db.execute(QUERY_DROP_TABLE(robot))
# Get the list of runs
files = pd.read_sql(QUERY_RUNS(robot), DB.db)
runs = files['index'].to_list()
times = [ name.split(" ")[0] for name in files["Name"] ]
times = [ t.split("h") for t in times ]
times = [ 3600 * int(t[0]) + 60 * int(t[1]) for t in times ]
# Compute
out = []
for i, run in enumerate(runs):
print("Robot", robot, "run n°", run)
out = [ *out, *compute_run(DB, robot, run, times[i]) ]
# Save
df = pd.DataFrame(out, columns=COMPUTED_COLUMS)
df.to_sql(f"Robot{robot}_indicators", DB.db)
return df
from typing import List, Tuple, Dict, Any
from typing import List, Tuple, Dict, Any, Callable
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
......@@ -268,7 +268,7 @@ def find_plateau (mask: np.ndarray, start: int, backwards: bool = False, W: int
# %% -
def FFT_by (
data: pd.DataFrame, by: str,
time: str, position: str, current: str, Ts=4
time: str, position: str, current: str, Ts=4, filter: np.ndarray = None
):
"""Makes an FFT for each of the values taken by the "by" variable
......@@ -308,8 +308,12 @@ def FFT_by (
P = subdata[position].to_numpy()
C = subdata[current].to_numpy()
TP, [CP], PP, fmov, periods, plateaus = make_periodic(T, P, [C])
if type(filter) == np.ndarray:
CP = Filter(CP, filter)
times.append(TP)
positions.append(PP)
currents.append(CP)
......@@ -376,22 +380,60 @@ def make_periodic (time: np.ndarray, position: np.ndarray, currents: List[np.nda
return XP, CP, PP, fmov, periods, plateaus
# %%
def RMS (signal: np.ndarray) -> float:
"""Computes the root mean square value of the given signal
# %% -
def Spectrogram (
signal: np.ndarray,
widow_width: float = 1, Ts: float = 0.004, no_overlap=False
) -> Tuple[np.ndarray, scs.ShortTimeFFT]:
"""Returns a 2D Spectrogram of the signal
Args:
signal (np.ndarray): Input signal
time (np.ndarray): The time axis
signal (np.ndarray): The signal to analyse
widow_width (float): The length of the window (in s, 1 by defaut)
Ts (float): The sampling rate (in s, 0.004 by default)
no_overlap (bool): Disables window overlapping
Returns:
float: The RMS of the signal
Tuple[np.ndarray, scs.ShortTimeFFT: The 2D spectrogram and the STFT object
"""
return np.sqrt(np.mean(signal ** 2))
# %%
def moving_rms (
# Sampling frequency
Fs = 1 / Ts
# Width in n° of samples
width = int(widow_width / Ts)
# Overlap
hop = width if no_overlap else width // 4
# Window
window = np.hanning(width)
# Zero-padding
pad = len(signal) * 5
# Get the spectrogram
SFT = scs.ShortTimeFFT(window, hop, Fs, mfft=pad, scale_to="magnitude")
Sx = SFT.stft(signal)
return Sx, SFT
def Density_spectrum (time: pd.Series | np.ndarray, signal: pd.Series | np.ndarray):
signal = signal - signal.mean()
b,a = scs.butter(7,0.99, btype='low', analog=False)
signal = scs.filtfilt(b,a,signal)
x,y = autocorrelation(time, signal)
signal_windowed = moving_window(y, 1)
return FFT(signal_windowed ,(time[1]-time[0])*1000, 32*len(signal_windowed) )
#%%
def moving_function (
time: np.ndarray,
signal: np.ndarray,
function: Callable[[np.ndarray], float],
window: float = 2,
overlap: float = 0.5
) -> Tuple[np.ndarray, np.ndarray]:
......@@ -401,6 +443,7 @@ def moving_rms (
Args:
time (np.ndarray): Time axis
signal (np.ndarray): Signal to compute the RMS from
function (Callable[[np.ndarray], np.ndarray]): Function to move on the signal
window (float, optional): Width of the window (in s). Defaults to 2s.
overlap (float): percentage of the window overlapping with the previous one
......@@ -420,54 +463,48 @@ def moving_rms (
for k in range(rms_segments):
a = hops[k]
b = a + int(rms_width)
rms_over_time[k] = RMS(signal[a:b])
rms_over_time[k] = function(signal[a:b])
return rms_time, rms_over_time
# %% -
def Spectrogram (
signal: np.ndarray,
widow_width: float = 1, Ts: float = 0.004, no_overlap=False
) -> Tuple[np.ndarray, scs.ShortTimeFFT]:
"""Returns a 2D Spectrogram of the signal
# %%
def RMS (signal: np.ndarray) -> float:
"""Computes the root mean square value of the given signal
Args:
time (np.ndarray): The time axis
signal (np.ndarray): The signal to analyse
widow_width (float): The length of the window (in s, 1 by defaut)
Ts (float): The sampling rate (in s, 0.004 by default)
no_overlap (bool): Disables window overlapping
signal (np.ndarray): Input signal
Returns:
Tuple[np.ndarray, scs.ShortTimeFFT: The 2D spectrogram and the STFT object
float: The RMS of the signal
"""
return np.sqrt(np.mean(signal ** 2))
# Sampling frequency
Fs = 1 / Ts
# Width in n° of samples
width = int(widow_width / Ts)
# Overlap
hop = width if no_overlap else width // 4
# Window
window = np.hanning(width)
# %%
def kmeans (x: np.ndarray) -> float:
return np.mean(((x - np.mean(x)) / np.std(x)) ** 4)
# Zero-padding
pad = len(signal) * 5
#%%
def peak_factor (x: np.ndarray) -> float:
return np.max(x) / RMS(x)
# Get the spectrogram
SFT = scs.ShortTimeFFT(window, hop, Fs, mfft=pad, scale_to="magnitude")
Sx = SFT.stft(signal)
# %%
def moving_rms (
time: np.ndarray,
signal: np.ndarray,
window: float = 2,
overlap: float = 0.5
) -> Tuple[np.ndarray, np.ndarray]:
"""Computes the RMS value of the signal using a moving window,
with 50% overlap. The RMS sampling rate is window / 2.
return Sx, SFT
Args:
time (np.ndarray): Time axis
signal (np.ndarray): Signal to compute the RMS from
window (float, optional): Width of the window (in s). Defaults to 2s.
overlap (float): percentage of the window overlapping with the previous one
Returns:
Tuple[np.ndarray, np.ndarray]: Time and RMS axis
"""
return moving_function(time, signal, RMS, window, overlap)
def Density_spectrum (time: pd.Series | np.ndarray, signal: pd.Series | np.ndarray):
signal = signal - signal.mean()
b,a = scs.butter(7,0.99, btype='low', analog=False)
signal = scs.filtfilt(b,a,signal)
x,y = autocorrelation(time, signal)
signal_windowed = moving_window(y, 1)
return FFT(signal_windowed ,(time[1]-time[0])*1000, 32*len(signal_windowed) )
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment