Skip to content
Snippets Groups Projects
Commit d5d26822 authored by antcsny's avatar antcsny
Browse files

Cleanup, comments and readme update

parent 44c941d2
Branches
No related merge requests found
import PySimpleGUI as sg
import os
import pandas as pd
import matplotlib.pyplot as plt
sg.theme("SystemDefaultForReal")
class MainWindow (sg.Window):
dataframe = []
dataframe.append(None)
dataframe.append(None)
# Flags to enable the plot of variables in datasets : torque, curent, temperature, command position, measured position
_do_trace = [True, True, True, False, False]
def __init__ (self, *args, **kwargs):
"""
With __make_layout, generate the pysimplegui window of the GUI
Return : sg.Window
"""
super().__init__("Data Plot", self.__make_layout(), finalize = True, *args, *kwargs)
def __make_layout (self):
# plot des variables qu'on veut : combo selon les colonnes du dataframe ouvert
# plot des données d'une seule variable sur un seul axe
self.data = self.def_file_browse('data1')
self.data2 = self.def_file_browse('data2')
self.do_tq = sg.Checkbox('Motor torque', key='-do_tq-', size=(25, 1), enable_events=True, default=True)
self.do_curr = sg.Checkbox('Motor current', key='-do_curr-', size=(25, 1), enable_events=True, default=True)
self.do_tptr = sg.Checkbox('Motor temperature', key='-do_tprt-', size=(25, 1),enable_events=True, default=True)
self.do_posact = sg.Checkbox('Robot command position', key='-do_posact-', size=(25, 1), enable_events=True, default=False)
self.do_posreal = sg.Checkbox('Robot measured position', key='-do_posreal-', size=(25, 1), enable_events=True, default=False)
self.do_axis = [ sg.Checkbox(f'A{i}', key=f'-do_axis{i}-', enable_events=True, default=True) for i in range(1,7)]
self._layout = [
[ self.data ],
[ self.data2 ],
[ sg.Frame("Variables to plot :", border_width=0, layout=[
[self.do_tq],
[self.do_curr],
[self.do_tptr],
[self.do_posact],
[self.do_posreal]
]),
],
[ sg.Text('Axis to plot :'), *self.do_axis ],
[ sg.Button('Trace Selected variables', key='-trace_selvar-', disabled=False, expand_x=True) ]
]
return self._layout
def def_file_browse(self, key:str):
data_path = os.path.dirname(os.path.realpath(__file__)) + "/data"
self._input_data = sg.Input(default_text=data_path, key=f'-path_{key}-', size=(67, 1), font=("Consolas", 10), enable_events=True)
self.import_xlsx = sg.FileBrowse("Browse EXCEL",initial_folder=data_path, file_types=(('Excel Files', '*.xlsx'),), key=f'-browse_{key}-')
self.open_xlsx = sg.Button("OPEN", key=f'-open_{key}-')
return [self._input_data, self.import_xlsx, self.open_xlsx]
def open_xlsx_file (self, path: str, dfnb:int):
"""Tries to load a .xslx file collected from the system variables
Args:
path (str): The path to the file to open
"""
try:
self.dataframe[dfnb-1] = pd.read_excel(path)
if 'TRACE' in path :
self.dataframe[dfnb-1]['Sample_time_s'] = self.dataframe[dfnb-1]['Sample']/1000
self.enable_var_buttons(True, 'TRACE')
else:
self.dataframe[dfnb-1]['Sample_time_s'] = self.dataframe[dfnb-1]['Sample_time']/1000
self.enable_var_buttons(True, 'default')
self.update_do()
self.data_col = self.dataframe[dfnb-1].columns
except Exception as e:
self.enable_var_buttons(False)
sg.popup(e)
def update_do (self):
"""_summary_
Update the state of the variables do_XXX if a checkbox is clicked
"""
i=0
for checkbox in [self.do_tq, self.do_curr, self.do_tptr, self.do_posact, self.do_posreal]:
self._do_trace[i] = bool(checkbox.get()) and checkbox.Disabled == False
checkbox.update(value = self._do_trace[i])
i+=1
def trace_selected_variables (self, dfnb:int):
plt.close('all')
a = self.get_selected_axis()
if(self._do_trace[0]):
self.dataframe[dfnb-1].plot(x="Sample_time_s",y=[f"Torque_A{i}" for i in a], grid=True),plt.ylabel("Motor Torque (N.m)")
if(self._do_trace[1]):
self.dataframe[dfnb-1].plot(x="Sample_time_s",y=[f"Current_A{i}" for i in a], grid=True),plt.ylabel("Motor Current (%)")
if(self._do_trace[2]):
self.dataframe[dfnb-1].plot(x="Sample_time_s",y=[f"Temperature_A{i}" for i in a], grid=True),plt.ylabel("Motor Temperature (°K)")
if(self._do_trace[3]):
self.dataframe[dfnb-1].plot(x="Sample_time_s",y=[f"Position_Command_A{i}" for i in a], grid=True),plt.ylabel("Robot command position in grid (mm))")
if(self._do_trace[4]):
self.dataframe[dfnb-1].plot(x="Sample_time_s",y=[f"Position_A{i}" for i in a], grid=True),plt.ylabel("Real Robot position in grid (mm))")
plt.pause(0.1) # Alternative to plt.show() that is not blocking
def get_selected_axis (self):
i=1
res = []
for axis in self.do_axis:
if axis.get():
res.append(i)
i+=1
return res
def enable_var_buttons (self, enable:bool, case:str = ''):
e = not(enable and (case == 'TRACE' or case == 'default'))
f = not(enable and case != 'TRACE')
self.do_tq.update(disabled=e)
self.do_curr.update(disabled=e)
self.do_tptr.update(disabled=f)
self.do_posact.update(disabled=f)
self.do_posreal.update(disabled=f)
window = MainWindow()
while True:
event, values = window.read(100)
if event == sg.WIN_CLOSED:
break
if event == '-open_data1-':
window.open_xlsx_file(values['-path_data1-'], 1)
if event == '-open_data2-':
window.open_xlsx_file(values['-path_data2-'], 2)
if '-do_' in event:
window.update_do()
if event == '-trace_selvar-':
window.trace_selected_variables(1)
if 'path' in event:
window[event].Widget.xview_moveto(1)
window.close()
\ No newline at end of file
......@@ -4,28 +4,30 @@ This tool is made to collect and plot sensor data from KUKA KR 3 R540 robotic ar
It collects the system vairables via a custom KRL submodule and trace data from
the KUKA Trace module of the robot controler.
For the tests, we can use three idetntical KUKA Cell with robotic arms, as one is shown here :
For the tests, we use three idetntical KUKA Cell with robotic arms, as one is shown here :
![Robot Image](./images/KUKA.jpg)
The robot is driven by a controler, that takes care of the power supply of each DC
motor independantly, robot program, memory and security. A robot program is stored in
a text format in .src file, associated with a .dat file for program data, each written
in KRL programming language. Global variables, which can be accessed by Python, are
located in $config.dat file. Among global variables there are system variables in read
only access, and this Python tool use it to collect data from the robot.
in KRL programming language.
As the purpose of this tool is to make a diagnosis on the robot arm health, the robot
program is a full-range motion on each axis of the robot. One motor iteration consists
of a axis movement by a defined angle from the default position, on positive then negative
side. Data analysis for diagnosis is curenly in developement.
For each of the 6 motors of the robot arm, the following variables are measured by system variables:
This tool uses a KRL submodule 'Data Collector' to store periodicly the values of intern
variables into buffers, giving informations about the following variables for the 6 motors
of the robotic arm:
- Position
- Torque
- Current draw
- Temperature
The buffers are global variables that can be accessed by Python, and are located in $config.dat file.
As the purpose of this tool is to make a diagnosis on the robot arm health, the robot
program is a full-range motion on each axis of the robot. One motor iteration consists
of a axis movement by a defined angle from the default position, on positive then negative
side. Data analysis for diagnosis is curenly in developement.
This tool can collect the data in real time with system variables. The main latency
factor is the network quality, so an ethernet connection is preffered over Wi-Fi. The
data is buffered on the robot side but has a hard-coded limit of 20000 samples. The
......@@ -34,7 +36,7 @@ the network connection quality. If the write index in the buffers reach the maxi
(size of the buffer), it will loop back to the begining of the buffer. Data from system
variables is lost when the write index reach the read index
Besides the system variables, the robot controler can measure the traces of the robot.
Besides the system variables, the robot controler can measure data via the KUKA traces of the robot.
It is a way intended by KUKA to recover data from the robot, that can measure the folowwing variables :
- Position : Command, measured, error
- Velocity error
......@@ -52,7 +54,7 @@ able to recover the traces.
The data collected by the program can be found in the /data folder,
each data file having an explicit name with the date of the collection
and details on the configuration.
and details on the configuration. See an example below of the files naming format.
**Example** :
`[2024-05-17] 10h57 data [20%-60%] [36ms] [class 0] [10 10 10 10 10 10] - Robot 2`
......@@ -65,6 +67,7 @@ and details on the configuration.
Summary
---
- [Measure Sequence](#measure-sequence)
- [Library requirements](#library-requirements)
- [UI Description and details](#ui-description-and-details)
- [Collection settings](#collection-settings)
- [KUKA traces](#kuka-traces)
......@@ -79,9 +82,21 @@ Summary
- [KUKA Trace Configuration](#kuka-trace-configuration)
- The `Axis_Main.src` program
## Library requirements
This Python application uses multiple librairies that may not be in your default Python install package an are important:
- PySimpleGUI 5.0.4+
- Numpy 1.26.4+
- Pandas 2.2.2+
- Openpyxl 3.1.2+
- Matplotlib 3.8.4+
- optional : scipy, scikit-learn, seaborn for data processing
To install theses librairies, enter `pip install <librairy_name>` in the python terminal. Pysimplegui may be the more complicated to use on first time, because it demands a licence. To access the licence asking, install the librairy and lauch this application, and follow the steps on the licence request website.
## Measure Sequence
One measure run consists of a repetition of axis movement at a given speed.
One measure run consists of a succession of axis movement at a given speed.
The numer of iterations by axis is variable and is defined by the user before
launching the sequence. The speed of the robot can be modified to test the robot
movement in diferent stress conditions.\
......@@ -112,10 +127,10 @@ green, red otherwise.
With multiple testings, we sometimes noticed connection difficulties with the robot
cell 1. It can appear after a measure sequence or after a period of time doing nothing,
so make sure the connection is not broken for this robot. If connection problems occurs,
**C3 bridge** on the cell has to be restarted by **Minimizing HMI** on the control panel
(in settings). When Windows is showed, right click using a mouse on **C3** on the right
side of the task bar.
so make sure the connection is not broken for this robot by clicking on the robot connection
button. If connection problems occurs, **C3 bridge** on the cell has to be restarted by
**Minimizing HMI** on the control panel (in settings). When Windows is showed, right
click using a mouse on **C3** on the right side of the task bar.
### Collection settings
......@@ -125,9 +140,7 @@ The following parameters can be configured:
- Number of movement iteration per axis
- Sampling rate
- Speed range for the test
Due to software limitations from the KRL environment, the sampling rate is
bound to be a multiple of 12.
- Robot Load (only for data analysis)
The speed of the motors during the acquisition varies according to the confifuration.
This value is defined as a percentage of the max value (i.e. 0 to 100), Note that
......@@ -141,7 +154,8 @@ by a step.
A test at multiple speed is equivalent to the data merge of multiple single speed runs. So,
check the configurations before launching an acquisition, even if it is able to stop between
the runs if a problem has occurred.
the runs if a problem has occurred. The whole program will be stoped and all the windows will close
if the button 'Abort' is clicked during a robot run.
During a measure sequence, system variables and KUKA traces can run at the same time to compare
the data. If one of the collection method is prefered, we give the ability to chose which one
......@@ -153,9 +167,13 @@ on multiple speed runs.
### KUKA traces
A combo selector is used to select the KUKA Trace configuration that will be
run for data collection. The collected data is the same
as with the system variables but with more precision and reliability.
The acquisition is done by the RTOS of the robot.
run for data collection. The collected variables are the same as with the system variables but
with more precision and reliability. The difference between the configurations 4_ms and 12_ms
is in the smaple rate of the aquisition. Note that due to internal limitation, KUKA traces size
cannot exceed 50000 samples. So, 12 ms sample rate collection is limited to 600 seconds, and 200
seconds at 4 ms. Time shouldn't be a worry for this application, as one axis movement sequence is
stored in a single trace file. It just has to not exceed the time limit, that can be reached during
a high number of axis iteration at low robot speed.
### Robots loads
......@@ -180,7 +198,11 @@ robot is not connected
### Collected data plot
Accessible without robot connection, this section plots data. First, a excel file containing data has to be selected with the Browse button, then OPEN. Variables available on robot axes will be shown as checkboxes in the frame 'varriables to plot' to give the user the choice to plot a unique variable. As variables provide data on each axis, each one of them can be disabled by unchecking the checkboxes 'Axis to plot'.
Accessible without robot connection, this section plots data. First, a excel file containing
data has to be selected with the Browse button, then OPEN. Variables available on robot axes
will be shown as checkboxes in the frame 'varriables to plot' to give the user the choice to
plot a unique variable. As variables provide data on each axis, each one of them can be disabled
by unchecking the checkboxes 'Axis to plot'.
## Program Structure
......@@ -203,8 +225,8 @@ measurement are in [`Measure_latency`](./ui/measure_latency.py) and
[`Measure_robot`](./ui/measure_robot.py).
The [`Measure_robot`](./ui/measure_robot.py) class contains the functions to
execute a data collection,
with the dynamic UI in [`CollectionGraphWindow`](./ui/graph_window.py)
showing the robot data buffer state.
with the dynamic UI in [`CollectionGraphWindow`](./ui/graph_window.py),
showing the robot data buffer state if system variable collection is enabled.
The [`main`](main.py) python file contains the main loop at the basis of the UI.
It is done by a main class, and allows organization with class variables and
......@@ -282,21 +304,21 @@ DECL E6AXIS ColBUFFER_POS_MEAS[20000]
To allow data collection with system variables, please be sure to add the `Data_collector.sub`
to the list of running submodules. This submodules takes up to **1 minute** to
properly initialize the first values of the internal data buffers.
properly initialize the first values of the internal data buffers, prenventing invalid memory writings during the run.
This progam can be found in [`robot/KRL/`](./robot/KRL).
### KUKA Trace Configuration
Copy the provided configuration files found in [`robot/configurations/`](./robot/configurations) to
the `TRACE` folder of the robot.
the `TRACE` folder of the robot. Theses files will be needed to launch the trace collection method.
**Example path** : `\\192.168.1.151\roboter\TRACE\`
### The `Axis_Main.src` program
The data collection uses the `Axis_Main_dataset.src` program to create the data
to collect. It must be running in `AUT` mode at `100%` of run speed on the control panel to produce
valid data.
The data collection uses the `Axis_Main_dataset.src` program to perform robot movement. It must be running in `AUT` mode at `100%` of run speed on the control panel to produce
valid data. On robot side, select the program for its run and press play button twice to run the
initialisation and the main loop.
This progam can be found in [`robot/KRL/`](./robot/KRL)`.
\ No newline at end of file
......@@ -352,6 +352,8 @@ class KUKA_DataReader:
return pd.DataFrame(data_trace)
def run_single_speed (self, A_iter, speed, sampling, next, done, load, lock, now, trace_config, trace_sampling, temp_dir, trace_offset):
""" Subsection of function acquire """
# Sync with other robots
if lock is not None:
lock.acquire()
......
......@@ -169,6 +169,9 @@ class KUKA_Trace:
return False
def Trace_Download(self):
"""
Download the traces corresponding to the previous run and return the data
"""
if self.enable:
result = self.read_traces(self.name)
return result
......@@ -205,6 +208,14 @@ class KUKA_Trace:
src.unlink()
def find_pairs (self, name: str):
"""
Find trace files
Args:
name (str): data file name
Returns:
_type_: array containing the trace fies names
"""
extensions = ['.dat', '.r64', ".trc"]
file_names = [
......
<?xml version="1.0"?>
<Trace xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<Time>200</Time>
<PreTrigger>0</PreTrigger>
<Basename>test_4_ms_v2</Basename>
<Modules>
<Module>
<Name>KRCIpo</Name>
<Rate>12000</Rate>
<Channel>500</Channel>
<Channel>43</Channel>
<Channel>44</Channel>
<Channel>45</Channel>
<Channel>46</Channel>
<Channel>47</Channel>
<Channel>48</Channel>
</Module>
<Module>
<Name>NextGenDrive#1</Name>
<Rate>4000</Rate>
<Channel>1</Channel>
<Channel>2</Channel>
<Channel>3</Channel>
<Channel>8</Channel>
<Channel>14</Channel>
<Channel>200</Channel>
<Channel>202</Channel>
</Module>
<Module>
<Name>NextGenDrive#2</Name>
<Rate>4000</Rate>
<Channel>1</Channel>
<Channel>2</Channel>
<Channel>3</Channel>
<Channel>8</Channel>
<Channel>14</Channel>
<Channel>200</Channel>
<Channel>202</Channel>
</Module>
<Module>
<Name>NextGenDrive#3</Name>
<Rate>4000</Rate>
<Channel>1</Channel>
<Channel>2</Channel>
<Channel>3</Channel>
<Channel>8</Channel>
<Channel>14</Channel>
<Channel>200</Channel>
<Channel>202</Channel>
</Module>
<Module>
<Name>NextGenDrive#4</Name>
<Rate>4000</Rate>
<Channel>1</Channel>
<Channel>2</Channel>
<Channel>3</Channel>
<Channel>8</Channel>
<Channel>14</Channel>
<Channel>200</Channel>
<Channel>202</Channel>
</Module>
<Module>
<Name>NextGenDrive#5</Name>
<Rate>4000</Rate>
<Channel>1</Channel>
<Channel>2</Channel>
<Channel>3</Channel>
<Channel>8</Channel>
<Channel>14</Channel>
<Channel>200</Channel>
<Channel>202</Channel>
</Module>
<Module>
<Name>NextGenDrive#6</Name>
<Rate>4000</Rate>
<Channel>1</Channel>
<Channel>2</Channel>
<Channel>3</Channel>
<Channel>8</Channel>
<Channel>14</Channel>
<Channel>200</Channel>
<Channel>202</Channel>
</Module>
</Modules>
<Trigger>
<ModuleName>KRCIpo</ModuleName>
<ID>1</ID>
</Trigger>
</Trace>
\ No newline at end of file
import PySimpleGUI as sg
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
from threading import Semaphore
class CollectionGraphWindow (sg.Window):
lock = Semaphore(1)
def __init__(self, cell: int):
self.cell = cell
self._title = sg.Text(f'Robot {self.cell}', key="-TITLE-", font='Helvetica 20')
self._subtitle = sg.Text("Collecting sample n°...", key="Subtitle")
self._canvas_elem = sg.Canvas(size=(480,360), key="-CANVAS-")
self._status = sg.Text("",key="-colstatus-",text_color="#000", font="Helvetica 15")
self._exit = sg.Button("Exit", key='-colexit-',font="Helvetica 11", size=(15,1))
_graph_layout = [
[ sg.Push(), self._title, sg.Push() ],
[ sg.Push(), self._subtitle, sg.Push() ],
[ self._canvas_elem ],
[ sg.Push(), self._status, sg.Push() ],
[ sg.Push(), self._exit, sg.Push() ]
]
self._data_buffer = []
self._data_latency = []
self._s = 0
super().__init__(f'Robot {cell}', _graph_layout, finalize=True)
self._canvas = self._canvas_elem.TKCanvas
self._figure: Figure = Figure()
self._ax = self._figure.add_subplot(2, 1, 1)
self._ay = self._figure.add_subplot(2, 1, 2)
self._ax.set_xlabel("Sample")
self._ax.set_ylabel("Number of buffered values")
self._ax.grid()
self._ay.set_xlabel("Sample")
self._ay.set_ylabel("Network latency (ms)")
self._ay.grid()
self._fig_agg = FigureCanvasTkAgg(self._figure, self._canvas)
self._fig_agg.draw()
self._fig_agg.get_tk_widget().pack(side='top', fill='both', expand=1)
def add (self, buffer: int, latency: float):
self.lock.acquire()
self._data_buffer.append(buffer)
self._data_latency.append(latency)
self._s += 1
self.lock.release()
def redraw (self):
self.lock.acquire()
a = [*self._data_buffer]
b = [*self._data_latency]
self.lock.release()
self._ax.cla()
self._ax.set_xlabel("Sample")
self._ax.set_ylabel("Number of buffered values")
self._ax.grid()
self._ax.plot(range(len(a)), a)
self._ay.cla()
self._ay.set_xlabel("Sample")
self._ay.set_ylabel("Network latency")
self._ay.grid()
self._ay.plot(range(len(b)), b)
self._fig_agg.draw()
self._subtitle.update(f"Collecting sample n°{self._s}")
if __name__ == "__main__":
import math
import random
import time
win = CollectionGraphWindow(1)
win.read(timeout=100)
for i in range(200):
event, value = win.read(timeout=10)
if event == sg.WIN_CLOSED:
exit(0)
win.add(math.sin(2 * math.pi * 0.1 * i), random.randint(1,100) / 10)
time.sleep(1/30)
win.close()
import PySimpleGUI as sg
import numpy as np
import threading as th
from datetime import datetime
from dateutil import tz
from time import time, sleep, time_ns
import os
import pandas as pd
import matplotlib.pyplot as plt
from kuka import KUKA_DataReader, KUKA_Handler
import traceback
from graph_window import CollectionGraphWindow
from threading import Semaphore
from typing import Callable
TZ = tz.gettz("Europe/Prague")
sg.theme("SystemDefaultForReal")
class UI_Collection_Settings (sg.Frame):
# Class flags
_disabled = False
_constant_speed = True
_do_tq = True
_do_curr = True
_do_temp = True
_do_posact = False
_do_posreal = False
# Robot staus variables
_not_connected = "#99c"
_connected = "#3f3"
_errored = "#f33"
_robot_status = [ _not_connected, _not_connected, _not_connected ]
def __init__ (self):
"""_summary_ : Class constructor
With __make_layout, generate a pysimplegui frame to be integrated in the main window for the configuration related elements
sg elements like checkbox and buttons are stored in the class to be accessed easily
Return : sg.Frame
"""
super().__init__("Collection Settings", self.__make_layout())
def __make_layout (self):
now = self.now()
self._input_dataset_name = sg.InputText(now, key='-dataset_name-', size=(30, 1), font=("Consolas", 10))
dir_path = os.path.dirname(os.path.realpath(__file__)) + "/data"
self._input_working_dir = sg.Input(default_text=dir_path, key='-user_path-', size=(40, 1), font=("Consolas", 10))
self._input_browse_dir = sg.FolderBrowse(initial_folder=dir_path, key='-browse-')
self._input_iterations = []
for i in range(1,7):
self._input_iterations.append(sg.InputText('1',key=f'num_of_iter{i}',size=(5,1), font=("Consolas", 10)))
self.do_tq = sg.Checkbox('$TORQUE_AXIS_ACT', key='-do_tq-', size=(25, 1), enable_events=True, default=True)
self.do_curr = sg.Checkbox('$CURR_ACT', key='-do_curr-', size=(25, 1), enable_events=True, default=True)
self.do_tptr = sg.Checkbox('$MOT_TEMP', key='-do_tprt-', size=(25, 1),enable_events=True, default=True)
self.do_posact = sg.Checkbox('$AXIS_ACT', key='-do_posact-', size=(25, 1), enable_events=True, default=False)
self.do_posreal = sg.Checkbox('$AXIS_ACT_MEAS', key='-do_posreal-', size=(25, 1), enable_events=True, default=False)
self.sample_rate = sg.Combo(['12','24', '36', '48', '60'], default_value='12', key='-Sys_sampling-')
self.doconst_speed = sg.Checkbox('Constant', key='-doconst_speed-', size=(25, 1),enable_events=True, default=True)
self._input_min_speed = sg.InputText('30', key='-rob_speed_min-', size=(3, 1), font=("Consolas", 10))
self._input_max_speed = sg.InputText('40', key='-rob_speed_max-', size=(3, 1), font=("Consolas", 10), disabled=True)
self._input_speed_step = sg.InputText('10', key='-rob_speed_step-', size=(3, 1), font=("Consolas", 10), disabled=True)
self._robot_selector = [ sg.Button(f"Robot {i}", key=f"-rob_select:{i}-") for i in range(1,4) ]
self._layout = [
[ sg.Text("Dataset name :"), self._input_dataset_name, sg.Button("Auto Name", key="-dataset_auto-name-", font=("Consolas", 10)) ],
[ sg.Text("Working directory :"), self._input_working_dir, self._input_browse_dir ],
[ sg.Text("Number of iterations from axis A1 to A6 : "), sg.Push(), *self._input_iterations, sg.Button("All = A1",key='-equal_iter-',font=("Consolas", 10)) ],
[
[sg.Text('Variables to collect :')],
[self.do_tq, sg.Text('Motor torque of an axis (torque mode)')],
[self.do_curr, sg.Text('Motor current of an axis')],
[self.do_tptr, sg.Text('Motor temperature of an axis (~ precision)')],
[self.do_posact, sg.Text('Axis-specific setpoint position of the robot')],
[self.do_posreal, sg.Text('Axis-specific real position of the robot')]
],
[ sg.Text('Variable sampling rate (ms):'), self.sample_rate],
[
sg.Text("Robot speed :"), self.doconst_speed, sg.Push(),
sg.Text("From :"), self._input_min_speed,
sg.Text("% To :"), self._input_max_speed,
sg.Text("% Step :"), self._input_speed_step
],
[ sg.Text("Selected robots : "), sg.Push(), *self._robot_selector ]
]
return self._layout
def now (self):
"""_summary_
Give the formated date time
Returns: _type_: str
"""
return datetime.now(tz=TZ).strftime("[%Y-%m-%d] %Hh%M data")
def update_speed_entry(self):
"""_summary_
Update the sate (disabled or enabled) of the inputs of robot speed depending on the state of the checkbox "Constant"
If constant, the Min speed will be considerated in the robot run
"""
self._constant_speed = bool(self.doconst_speed.get())
self._input_max_speed.update(disabled=self._constant_speed, text_color="#000")
self._input_speed_step.update(disabled=self._constant_speed, text_color="#000")
def update_robot_buttons (self):
"""_summary_:
updates the color of the button clicked to select a robot depending on the status
"""
for i in range(len(self._robot_selector)):
btn = self._robot_selector[i]
btn.update(button_color=self._robot_status[i])
def set_robot_connected (self, cell: int, connected: bool, errored: bool = False):
"""_summary_
updates the state of the variable _robot_status, giving the state of the robots
Args:
cell (int): nuber of the robot (1,2,3)
connected (bool): give True to indicate that the connection to the robot is successful
errored (bool, optional): Defaults to False, give True if the robot connection is errored
"""
if errored:
self._robot_status[cell] = self._errored
return
self._robot_status[cell] = self._connected if connected else self._not_connected
def update_do (self):
"""_summary_
Update the state of the variables do_XXX if a checkbox is clicked
"""
self._do_tq = bool(self.do_tq.get())
self._do_curr = bool(self.do_curr.get())
self._do_temp = bool(self.do_tptr.get())
self._do_posact = bool(self.do_posact.get())
self._do_posreal = bool(self.do_posreal.get())
def check_configuration(self):
"""_summary_
Configuration check before launching a mesure of the robot variables, insure that all the parameters are correct :
axis iterations, robot speed
Returns:
_type_: configuration correct ? True/False
"""
config_errors = []
try:
for axis in self._input_iterations:
if not 99 >= int(axis.get()) >= 0:
config_errors.append('- Number of axis iteration must be from 0 to 99\n')
break
except ValueError:
config_errors.append('- Number of axis iteration must be integer value\n')
try:
if not 100 >= int(self._input_max_speed.get()) >= 1:
config_errors.append('- Robot max speed must be from 1 to 100 %\n')
if not 100 >= int(self._input_min_speed.get()) >= 1:
config_errors.append('- Robot max speed must be from 1 to 100 %\n')
if not 100 >= int(self._input_speed_step.get()) >= 1:
config_errors.append('- Robot step speed must be from 1 to 100 %\n')
except ValueError:
config_errors.append('- Robot speed be integer value\n')
if not bool(config_errors):
return True
else:
sg.popup_ok(''.join(config_errors), title='ERROR', font=16, modal=True)
return False
@property # called when : print(disabled)
def disabled (self):
return self._disabled
@disabled.setter # called when : disabled = True/False
def disabled (self, v: bool):
"""_summary_
Disable all the interactions with widgets of the frame to prevent modifications
Make it clear to the user that no robot is connected
Args:
v (bool): disable the entries of the entire frame ?
"""
self._disabled = v
self.do_tq.update(disabled=v)
self.do_curr.update(disabled=v)
self.do_tptr.update(disabled=v)
self.do_posact.update(disabled=v)
self.do_posreal.update(disabled=v)
self._input_dataset_name.update(disabled=v, text_color="#000")
self._input_working_dir.update(disabled=v, text_color="#000")
self._input_browse_dir.update(disabled=v)
self._input_min_speed.update(disabled=v, text_color="#000")
self._input_max_speed.update(disabled=self._constant_speed or v, text_color="#000")
self._input_speed_step.update(disabled=self._constant_speed or v, text_color="#000")
self.doconst_speed.update(disabled=v)
self.sample_rate.update(disabled=v)
for s in self._input_iterations:
if not isinstance(s, sg.Text):
s.update(disabled=v, text_color="#000")
@property ##################################################################################### A completer #######################################################################""
def speed (self) -> str | slice:
"""Returns the speed configuration. If the speed is marked as constant,
it returns a string. If the speed is not constant, it returns a splice
object which contains the minimum speed, the maximum speed and the step.
Returns:
str | slice: The speed configuration
"""
if self._constant_speed:
return self._input_min_speed.get()
min = int(self._input_min_speed.get())
max = int(self._input_max_speed.get())
step = int(self._input_speed_step.get())
return slice(min, max, step)
class UI_KUKATrace( sg.Frame):
_disabled = False
def __init__ (self):
"""_summary_ : Class constructor
With __make_layout, generate a pysimplegui frame to be integrated in the main window for the Kuka trace related elements
sg elements like checkbox and buttons are stored in the class to be accessed easily
Return : sg.Frame
"""
super().__init__(f"KUKA Trace", self.__make_layout(), expand_x=True)
def __make_layout (self):
self._sampling_rate = sg.Combo(['12_ms','12_ms_v2','4_ms'], key='-Trace_config-', default_value='12_ms', size=(20, 1))
self._do_delfile = sg.Checkbox('Delete files from KRC?',key='-delete-',default=True, disabled=True)
self._layout = [
[ sg.Text('Trace Sampling rate:'), self._sampling_rate, self._do_delfile ]
]
return self._layout
@property # called when : print(disabled)
def disabled (self):
return self._disabled
@disabled.setter # called when : disabled = True/False
def disabled (self, v: bool):
""" Give the disabled state v to the Combo entry for the sampling rate to disable it"""
self._disabled = v
self._sampling_rate.update(disabled=v)
class UI_RobotLoad (sg.Frame):
_disabled = False
def __init__ (self, i: int):
"""_summary_ : Class constructor
With __make_layout, generate a pysimplegui frame to be integrated in the main window for one Robot load parameters
sg elements like checkbox and buttons are stored in the class to be accessed easily
Return : sg.Frame
"""
self.i = i
super().__init__(f"Robot {i}", self.__make_layout())
def __make_layout (self):
self._input_load = sg.Combo(['0','0.5','1','2'],default_value='0',key=f'-load_robot{self.i}-',size=(5,1))
self._input_bungee_yellow = sg.Checkbox('Yellow',key=f'-WrapY_robot{self.i}-')
self._input_bungee_red = sg.Checkbox('Red',key=f'-WrapR_robot{self.i}-')
self._layout = [
[ sg.Text("Load :"), sg.Push(), self._input_load , sg.Text("kg") ],
[ sg.Text("Bungee cords :") ],
[ self._input_bungee_yellow ],
[ self._input_bungee_red ]
]
return self._layout
@property
def disabled (self): # called when : print(disabled)
return self._disabled
@disabled.setter # called when : disabled = True/False
def disabled (self, v: bool):
""" Give the disabled state v to the entries and combo frome the layout to disable it"""
self._disabled = v
self._input_load.update(disabled=v)
self._input_bungee_yellow.update(disabled=v)
self._input_bungee_red.update(disabled=v)
@property
def setup (self):
"""_summary_
Returns:
_type_: array containing the load parameter on the given robot : load weight, bungee cords
"""
return [
float(self._input_load.get()),
1 if self._input_bungee_yellow.get() else 0,
1 if self._input_bungee_red.get() else 0,
]
class UI_Data (sg.Frame):
_disabled = False
def __init__ (self):
"""_summary_ : Class constructor
With __make_layout, generate a pysimplegui frame to be integrated in the main window for launching measuring sequence and data printing
sg elements like checkbox and buttons are stored in the class to be accessed easily
To print collection results from an excel file, select it with "Browse" and store it in python with "OPEN"
Then show the graphs with buttons below
Return : sg.Frame
"""
super().__init__("Collected Data", self.__make_layout(), expand_x=True)
def __make_layout (self):
data_path = os.path.dirname(os.path.realpath(__file__)) + "/data"
self._btn_start = sg.Button('START SEQUENCE', key='-BTN_start-', button_color='white', size=(25, 2),expand_x=True)
self._input_data = sg.Input(default_text=data_path, key='-data_path-', size=(40, 1), font=("Consolas", 10))
self.import_xlsx = sg.FileBrowse("Browse EXCEL",initial_folder=data_path, file_types=(('Excel Files', '*.xlsx'),), key='-browse_xlsx-')
self.open_xlsx = sg.Button("OPEN", key='-open_xlsx-')
self.trace_selvariables = sg.Button('Trace Selected variables', key='-trace_selvar-', disabled=True, expand_x=True)
self.trace_samples = sg.Button('Trace Sample collection history', key='-trace_sample-', disabled=True, expand_x=True)
self.trace_latency = sg.Button('Trace Sample latency distribution', key='-trace_latency-', disabled=True, expand_x=True)
self._layout = [
[ self._btn_start ],
[sg.Frame("", border_width=0, layout = [
[ self._input_data, self.import_xlsx, self.open_xlsx ],
[ self.trace_selvariables, self.trace_samples ],
[ self.trace_latency ]
])
]
]
return self._layout
@property
def disabled (self): # called when : print(disabled)
return self._disabled
@disabled.setter # called when : disabled = True/False
def disabled (self, v: bool):
""" Give the disabled state v to the lauch button to disable it
Data plot functionality is accessible if a robot is not connected
"""
self._disabled = v
self._btn_start.update(disabled=v)
def enable_plot_buttons(self, enable):
"""_summary_
Updates the state of the plot butons to prevent data plot if no data is already stored in python
Args:
enable (bool): True/False to enable/disable buttons
"""
self.trace_selvariables.update(disabled=not(enable))
self.trace_samples.update(disabled=not(enable))
self.trace_latency.update(disabled=not(enable))
class UI_Latency(sg.Frame):
_disabled = False
def __init__ (self):
"""_summary_ : Class constructor
With __make_layout, generate a pysimplegui frame to be integrated in the main window for launching latency measuring sequence
Containing one single button to launch measuring on the connected robots
Return : sg.Frame
"""
super().__init__("Measure Latency", self.__make_layout(), expand_x= True)
def __make_layout (self):
self.start_measure = sg.Button("Start Latency Measure on connected robots", key = '-BTN_latency-')
self.layout = [[self.start_measure]]
return self.layout
@property
def disabled (self):
return self._disabled
@disabled.setter
def disabled (self, v: bool):
self._disabled = v
self.start_measure.update(disabled=v)
class UI_Gripper (sg.Frame):
_disabled = False
def __init__ (self):
"""_summary_ : Class constructor
With __make_layout, generate a pysimplegui frame to be integrated in the main window to control the gripper state of a robot
If robot selected in the Combo is connected, open or close the gripper with buttons
Return : sg.Frame
"""
super().__init__("Gripper", self.__make_layout())
def __make_layout (self):
self.robot_choice = sg.Combo(['Robot 1', 'Robot 2', 'Robot 3'], default_value='Robot 2', key='-Rob_choice-')
self._btn_open = sg.Button('Open', key='-BTN_open_gripper-')
self._btn_close = sg.Button('Close', key='-BTN_close_gripper-')
self._layout = [
[ self.robot_choice, self._btn_open, self._btn_close]
]
return self._layout
@property
def disabled (self):
return self._disabled
@disabled.setter
def disabled (self, v: bool):
self._disabled = v
self._btn_close.update(disabled=v)
self._btn_open.update(disabled=v)
@property
def _robot_choice (self):
""" Returns the number of the selected robot in the combo for further use """
target = self.robot_choice.get()
for i in range(len(self.robot_choice.Values)):
if target == self.robot_choice.Values[i]:
return i+1
return self._disabled
class MainWindow (sg.Window):
def __init__ (self, *args, **kwargs):
"""_summary_ : Class constructor
With __make_layout, generate the pysimplegui window of the GUI containing all the frames from the other UI class
Frames are stored in the class variables to be accessed
Return : sg.Window
"""
super().__init__("Data Collection", self.__make_layout(), finalize = True, *args, *kwargs)
def __make_layout (self):
self.collection_settings = UI_Collection_Settings()
self.kukatrace = UI_KUKATrace()
self.robots = [ UI_RobotLoad(i) for i in range(1, 4) ]
self.latency = UI_Latency()
self.data = UI_Data()
self.gripper = UI_Gripper()
self._layout = [
[ self.collection_settings ],
[ self.kukatrace ],
[ self.robots[0], sg.Push(), self.robots[1], sg.Push(), self.robots[2] ],
[ self.latency, self.gripper],
[ sg.Push(), self.data, sg.Push() ],
[ sg.Text("COS0028 - PIE0073, from the work of Bc. Adam Batrla BAT0050, 2024")]
]
return self._layout
def get_category (self, cell: int = 1):
"""_summary_
Give the category of the chosen load for data processing
Args:
cell (int, optional): Robot cell number. Defaults to 1.
Returns:
_type_: category of the robot load
"""
if cell - 1 > len(self.robots):
return -1
robot = self.robots[cell - 1]
setup = robot.setup # method from UI_robotload
match setup:
case [0, 0, 0]:
return 0
case [0.5, 0, 0]:
return 1
case [1, 0, 0]:
return 2
case [2, 0, 0]:
return 3
case [0, 1, 0]:
return 4
case [0.5, 1, 0]:
return 5
case [1, 1, 0]:
return 6
case [2, 1, 0]:
return 7
case [0, 0, 1]:
return 8
case [0.5, 0, 1]:
return 9
case [1, 0, 1]:
return 10
case [2, 0, 1]:
return 11
case [0, 1, 1]:
return 12
case [0.5, 1, 1]:
return 13
case [1, 1, 1]:
return 14
case [2, 1, 1]:
return 15
# UI TEST by launching this file as main
if __name__ == "__main__":
dataframe = None
win = MainWindow()
while True:
event, values = win.read(100)
if event == sg.WIN_CLOSED:
break
if event == '-open_xlsx-':
try:
dataframe = pd.read_excel(values['-data_path-'])
dataframe['EXEC_TIME_s'] = dataframe['EXEC_TIME']/1000
sg.popup("Done")
win.data.enable_plot_buttons(True)
except Exception as e:
sg.popup(e)
win.data.enable_plot_buttons(False)
if event == '-trace_selvar-':
if(win.collection_settings._do_tq):
dataframe.plot(x="EXEC_TIME_s",y=[f"TQ_A{i}" for i in range(1,7,1)], grid=False),plt.ylabel("Motor Torque (N.m)")
if(win.collection_settings._do_curr):
dataframe.plot(x="EXEC_TIME_s",y=[f"CURR_A{i}" for i in range(1,7,1)], grid=False),plt.ylabel("Motor Current (%)")
if(win.collection_settings._do_temp):
dataframe.plot(x="EXEC_TIME_s",y=[f"TEMP_A{i}" for i in range(1,7,1)], grid=False),plt.ylabel("Motor Temperature (°K)")
if(win.collection_settings._do_posact):
dataframe.plot(x="EXEC_TIME_s",y=[f"POS_ACT_A{i}" for i in range(1,7,1)], grid=False),plt.ylabel("Actual Robot position in grid (mm))")
if(win.collection_settings._do_posreal):
dataframe.plot(x="EXEC_TIME_s",y=[f"POS_MEAS_A{i}" for i in range(1,7,1)], grid=False),plt.ylabel("Real Robot position in grid (mm))")
plt.show()
if event == '-trace_sample-':
dataframe.plot(x="EXEC_TIME_s",y=["Queue_Read", "Queue_Write"], grid=False),plt.ylabel("Samples")
plt.xlabel("Collection execution time (s)")
plt.twinx(), plt.plot(dataframe["EXEC_TIME_s"],dataframe['Total Request Time'], alpha=0.05), plt.ylabel("Request time of the sample (ms)")
plt.title("Samples in the buffer and red by Python")
plt.text(2,25,f"Sample time : {dataframe['EXEC_TIME'].diff().median()}")
plt.show()
if event == '-trace_latency-':
dataframe.hist(column='Total Request Time', grid=False, bins=30)
plt.title("Distribution of the collection time of a sample")
plt.xlabel(f"Request time (ms) : mean = {dataframe['Total Request Time'].mean().__round__(2)}"),plt.ylabel("Number of samples")
plt.show()
if event == '-BTN_open_gripper-':
print(win.gripper._robot_choice.get)
if event == '-BTN_start-':
continue
\ No newline at end of file
import numpy as np
import re
import pandas as pd
import PySimpleGUI as sg
import matplotlib.pyplot as plt
import os
#######################################################################################################################
######################################### F U N C T I O N S ######################################################
#######################################################################################################################
def plot_selector(data_file):
layout = []
layout.append([sg.Text('Select channels to plot:', font=(15))])
for channel in data_file.keys():
if channel != 'Sample':
layout.append([sg.Checkbox(text=channel, key=str(channel), default=True)])
layout.append([sg.Button('Select All', key='-BTN_ALL-'),
sg.Button('Deselect All', key='-BTN_NONE-')])
layout.append([sg.Button('Ok', key='-Ok-'), sg.Exit()])
window_s = sg.Window('Select Channels', layout, finalize=True)
while True:
event, values = window_s.read()
if event == '-Ok-':
output_data = {'Sample': data_file['Sample']}
for channel in data_file.keys():
if ('_' or 'time') in channel:
# if not ('filename' in channel or 'Sample' in channel):
if values[channel]:
output_data[channel] = data_file[channel]
window_s.Close()
return output_data
if event == '-BTN_ALL-':
for channel in data_file.keys():
if channel != 'Sample':
window_s[channel].update(True)
if event == '-BTN_NONE-':
for channel in data_file.keys():
if channel != 'Sample':
window_s[channel].update(False)
if event == sg.WIN_CLOSED or event == 'Exit':
break
def item_counter(list):
value_count = 0
output = []
for index in range(1, len(list)):
new_value = list[index]
last_value = list[index - 1]
if new_value == last_value:
value_count += 1
else:
value_count += 1
output.append(value_count)
value_count = 0
count_dict = {}
for item in output:
if item in count_dict:
count_dict[item] += 1
else:
count_dict[item] = 1
for key, value in count_dict.items():
print(f"Prvek {key} se vyskytuje {value}x.")
print('******************************************')
def create_figure(data_dict):
plt.rcParams.update({'font.size': 11})
plt.figure(figsize=(10, 4))
color_pallet = ['b', 'r', 'g', 'k', 'c', 'm', 'y']
plot_color = 0
if not isinstance(data_dict['Sample'][1], float):
data_dict['Sample'] = [x / 1000 for x in data_dict['Sample']]
for channel in data_dict:
label = channel
# label = channel.split('(')[0]
# label = ['A1', 'A2', 'A3', 'A4', 'A5', 'A6']
# label = ['A1 setpoint position (°)', 'A1 actual position (°)']
# label = ['No load','Bungee Cord 1' ,'Bungee Cord 1 + 2']
# label = ['0 g','2000 g', '1000 g']
# label = channel
if channel != 'Sample':
# if 'AXIS_ACT' in channel:
# if 'TORQUE' in channel:
# label = channel + ' (Nm)'
# else:
# label = channel + ' (°)'
# if 'MOT_TEMP' in channel:
# label = channel + ' (°C)'
# if 'CURR' in channel:
# label = channel + ' (%)'
# if 'time' in channel:
# label = channel.split('(')[0] + ' (ms)'
# item_counter(data_dict[channel])
plt.plot(data_dict['Sample'], data_dict[channel], color=color_pallet[plot_color % len(color_pallet)],
label=label) # label=label[plot_color]
plot_color += 1
# plt.suptitle('Values of the Selected Variables Over Time')
# plt.xlabel('Time (ms)')
plt.xlabel('Time (s)')
plt.ylabel('Axis A5, Actual torque (Nm)')
# plt.ylabel('Variable')
plt.legend(loc='upper right', ncol=1)
# plt.legend(bbox_to_anchor=(1, 1.2),loc='upper right',ncol = 3)
plt.grid(True)
plt.xlim([0, 15])
directory = r'D:\VSB\Data collection app - THESIS\data'
plt.savefig(directory + r'\graf.pdf', format="pdf", bbox_inches="tight")
plt.show()
def import_data(files):
dictionaries = []
layout = []
for index in range(len(files)):
dataset = {}
dictionaries.append(dataset)
for file, index in zip(files, range(len(files))):
df = pd.read_csv(file, sep=';')
for channel in df.columns:
dictionaries[index][channel] = df[channel].tolist()
dictionaries[index]['Filename'] = file.split('/')[-1]
trace_channels = [[] for _ in dictionaries]
for dataset, index in zip(dictionaries, range(len(dictionaries))):
for channel in dataset.keys():
if not ('Filename' in channel or 'Sample' in channel):
trace_channels[index].append(channel)
tabs = [[] for _ in dictionaries]
tab_groups = []
for index in range(len(dictionaries)):
tabs[index].append([sg.Text('Number of samples: ' + str(len(dictionaries[index]['Sample'])))])
tabs[index].append([sg.Text('Sampling period: ' + str(dictionaries[index]['Sample'][1]) + ' ms')])
for channel in dictionaries[index].keys():
if not ('Filename' in channel or 'Sample' in channel):
tabs[index].append([sg.Checkbox(text=channel, key=str(channel + str(index)))])
layout.append([sg.Text('Select channels to plot:', font=(15))])
for dataset, index in zip(dictionaries, range(len(dictionaries))):
tab_groups.append(sg.Tab(dataset['Filename'], tabs[index]))
layout.append([sg.TabGroup([tab_groups])])
layout.append([sg.Button('Ok', key='-Ok-'), ])
window_s = sg.Window('Select Channels', layout, finalize=True, grab_anywhere=True)
same_sampling = [dictionaries[index]['Sample'][1] == dictionaries[0]['Sample'][1] for index in
range(len(dictionaries))] # Check, if all the datasets have the same sampling rate
if not all(same_sampling):
sg.Popup('Not all imported datasets have the same sampling rate.\n'
'Comparing such channels is irrelevant.', title='Warning', font=13)
while True:
event, values = window_s.read()
if event == '-Ok-':
sample_channel = []
for index in range(len(dictionaries)):
sample_channel.append(dictionaries[index]['Sample'])
output_data = {'Sample': min(sample_channel, key=len)}
shortest_channel_length = len(output_data['Sample'])
for index in range(len(dictionaries)):
for channel in dictionaries[index]:
if not ('Filename' in channel or 'Sample' in channel) and values[channel + str(index)]:
output_data[channel + ' (' + str(dictionaries[index]['Filename']) + ')'] = \
dictionaries[index][channel][0:shortest_channel_length]
window_s.Close()
return output_data
if event == sg.WIN_CLOSED or event == 'Exit':
window_s.Close()
return None
def convert_trace_data(file_names):
data = {}
for file in file_names:
if '#' in file:
axis_number = re.search(r'#(.)', file).group(1)
channel_name = f'_A{axis_number}'
else:
axis_number = ''
channel_name = ''
if '.dat' in file:
with open(file, 'r') as dat_file:
config = [line.strip() for line in dat_file.readlines()]
trace_names = []
trace_units = []
found_sampling_period = False
for line in config:
if '200,' in line:
trace_names.append(line.split(',')[1])
if '202,' in line:
trace_units.append(line.split(',')[1])
if '241,' in line:
if not found_sampling_period:
sampling_period = int(float(line.split(',')[1]) * 1000)
found_sampling_period = True
for name, unit in zip(trace_names, trace_units):
if name != 'Zeit':
data[f'{name}{channel_name} ({unit})'] = []
if '.r64' in file:
channels = list(data.keys())
current_axis_channels = []
for channel in channels:
if axis_number in channel:
current_axis_channels.append(channel)
with open(file, 'rb') as real64_file:
all_samples = np.fromfile(real64_file, dtype='float64')
number_of_samples = int(len(all_samples) / len(current_axis_channels))
channel_number = 0
for sample in all_samples:
data[current_axis_channels[channel_number]].append(sample)
if channel_number < len(current_axis_channels) - 1:
channel_number += 1
else:
channel_number = 0
for channel in data.keys():
if 'Motortemperatur' in channel:
data[channel] = [sample - 273.15 for sample in data[channel]]
if 'position' in channel:
data[channel] = [sample / 1000000 for sample in data[channel]]
data['Sample'] = [x * sampling_period for x in range(len(data[channels[0]]))]
return data
def config_dialog():
default_directory = r'D:\VSB\Data collection app - THESIS\data'
sg.theme('GrayGrayGray')
layout = [
[sg.Text('Please insert IP address and Port number of the robot controller:')],
[sg.Text('IP Address:', size=(10, 1)), sg.InputText(size=(15, 1), key='-IP-', default_text='192.168.1.152')],
[sg.Text('Port Number:', size=(10, 1)), sg.InputText(size=(15, 1), key='-port-', default_text='7000')],
[sg.T('')],
[sg.Text('Choose working directory:')],
[sg.Input(default_text=default_directory, key='-user_path-', size=(60, 1)),
sg.FolderBrowse(initial_folder=default_directory, key='-browse-')],
[sg.T('')], [sg.Text('Run Data Collection Application in:')],
[sg.Button('Online mode', key='-Confirm_on-', button_color='#77DB00'),
sg.Button('Offline mode', key='-Confirm_off-')]]
window = sg.Window('Initial Setup', layout, keep_on_top=True, modal=True,
element_justification='c', icon='V2/V2.5/icon/setup_icon.ico')
while True:
event, values = window.read()
config_errors = []
if event == sg.WIN_CLOSED:
quit()
if 'Confirm' in event:
try:
if os.path.exists(values['-user_path-']):
directory = values['-user_path-']
trace_path = os.path.join(directory, "trace")
if not os.path.exists(trace_path):
os.makedirs(trace_path)
csv_path = os.path.join(directory, "csv")
if not os.path.exists(csv_path):
os.makedirs(csv_path)
else:
directory = ''
config_errors.append('- Working directory does not exist\n')
if 'off' in event:
offline_mode = True
if not bool(config_errors):
window.close()
return 0, 0, directory, offline_mode
else:
sg.popup_ok(''.join(config_errors), title='ERROR', font=14, modal=True,
icon='V2/V2.5/icon/error_icon.ico', keep_on_top=True)
else:
ip_address = values['-IP-']
parts = ip_address.split('.')
if len(parts) != 4:
config_errors.append('- Invalid IPv4 Address Format.\n')
else:
try:
for part in parts:
if not 0 <= int(part) <= 255:
config_errors.append('- IP Address number out of range.\n')
break
except ValueError:
config_errors.append('- IP address must contain integer values only.\n')
try:
port_number = int(values['-port-'])
if not 0 <= port_number <= 65535:
config_errors.append('- Port number out of range.')
except ValueError:
config_errors.append('- Port number must be an integer value.\n')
else:
offline_mode = False
if not bool(config_errors):
window.close()
return ip_address, port_number, directory, offline_mode
else:
sg.popup_ok(''.join(config_errors), title='ERROR', font=14, modal=True,
icon='V2/V2.5/icon/error_icon.ico', keep_on_top=True)
except Exception as e:
sg.popup_ok(e)
def open_help():
sg.theme('GrayGrayGray')
help_path = r'D:\VSB\Data collection app - THESIS\data'
with open(help_path, 'r', encoding='utf-8') as file:
help_text = file.read()
layout = [[sg.Multiline(default_text=help_text, size=(90, 40), disabled=True, font=('Consolas', 14))],
[sg.Button('Close', )]]
window_help = sg.Window('Help', layout, font=16, element_justification='c')
while True:
event, values = window_help.read()
if event in (sg.WIN_CLOSED, 'Close'):
window_help.close()
break
\ No newline at end of file
if __name__ == "__main__":
from .kuka import KUKA_Handler, KUKA_DataReader
import traceback
from progress.bar import Bar
h = KUKA_Handler("192.168.1.152", 7000)
if not h.KUKA_Open():
exit(1)
collection = KUKA_DataReader(h)
collection.READ_TQ[1:7] = True
collection.READ_TEMP[1:7] = True
collection.READ_CURR[1:7] = True
collection.READ_POS_ACT = True
collection.READ_POS_MEAS = True
try:
print(collection._data_available,collection._read_done)
print(collection.ColRUN)
print(collection.READ_TQ[1:7])
print(collection.READ_TEMP[1:7])
print(collection.READ_CURR[1:7])
print(collection.READ_POS_ACT)
print(collection.READ_POS_MEAS)
print(collection.handler.KUKA_ReadVar("__TAB_1[]"))
# print("Waiting to collect ...")
# with Bar("Collection ...", max=20000) as bar:
# data = collection.run(lambda: bar.next())
except Exception as e:
traceback.print_exception(e)
h.KUKA_Close()
\ No newline at end of file
# Python Librairies
import PySimpleGUI as sg
import pandas as pd
import matplotlib.pyplot as plt
import threading as th
from typing import List
import traceback
# User Librairies
from ui import MainWindow
from kuka import KUKA_Handler
from measure import Measure_robot, Measure_latency
from classes_functions import open_help
trace_enable = False
sys_var_enable = True
# GLOBAL VARIABLES
data_buffer = {}
collecting_data_done = False
reading_data_done = False
reading_progress = 0
measuring_delay_done = False
delay_progress = 0
abort_process = False
sync_done = 0
sync_number = 0
sync_sem = th.Semaphore(0)
window = MainWindow()
window_latency = []
robot_windows = []
robot_handlers: List[KUKA_Handler] = [ None ] * 3
######################################### FUNCTION DECLARATION ###################################################
def connect_robot_cell (window: MainWindow, cell: int):
handler = KUKA_Handler(f"192.168.1.15{cell}", 7000)
try:
ok = handler.KUKA_Open()
if not ok:
window.write_event_value(f"-rob_errored:{cell}-", None)
return
except Exception as e:
traceback.print_exception(e)
sg.popup_ok("\n".join(traceback.format_exception(e)))
window.write_event_value(f"-rob_errored:{cell}-", None)
return
robot_handlers[cell - 1] = handler
window.write_event_value(f"-rob_connected:{cell}-", None)
def update_disabled_ui (window: MainWindow):
disabled = True
for i in range(len(robot_handlers)):
r = robot_handlers[i]
disabled = disabled and (r is None)
window.robots[i].disabled = (r is None)
window.collection_settings.disabled = disabled
window.kukatrace.disabled = disabled
window.latency.disabled = disabled
window.gripper.disabled = disabled
window.data.disabled = disabled
window.collection_settings.update_robot_buttons()
def done ():
global sync_done
global sync_number
global sync_sem
sync_done += 1
if sync_done == sync_number:
sync_done = 0
sync_sem.release(sync_number)
################################################## MAIN PROGRAM ##################################################
update_disabled_ui(window)
plt.figure()
plt.close() # To adjust DPI of GUI
while True:
event, values = window.read(timeout=1)
### Unconditionnal updates ###
######################## Core Event ########################
if event in (sg.WINDOW_CLOSE_ATTEMPTED_EVENT, sg.WIN_CLOSED, 'Exit'):
for r in robot_handlers:
if r is not None:
r.KUKA_Close()
plt.close('all')
break
if event == '-BTN_help-':
open_help()
if event == '-equal_iter-':
for i in range(1, 7):
window[f'num_of_iter{i}'].update(value=values['num_of_iter1'])
if event == '-BTN_start-':
if not window.collection_settings.check_configuration():
continue
file_path = values["-user_path-"] + "/" + values["-dataset_name-"]
robot_windows = [ ]
for i in range(len(robot_handlers)):
r = robot_handlers[i]
if r is not None:
robot_windows.append(Measure_robot(r, i + 1, file_path))
sync_number = len(robot_windows)
sync_sem = th.Semaphore(0)
sync_done = 0
A_iter = [ values[f'num_of_iter{i}'] for i in range(1,7) ]
speed = window.collection_settings.speed
sampling = values["-Sys_sampling-"]
trace_sampling = values["-Trace_config-"]
for r in robot_windows:
r._poll()
t = th.Thread(
target=r.measure_sequence,
args=[ A_iter, speed, sampling, trace_sampling, window.get_category(r.cell), sync_sem, done ],
daemon=False)
t.start()
sync_sem.release(sync_number)
for i in range(len(window_latency)):
w = window_latency[i]
if w is not None:
if w._poll() is False:
window_latency[i] = None
for i in range(len(robot_windows)):
win = robot_windows[i]
# Window closed
if win is None:
continue
# Window closed
if not win._poll():
# Récupérer les données
robot_windows[i] = None
continue
if event == "-dataset_auto-name-":
window.collection_settings._input_dataset_name.update(value=window.collection_settings.now())
if event == '-BTN_latency-':
try:
window_latency = []
for i in range(len(robot_handlers)):
r = robot_handlers[i]
if r is not None:
w = Measure_latency(f"Latency measurement for Robot {i + 1}")
w._poll()
window_latency.append(w)
t = th.Thread(target=w.measure_latency, args=[r], daemon=False)
t.start()
except Exception as e:
sg.popup_ok(e)
if event == '-doconst_speed-':
window.collection_settings.update_speed_entry()
if 'do' in event:
window.collection_settings.update_do()
######################## Robot Selector Events #################
if "rob_select" in event:
cell = int(event.split(':')[1][:-1])
if robot_handlers[cell - 1] is not None:
print(f"Trying to disconnect from robot {cell}")
window.collection_settings.set_robot_connected(cell - 1, False)
robot_handlers[cell - 1].KUKA_Close()
robot_handlers[cell - 1] = None
update_disabled_ui(window)
continue
update_disabled_ui(window)
print(f"Trying to connect to robot {cell}")
window.perform_long_operation(lambda: connect_robot_cell(window, cell), "-connect-call-end-")
if "rob_connected" in event:
cell = int(event.split(':')[1][:-1])
window.collection_settings.set_robot_connected(cell - 1, True)
update_disabled_ui(window)
if "rob_errored" in event:
cell = int(event.split(':')[1][:-1])
window.collection_settings.set_robot_connected(cell - 1, False, True)
update_disabled_ui(window)
######################## Gripper Events ########################
if event == '-BTN_open_gripper-':
if robot_handlers[window.gripper._robot_choice - 1] is not None:
robot_handlers[window.gripper._robot_choice - 1].KUKA_WriteVar('PyOPEN_GRIPPER', True)
window.gripper._btn_open.update(disabled=True)
window.gripper._btn_close.update(disabled=False)
if event == '-BTN_close_gripper-':
if robot_handlers[window.gripper._robot_choice - 1] is not None:
robot_handlers[window.gripper._robot_choice - 1].KUKA_WriteVar('PyCLOSE_GRIPPER', True)
window.gripper._btn_open.update(disabled=False)
window.gripper._btn_close.update(disabled=True)
######################## Import Excel Trace ########################
if event == '-open_xlsx-':
try:
dataframe = pd.read_excel(values['-data_path-'])
dataframe['Sample_time_s'] = dataframe['Sample_time']/1000
sg.popup("Done")
window.data.enable_plot_buttons(True)
except Exception as e:
sg.popup(e)
window.data.enable_plot_buttons(False)
if event == '-trace_selvar-':
if(window.collection_settings._do_tq):
dataframe.plot(x="Sample_time_s",y=[f"Torque_A{i}" for i in range(1,7,1)], grid=True),plt.ylabel("Motor Torque (N.m)")
if(window.collection_settings._do_curr):
dataframe.plot(x="Sample_time_s",y=[f"Current_A{i}" for i in range(1,7,1)], grid=True),plt.ylabel("Motor Current (%)")
if(window.collection_settings._do_temp):
dataframe.plot(x="Sample_time_s",y=[f"Temperature_A{i}" for i in range(1,7,1)], grid=True),plt.ylabel("Motor Temperature (°K)")
if(window.collection_settings._do_posact):
dataframe.plot(x="Sample_time_s",y=[f"Command_A{i}" for i in range(1,7,1)], grid=True),plt.ylabel("Robot command position in grid (mm))")
if(window.collection_settings._do_posreal):
dataframe.plot(x="Sample_time_s",y=[f"Position_A{i}" for i in range(1,7,1)], grid=True),plt.ylabel("Real Robot position in grid (mm))")
plt.show()
if event == '-trace_sample-':
dataframe.plot(x="Sample_time_s",y=["Queue_Read", "Queue_Write"], grid=True),plt.ylabel("Samples")
plt.xlabel("Collection execution time (s)")
plt.twinx(), plt.plot(dataframe["Sample_time_s"],dataframe['Read_time'], alpha=0.05), plt.ylabel("Request time of the sample (ms)")
plt.title("Samples in the buffer and red by Python")
plt.text(10,10,f"Mean ample time : {dataframe['Sample_time'].diff().mean()}")
plt.show()
if event == '-trace_latency-':
dataframe.hist(column='Read_time', grid=True, bins=30)
plt.title("Distribution of the collection time of a sample")
plt.xlabel(f"Request time (ms) : mean = {dataframe['Read_time'].mean().__round__(2)}"),plt.ylabel("Number of samples")
plt.show()
for r in robot_handlers:
if r is not None:
r.KUKA_Close()
window.close()
from kuka import KUKA_Trace, KUKA_Handler
h = KUKA_Handler("192.168.1.153", 7000)
trace = KUKA_Trace(h)
trace.name = "2024-05-27_16-06-53[70]_R2"
pairs = trace.find_pairs(trace.name)
dats = []
for pair in pairs:
dat_path = trace.temp_folder.joinpath(trace.name).joinpath(pair[0])
dat = trace.read_dat(dat_path)
dats.append(dat)
min_sampling = 1
for dat in dats:
min_sampling = min(min_sampling, dat.sampling)
min_len = 1e9
for dat in dats:
ratio = int(dat.sampling // min_sampling)
min_len = min(min_len, dat.length * ratio)
print(min_len)
\ No newline at end of file
from ui import MainWindow
import PySimpleGUI as sg
import pandas as pd
import matplotlib.pyplot as plt
# UI TEST by launching this file as main
if __name__ == "__main__":
dataframe = None
win = MainWindow()
while True:
event, values = win.read(100)
if event == sg.WIN_CLOSED:
break
if event == '-open_xlsx-':
try:
dataframe = pd.read_excel(values['-data_path-'])
dataframe['EXEC_TIME_s'] = dataframe['EXEC_TIME']/1000
sg.popup("Done")
win.data.enable_plot_buttons(True)
except Exception as e:
sg.popup(e)
win.data.enable_plot_buttons(False)
if event == '-trace_selvar-':
if(win.collection_settings._do_tq):
dataframe.plot(x="EXEC_TIME_s",y=[f"TQ_A{i}" for i in range(1,7,1)], grid=False),plt.ylabel("Motor Torque (N.m)")
if(win.collection_settings._do_curr):
dataframe.plot(x="EXEC_TIME_s",y=[f"CURR_A{i}" for i in range(1,7,1)], grid=False),plt.ylabel("Motor Current (%)")
if(win.collection_settings._do_temp):
dataframe.plot(x="EXEC_TIME_s",y=[f"TEMP_A{i}" for i in range(1,7,1)], grid=False),plt.ylabel("Motor Temperature (°K)")
if(win.collection_settings._do_posact):
dataframe.plot(x="EXEC_TIME_s",y=[f"POS_ACT_A{i}" for i in range(1,7,1)], grid=False),plt.ylabel("Actual Robot position in grid (mm))")
if(win.collection_settings._do_posreal):
dataframe.plot(x="EXEC_TIME_s",y=[f"POS_MEAS_A{i}" for i in range(1,7,1)], grid=False),plt.ylabel("Real Robot position in grid (mm))")
plt.show()
if event == '-trace_sample-':
dataframe.plot(x="EXEC_TIME_s",y=["Queue_Read", "Queue_Write"], grid=False),plt.ylabel("Samples")
plt.xlabel("Collection execution time (s)")
plt.twinx(), plt.plot(dataframe["EXEC_TIME_s"],dataframe['Total Request Time'], alpha=0.05), plt.ylabel("Request time of the sample (ms)")
plt.title("Samples in the buffer and red by Python")
plt.text(2,25,f"Sample time : {dataframe['EXEC_TIME'].diff().median()}")
plt.show()
if event == '-trace_latency-':
dataframe.hist(column='Total Request Time', grid=False, bins=30)
plt.title("Distribution of the collection time of a sample")
plt.xlabel(f"Request time (ms) : mean = {dataframe['Total Request Time'].mean().__round__(2)}"),plt.ylabel("Number of samples")
plt.show()
if event == '-BTN_open_gripper-':
print(win.gripper._robot_choice.get)
if event == '-BTN_start-':
continue
\ No newline at end of file
......@@ -45,6 +45,10 @@ class CollectionGraphWindow (sg.Window):
self._fig_agg.get_tk_widget().pack(side='top', fill='both', expand=1)
def __make_layout (self):
""" Defines the window layout
Returns:
List : created layout
"""
self._title = sg.Text(f'Robot {self.cell}', key="-TITLE-", font='Helvetica 20', size=(20,3), justification='center')
self._subtitle = sg.Text("Collecting Data ...", key="Subtitle")
if self.enable:
......
......@@ -20,6 +20,10 @@ class MainWindow (sg.Window):
super().__init__("Data Collection", self.__make_layout(), finalize = True, *args, *kwargs)
def __make_layout (self):
""" Defines the window layout
Returns:
List : created layout
"""
self.collection_settings = UI_Collection_Settings()
self.kukatrace = UI_KUKATrace()
......
......@@ -25,6 +25,10 @@ class UI_Collection_Settings (sg.Frame):
super().__init__("Collection Settings", self.__make_layout(), expand_x=True)
def __make_layout (self):
""" Defines the window layout
Returns:
List : created layout
"""
now = self.now()
self._input_dataset_name = sg.InputText(now, key='-dataset_name-', size=(30, 1), font=("Consolas", 10))
......
......@@ -21,6 +21,10 @@ class UI_Data (sg.Frame):
super().__init__("Collected Data", self.__make_layout(), expand_x=True)
def __make_layout (self):
""" Defines the window layout
Returns:
List : created layout
"""
self._btn_start = sg.Button('START SEQUENCE', key='-BTN_start-', button_color='white', size=(15, 2),expand_x=True)
self.import_data = self.__make_file_browse('1')
......
......@@ -13,6 +13,10 @@ class UI_Gripper (sg.Frame):
super().__init__("Gripper", self.__make_layout())
def __make_layout (self):
""" Defines the window layout
Returns:
List : created layout
"""
self.robot_choice = sg.Combo(['Robot 1', 'Robot 2', 'Robot 3'], default_value='Robot 2', key='-Rob_choice-')
self._btn_open = sg.Button('Open', key='-BTN_open_gripper-')
......
......@@ -13,6 +13,10 @@ class UI_Latency(sg.Frame):
super().__init__("Measure Latency", self.__make_layout(), expand_x= True)
def __make_layout (self):
""" Defines the window layout
Returns:
List : created layout
"""
self.start_measure = sg.Button("Start Latency Measure on connected robots", key = '-BTN_latency-')
self.layout = [[self.start_measure]]
......
......@@ -14,6 +14,10 @@ class UI_RobotLoad (sg.Frame):
super().__init__(f"Robot {i}", self.__make_layout())
def __make_layout (self):
""" Defines the window layout
Returns:
List : created layout
"""
self._input_load = sg.Combo(['0','0.5','1','2'],default_value='0',key=f'-load_robot{self.i}-',size=(5,1))
......
......@@ -12,6 +12,10 @@ class UI_KUKATrace( sg.Frame):
super().__init__(f"KUKA Trace", self.__make_layout(), expand_x=True)
def __make_layout (self):
""" Defines the window layout
Returns:
List : created layout
"""
self._sampling_rate = sg.Combo([
'12_ms_v2',
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment