An error occurred while loading the file. Please try again.
-
VulcanixFR authored36042633
extractor.py 14.17 KiB
import pandas as pd
import traceback
from typing import List, Dict, Tuple
import os
from pathlib import Path
import numpy as np
class CorrectionCoefficient:
motor: int
value: str
coefficient: float
pre_offset: float
post_offset: float
def __init__(self, motor: int, value: int, coefficient: float, pre_offset: float = 0, post_offset: float = 0):
self.motor = motor
self.value = value
self.coefficient = coefficient
self.pre_offset = pre_offset
self.post_offset = post_offset
class DataExtractor:
data_sysvar: pd.DataFrame = None
data_trace: pd.DataFrame = None
sysvar_correction: List[CorrectionCoefficient] = []
trace_correction: List[CorrectionCoefficient] = []
export_folder: Path = Path(os.getcwd(), "export")
export_prefix: str = ""
excel = True
debug = False
def __init__(self) -> None:
if not os.path.exists(self.export_folder):
os.mkdir(self.export_folder)
# MAGIC NUMBERS
self.set_trace_coefficient(1, "Torque", -75)
self.set_trace_coefficient(2, "Torque", 75)
self.set_trace_coefficient(3, "Torque", 75)
self.set_trace_coefficient(4, "Torque", -75)
self.set_trace_coefficient(5, "Torque", -75)
self.set_trace_coefficient(6, "Torque", 75)
self.set_sysvar_coefficient(1, "Current", 0.11) # % -> A
self.set_sysvar_coefficient(2, "Current", 0.11) # % -> A
self.set_sysvar_coefficient(3, "Current", 0.065) # % -> A
self.set_sysvar_coefficient(4, "Current", 0.035) # % -> A
self.set_sysvar_coefficient(5, "Current", 0.035) # % -> A
self.set_sysvar_coefficient(6, "Current", 0.035) # % -> A
for i in range(1,7):
self.set_sysvar_coefficient(i, "Temperature", 0.1) # °C
self.set_trace_coefficient(i, "Temperature", 1, -273) # K -> °C
# self.set_trace_coefficient(i, "Position_Command", 0.01)
# self.set_trace_coefficient(i, "Position", 0.01)
def reset (self):
"""Resets this class
"""
self.data_sysvar = None
self.data_trace = None
self.sysvar_correction = []
self.trace_correction = []
self.export_folder = Path(os.getcwd(), "export")
self.export_prefix = ""
self.excel = True
self.debug = False
def load (self, file: str | Path, trace: bool = False) -> bool:
"""Loads a dataset
Args:
file (str): The file containing the data collected from System variables
trace (str, optional): The file is a trace. Defaults to False.
Returns:
bool: Retruns True when properly loaded, else False
"""
if not isinstance(file, Path):
file = Path(file)
self.export_prefix = file.stem
try:
if trace:
self.data_trace = pd.read_excel(file, index_col=0)
else:
self.data_sysvar = pd.read_excel(file, index_col=0)
except Exception as e:
print("Failed to load " + str(file))
traceback.print_exception(e)
return False
return True
def set_trace_coefficient (self, motor: int, value: str, coeff: float, pre_offset: float = 0, post_offset: float = 0) -> None:
"""Defines a coefficient to apply to a value for a specific motor on trace data
Args:
motor (int): The motor needing the correction
value (str): The corrected value
coeff (float): The correction coefficient
"""
for c in self.trace_correction:
if c.motor == motor and c.value == value:
c.coefficient = coeff
c.pre_offset = pre_offset
c.post_offset = post_offset
return
self.trace_correction.append(CorrectionCoefficient(motor, value, coeff, pre_offset, post_offset))
def set_sysvar_coefficient (self, motor: int, value: str, coeff: float, pre_offset: float = 0, post_offset: float = 0) -> None:
"""Defines a coefficient to apply to a value for a specific motor on system variables data
Args:
motor (int): The motor needing the correction
value (str): The corrected value
coeff (float): The correction coefficient
"""
for c in self.sysvar_correction:
if c.motor == motor and c.value == value:
c.coefficient = coeff
c.pre_offset = pre_offset
c.post_offset = post_offset
return
self.sysvar_correction.append(CorrectionCoefficient(motor, value, coeff, pre_offset, post_offset))
def get_trace_correction (self) -> Dict[str, Tuple[float, float]]:
"""Generates a dictionnary of the defined trace data correction
Returns:
Dict[str, float]: The correction coefficients according to the column name
"""
out = { }
for c in self.trace_correction:
out[f'{c.value}_A{c.motor}'] = (c.coefficient, c.pre_offset, c.post_offset)
return out
def get_sysvar_correction (self) -> Dict[str, Tuple[float, float]]:
"""Generates a dictionnary of the defined system variables data correction
Returns:
Dict[str, float]: The correction coefficients according to the column name
"""
out = { }
for c in self.sysvar_correction:
out[f'{c.value}_A{c.motor}'] = (c.coefficient, c.pre_offset, c.post_offset)
return out
def find_trace_correction (self, motor: int, value: str) -> CorrectionCoefficient | None:
"""Returns the Correction Coefficient for a trace value if it exists
Args:
motor (int): The motor
value (str): The value
Returns:
CorrectionCoefficient | None: The coefficient
"""
for c in self.trace_correction:
if c.motor == motor and c.value == value:
return c
return None
def find_sysvar_correction (self, motor: int, value: str) -> CorrectionCoefficient | None:
"""Returns the Correction Coefficient for a system variable value if it exists
Args:
motor (int): The motor
value (str): The value
Returns:
CorrectionCoefficient | None: The coefficient
"""
for c in self.sysvar_correction:
if c.motor == motor and c.value == value:
return c
return None
def tune (
self, motor: int, value: str,
min_val: float, max_val: float
):
"""Finds the correction coefficients to fit the desired range while
not altering the relative offset from 0
Args:
motor (int): The motor
value (str): The value to fit
min_val (float): The new minimum
max_val (float): The new maximum
Returns:
The correction coefficients
"""
axis = f"{value}_A{motor}"
if not axis in self.data_trace.columns:
print("No axis", axis, "in data")
return 1, 0, 0
column = self.data_trace[axis]
desired_range = max_val - min_val
actual_range = column.max() - column.min()
pre_offset = (column.max() + column.min()) / 2
scaling_factor = desired_range / actual_range
post_offset = pre_offset * scaling_factor
return scaling_factor, pre_offset, post_offset
def auto_fit_trace_positions (self):
"""Updates the correction coefficients for position values in the trace data
"""
if self.data_trace is None:
return
ranges = [
(-45, 45), (-45, 15), (-30, 25), (-50, 50), (-35, 30), (-90, 90)
]
for i in range(1,7):
tuning_pos = self.tune(i, "Position", *ranges[i - 1])
tuning_cmd = self.tune(i, "Position_Command", *ranges[i - 1])
self.set_trace_coefficient(i, "Position", *tuning_pos)
self.set_trace_coefficient(i, "Position_Error", *tuning_pos)
self.set_trace_coefficient(i, "Position_Command", *tuning_cmd)
def apply_correction_sysvar (self):
"""Applies the correction on the loaded sysvar data
"""
if self.data_sysvar is None:
return
sysvar_correction = self.get_sysvar_correction()
sysvar_cols = self.data_sysvar.columns
if self.debug:
print("Sys Vars correction :", sysvar_correction)
for k in sysvar_correction.keys():
if k in sysvar_cols:
if self.debug:
print("Correcting Sys Vars column ", k)
self.data_sysvar[k] = (sysvar_correction[k][0] * (self.data_sysvar[k] - sysvar_correction[k][1])) + sysvar_correction[k][2]
def apply_correction_trace (self):
"""Applies the correction on the loaded trace data
"""
if self.data_trace is None:
return
trace_correction = self.get_trace_correction()
trace_cols = self.data_trace.columns
if self.debug:
print("Trace correction :", trace_correction)
for k in trace_correction.keys():
if k in trace_cols:
if self.debug:
print("Correcting Trace column ", k)
self.data_trace[k] = (trace_correction[k][0] * (self.data_trace[k] - trace_correction[k][1])) + trace_correction[k][2]
def apply_correction (self):
"""Applies the correction on the loaded data
"""
self.apply_correction_sysvar()
self.apply_correction_trace()
def normalize_sysvar (self, sysvar: pd.DataFrame) -> pd.DataFrame:
neo = pd.DataFrame()
neo["Sample_time"] = sysvar["Sample_time"]
neo["Class"] = sysvar["Load"]
neo["Speed"] = [ int(speed[:-1]) for speed in sysvar["Speed"] ]
neo["MovingMotor"] = (
1 * sysvar["A1"]
+ 2 * sysvar["A2"]
+ 3 * sysvar["A3"]
+ 4 * sysvar["A4"]
+ 5 * sysvar["A5"]
+ 6 * sysvar["A6"]
)
for i in range(1,7):
neo[f'Position_A{i}'] = sysvar[f'Position_A{i}']
neo[f'Position_Error_A{i}'] = sysvar[f'Position_Command_A{i}'] - sysvar[f'Position_A{i}']
neo[f'Position_Command_A{i}'] = sysvar[f'Position_Command_A{i}']
neo[f'Current_A{i}'] = sysvar[f'Current_A{i}']
neo[f'Temperature_A{i}'] = sysvar[f'Temperature_A{i}']
return neo
def normalize_trace (self, trace: pd.DataFrame) -> pd.DataFrame:
neo = pd.DataFrame()
neo["Sample_time"] = trace["Sample_time"]
neo["Class"] = trace["Load"]
neo["Speed"] = trace["Speed"]
neo["MovingMotor"] = trace["AnalogOut1"] % 7
for i in range(1,7):
neo[f'Position_A{i}'] = trace[f'Position_A{i}']
neo[f'Position_Error_A{i}'] = trace[f'Position_Error_A{i}']
neo[f'Position_Command_A{i}'] = trace[f'Position_Command_A{i}']
neo[f'Current_A{i}'] = trace[f'Current_A{i}']
neo[f'Temperature_A{i}'] = trace[f'Temperature_A{i}']
return neo
def extract (self) -> Tuple[pd.DataFrame | None, pd.DataFrame | None]:
"""Extracts only the moments when the motors are moving
Returns:
Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]: The resulting extraction
"""
output_sysvar = None
output_trace = None
if self.data_sysvar is not None:
output_sysvar = self.normalize_sysvar(self.data_sysvar)
if self.data_trace is not None:
output_trace = self.normalize_trace(self.data_trace)
# for i in range(0, 8):
# motor = f'A{i}'
# if self.data_sysvar is not None:
# output_sysvar[motor] = self.normalize_sysvar(self.data_sysvar[self.data_sysvar[motor] == 1])
# if self.data_trace is not None:
# output_trace[motor] = self.normalize_trace(self.data_trace[self.data_trace["AnalogOut1"] == i])
return output_sysvar, output_trace
# def export (self):
# """Extracts and saves to files the moments when the motors are moving
# """
# sysvar, trace = self.extract()
# # Preparing the file name
# folder = Path(self.export_folder, self.export_prefix)
# if not os.path.exists(folder):
# os.mkdir(folder)
# if sysvar is not None:
# # Exporting sys vars data
# for motor in sysvar.keys():
# file = folder.joinpath(f"SysVars - {motor}")
# if self.excel:
# sysvar[motor].to_excel(file.with_suffix(".xlsx"))
# else:
# sysvar[motor].to_csv(file.with_suffix(".csv"))
# if trace is not None:
# # Exporting trace data
# for motor in trace.keys():
# file = folder.joinpath(f"Trace - {motor}")
# if self.excel:
# trace[motor].to_excel(file.with_suffix(".xlsx"))
# else:
# trace[motor].to_csv(file.with_suffix(".csv"))
# def run (self, sysvar: str | Path, trace: str | Path = None):
# """Loads, corrects and exports the specified data
# Args:
# sysvar (str): The file path to System Variables data
# trace (str, optional): The file path to Trace data. Defaults to None.
# """
# self.load(sysvar, False)
# if trace is not None:
# self.load(trace, True)
# self.apply_correction()
# self.export()
if __name__ == "__main__":
sysvar_file = "data3/[2024-05-23] 10h25 data [50%-80%] [24ms] [class 0] [3 3 3 3 3 3] - Robot 3.xlsx"
trace_file = "data3/[2024-05-23] 10h25 data [50%-80%] [24ms] [class 0] [3 3 3 3 3 3] - Robot 3_TRACE.xlsx"
extractor = DataExtractor()
extractor.debug = True
extractor.run(sysvar_file, trace_file)
extractor.compare_and_export()