Skip to content
Snippets Groups Projects
Commit f324afdf authored by antcsny's avatar antcsny
Browse files
parents 5c4af7b0 3de03613
No related merge requests found
......@@ -317,7 +317,7 @@ class Database:
# Adds the file description to the database
try:
md = pd.read_sql_query(f"SELECT `index` FROM {_DB_METADATA_TABLE}")
md = pd.read_sql_query(f"SELECT `index` FROM {_DB_METADATA_TABLE}", self.db)
index = md["index"].to_numpy()[-1] + 1
except:
index = 0
......@@ -353,20 +353,19 @@ class Database:
# Get the output
data_sysvar, data_trace = Extractor.extract()
data = data_trace if f.trace else data_sysvar
merged: pd.DataFrame = pd.concat(data.values())
# Add labels
data_len = len(merged[merged.columns[0]])
merged["Sample_rate"] = [ f.sampling_time ] * data_len
merged["run"] = [ index + i ] * data_len
merged["trace"] = [ 1 if f.trace else 0 ] * data_len
data_len = len(data[data.columns[0]])
data["Sample_rate"] = [ f.sampling_time ] * data_len
data["run"] = [ index + i ] * data_len
data["trace"] = [ 1 if f.trace else 0 ] * data_len
Chrono.tick(True)
# Add to database
print(prefix, "Saving file", f.file_name, end=" ... ")
table = f.robot.replace(" ", "_")
merged.to_sql(table, self.db, if_exists="append", index=False)
data.to_sql(table, self.db, if_exists="append", index=False)
Chrono.tick(True)
# raise Exception("")
......
......@@ -3,6 +3,7 @@ import traceback
from typing import List, Dict, Tuple
import os
from pathlib import Path
import numpy as np
class CorrectionCoefficient:
......@@ -314,7 +315,7 @@ class DataExtractor:
neo["Class"] = sysvar["Load"]
neo["Speed"] = [ int(speed[:-1]) for speed in sysvar["Speed"] ]
neo["MovingMotor"] = (
sysvar["A1"]
1 * sysvar["A1"]
+ 2 * sysvar["A2"]
+ 3 * sysvar["A3"]
+ 4 * sysvar["A4"]
......@@ -338,7 +339,7 @@ class DataExtractor:
neo["Sample_time"] = trace["Sample_time"]
neo["Class"] = trace["Load"]
neo["Speed"] = trace["Speed"]
neo["MovingMotor"] = trace["AnalogOut1"]
neo["MovingMotor"] = trace["AnalogOut1"] % 7
for i in range(1,7):
neo[f'Position_A{i}'] = trace[f'Position_A{i}']
......@@ -349,72 +350,76 @@ class DataExtractor:
return neo
def extract (self) -> Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]:
def extract (self) -> Tuple[pd.DataFrame | None, pd.DataFrame | None]:
"""Extracts only the moments when the motors are moving
Returns:
Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]: The resulting extraction
"""
output_sysvar = None
output_trace = None
output_sysvar = {}
output_trace = {}
if self.data_sysvar is not None:
output_sysvar = self.normalize_sysvar(self.data_sysvar)
if self.data_trace is not None:
output_trace = self.normalize_trace(self.data_trace)
# for i in range(0, 8):
# motor = f'A{i}'
# if self.data_sysvar is not None:
# output_sysvar[motor] = self.normalize_sysvar(self.data_sysvar[self.data_sysvar[motor] == 1])
# if self.data_trace is not None:
# output_trace[motor] = self.normalize_trace(self.data_trace[self.data_trace["AnalogOut1"] == i])
for i in range(1, 7):
motor = f'A{i}'
if self.data_sysvar is not None:
output_sysvar[motor] = self.normalize_sysvar(self.data_sysvar[self.data_sysvar[motor] == 1])
if self.data_trace is not None:
output_trace[motor] = self.normalize_trace(self.data_trace[self.data_trace["AnalogOut1"] == i])
return output_sysvar, output_trace
def export (self):
"""Extracts and saves to files the moments when the motors are moving
"""
# def export (self):
# """Extracts and saves to files the moments when the motors are moving
# """
sysvar, trace = self.extract()
# sysvar, trace = self.extract()
# Preparing the file name
folder = Path(self.export_folder, self.export_prefix)
# # Preparing the file name
# folder = Path(self.export_folder, self.export_prefix)
if not os.path.exists(folder):
os.mkdir(folder)
# if not os.path.exists(folder):
# os.mkdir(folder)
if sysvar is not None:
# if sysvar is not None:
# Exporting sys vars data
for motor in sysvar.keys():
file = folder.joinpath(f"SysVars - {motor}")
# # Exporting sys vars data
# for motor in sysvar.keys():
# file = folder.joinpath(f"SysVars - {motor}")
if self.excel:
sysvar[motor].to_excel(file.with_suffix(".xlsx"))
else:
sysvar[motor].to_csv(file.with_suffix(".csv"))
if trace is not None:
# Exporting trace data
for motor in trace.keys():
file = folder.joinpath(f"Trace - {motor}")
if self.excel:
trace[motor].to_excel(file.with_suffix(".xlsx"))
else:
trace[motor].to_csv(file.with_suffix(".csv"))
def run (self, sysvar: str | Path, trace: str | Path = None):
"""Loads, corrects and exports the specified data
Args:
sysvar (str): The file path to System Variables data
trace (str, optional): The file path to Trace data. Defaults to None.
"""
self.load(sysvar, False)
if trace is not None:
self.load(trace, True)
self.apply_correction()
self.export()
# if self.excel:
# sysvar[motor].to_excel(file.with_suffix(".xlsx"))
# else:
# sysvar[motor].to_csv(file.with_suffix(".csv"))
# if trace is not None:
# # Exporting trace data
# for motor in trace.keys():
# file = folder.joinpath(f"Trace - {motor}")
# if self.excel:
# trace[motor].to_excel(file.with_suffix(".xlsx"))
# else:
# trace[motor].to_csv(file.with_suffix(".csv"))
# def run (self, sysvar: str | Path, trace: str | Path = None):
# """Loads, corrects and exports the specified data
# Args:
# sysvar (str): The file path to System Variables data
# trace (str, optional): The file path to Trace data. Defaults to None.
# """
# self.load(sysvar, False)
# if trace is not None:
# self.load(trace, True)
# self.apply_correction()
# self.export()
......
"""
This files contains the logic to compute the indicators used for the
predictive diagnosis.
"""
# %% - Include Dependencies
import numpy as np
import pandas as pd
from typing import List, Dict, Tuple, Any
import re
from .database import Database
from .processing import *
from .plots import *
from time import time, mktime
from datetime import datetime
# %% - Constants
QUERY_DROP_TABLE = lambda r: f"drop table if exists Robot{r}_indicators;"
QUERY_RUNS = lambda r: f'SELECT `index`, `Name`, `Year`, `Month`, `Day` FROM DataFiles WHERE `Robot` = "Robot_{r}";'
COMPUTED_COLUMS = [
"Time",
"RunTime",
"Speed",
"Class",
"Date",
"Axis",
"RMS", "KMeans", "PeakFactor", "MeanTemperature"
]
# for m in range(1,7):
# for v in [ "RMS", "KMeans", "PeakFactor", "MeanTemperature" ]:
# COMPUTED_COLUMS.append(f"{v}_A{m}")
QUERY_INDICATORS = lambda r: f"SELECT * FROM Robot{r}_indicators;"
QUERY_ROBOTS = "SELECT DISTINCT `Robot` from DataFiles"
# %% - Computation for one run
def compute_run (DB: Database, robot: int, run: int, t0: int, date: str, next: Callable[[str], None] = None):
if next is not None:
next(f"Robot {robot} run n°{run} from {date}")
out = []
data = DB.robot(robot).by_run(run).run()
# Compute sub-arrays
subs: List[pd.DataFrame] = [
data[data["MovingMotor"] == m] for m in range(1,7)
]
runTime = t0 + mktime(datetime.strptime(date, "%Y-%m-%d").timetuple())
# Compute for whole experiment
# line = [ t0, 0 ]
for m in range(1,7):
sub = subs[m - 1]
c = sub[f"Current_A{m}"].to_numpy()
temp = sub[f"Temperature_A{m}"].to_numpy()
line = [
# *line,
runTime, runTime, 0, data["Class"][0], date,
f"A{m}", RMS(c), kmeans(c), peak_factor(c), np.mean(temp)
]
out.append(line)
# out.append(line)
# Compute for each speed
speeds = data['Speed'].unique()
speed_data = [
data[data["Speed"] == s] for s in speeds
]
for i, s in enumerate(speeds):
d = speed_data[i]
# time = d["Sample_time"].to_numpy()[0] + t0
# line = [ time, s ]
for m in range(1,7):
sub = d[d["MovingMotor"] == m]
time = d["Sample_time"].to_numpy()[0] + runTime
c = sub[f"Current_A{m}"].to_numpy()
temp = sub[f"Temperature_A{m}"].to_numpy()
line = [
# *line,
time, runTime, s, data["Class"][0], date,
f"A{m}", RMS(c), kmeans(c), peak_factor(c), np.mean(temp)
]
out.append(line)
# out.append(line)
return out
# %% - Transformation function
def compute_indicators (DB: Database, robot: int, next: Callable[[str], None] = None):
if not (1 <= robot <= 3):
print("Robot must be between 1 and 3")
return
# Remove any existing computation
DB.db.execute(QUERY_DROP_TABLE(robot))
# Get the list of runs
files = pd.read_sql(QUERY_RUNS(robot), DB.db)
runs = files['index'].to_list()
times = [ name.split(" ")[0] for name in files["Name"] ]
times = [ t.split("h") for t in times ]
times = [ 3600 * int(t[0]) + 60 * int(t[1]) for t in times ]
dates = (files["Year"].astype(str) + "-" + files["Month"].astype(str) + "-" + files["Day"].astype(str)).to_numpy()
# Compute
out = []
for i, run in enumerate(runs):
print("Robot", robot, "run n°", run)
out = [ *out, *compute_run(DB, robot, run, times[i], dates[i], next) ]
# Save
df = pd.DataFrame(out, columns=COMPUTED_COLUMS)
df.to_sql(f"Robot{robot}_indicators", DB.db)
return df
#%%
def compute_indicators_all_robots (DB: Database, next: Callable[[str], None] = None):
if (next is not None):
print("Has next function")
next("Initializing ...")
robots: List[str] = pd.read_sql(QUERY_ROBOTS, DB.db)["Robot"].to_list()
for robot in robots:
print("Calculating indicators for", robot)
n = int(robot.split("_")[1])
compute_indicators(DB, n, next)
\ No newline at end of file
from typing import List, Tuple, Dict, Any
from typing import List, Tuple, Dict, Any, Callable
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
......@@ -268,7 +268,7 @@ def find_plateau (mask: np.ndarray, start: int, backwards: bool = False, W: int
# %% -
def FFT_by (
data: pd.DataFrame, by: str,
time: str, position: str, current: str, Ts=4
time: str, position: str, current: str, Ts=4, filter: np.ndarray = None
):
"""Makes an FFT for each of the values taken by the "by" variable
......@@ -308,8 +308,12 @@ def FFT_by (
P = subdata[position].to_numpy()
C = subdata[current].to_numpy()
TP, [CP], PP, fmov, periods, plateaus = make_periodic(T, P, [C])
if type(filter) == np.ndarray:
CP = Filter(CP, filter)
times.append(TP)
positions.append(PP)
currents.append(CP)
......@@ -376,22 +380,60 @@ def make_periodic (time: np.ndarray, position: np.ndarray, currents: List[np.nda
return XP, CP, PP, fmov, periods, plateaus
# %%
def RMS (signal: np.ndarray) -> float:
"""Computes the root mean square value of the given signal
# %% -
def Spectrogram (
signal: np.ndarray,
widow_width: float = 1, Ts: float = 0.004, no_overlap=False
) -> Tuple[np.ndarray, scs.ShortTimeFFT]:
"""Returns a 2D Spectrogram of the signal
Args:
signal (np.ndarray): Input signal
time (np.ndarray): The time axis
signal (np.ndarray): The signal to analyse
widow_width (float): The length of the window (in s, 1 by defaut)
Ts (float): The sampling rate (in s, 0.004 by default)
no_overlap (bool): Disables window overlapping
Returns:
float: The RMS of the signal
Tuple[np.ndarray, scs.ShortTimeFFT: The 2D spectrogram and the STFT object
"""
return np.sqrt(np.mean(signal ** 2))
# %%
def moving_rms (
# Sampling frequency
Fs = 1 / Ts
# Width in n° of samples
width = int(widow_width / Ts)
# Overlap
hop = width if no_overlap else width // 4
# Window
window = np.hanning(width)
# Zero-padding
pad = len(signal) * 5
# Get the spectrogram
SFT = scs.ShortTimeFFT(window, hop, Fs, mfft=pad, scale_to="magnitude")
Sx = SFT.stft(signal)
return Sx, SFT
def Density_spectrum (time: pd.Series | np.ndarray, signal: pd.Series | np.ndarray):
signal = signal - signal.mean()
b,a = scs.butter(7,0.99, btype='low', analog=False)
signal = scs.filtfilt(b,a,signal)
x,y = autocorrelation(time, signal)
signal_windowed = moving_window(y, 1)
return FFT(signal_windowed ,(time[1]-time[0])*1000, 32*len(signal_windowed) )
#%%
def moving_function (
time: np.ndarray,
signal: np.ndarray,
function: Callable[[np.ndarray], float],
window: float = 2,
overlap: float = 0.5
) -> Tuple[np.ndarray, np.ndarray]:
......@@ -401,6 +443,7 @@ def moving_rms (
Args:
time (np.ndarray): Time axis
signal (np.ndarray): Signal to compute the RMS from
function (Callable[[np.ndarray], np.ndarray]): Function to move on the signal
window (float, optional): Width of the window (in s). Defaults to 2s.
overlap (float): percentage of the window overlapping with the previous one
......@@ -420,54 +463,48 @@ def moving_rms (
for k in range(rms_segments):
a = hops[k]
b = a + int(rms_width)
rms_over_time[k] = RMS(signal[a:b])
rms_over_time[k] = function(signal[a:b])
return rms_time, rms_over_time
# %% -
def Spectrogram (
signal: np.ndarray,
widow_width: float = 1, Ts: float = 0.004, no_overlap=False
) -> Tuple[np.ndarray, scs.ShortTimeFFT]:
"""Returns a 2D Spectrogram of the signal
# %%
def RMS (signal: np.ndarray) -> float:
"""Computes the root mean square value of the given signal
Args:
time (np.ndarray): The time axis
signal (np.ndarray): The signal to analyse
widow_width (float): The length of the window (in s, 1 by defaut)
Ts (float): The sampling rate (in s, 0.004 by default)
no_overlap (bool): Disables window overlapping
signal (np.ndarray): Input signal
Returns:
Tuple[np.ndarray, scs.ShortTimeFFT: The 2D spectrogram and the STFT object
float: The RMS of the signal
"""
return np.sqrt(np.mean(signal ** 2))
# Sampling frequency
Fs = 1 / Ts
# Width in n° of samples
width = int(widow_width / Ts)
# Overlap
hop = width if no_overlap else width // 4
# Window
window = np.hanning(width)
# %%
def kmeans (x: np.ndarray) -> float:
return np.mean(((x - np.mean(x)) / np.std(x)) ** 4)
# Zero-padding
pad = len(signal) * 5
#%%
def peak_factor (x: np.ndarray) -> float:
return np.max(x) / RMS(x)
# Get the spectrogram
SFT = scs.ShortTimeFFT(window, hop, Fs, mfft=pad, scale_to="magnitude")
Sx = SFT.stft(signal)
# %%
def moving_rms (
time: np.ndarray,
signal: np.ndarray,
window: float = 2,
overlap: float = 0.5
) -> Tuple[np.ndarray, np.ndarray]:
"""Computes the RMS value of the signal using a moving window,
with 50% overlap. The RMS sampling rate is window / 2.
return Sx, SFT
Args:
time (np.ndarray): Time axis
signal (np.ndarray): Signal to compute the RMS from
window (float, optional): Width of the window (in s). Defaults to 2s.
overlap (float): percentage of the window overlapping with the previous one
Returns:
Tuple[np.ndarray, np.ndarray]: Time and RMS axis
"""
return moving_function(time, signal, RMS, window, overlap)
def Density_spectrum (time: pd.Series | np.ndarray, signal: pd.Series | np.ndarray):
signal = signal - signal.mean()
b,a = scs.butter(7,0.99, btype='low', analog=False)
signal = scs.filtfilt(b,a,signal)
x,y = autocorrelation(time, signal)
signal_windowed = moving_window(y, 1)
return FFT(signal_windowed ,(time[1]-time[0])*1000, 32*len(signal_windowed) )
......@@ -4,7 +4,10 @@
import PySimpleGUI as sg
from pathlib import Path
from threading import Thread, Semaphore
from tools.database import Database, init_database
from tools.predictive_indicators import compute_indicators_all_robots
from .DB_Metadata import DBFileManagerWindow
class DBSelectWin (sg.Window):
......@@ -15,6 +18,14 @@ class DBSelectWin (sg.Window):
self.edit_window = None
self.indicator_popup: sg.Window = None
self.indicator_popup_lock = Semaphore()
self.indicator_progress: int = 0
self.indicator_max_progress: int = 0
self.indicator_run_name: str = "None"
self.db_use_done = False
super().__init__("Database Selector", self._layout(), finalize=True)
self.update_display()
......@@ -25,7 +36,7 @@ class DBSelectWin (sg.Window):
self.buttons = {
"-open-close-": sg.Button("Open", key="-open-close-"),
"-edit-": sg.Button("Edit Data", key="-edit-"),
"-plot-": sg.Button("Plot Data", key="-plot-"),
"-indicators-": sg.Button("Compute indicators", key="-indicators-"),
"-new-": sg.Button("New Database", key="-new-"),
"-browse-": sg.FileBrowse(
key="-browse-",
......@@ -47,7 +58,7 @@ class DBSelectWin (sg.Window):
[
self.buttons["-open-close-"],
self.buttons["-edit-"],
self.buttons["-plot-"],
self.buttons["-indicators-"],
sg.Push(),
self.buttons["-new-"],
]
......@@ -63,14 +74,14 @@ class DBSelectWin (sg.Window):
"""
has_db = self.db is not None
using_data = (self.edit_window is not None)
using_data = (self.edit_window is not None) or (self.indicator_popup is not None)
self.buttons["-edit-"].update(
disabled=not has_db or using_data,
button_color=sg.theme_button_color()
)
self.buttons["-plot-"].update(
self.buttons["-indicators-"].update(
disabled=not has_db or using_data,
button_color=sg.theme_button_color()
)
......@@ -96,6 +107,15 @@ class DBSelectWin (sg.Window):
button_color=sg.theme_button_color()
)
self.indicator_popup_lock.acquire()
if self.indicator_popup is not None:
self.indicator_popup["-run-name-"].update(value=self.indicator_run_name)
self.indicator_popup["-progress-"].update(current_count=self.indicator_progress)
self.indicator_popup.read(1)
self.indicator_popup_lock.release()
def open_database (self, path: str):
"""Tries to open a Database
......@@ -154,6 +174,57 @@ class DBSelectWin (sg.Window):
self.inputs["-db_path-"].update(value=text)
self.open_database(text)
def compute_indicators (self):
self.indicator_max_progress = len(self.db.list_files()) + 1
self.indicator_progress = 0
popup = sg.Window("Computing indicators", [
[ sg.Push(), sg.Text("Computing indicators."), sg.Push() ],
[
sg.Push(),
sg.Text(
"This might take a while",
justification="center"
),
sg.Push()
],
[
sg.Push(),
sg.Text("Waiting ...", key="-run-name-"),
sg.Push()
],
[
sg.ProgressBar(self.indicator_max_progress, expand_x=True, key="-progress-")
]
], finalize=True, disable_close=True, disable_minimize=True)
popup.read(1)
self.indicator_popup = popup
self.db.close()
def compute_thread (self: DBSelectWin):
def next (name: str):
print("Next", name)
self.indicator_popup_lock.acquire()
self.indicator_run_name = name
self.indicator_progress += 1
self.indicator_popup_lock.release()
print("Released lock")
self.db.open()
compute_indicators_all_robots(self.db, next)
self.db.close()
self.indicator_popup_lock.acquire()
self.db_use_done = True
self.indicator_popup_lock.release()
Thread(target=compute_thread, daemon=True, args=[self]).start()
def poll (self):
"""Runs the window
......@@ -178,7 +249,19 @@ class DBSelectWin (sg.Window):
self.edit_window.close()
self.edit_window = None
self.update_display()
# DB management
self.indicator_popup_lock.acquire()
if self.db_use_done == True:
self.db_use_done = False
self.indicator_popup.close()
self.indicator_popup = None
self.db.open()
self.indicator_popup_lock.release()
# Update display
self.update_display()
return True
# User opens the Database
......@@ -200,9 +283,10 @@ class DBSelectWin (sg.Window):
self.edit_window = DBFileManagerWindow(self.db)
# User wants to plot some data
if event == "-plot-":
pass
if event == "-indicators-":
self.compute_indicators()
# Update this window display
self.update_display()
return True
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment