Skip to content
Snippets Groups Projects
extractor.py 14.17 KiB
import pandas as pd
import traceback
from typing import List, Dict, Tuple
import os
from pathlib import Path
import numpy as np

class CorrectionCoefficient:

    motor: int
    value: str
    coefficient: float
    pre_offset: float
    post_offset: float

    def __init__(self, motor: int, value: int, coefficient: float, pre_offset: float = 0, post_offset: float = 0):
        self.motor = motor
        self.value = value
        self.coefficient = coefficient
        self.pre_offset = pre_offset
        self.post_offset = post_offset


class DataExtractor:

    data_sysvar: pd.DataFrame = None
    data_trace: pd.DataFrame = None

    sysvar_correction: List[CorrectionCoefficient] = []
    trace_correction: List[CorrectionCoefficient] = []

    export_folder: Path = Path(os.getcwd(), "export")
    export_prefix: str = ""

    excel = True
    debug = False

    def __init__(self) -> None:

        if not os.path.exists(self.export_folder):
            os.mkdir(self.export_folder)

        # MAGIC NUMBERS
        self.set_trace_coefficient(1, "Torque", -75)
        self.set_trace_coefficient(2, "Torque", 75)
        self.set_trace_coefficient(3, "Torque", 75)
        self.set_trace_coefficient(4, "Torque", -75)
        self.set_trace_coefficient(5, "Torque", -75)
        self.set_trace_coefficient(6, "Torque", 75)

        self.set_sysvar_coefficient(1, "Current", 0.11)     # % -> A
        self.set_sysvar_coefficient(2, "Current", 0.11)     # % -> A
        self.set_sysvar_coefficient(3, "Current", 0.065)    # % -> A
        self.set_sysvar_coefficient(4, "Current", 0.035)    # % -> A
        self.set_sysvar_coefficient(5, "Current", 0.035)    # % -> A
        self.set_sysvar_coefficient(6, "Current", 0.035)    # % -> A

        for i in range(1,7):
            self.set_sysvar_coefficient(i, "Temperature", 0.1)      # °C
            self.set_trace_coefficient(i, "Temperature", 1, -273)   # K -> °C
            # self.set_trace_coefficient(i, "Position_Command", 0.01)
            # self.set_trace_coefficient(i, "Position", 0.01)

    def reset (self):
        """Resets this class
        """

        self.data_sysvar = None
        self.data_trace = None

        self.sysvar_correction = []
        self.trace_correction = []

        self.export_folder = Path(os.getcwd(), "export")
        self.export_prefix = ""

        self.excel = True
        self.debug = False


    def load (self, file: str | Path, trace: bool = False) -> bool:
        """Loads a dataset

        Args:
            file (str): The file containing the data collected from System variables
            trace (str, optional): The file is a trace. Defaults to False.

        Returns:
            bool: Retruns True when properly loaded, else False
        """

        if not isinstance(file, Path):
            file = Path(file)

        self.export_prefix = file.stem

        try:
            if trace:
                self.data_trace = pd.read_excel(file, index_col=0)
            else:
                self.data_sysvar = pd.read_excel(file, index_col=0)
        except Exception as e:
            print("Failed to load " + str(file))
            traceback.print_exception(e)
            return False
        
        return True

    def set_trace_coefficient (self, motor: int, value: str, coeff: float, pre_offset: float = 0, post_offset: float = 0) -> None:
        """Defines a coefficient to apply to a value for a specific motor on trace data

        Args:
            motor (int): The motor needing the correction
            value (str): The corrected value
            coeff (float): The correction coefficient
        """

        for c in self.trace_correction:
            if c.motor == motor and c.value == value:
                c.coefficient = coeff
                c.pre_offset = pre_offset
                c.post_offset = post_offset
                return
            
        self.trace_correction.append(CorrectionCoefficient(motor, value, coeff, pre_offset, post_offset))

    def set_sysvar_coefficient (self, motor: int, value: str, coeff: float, pre_offset: float = 0, post_offset: float = 0) -> None:
        """Defines a coefficient to apply to a value for a specific motor on system variables data

        Args:
            motor (int): The motor needing the correction
            value (str): The corrected value
            coeff (float): The correction coefficient
        """

        for c in self.sysvar_correction:
            if c.motor == motor and c.value == value:
                c.coefficient = coeff
                c.pre_offset = pre_offset
                c.post_offset = post_offset
                return
            
        self.sysvar_correction.append(CorrectionCoefficient(motor, value, coeff, pre_offset, post_offset))

    def get_trace_correction (self) -> Dict[str, Tuple[float, float]]:
        """Generates a dictionnary of the defined trace data correction

        Returns:
            Dict[str, float]: The correction coefficients according to the column name
        """

        out = { }

        for c in self.trace_correction:
            out[f'{c.value}_A{c.motor}'] = (c.coefficient, c.pre_offset, c.post_offset)

        return out
    
    def get_sysvar_correction (self) -> Dict[str, Tuple[float, float]]:
        """Generates a dictionnary of the defined system variables data correction

        Returns:
            Dict[str, float]: The correction coefficients according to the column name
        """

        out = { }

        for c in self.sysvar_correction:
            out[f'{c.value}_A{c.motor}'] = (c.coefficient, c.pre_offset, c.post_offset)

        return out
    
    def find_trace_correction (self, motor: int, value: str) -> CorrectionCoefficient | None:
        """Returns the Correction Coefficient for a trace value if it exists

        Args:
            motor (int): The motor
            value (str): The value

        Returns:
            CorrectionCoefficient | None: The coefficient
        """

        for c in self.trace_correction:
            if c.motor == motor and c.value == value:
                return c

        return None   
     
    def find_sysvar_correction (self, motor: int, value: str) -> CorrectionCoefficient | None:
        """Returns the Correction Coefficient for a system variable value if it exists

        Args:
            motor (int): The motor
            value (str): The value

        Returns:
            CorrectionCoefficient | None: The coefficient
        """

        for c in self.sysvar_correction:
            if c.motor == motor and c.value == value:
                return c

        return None
    
    def tune (
        self, motor: int, value: str, 
        min_val: float, max_val: float
    ):
        """Finds the correction coefficients to fit the desired range while 
        not altering the relative offset from 0

        Args:
            motor (int): The motor
            value (str): The value to fit
            min_val (float): The new minimum
            max_val (float): The new maximum

        Returns:
            The correction coefficients
        """

        axis = f"{value}_A{motor}"

        if not axis in self.data_trace.columns:
            print("No axis", axis, "in data")
            return 1, 0, 0

        column = self.data_trace[axis]

        desired_range = max_val - min_val
        actual_range = column.max() - column.min()

        pre_offset = (column.max() + column.min()) / 2
        
        scaling_factor = desired_range / actual_range
        post_offset = pre_offset * scaling_factor

        return scaling_factor, pre_offset, post_offset
    

    def auto_fit_trace_positions (self):
        """Updates the correction coefficients for position values in the trace data
        """

        if self.data_trace is None:
            return

        ranges = [
            (-45, 45), (-45, 15), (-30, 25), (-50, 50), (-35, 30), (-90, 90)
        ]

        for i in range(1,7):

            tuning_pos = self.tune(i, "Position", *ranges[i - 1])
            tuning_cmd = self.tune(i, "Position_Command", *ranges[i - 1])

            self.set_trace_coefficient(i, "Position", *tuning_pos)
            self.set_trace_coefficient(i, "Position_Error", *tuning_pos)
            self.set_trace_coefficient(i, "Position_Command", *tuning_cmd)


    def apply_correction_sysvar (self):
        """Applies the correction on the loaded sysvar data
        """

        if self.data_sysvar is None:
            return
        
        sysvar_correction = self.get_sysvar_correction()
        sysvar_cols = self.data_sysvar.columns

        if self.debug:
            print("Sys Vars correction :", sysvar_correction)

        for k in sysvar_correction.keys():
            if k in sysvar_cols:
                if self.debug:
                    print("Correcting Sys Vars column ", k)
                self.data_sysvar[k] = (sysvar_correction[k][0] * (self.data_sysvar[k] - sysvar_correction[k][1])) + sysvar_correction[k][2]

    def apply_correction_trace (self):
        """Applies the correction on the loaded trace data
        """
        
        if self.data_trace is None:
            return
                
        trace_correction = self.get_trace_correction()
        trace_cols = self.data_trace.columns

        if self.debug:
            print("Trace correction :", trace_correction)
        for k in trace_correction.keys():
            if k in trace_cols:
                if self.debug:    
                    print("Correcting Trace column ", k)
                self.data_trace[k] = (trace_correction[k][0] * (self.data_trace[k] - trace_correction[k][1])) + trace_correction[k][2]


    def apply_correction (self):
        """Applies the correction on the loaded data
        """

        self.apply_correction_sysvar()
        self.apply_correction_trace()

    def normalize_sysvar (self, sysvar: pd.DataFrame) -> pd.DataFrame:

        neo = pd.DataFrame()


        neo["Sample_time"]  = sysvar["Sample_time"]
        neo["Class"]        = sysvar["Load"]
        neo["Speed"]        = [ int(speed[:-1]) for speed in sysvar["Speed"] ]
        neo["MovingMotor"]  = (
              1 * sysvar["A1"] 
            + 2 * sysvar["A2"] 
            + 3 * sysvar["A3"] 
            + 4 * sysvar["A4"] 
            + 5 * sysvar["A5"] 
            + 6 * sysvar["A6"] 
        )

        for i in range(1,7):
            neo[f'Position_A{i}']           = sysvar[f'Position_A{i}']
            neo[f'Position_Error_A{i}']     = sysvar[f'Position_Command_A{i}'] - sysvar[f'Position_A{i}']
            neo[f'Position_Command_A{i}']   = sysvar[f'Position_Command_A{i}']
            neo[f'Current_A{i}']            = sysvar[f'Current_A{i}']
            neo[f'Temperature_A{i}']        = sysvar[f'Temperature_A{i}']

        return neo
    
    def normalize_trace (self, trace: pd.DataFrame) -> pd.DataFrame:

        neo = pd.DataFrame()

        neo["Sample_time"]  = trace["Sample_time"]
        neo["Class"]        = trace["Load"]
        neo["Speed"]        = trace["Speed"]
        neo["MovingMotor"]  = trace["AnalogOut1"] % 7

        for i in range(1,7):
            neo[f'Position_A{i}']           = trace[f'Position_A{i}']
            neo[f'Position_Error_A{i}']     = trace[f'Position_Error_A{i}']
            neo[f'Position_Command_A{i}']   = trace[f'Position_Command_A{i}']
            neo[f'Current_A{i}']            = trace[f'Current_A{i}']
            neo[f'Temperature_A{i}']        = trace[f'Temperature_A{i}']

        return neo

    def extract (self) -> Tuple[pd.DataFrame | None, pd.DataFrame | None]:
        """Extracts only the moments when the motors are moving

        Returns:
            Tuple[Dict[str, pd.DataFrame], Dict[str, pd.DataFrame]]: The resulting extraction
        """
        output_sysvar = None
        output_trace = None

        if self.data_sysvar is not None:
            output_sysvar = self.normalize_sysvar(self.data_sysvar)
        if self.data_trace is not None: 
            output_trace = self.normalize_trace(self.data_trace)

        # for i in range(0, 8):
        #     motor = f'A{i}'
        #     if self.data_sysvar is not None:
        #         output_sysvar[motor] = self.normalize_sysvar(self.data_sysvar[self.data_sysvar[motor] == 1])
        #     if self.data_trace is not None:
        #         output_trace[motor] = self.normalize_trace(self.data_trace[self.data_trace["AnalogOut1"] == i])

        return output_sysvar, output_trace

    # def export (self):
    #     """Extracts and saves to files the moments when the motors are moving 
    #     """

    #     sysvar, trace = self.extract()

    #     # Preparing the file name
    #     folder = Path(self.export_folder, self.export_prefix)
        
    #     if not os.path.exists(folder):
    #         os.mkdir(folder)

    #     if sysvar is not None:

    #         # Exporting sys vars data
    #         for motor in sysvar.keys():
    #             file = folder.joinpath(f"SysVars - {motor}")
                
    #             if self.excel:
    #                 sysvar[motor].to_excel(file.with_suffix(".xlsx"))
    #             else:
    #                 sysvar[motor].to_csv(file.with_suffix(".csv"))

    #     if trace is not None:

    #         # Exporting trace data
    #         for motor in trace.keys():
    #             file = folder.joinpath(f"Trace - {motor}")
    #             if self.excel:
    #                 trace[motor].to_excel(file.with_suffix(".xlsx"))
    #             else:
    #                 trace[motor].to_csv(file.with_suffix(".csv"))


    # def run (self, sysvar: str | Path, trace: str | Path = None):
    #     """Loads, corrects and exports the specified data

    #     Args:
    #         sysvar (str): The file path to System Variables data
    #         trace (str, optional): The file path to Trace data. Defaults to None.
    #     """

    #     self.load(sysvar, False)
    #     if trace is not None:
    #         self.load(trace, True)
    #     self.apply_correction()
    #     self.export()
        
    


if __name__ == "__main__":

    sysvar_file = "data3/[2024-05-23] 10h25 data [50%-80%] [24ms] [class 0] [3 3 3 3 3 3] - Robot 3.xlsx"
    trace_file  = "data3/[2024-05-23] 10h25 data [50%-80%] [24ms] [class 0] [3 3 3 3 3 3] - Robot 3_TRACE.xlsx"

    extractor = DataExtractor()
    extractor.debug = True

    extractor.run(sysvar_file, trace_file)
    extractor.compare_and_export()