Skip to content
Snippets Groups Projects
reader.py 17.33 KiB
import dateutil.tz
from time import sleep, time
import pandas as pd
from typing import Callable, List, Tuple
from threading import Semaphore
from datetime import datetime
import numpy as np

from .handler import KUKA_Handler
from .trace import KUKA_Trace

TZ = dateutil.tz.gettz("Europe/Prague")

def trad_bool(input:bytes):
    out = [0]*len(input)
    for i in range(len(input)):
        if(input[i]==b'TRUE'):
            out[i]=True
        if(input[i]==b'FALSE'):
            out[i]=False
    return out


class KUKA_DataReader:
    """ Used to use the live data collection system """
    
    # Flags used to sync the robot state with the local state
    _read_done = False
    _data_available = False
    _speed = 0
    
    _dosysvar = False
    _dotrace = False

    # The columns order of the read data, used to create the DataFrame
    _columns = [
        "Sample_time",
        "Position_Command_A1", "Position_A1", "Torque_A1", "Current_A1", "Temperature_A1",
        "Position_Command_A2", "Position_A2", "Torque_A2", "Current_A2", "Temperature_A2",
        "Position_Command_A3", "Position_A3", "Torque_A3", "Current_A3", "Temperature_A3",
        "Position_Command_A4", "Position_A4", "Torque_A4", "Current_A4", "Temperature_A4",
        "Position_Command_A5", "Position_A5", "Torque_A5", "Current_A5", "Temperature_A5",
        "Position_Command_A6", "Position_A6", "Torque_A6", "Current_A6", "Temperature_A6",
        "A1", "A2", "A3", "A4", "A5", "A6" ,     
        "Queue_Read", "Queue_Write", "Load", "Faulty", "Speed", "Read_time",
    ]

    # TAB1 Length
    __TAB1_LEN = 36
    
    # TAB1 data indexes 
    __TAB1_SAMPLE = 0
    __TAB1_SAMPLE_READ = 32
    __TAB1_SAMPLE_WRITE = 33
    __TAB1_DATA_START = 1
    __TAB1_DATA_END = 31
    __TAB1_MOTOR = 31
    __TAB1_DATA_AVAILABLE = 34
    __TAB1_DONE = 35

    def __init__(self, handler: KUKA_Handler, dosysvar: bool, dotrace: bool) -> None:
        """Creates a new Data Reader

        Args:
            handler (KUKA_Handler): The C3 Bridge Connection handler
        """        

        self.handler = handler
        self._dosysvar = dosysvar
        self._dotrace = dotrace

        # KUKA TRACE
        self.trace = KUKA_Trace(handler)
        self.trace.Trace_Enable(self._dotrace)

    def __read (self, name: str, default = None):
        """Tries to read a value from the C3 interface, or returns a default
        value.

        Args:
            name (str): The name of the varaible to read
            default (_type_, optional): The default value to return. Defaults to None.

        Returns:
            The variable read or the default value
        """        

        r = self.handler.KUKA_ReadVar(name)
        if r is None or r == b'':
            return default
        return r
    
    ### This section defines setter/getters to change the variables used by ###
    ### the data collector sub                                              ###

    ## PyDONE
    @property
    def PyDONE (self) -> bool:
        return self.__read("PyDONE", False)

    ## __PYTHON_HAS_READ
    @property
    def HAS_READ (self) -> int:
        return self.__read("__PYTHON_HAS_READ", 0)
    
    @HAS_READ.setter
    def HAS_READ (self, value: int):
        self.handler.KUKA_WriteVar("__PYTHON_HAS_READ", value)

    ### ------------------------------------------------------------------- ###

    def __try_get_data (self, name: str) -> bytes:
        """Tries to read a variable via the C3 interface up to 10 times

        Args:
            name (str): The name of the variable to read

        Raises:
            Exception: The read operation failed 10 times in a row

        Returns:
            bytes: The read data
        """        

        tries = 0
        r: bytes = b''
        while (r == b'' or r is None) and tries < 10:
            r = self.handler.KUKA_ReadVar(name)
            tries += 1
        if (r == b'' or r is None):
            raise Exception("Failed to read " + name)
        return r

    def __format_motor_value (self, motor: int):
        """Converts the analog output reading to 7 columns boolean values

        Args:
            motor (int): The analog value

        Returns:
            List[int]: The 7 columns reprensenting the motor states
        """        

        return [ (1 if i == motor else 0) for i in range(1,7) ]

    ## Get data
    def get_data (self) -> Tuple[List[float|int], bool, int]:
        """Collects the latest sample made available by the Data collector sub

        Raises:
            Exception: "Incomplete Tab 1 !": Not all columns have been read

        Returns:
            List[float|int]: The sample
        """        

        r1: bytes = self.__try_get_data("__TAB_1[]")
        raw = r1.split(b" ")[:-1]
        if len(raw) != self.__TAB1_LEN:
            print("Invaild raw data :", raw)
            raise Exception("Incomplete Tab 1 !")
        
        TAB1 = [ float(raw[i]) for i in range(len(raw)) ]           # Bytes to float conversion

        data = [
            TAB1[self.__TAB1_SAMPLE],                               # Sample time
            *TAB1[self.__TAB1_DATA_START:self.__TAB1_DATA_END],     # Per-axis data
            *self.__format_motor_value(TAB1[self.__TAB1_MOTOR]),    # Motor status columns
            TAB1[self.__TAB1_SAMPLE_READ],                          # Queue Read 
            TAB1[self.__TAB1_SAMPLE_WRITE],                         # Queue Write
        ]

        return (
                data, 
                TAB1[self.__TAB1_DATA_AVAILABLE] == 1,              # Data available flag
                TAB1[self.__TAB1_DONE]                        # PyDone status
                )

    ## Reading function for the queue
    def read (self, time_before, load: int = 0) -> Tuple[List[float|int], bool, int]:
        """Reads a sample from the data collection sub

        Args:
            time_before (_type_): The time at which the latest successful read has occured (for latency calculation)
            load (int, optional): The the class of the sample. Defaults to 0.

        Returns:
            Tuple[List[float|int], bool, bool]: The read data, data available flag and PyDone flag
        """        

        data, data_available, done = self.get_data()
        now = time()
        data.append(load)                       # Load label
        data.append(1 if load == 0 else 0)      # Faulty label
        data.append(f'{self._speed}%')          # Speed Label
        data.append((now - time_before) * 1000) # Request Time
        return data, data_available, done
        

    def reset (self):
        """Resets the data collection sub
        """        

        self._read_done = False
        self._data_available = False
        self.handler.KUKA_WriteVar("SAMPLE_NUMBER", 1)
        self.handler.KUKA_WriteVar("SAMPLE_READ", 1)
        self.handler.KUKA_WriteVar("__PyResetTimer", True)
                                       
    def init (self, A_iter: List[str], speed: str, sampling: str):
        """Prepares and starts data collection

        Args:
            A_iter (List[str]): The number of iterations per axis
            speed (str): The speed for this run
            sampling (str): The sampling rate in ms
        """        

        self.rate = int(sampling)/1000
        
        # Writing parameters        
        for i in range(1,7):
            self.handler.KUKA_WriteVar(f'PyITER[{i}]', A_iter[i - 1])

        self.handler.KUKA_WriteVar('PySPEED', speed)
        self._speed = int(speed)
        if self._dosysvar:
            self.handler.KUKA_WriteVar('ColSAMPLING', sampling)
            # Waiting for Reset
            self.handler.KUKA_WriteVar('ColRESET', True)
            sleep(0.1)
            while not self.handler.KUKA_ReadVar('ColRESET_DONE'):
                sleep(0.2)
            sleep(0.5)
            self.handler.KUKA_WriteVar('ColRUN', True)
        
        self.handler.KUKA_WriteVar('PyRUN', True)
        sleep(0.1)

    def run_sysvar (
            self, 
            next: Callable[[float, int, int], None] = None, 
            load: int = -1
        ) -> pd.DataFrame:
        """Runs data collection using system variables

        Args:
            next (Callable[[float, int, int], None], optional): A function used to update the user interface in order to show the current progress. Defaults to None.
            load (int, optional): The class of the sample. Defaults to -1.

        Returns:
            pd.DataFrame: The collected data 
        """               
        
        # Indexes of control data in the raw readings
        __SAMPLE_READ_INDEX = -6
        __SAMPLE_WRITE_INDEX = -5
        __SAMPLE_LATENCY = -1

        # Buffers
        frames = []
        
        # Time data to calculate latency
        now = time()

        # Flag indicating the state of the collection
        self._read_done = False
        trace_stoped = False
        
        while self._read_done != 1 :
            
            # Getting our sample
            data, self._data_available, self._read_done = self.read(now, load)
            if self._read_done == 2 and not(trace_stoped): # stop de trace if robot movement done
                trace_stoped = True
                self.trace.Trace_Stop()
            
            # Checking if some data is available
            if (self._data_available):
                
                # Getting the last sample number. Defaults to 0 which does not exist in KRL
                before = 0 if len(frames) == 0 else frames[-1][__SAMPLE_READ_INDEX]

                # Getting the current sample number. Starts at 1
                after = data[__SAMPLE_READ_INDEX]

                # Ignore duplicates
                if before == after:
                    sleep(self.rate)
                    continue
                
                # Indicating to the sub that we read the sample
                self.HAS_READ = after
                
                # Resetting the current time to measure the next request delay
                now = time()
                
                # Storing the currently measured data
                frames.append(data)

                # Callback to give a visual feedback on current data collection
                if next is not None:
                    next(data[__SAMPLE_LATENCY], data[__SAMPLE_READ_INDEX], data[__SAMPLE_WRITE_INDEX])
            
                # Sleeping not to slow down KRL
                sleep(self.rate/2)

            else:
                # Sleeping to wait for the next data to be sampled
                sleep(self.rate/2)
        
        # Creating a data frame
        return pd.DataFrame(frames, columns=self._columns)
    
    def get_trace_data (
            self, 
            speed: int | str, 
            load: int,
            sampling: int,
            dir: str = ".\\temp",
            sampling_offset: int = 0
        ) -> Tuple[pd.DataFrame, int]:
        """Gets the Kuka Trace data

        Args:
            speed (int | str): The speed at which the run was made
            load (int): The class of the sample
            sampling (int): The sampling rate of the Kuka Trace
            dir (str, optional): The folder in which to store the files to process. Defaults to ".\temp".
            sampling_offset (int, optional): The index of the first sample. Defaults to 0.

        Returns:
            Tuple[pd.DataFrame, int]: The collected data and the number of samples
        """        

        data_trace = self.trace.Trace_Download()
        
        dataset_length = len(data_trace['Sample_time'])
        data_trace['Speed'] = [int(speed)] * dataset_length

        if load == 0:
            data_trace['Faulty'] = [0] * dataset_length
        else:
            data_trace['Faulty'] = [1] * dataset_length
        
        data_trace['Load'] = [load] * dataset_length

        sampling = sampling / 1000
        # print(sampling, sampling_offset * sampling)
        # data_trace["Sample_time"] += (sampling_offset * sampling)

        return pd.DataFrame(data_trace)
    
    def run_single_speed (self, A_iter, speed, sampling, next, done, load, lock, now, trace_config, trace_sampling, temp_dir, trace_offset):
        """ Subsection of function acquire """
        
        # Sync with other robots
        if lock is not None:
            lock.acquire()
        
        # Print current speed to the terminal    
        print(f"Run with speed {speed}")
        
        cell = self.handler.ipAddress.split(".")[3][-1]
        file_name = now + f"[{speed}]_R{cell}"
        
        # KUKA Trace setup
        self.tracing = False
        if self._dotrace:
            self.trace.Trace_Config([ file_name, trace_config , "600" ])
            self.tracing = self.trace.Trace_Start()
            if self.tracing:
                print("Trace start for " + self.handler.ipAddress)
            else:
                print("Could not start trace for " + self.handler.ipAddress)
        
        # Robot init and launch
        self.init(A_iter, speed, sampling)
        
        # KRL System Variables collection
        if self._dosysvar :
            data_vars = self.run_sysvar(next, load) 
        else:
            data_vars = None
        
        # KUKA Trace collection
        data_trace = None
        if self._dotrace:
            if not self._dosysvar :
                # check PyDONE robot variable to stop the trace, as it is done in sysvar collection method
                sleep(1)
                while self.PyDONE !=True:
                    sleep(0.1)
                self.trace.Trace_Stop()
            if self.tracing:
                sleep(1)
                data_trace = self.get_trace_data(speed, load, trace_sampling, temp_dir, trace_offset)
        
        # Indicating that this run is done
        if done is not None:
            done()
        
        # Returning the two collected DataFrames
        return data_vars, data_trace
        
    def acquire (
            self, 
            A_iter: List[str], 
            speed: str | int | slice, 
            sampling: str, 
            trace_config = "12_ms",
            next: Callable[[float], None] = None,
            done: Callable = None,
            load: int = -1,
            lock: Semaphore = None,
            temp_dir: str = None
        ) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """Collects a full dataset using both system variables and Kuka Traces

        Args:
            A_iter (List[str]): The number of iteration per axis
            speed (str | int | slice): The speed (range) at which to run the iterations
            sampling (str): The system variables sampling time
            trace_config (str, optional): The trace configuration file name. Defaults to "12_ms".
            next (Callable[[float], None], optional): A function used to update the user interface in order to show the current progress. Defaults to None.
            done (Callable, optional): A function which is called at the end of a run. Used to sync multiple robots. Defaults to None.
            load (int, optional): The dataset class. Defaults to -1.
            lock (Semaphore, optional): A lock used to sync multiple robots. Defaults to None.
            temp_dir (str, optional): The folder in which to store the files to process. Defaults to None.

        Returns:
            Tuple[pd.DataFrame, pd.DataFrame]: The system variables Dataframe and the Kuka Trace DataFrame
        """
        
        # Reset the sub
        self.reset()

        # Using current date as a unique file name for the Kuka Traces
        now = datetime.now(tz=TZ).strftime("%Y-%m-%d_%H-%M-%S")

        # Making sure that Kuka Trace is stopped
        self.trace.Trace_Stop()

        # Getting the Kuka Trace sampling rate from the file name
        trace_sampling = int(trace_config.split("_")[0])

        ## ---- Run for a single speed ---- ##
        if type(speed) == str or type(speed) == int:
            return self.run_single_speed(A_iter, speed, sampling, next, done, load, lock, now, trace_config, trace_sampling, temp_dir, 0)

        ## ---- Run for multiple speeds ---- ##

        start = speed.start if speed.start is not None else 20
        step = speed.step if speed.step is not None else 10
        stop = speed.stop if speed.stop is not None else start
       
        # Buffers 
        sysvar_dataframes = []
        trace_dataframes = []
        trace_offset = 0
        data_sysvar : pd.DataFrame = None
        data_trace : pd.DataFrame = None
        
        while start <= stop:
            data_sysvar, data_trace = self.run_single_speed(A_iter, start, sampling, next, done, load, lock, now, trace_config, trace_sampling, temp_dir, trace_offset)
            
            # KUKA Trace
            if self.tracing and self._dotrace:
                # Updating the offset
                trace_offset += data_trace.shape[0]

                # Storing the resulting DataFrame in the buffer
                trace_dataframes.append(data_trace)
            
            # Storing the resulting DataFrame in the buffer
            if self._dosysvar:
                sysvar_dataframes.append(data_sysvar)  
            start += step 

        # Merging the results for each speed into one monolithic DataFrame for each method
        if self._dosysvar:
            sys_data = pd.concat(sysvar_dataframes)
        else:
            sys_data = None
        if self._dotrace:
            trace_data = pd.concat(trace_dataframes)
            trace_data["Sample_time"] = np.arange(len(trace_data["Sample_time"])) * (trace_sampling / 1000)
        else:
            trace_data = None

        return sys_data, trace_data