diff --git a/.gitignore b/.gitignore index 57372b350cefae9af94fb3335a95daae40a68192..a88c9143dedaebfb1a6ff5605b4c8d5a166f88da 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ ui/__pycache__ ui/__pycache__/* kuka/__pycache__/ kuka/__pycache__/* +temp/*/* +temp/*/ diff --git a/kuka/reader.py b/kuka/reader.py index c69e170dfaa6f8bf8357a9ce58abfe243006ea4f..33e2648ecd120d18d3a0f4642648632bb682f428 100644 --- a/kuka/reader.py +++ b/kuka/reader.py @@ -340,22 +340,17 @@ class KUKA_DataReader: Tuple[pd.DataFrame, int]: The collected data and the number of samples """ - data_trace = self.trace.Trace_Download(dir, False) + data_trace = self.trace.Trace_Download() - dataset_length = len(data_trace['Sample']) - data_trace['Speed'] = [f'{speed}%'] * dataset_length + dataset_length = len(data_trace['Sample_time']) + data_trace['Speed'] = [int(speed)] * dataset_length if load == 0: data_trace['Faulty'] = [0] * dataset_length else: data_trace['Faulty'] = [1] * dataset_length - for index in range(len(data_trace['Main Category'])): - if data_trace['Main Category'][index] == 0: - data_trace['Main Category'][index] = load - - del data_trace['Sample'] - data_trace['Sample'] = [ sampling * (i + sampling_offset) for i in range(dataset_length) ] + data_trace['Load'] = [load] * dataset_length return (pd.DataFrame(data_trace), dataset_length) @@ -408,7 +403,8 @@ class KUKA_DataReader: lock.acquire() # KUKA Trace - file_name = now + f"[{speed}]" + cell = self.handler.ipAddress.split(".")[3][-1] + file_name = now + f"[{speed}]_R{cell}" self.trace.Trace_Config([ file_name, trace_config , "600" ]) self.tracing = self.trace.Trace_Start() if self.tracing: @@ -423,7 +419,7 @@ class KUKA_DataReader: # KUKA Trace if self.tracing: # self.trace.Trace_Stop() - sleep(5) + # sleep(5) data_trace, _ = self.get_trace_data(speed, load, trace_sampling, temp_dir) # Indicating the end of this run @@ -455,7 +451,8 @@ class KUKA_DataReader: print(f"Run with speed {start}") # KUKA Trace - file_name = now + f"[{start}]" + cell = self.handler.ipAddress.split(".")[3][-1] + file_name = now + f"[{start}]_R{cell}" self.trace.Trace_Config([file_name, trace_config, "600"]) self.tracing = self.trace.Trace_Start() if self.tracing: @@ -472,7 +469,7 @@ class KUKA_DataReader: if self.tracing: self.trace.Trace_Stop() sleep(0.1) - data_trace, size = self.get_trace_data(speed, load, trace_sampling, temp_dir, trace_offset) + data_trace, size = self.get_trace_data(start, load, trace_sampling, temp_dir, trace_offset) # Updating the offset trace_offset += size diff --git a/kuka/trace.py b/kuka/trace.py index e2d61f76190112819556f939ad9b0e62080f2415..20f9226ef795bee777ab62cbad4de253ce8b9cbd 100644 --- a/kuka/trace.py +++ b/kuka/trace.py @@ -2,8 +2,25 @@ from .handler import KUKA_Handler import xml.etree.ElementTree as et from time import sleep import os +import shutil import re import numpy as np +from typing import List, Dict, Tuple +from pathlib import Path +import pandas as pd + +class DatFile: + + sampling: float = 0 + col241: List[float] = [] + traces: List[float] = [] + length: float = 0 + + def __init__(self) -> None: + self.sampling = 0 + self.col241 = [] + self.traces = [] + self.length = 0 class KUKA_Trace: """ @@ -11,6 +28,23 @@ class KUKA_Trace: download new_data to desired location. """ + # Directory to store the trace file before processing + temp_folder: Path = Path(os.getcwd(), "temp") + + # The folder of the robot containing the traces + trace_root: Path = None + + # Translations from German to English + translations = { + "Sollposition": "Position_Command", + "Istposition": "Position", + "Positionsschleppfehler": "Position_Error", + "Geschwindigkeitsdifferenz": "Velocity_Error", + "Motortemperatur": "Temperature", + "Istmoment": "Torque", + "Iststrom": "Current" + } + def __init__(self, rob_instance: KUKA_Handler): self.name = None self.config = None @@ -41,6 +75,8 @@ class KUKA_Trace: if self.enable: config_path = fr'\\{self.rob_instance.ipAddress}\roboter\TRACE\{configuration}.xml' + self.trace_root = Path(f'\\\\{self.rob_instance.ipAddress}\\roboter\\TRACE\\') + # try: # Comented to not modify the xml file # tree = et.parse(config_path) # root = tree.getroot() @@ -129,125 +165,223 @@ class KUKA_Trace: else: return False - def Trace_Download(self, directory, delete_rob_file): - """ - Downloads the .r64 and .dat files with previously set self.name from KRC's shared folder IP/roboter/TRACE into - new folder. - :param directory: directory of trace new_data folder - :param delete_rob_file: If true, system will delete trace recording from KRC. - :return: - """ - if self.enable: - extensions = ['.dat', '.r64', '.trc'] - file_paths = [] - axis_paths = [] - - if self.name is not None: - for extension in extensions: - axis_paths.append(fr"\\{self.rob_instance.ipAddress}\roboter\TRACE\{self.name}_KRCIpo{extension}") - for axis in range(1, 7): - file_paths.append( - fr"\\{self.rob_instance.ipAddress}\roboter\TRACE\{self.name}_NextGenDrive#{axis}{extension}") - file_paths.append(fr"\\{self.rob_instance.ipAddress}\roboter\TRACE\{self.name}_PROG.TXT") + def Trace_Download(self): + - else: - print('Configure Trace before downloading') - return False - file_path = directory + rf'\{self.name}.csv' - - data_buffer = self.r64_converter(file_paths) - active_axis_raw = self.r64_converter(axis_paths) - active_axis_raw['Main Category'] = [0] * len(active_axis_raw['AnalogOut1']) - for axis in range(1,7): - active_axis_raw[f'A{axis}'] = [0]*len(active_axis_raw['AnalogOut1']) - for sample in range(0,len(active_axis_raw['AnalogOut1'])): - axis_number = str(int(active_axis_raw['AnalogOut1'][sample])) - if axis_number != '0': - active_axis_raw[f'A{axis_number}'][sample] = 1 + result = self.read_traces(self.name) + + return result + + + def translate (self, value: str) -> str: + + if value in self.translations: + return self.translations[value] + + return value + + def copy_to_local (self, pairs: List[List[Path]], name: str): + + src_folder = None + if type (self.trace_root) == str : + src_folder = self.trace_root + else: + src_folder = self.trace_root.absolute() + + self.dest_folder = self.temp_folder.joinpath(name).absolute() + + if not self.dest_folder.exists(): + self.dest_folder.mkdir(parents=True) + + for pair in pairs: + for file in pair: + src = None + if type(src_folder) == str: + src = src_folder + str(file) else: - active_axis_raw['Main Category'][sample] = -1 + src = src_folder.joinpath(file) + dest = self.dest_folder.joinpath(file) + shutil.copyfile(src, dest) + src.unlink() + + + def find_pairs (self, name: str): - del active_axis_raw['Sample'] - del active_axis_raw['AnalogOut1'] - result = data_buffer | active_axis_raw + extensions = ['.dat', '.r64', ".trc"] + file_names = [ + "KRCIpo", + *[ f"NextGenDrive#{i}" for i in range(1,7) ] + ] + files = [] - lengths = [len(values) for values in result.values()] - min_data_length = min(lengths) + for file_name in file_names: + + path = Path(f'{name}_{file_name}') + files.append([ path.with_suffix(s) for s in extensions ]) - for key in result.keys(): - result[key] = result[key][:min_data_length-1] + return files - if delete_rob_file: - file_paths = file_paths + axis_paths - for file_path in file_paths: - os.remove(file_path) - return result + def read_dat (self, dat: Path, suffix: str = "") -> DatFile: - def r64_converter(self, file_names): - data = {} - for file in file_names: - if '#' in file: - axis_number = re.search(r'#(.)', file).group(1) - channel_name = f'_A{axis_number}' - else: - axis_number = '' - channel_name = '' - if '.dat' in file: - with open(file, 'r') as dat_file: - config = [line.strip() for line in dat_file.readlines()] - trace_names = [] - found_sampling_period = False - for line in config: - if '200,' in line: - trace_name_DE = line.split(',')[1] - match trace_name_DE: - case "Sollposition": - trace_names.append("Position_Command") - case "Istposition": - trace_names.append("Position") - case 'Positionsschleppfehler': - trace_names.append('Position_Error') - case 'Geschwindigkeitsdifferenz': - trace_names.append('Velocity_Error') - case "Motortemperatur": - trace_names.append("Temperature") - case 'Istmoment': - trace_names.append('Torque') - case 'Iststrom': - trace_names.append('Current') - case _: - trace_names.append(trace_name_DE) - if '241,' in line: - if not found_sampling_period: - sampling_period = int(float(line.split(',')[1]) * 1000) - found_sampling_period = True - - for name in trace_names: - if name != 'Zeit': - data[f'{name}{channel_name}'] = [] - if '.r64' in file: - channels = list(data.keys()) - current_axis_channels = [] - for channel in channels: - if axis_number in channel: - current_axis_channels.append(channel) - with open(file, 'rb') as real64_file: - all_samples = np.fromfile(real64_file, dtype='float64') - # number_of_samples = int(len(all_samples) / len(current_axis_channels)) - channel_number = 0 - for sample in all_samples: - - data[current_axis_channels[channel_number]].append(sample) - if channel_number < len(current_axis_channels) - 1: - channel_number += 1 - else: - channel_number = 0 - for channel in data.keys(): #sekce bulharskych konstant - if 'Motortemperatur' in channel: - data[channel] = [sample - 273.15 for sample in data[channel]] - if 'Position' in channel: - data[channel] = [sample / 1000000 for sample in data[channel]] - if 'Velocity' in channel: - data[channel] = [sample / 6 for sample in data[channel]] #neptej se... proste to tak je - data['Sample'] = [x * sampling_period for x in range(len(data[channels[0]]))] - return data + out = DatFile() + + with open(dat, "r") as dat_file: + + config = [ line.strip() for line in dat_file.readlines() ] + + inChannel = False + isZeit = False + + for line in config: + + if line == "#BEGINCHANNELHEADER": + inChannel = True + continue + + if line == "#ENDCHANNELHEADER": + inChannel = False + continue + + if not inChannel: + continue + + code, value = line.split(",") + + if isZeit: + if code == "241": + out.sampling = float(value) + isZeit = False + continue + + match code: + case "200": + if value == "Zeit": + isZeit = True + continue + + out.traces.append(self.translate(value) + suffix) + + case "220": + l = int(value) + if out.length == 0: + out.length = l + + case "241": + out.col241.append(float(value)) + + return out + + def convert_r64 (self, r64: Path, dat: DatFile) -> Dict[str, List[float]]: + + out: Dict[str, List[float]] = {} + + for col in dat.traces: + out[col] = [] + + N = len(dat.traces) + + with open(r64, "rb") as file: + + samples = np.fromfile(file, dtype='float64') + length = len(samples) // N + + for i in range(length): + for n in range(N): + col = dat.traces[n] + out[col].append(samples[i * N + n] * dat.col241[n]) + + return out + + def linear_interpolation (self, data: List[float], ratio: int = 1): + if ratio == 1: + return data + + data_len = len(data) + neo_len = data_len * ratio + + neo = np.zeros(neo_len) + neo[::ratio] = data + + for i in range(1,neo_len): + # Skip existing data points + if i % ratio == 0: + continue + + k = i // ratio + if (k + 1) >= data_len: + neo[i] = data[k] + continue + + a = data[k+1] - data[k] + b = data[k+1] - a + + neo[i] = (k / ratio) * a + b + + return neo + + def read_traces (self, name: str): + + pairs = self.find_pairs(name) + self.copy_to_local(pairs, name) + self.copy_to_local([[f'{name}_PROG.TXT']], name) + + data: List[Tuple[DatFile, Dict[str, List[float]]]] = [] + + for pair in pairs: + + dat_path = self.dest_folder.joinpath(pair[0]) + + suffix = "" + if '#' in dat_path.stem: + n = re.search(r'#(.)', dat_path.stem).group(1) + suffix = f'_A{n}' + + dat = self.read_dat(dat_path, suffix) + + r64_path = self.dest_folder.joinpath(pair[1]) + r64 = self.convert_r64(r64_path, dat) + + data.append((dat, r64)) + + min_sampling = data[0][0].sampling + max_sampling = data[0][0].sampling + min_length = data[0][0].length + for d in data: + min_sampling = min(d[0].sampling, min_sampling) + max_sampling = max(d[0].sampling, max_sampling) + min_length = min(d[0].length, min_length) + + ratio = int(max_sampling // min_sampling) + length = min_length * ratio + + print(length, ratio, min_sampling, max_sampling, min_length) + + dataframe = pd.DataFrame() + + for d in data: + dat = d[0] + ratio = int(dat.sampling // min_sampling) + + values = d[1] + + for col in values: + + if ratio > 1: + + if "AnalogOut" in col: + # Step interpolation + temp = np.zeros(len(values[col]) * ratio) + for i in range(ratio): + temp[i::ratio] = values[col] + dataframe[col] = temp[:length] + else: + # Linear interpolation + dataframe[col] = self.linear_interpolation(values[col], ratio)[:length] + + else: + dataframe[col] = np.float64(values[col])[:length] + + T = len(dataframe[dataframe.columns[0]]) + dataframe["Sample_time"] = np.arange(T) * min_sampling + + return dataframe[[dataframe.columns[-1], *dataframe.columns[:-1]]] diff --git a/main.py b/main.py index 3c560ecac33f4a1dc554b32b05c6f79caed36ccf..00bb431b1760cb67bf62158fef1ce68091f9b9b1 100644 --- a/main.py +++ b/main.py @@ -217,7 +217,7 @@ class MainProgram (MainWindow): try: self.dataframe = pd.read_excel(path) self.data.update_layout_on_columns(self, self.dataframe.columns) - self.dataframe['Sample_time_s'] = self.dataframe['Sample']/1000 if 'TRACE' in path else self.dataframe['Sample_time']/1000 + self.dataframe['Sample_time_s'] = self.dataframe['Sample_time']/1000 sg.popup("Done") self.data.enable_plot_buttons(True) except Exception as e: