Skip to content
Snippets Groups Projects
Commit 6f80cfb3 authored by VulcanixFR's avatar VulcanixFR
Browse files

[WIP] Adding new R64 converter

parent 50e2750f
Branches
No related merge requests found
......@@ -7,3 +7,5 @@ ui/__pycache__
ui/__pycache__/*
kuka/__pycache__/
kuka/__pycache__/*
temp/*/*
temp/*/
......@@ -340,22 +340,17 @@ class KUKA_DataReader:
Tuple[pd.DataFrame, int]: The collected data and the number of samples
"""
data_trace = self.trace.Trace_Download(dir, False)
data_trace = self.trace.Trace_Download()
dataset_length = len(data_trace['Sample'])
data_trace['Speed'] = [f'{speed}%'] * dataset_length
dataset_length = len(data_trace['Sample_time'])
data_trace['Speed'] = [int(speed)] * dataset_length
if load == 0:
data_trace['Faulty'] = [0] * dataset_length
else:
data_trace['Faulty'] = [1] * dataset_length
for index in range(len(data_trace['Main Category'])):
if data_trace['Main Category'][index] == 0:
data_trace['Main Category'][index] = load
del data_trace['Sample']
data_trace['Sample'] = [ sampling * (i + sampling_offset) for i in range(dataset_length) ]
data_trace['Load'] = [load] * dataset_length
return (pd.DataFrame(data_trace), dataset_length)
......@@ -408,7 +403,8 @@ class KUKA_DataReader:
lock.acquire()
# KUKA Trace
file_name = now + f"[{speed}]"
cell = self.handler.ipAddress.split(".")[3][-1]
file_name = now + f"[{speed}]_R{cell}"
self.trace.Trace_Config([ file_name, trace_config , "600" ])
self.tracing = self.trace.Trace_Start()
if self.tracing:
......@@ -423,7 +419,7 @@ class KUKA_DataReader:
# KUKA Trace
if self.tracing:
# self.trace.Trace_Stop()
sleep(5)
# sleep(5)
data_trace, _ = self.get_trace_data(speed, load, trace_sampling, temp_dir)
# Indicating the end of this run
......@@ -455,7 +451,8 @@ class KUKA_DataReader:
print(f"Run with speed {start}")
# KUKA Trace
file_name = now + f"[{start}]"
cell = self.handler.ipAddress.split(".")[3][-1]
file_name = now + f"[{start}]_R{cell}"
self.trace.Trace_Config([file_name, trace_config, "600"])
self.tracing = self.trace.Trace_Start()
if self.tracing:
......@@ -472,7 +469,7 @@ class KUKA_DataReader:
if self.tracing:
self.trace.Trace_Stop()
sleep(0.1)
data_trace, size = self.get_trace_data(speed, load, trace_sampling, temp_dir, trace_offset)
data_trace, size = self.get_trace_data(start, load, trace_sampling, temp_dir, trace_offset)
# Updating the offset
trace_offset += size
......
......@@ -2,8 +2,25 @@ from .handler import KUKA_Handler
import xml.etree.ElementTree as et
from time import sleep
import os
import shutil
import re
import numpy as np
from typing import List, Dict, Tuple
from pathlib import Path
import pandas as pd
class DatFile:
sampling: float = 0
col241: List[float] = []
traces: List[float] = []
length: float = 0
def __init__(self) -> None:
self.sampling = 0
self.col241 = []
self.traces = []
self.length = 0
class KUKA_Trace:
"""
......@@ -11,6 +28,23 @@ class KUKA_Trace:
download new_data to desired location.
"""
# Directory to store the trace file before processing
temp_folder: Path = Path(os.getcwd(), "temp")
# The folder of the robot containing the traces
trace_root: Path = None
# Translations from German to English
translations = {
"Sollposition": "Position_Command",
"Istposition": "Position",
"Positionsschleppfehler": "Position_Error",
"Geschwindigkeitsdifferenz": "Velocity_Error",
"Motortemperatur": "Temperature",
"Istmoment": "Torque",
"Iststrom": "Current"
}
def __init__(self, rob_instance: KUKA_Handler):
self.name = None
self.config = None
......@@ -41,6 +75,8 @@ class KUKA_Trace:
if self.enable:
config_path = fr'\\{self.rob_instance.ipAddress}\roboter\TRACE\{configuration}.xml'
self.trace_root = Path(f'\\\\{self.rob_instance.ipAddress}\\roboter\\TRACE\\')
# try: # Comented to not modify the xml file
# tree = et.parse(config_path)
# root = tree.getroot()
......@@ -129,125 +165,223 @@ class KUKA_Trace:
else:
return False
def Trace_Download(self, directory, delete_rob_file):
"""
Downloads the .r64 and .dat files with previously set self.name from KRC's shared folder IP/roboter/TRACE into
new folder.
:param directory: directory of trace new_data folder
:param delete_rob_file: If true, system will delete trace recording from KRC.
:return:
"""
if self.enable:
extensions = ['.dat', '.r64', '.trc']
file_paths = []
axis_paths = []
if self.name is not None:
for extension in extensions:
axis_paths.append(fr"\\{self.rob_instance.ipAddress}\roboter\TRACE\{self.name}_KRCIpo{extension}")
for axis in range(1, 7):
file_paths.append(
fr"\\{self.rob_instance.ipAddress}\roboter\TRACE\{self.name}_NextGenDrive#{axis}{extension}")
file_paths.append(fr"\\{self.rob_instance.ipAddress}\roboter\TRACE\{self.name}_PROG.TXT")
def Trace_Download(self):
else:
print('Configure Trace before downloading')
return False
file_path = directory + rf'\{self.name}.csv'
data_buffer = self.r64_converter(file_paths)
active_axis_raw = self.r64_converter(axis_paths)
active_axis_raw['Main Category'] = [0] * len(active_axis_raw['AnalogOut1'])
for axis in range(1,7):
active_axis_raw[f'A{axis}'] = [0]*len(active_axis_raw['AnalogOut1'])
for sample in range(0,len(active_axis_raw['AnalogOut1'])):
axis_number = str(int(active_axis_raw['AnalogOut1'][sample]))
if axis_number != '0':
active_axis_raw[f'A{axis_number}'][sample] = 1
result = self.read_traces(self.name)
return result
def translate (self, value: str) -> str:
if value in self.translations:
return self.translations[value]
return value
def copy_to_local (self, pairs: List[List[Path]], name: str):
src_folder = None
if type (self.trace_root) == str :
src_folder = self.trace_root
else:
src_folder = self.trace_root.absolute()
self.dest_folder = self.temp_folder.joinpath(name).absolute()
if not self.dest_folder.exists():
self.dest_folder.mkdir(parents=True)
for pair in pairs:
for file in pair:
src = None
if type(src_folder) == str:
src = src_folder + str(file)
else:
active_axis_raw['Main Category'][sample] = -1
src = src_folder.joinpath(file)
dest = self.dest_folder.joinpath(file)
shutil.copyfile(src, dest)
src.unlink()
def find_pairs (self, name: str):
del active_axis_raw['Sample']
del active_axis_raw['AnalogOut1']
result = data_buffer | active_axis_raw
extensions = ['.dat', '.r64', ".trc"]
file_names = [
"KRCIpo",
*[ f"NextGenDrive#{i}" for i in range(1,7) ]
]
files = []
lengths = [len(values) for values in result.values()]
min_data_length = min(lengths)
for file_name in file_names:
path = Path(f'{name}_{file_name}')
files.append([ path.with_suffix(s) for s in extensions ])
for key in result.keys():
result[key] = result[key][:min_data_length-1]
return files
if delete_rob_file:
file_paths = file_paths + axis_paths
for file_path in file_paths:
os.remove(file_path)
return result
def read_dat (self, dat: Path, suffix: str = "") -> DatFile:
def r64_converter(self, file_names):
data = {}
for file in file_names:
if '#' in file:
axis_number = re.search(r'#(.)', file).group(1)
channel_name = f'_A{axis_number}'
else:
axis_number = ''
channel_name = ''
if '.dat' in file:
with open(file, 'r') as dat_file:
config = [line.strip() for line in dat_file.readlines()]
trace_names = []
found_sampling_period = False
for line in config:
if '200,' in line:
trace_name_DE = line.split(',')[1]
match trace_name_DE:
case "Sollposition":
trace_names.append("Position_Command")
case "Istposition":
trace_names.append("Position")
case 'Positionsschleppfehler':
trace_names.append('Position_Error')
case 'Geschwindigkeitsdifferenz':
trace_names.append('Velocity_Error')
case "Motortemperatur":
trace_names.append("Temperature")
case 'Istmoment':
trace_names.append('Torque')
case 'Iststrom':
trace_names.append('Current')
case _:
trace_names.append(trace_name_DE)
if '241,' in line:
if not found_sampling_period:
sampling_period = int(float(line.split(',')[1]) * 1000)
found_sampling_period = True
for name in trace_names:
if name != 'Zeit':
data[f'{name}{channel_name}'] = []
if '.r64' in file:
channels = list(data.keys())
current_axis_channels = []
for channel in channels:
if axis_number in channel:
current_axis_channels.append(channel)
with open(file, 'rb') as real64_file:
all_samples = np.fromfile(real64_file, dtype='float64')
# number_of_samples = int(len(all_samples) / len(current_axis_channels))
channel_number = 0
for sample in all_samples:
data[current_axis_channels[channel_number]].append(sample)
if channel_number < len(current_axis_channels) - 1:
channel_number += 1
else:
channel_number = 0
for channel in data.keys(): #sekce bulharskych konstant
if 'Motortemperatur' in channel:
data[channel] = [sample - 273.15 for sample in data[channel]]
if 'Position' in channel:
data[channel] = [sample / 1000000 for sample in data[channel]]
if 'Velocity' in channel:
data[channel] = [sample / 6 for sample in data[channel]] #neptej se... proste to tak je
data['Sample'] = [x * sampling_period for x in range(len(data[channels[0]]))]
return data
out = DatFile()
with open(dat, "r") as dat_file:
config = [ line.strip() for line in dat_file.readlines() ]
inChannel = False
isZeit = False
for line in config:
if line == "#BEGINCHANNELHEADER":
inChannel = True
continue
if line == "#ENDCHANNELHEADER":
inChannel = False
continue
if not inChannel:
continue
code, value = line.split(",")
if isZeit:
if code == "241":
out.sampling = float(value)
isZeit = False
continue
match code:
case "200":
if value == "Zeit":
isZeit = True
continue
out.traces.append(self.translate(value) + suffix)
case "220":
l = int(value)
if out.length == 0:
out.length = l
case "241":
out.col241.append(float(value))
return out
def convert_r64 (self, r64: Path, dat: DatFile) -> Dict[str, List[float]]:
out: Dict[str, List[float]] = {}
for col in dat.traces:
out[col] = []
N = len(dat.traces)
with open(r64, "rb") as file:
samples = np.fromfile(file, dtype='float64')
length = len(samples) // N
for i in range(length):
for n in range(N):
col = dat.traces[n]
out[col].append(samples[i * N + n] * dat.col241[n])
return out
def linear_interpolation (self, data: List[float], ratio: int = 1):
if ratio == 1:
return data
data_len = len(data)
neo_len = data_len * ratio
neo = np.zeros(neo_len)
neo[::ratio] = data
for i in range(1,neo_len):
# Skip existing data points
if i % ratio == 0:
continue
k = i // ratio
if (k + 1) >= data_len:
neo[i] = data[k]
continue
a = data[k+1] - data[k]
b = data[k+1] - a
neo[i] = (k / ratio) * a + b
return neo
def read_traces (self, name: str):
pairs = self.find_pairs(name)
self.copy_to_local(pairs, name)
self.copy_to_local([[f'{name}_PROG.TXT']], name)
data: List[Tuple[DatFile, Dict[str, List[float]]]] = []
for pair in pairs:
dat_path = self.dest_folder.joinpath(pair[0])
suffix = ""
if '#' in dat_path.stem:
n = re.search(r'#(.)', dat_path.stem).group(1)
suffix = f'_A{n}'
dat = self.read_dat(dat_path, suffix)
r64_path = self.dest_folder.joinpath(pair[1])
r64 = self.convert_r64(r64_path, dat)
data.append((dat, r64))
min_sampling = data[0][0].sampling
max_sampling = data[0][0].sampling
min_length = data[0][0].length
for d in data:
min_sampling = min(d[0].sampling, min_sampling)
max_sampling = max(d[0].sampling, max_sampling)
min_length = min(d[0].length, min_length)
ratio = int(max_sampling // min_sampling)
length = min_length * ratio
print(length, ratio, min_sampling, max_sampling, min_length)
dataframe = pd.DataFrame()
for d in data:
dat = d[0]
ratio = int(dat.sampling // min_sampling)
values = d[1]
for col in values:
if ratio > 1:
if "AnalogOut" in col:
# Step interpolation
temp = np.zeros(len(values[col]) * ratio)
for i in range(ratio):
temp[i::ratio] = values[col]
dataframe[col] = temp[:length]
else:
# Linear interpolation
dataframe[col] = self.linear_interpolation(values[col], ratio)[:length]
else:
dataframe[col] = np.float64(values[col])[:length]
T = len(dataframe[dataframe.columns[0]])
dataframe["Sample_time"] = np.arange(T) * min_sampling
return dataframe[[dataframe.columns[-1], *dataframe.columns[:-1]]]
......@@ -217,7 +217,7 @@ class MainProgram (MainWindow):
try:
self.dataframe = pd.read_excel(path)
self.data.update_layout_on_columns(self, self.dataframe.columns)
self.dataframe['Sample_time_s'] = self.dataframe['Sample']/1000 if 'TRACE' in path else self.dataframe['Sample_time']/1000
self.dataframe['Sample_time_s'] = self.dataframe['Sample_time']/1000
sg.popup("Done")
self.data.enable_plot_buttons(True)
except Exception as e:
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment