Skip to content
Snippets Groups Projects
Commit 5c4af7b0 authored by antcsny's avatar antcsny
Browse files

Adding movement extractor and treatment for condition monitoring

parent 5875ed38
No related merge requests found
.vscode
.vscode/*
db
db/*
__pycache__
......
File added
File added
File added
#%% Imports and variables
import matplotlib.pyplot as plt
from tools.database import Database
DB = Database(f"db\db_torque.sqlite")
# %% Plot by Torque versus current
plt.figure()
Speed = 40
Motors = [1,2,3,4,5,6]
for Motor in Motors:
df = DB.robot(2).by_speed(Speed).by_moving_motor(Motor).run()
plt.scatter(df[f'Current_A{Motor}'], df[f'Torque_A{Motor}'], alpha = 0.5)
plt.legend([f'Motor {A}' for A in Motors])
plt.xlabel('Current (A)'), plt.ylabel('Torque(N.m)')
plt.title('Motor torque (N.m) versus motor current (A)')
plt.grid(), plt.show()
# %% Plot Torque/Current by temperature : values of the torque constants of the motors
plt.figure()
Speed = 40
Motors = [1,2,3,4,5,6]
for Motor in Motors:
df = DB.robot(2).by_speed(Speed).by_moving_motor(Motor).run()
plt.scatter(df[f'Temperature_A{Motor}']-273.15,df[f'Torque_A{Motor}']/df[f'Current_A{Motor}'], alpha = 0.5)
plt.legend([f'Motor {A}' for A in Motors])
plt.xlabel('Temperature (°C)'), plt.ylabel('Torque/Current (N.m/A)')
plt.title('Motor torque constant (Torque/Current) versus motor temperature')
plt.grid(), plt.show()
\ No newline at end of file
......@@ -31,17 +31,16 @@ plt.show()
plt.show() """
# %% Weight analysis
fig1 = plot_all_axis(DB,'Current','Class',[0,2],MovingMotor=1, saving=True)
for Motor in range (2,7,1):
fig1 = plot_all_axis(DB,'Current','Class',[0,1,2,3],Speed=40 ,MovingMotor=Motor, saving=True)
fig2 = plot_grouped_load(DB, 'Current', [[0,2],[4,6],[8,10],[12,14]], 1, 60, doposition=True, saving=True)
fig3 = plot_moving_axes(DB,'Current','Class',[0,2], doposition=True,saving=True)
fig3 = plot_moving_axes(DB,'Current','Class',[0,1,2,3],Speed=40 ,doposition=True,saving=True)
plt.show()
# %% Bungee analysis
fig2 = plot_grouped_load(DB, 'Current', [[0,4],[0,8],[0,12]], 1, 60, doposition=True, saving=True)
fig1 = plot_all_axis(DB,'Current','Class',[0,8],MovingMotor=1, saving=True)
fig1 = plot_all_axis(DB,'Current','Class',[0,4,8],MovingMotor=1, saving=True)
fig2 = plot_grouped_load(DB, 'Current', [[0,4],[0,8]], 1, 60, doposition=True, saving=True)
fig3 = plot_moving_axes(DB,'Current','Class',[0,4,8], doposition=True,saving=True)
plt.show()
# %%
fig1 = plot_all_axis(DB,'Current','Class',[0] , MovingMotor=2, doposition=True, saving=True)
# %%
#%% Imports and variables
from pathlib import Path
from array import array
from turtle import pos
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import scipy.signal as scs
from tools.database import Database
from tools.plots import Spectrogram_plot
from tools.processing import RMS, FFT
DB = Database(f"D:\VSB\Programs\DB\Robot2-LoadTest.sqlite")
# %% Motor movement extraction class
"""
The goal of the class is to extract the motor movements from a trace file whatever
the movements the robot has achieved. With a database of movements in a normal use,
analysis of the new movements can perform condition monitoring.
See movements_treatment.py for the movement analysis
"""
class Movement_Extractor:
time : np.ndarray
position : np.ndarray
velocity : np.ndarray
raw_acceleration : np.ndarray
acceleration : np.ndarray
movements : array
data_in_file : pd.DataFrame
sub_data_in_file : pd.DataFrame
def __init__(self, file_path: Path | str = ''):
"""
Initialize the class variables.
If provided with a excel file path, the class will store the data inside the file in it's memory
Args:
file_path (Path | str, optional): Excel file path. Defaults to ''.
"""
self.time = []
self.position = []
self.velocity = []
self.raw_acceleration = []
self.acceleration = []
self.movements = []
if '.xlsx' in file_path:
print(f'Processing file : {file_path}')
self.data_in_file = pd.read_excel(file_path)
# load data from a exel file or direcly from a database ?
def get_kinematics(self, data:pd.DataFrame, Motor:int, plot:str = ''):
"""
Get the kinematics variables of a motor with it's position data
Computes the first and second order differencial to get angular velocity and acceleration
Args:
data (pd.DataFrame): position data and associated sample time
Motor (int): Motor number to analyse
plot (str, optional): plots the computed position, velocity or acceleration. Defaults to ''.
"""
self.time = data['Sample_time'].to_numpy() - data['Sample_time'].min()
sampling = (self.time[1] - self.time[0])
self.position = data[f'Position_A{Motor}'].to_numpy()
self.velocity = np.diff(self.position)/sampling * np.pi/180 # velocity : deg/s -> rad/s
self.raw_acceleration = np.diff(self.velocity)
acceleration_filt = np.convolve(np.hanning(20),self.raw_acceleration)[10:] # filtering the noise in the signal to recover clean peaks
self.acceleration = acceleration_filt / acceleration_filt.max()
if plot:
fig = plt.figure()
if plot == 'position':
plt.plot(self.position), plt.ylabel('Position (°)')
if plot == 'velocity':
plt.plot(self.velocity), plt.ylabel('Velocity (rad/s)')
if plot == 'acceleration':
plt.plot(self.acceleration), plt.ylabel('Velocity (rad/s)')
if plot:
plt.xlabel('Samples')
plt.title(f"Extracted motor {plot}")
fig.tight_layout()
plt.pause(0.1)
return self
def movement_indexes(self):
"""
Extract the movement indexes from the acceleration data by finding its peaks
Result : self.movements, array of tuples
Indexes in a tuple : movement start, movement stop, constant velocity start, constant velocity stop
The last value of the tuple is a boolean to indicate the direction of the motor rotation
"""
if len(self.acceleration) == 0 :
print("No acceleration data, run self.get_kinematics")
return
peaks = scs.find_peaks(abs(self.acceleration),height=0.1,distance=15)[0] # Acceleration peaks extraction
i = 0
self.movements = []
while(i<len(peaks)-1):
if self.acceleration[peaks[i]]*self.acceleration[peaks[i+1]] < 0:
mvt_start = const_vel_start = peaks[i]
mvt_stop = const_vel_stop = peaks[i+1]
# Acceleration spikes comes in pairs, forward-reverse or reverse-forward
forward = self.acceleration[mvt_start] > self.acceleration[mvt_stop]
if forward :
while(self.acceleration[mvt_start]>0.05): # start of the acceleration
mvt_start -= 2
while(self.acceleration[mvt_stop]<-0.05): # end of the acceleration
mvt_stop += 2
while(self.acceleration[const_vel_start]>0.2): # start of the velocity plateau
const_vel_start += 2
while(self.acceleration[const_vel_stop]<-0.2): # end of the velocity plateau
const_vel_stop -= 2
else :
while(self.acceleration[mvt_start]<-0.05):
mvt_start -= 2
while(self.acceleration[mvt_stop]>0.05):
mvt_stop += 2
while(self.acceleration[const_vel_start]<-0.2):
const_vel_start += 2
while(self.acceleration[const_vel_stop]>0.2):
const_vel_stop -= 2
self.movements.append((mvt_start,mvt_stop,const_vel_start,const_vel_stop,forward))
i+=2
continue
i+=1
return self
def movement_data(self, data:pd.DataFrame, Motor:int, plot:str = '') -> pd.DataFrame :
"""
Computes the movement data from the extracted movements.
If plot is position, velocity, acceleration, current, temperature, it will plot the extracted data in a matplotlib figure
Args:
data (pd.DataFrame): Data containing the motor movements (with columns Sample_time, Load, Position, Current, Temperature)
Motor (int): Motor number to analyse
plot (str, optional): see summary. Defaults to ''.
Returns:
pd.DataFrame: Extracted movement data
"""
MOVEMENT_COLUMNS = [
'Load_Class', 'Motor',
'Delta_Position', 'Const_Velocity_Mean',
'Peak_Start_Current', 'Peak_Break_Current',
'Mean_Const_Current', 'RMS_Const_Current', 'STD_Const_Current',
'Mean_Position_Error', 'STD_Position_Error',
'Mean_Temperature'
]
if len(self.movements) == 0 :
print("Movement indexes does not exists, run self.movement_indexes")
return
if plot:
fig = plt.figure()
Movements_data = [] # Data Buffer
for i, (mvt_st, mvt_sp ,vel_st ,vel_sp ,forward) in enumerate(self.movements):
if mvt_st == mvt_sp or vel_st == vel_sp:
print(f'Not enough points for movement {i} of motor {Motor}')
continue
current = data[f'Current_A{Motor}'].to_numpy()
temperature = data[f'Temperature_A{Motor}'].to_numpy()
load_class = data['Load'].to_numpy()
pos_error = data[f'Position_Error_A{Motor}']
movement = []
movement.append(load_class[0]) # Load
movement.append(Motor) # Motor number
movement.append(np.abs(self.position[mvt_sp] - self.position[mvt_st])) # Delta Position
movement.append(np.abs(np.mean(self.velocity[vel_st:vel_sp]))) # Mean velocity
current_mvt_data = current[mvt_st:mvt_sp]
if forward: # Current peaks
movement.append(current_mvt_data.max())
movement.append(current_mvt_data.min())
else:
movement.append(current_mvt_data.min())
movement.append(current_mvt_data.max())
current_mvt_data = current[vel_st:vel_sp] # Current statistics
movement.append(current_mvt_data.mean())
movement.append(RMS(current_mvt_data))
movement.append(np.std(current_mvt_data))
movement.append(np.mean(pos_error[mvt_st:mvt_sp])) # Position Error
movement.append(np.std(pos_error[mvt_st:mvt_sp]))
movement.append(temperature[mvt_st:mvt_sp].mean()) # Temperature
Movements_data.append(movement)
if plot == 'position':
plt.plot(self.time[mvt_st:mvt_sp],self.position[mvt_st:mvt_sp]), plt.ylabel('Position (°)')
if plot == 'velocity':
plt.plot(self.time[mvt_st:mvt_sp],self.velocity[mvt_st:mvt_sp]), plt.ylabel('Velocity (rad/s)')
if plot == 'acceleration':
plt.plot(self.time[mvt_st:mvt_sp],self.acceleration[mvt_st:mvt_sp]), plt.ylabel('Acceleration (normalized)')
if plot == 'current':
plt.plot(self.time[mvt_st:mvt_sp],current[mvt_st:mvt_sp]), plt.ylabel('Current (A)')
if plot == 'temperature':
plt.plot(self.time[mvt_st:mvt_sp],temperature[mvt_st:mvt_sp]), plt.ylabel('Temperature (°K)')
if plot:
plt.xlabel('Samples')
plt.title(f"Extracted motor {plot}")
fig.tight_layout()
plt.pause(0.1)
return pd.DataFrame(Movements_data, columns = MOVEMENT_COLUMNS)
def movements_from_file(self, plot:str = []) -> pd.DataFrame:
"""
Realize the whole movement data extraction from the data stored in self.data_in_file
This data is stored in the class when analysing a excel data file
Args:
plot (str, optional): plots the extracted position, velocity, acceleration, current or temperature. Defaults to ''.
Returns:
pd.DataFrame: output movements data
"""
DataMovementFile = pd.DataFrame()
for Motor in range(1,7,1):
sub_columns = ['Sample_time', 'Load', f'Position_A{Motor}', f'Current_A{Motor}', f'Temperature_A{Motor}', f'Position_Error_A{Motor}']
self.sub_data_in_file = self.data_in_file[sub_columns]
Extractor = Movement_Extractor()
Data = Extractor.get_kinematics(self.sub_data_in_file, Motor).movement_indexes().movement_data(self.sub_data_in_file, Motor, plot)
DataMovementFile = pd.concat([DataMovementFile,Data])
if plot :
plt.show()
return DataMovementFile
# Example of uses :
if __name__ == "__main__":
# %% Extract movement from exel file and store it in Data_Movements.xlsx
Extractor = Movement_Extractor(fr"D:\VSB\Programs\data-collection\data\[2024-06-18] 10h53 data [30%-80%] [4ms] [class 4] [10 10 10 10 10 10] - Robot 2_TRACE.xlsx")
Total_Data = Extractor.movements_from_file(plot = 'current') # plots the extracted current
Total_Data.to_excel("Data_Movements.xlsx")
# %% Extract Motor X movements stored in 'Dataframe', containing Motor X position, current, temeprature, position error, sample time and motor load
"""
Extractor = Movement_Extractor()
Extractor.get_kinematics(Dataframe, X)
Extractor.movement_indexes()
Extractor.movement_data(Dataframe, X)
"""
\ No newline at end of file
# %%
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import scipy.signal as scs
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
# Database of movements (can be sqlite in the future but is excel format for now)
Movements_Data_File = pd.read_excel(fr'D:\VSB\Programs\data-processing\Data_Weight_loads.xlsx',index_col=0)
# New movements to analyse
Data_2kg = pd.read_excel(fr"D:\VSB\Programs\data-processing\Data_Weight_2kg.xlsx",index_col=0)
Data_Redbungee = pd.read_excel(fr"D:\VSB\Programs\data-processing\Data_Red_Bungee.xlsx",index_col=0)
# %% Functions
def correlation_heatmap(Dataframe : pd.DataFrame):
"""
Draw the corelation heatmap between the data in the columns of the passed dataframe
Args:
Dataframe (pd.DataFrame): Data
"""
C = pd.DataFrame.corr(Dataframe, method='pearson')
if 'Load_Class' in Dataframe.columns and 'Motor' in Dataframe.columns :
C = C.drop(['Load_Class', 'Motor'], axis = 1)
C = C.drop(['Load_Class', 'Motor'], axis = 0)
plt.figure()
sns.heatmap(C, vmin=-1, vmax=1, cmap='coolwarm', annot=True)
plt.title("Corelation heatmap between the varriables")
plt.show()
def explained_variance_ratio(obj, p: int):
"""Trace the explained variance ratio of the transformation
Args:
obj (_type_): PCA() or LinearDiscriminantAnalysis()
p (int): number of componentss produced by the transformation
"""
plt.figure()
plt.bar(np.arange(1, p+1), obj.explained_variance_ratio_)
plt.plot(np.arange(1, p+1), np.cumsum(obj.explained_variance_ratio_))
plt.ylabel("Variance explained in ratio and cumulation")
plt.xlabel("Number of factors")
plt.show()
def corelations_circle(Xraw:pd.DataFrame, Xtransform:np.ndarray, p:int) -> pd.DataFrame:
"""Trace the correlations circle between the raw variables and the final ones
Args:
Xraw (pd.DataFrame): Raw data
Xtransform (np.ndarray): Fit transform data
p (int): Number of componentss produced by the transformation
Returns:
pd.DataFrame: Corelations between raw variables and output ones
"""
XArray = pd.DataFrame.to_numpy(Xraw)
corvar = np.zeros((p, 2))
for k in range(2):
for j in range(p):
corvar[j, k] = np.corrcoef(Xtransform[:, k], XArray[:, j])[0, 1]
# Correlations
fig, axes = plt.subplots(figsize=(8, 8))
axes.set_xlim(-1, 1)
axes.set_ylim(-1, 1)
plt.scatter(corvar[:, 0], corvar[:, 1])
# Variables name tags
for j in range(p):
plt.annotate(Xraw.columns[j], (corvar[j, 0], corvar[j, 1]))
# Axes
plt.plot([-1, 1], [0, 0], color='silver', linestyle='-', linewidth=1)
plt.plot([0, 0], [-1, 1], color='silver', linestyle='-', linewidth=1)
# Circle
axes.add_artist(plt.Circle((0, 0), 1, color='blue', fill=False))
plt.subplots_adjust(left=0, bottom=0, right=1, top=1, wspace=0.4, hspace=0.4)
plt.xlabel('Component 1')
plt.ylabel('Component 2')
plt.show()
return pd.DataFrame({'id': Xraw.columns, 'COR_1': corvar[:, 0], 'COR_2': corvar[:, 1]})
def scatter_points(Xtransform:np.ndarray, Group_vect : pd.Series, Add_points : np.ndarray = []):
"""Plots the transformed points
Args:
Xtransform (np.ndarray): Transformed points
Group_vect (pd.Series): Column that separates the data in Xtransform
Add_points (np.ndarray, optional): Points without a group to add to the plot (example: result of lda.transform). Default to []
"""
Data = pd.DataFrame(Xtransform)
Data['Group'] = Group_vect.to_numpy()
fig = plt.figure()
sns.scatterplot(Data, x=0, y = 1, hue = 'Group')
if len(Add_points) > 0:
sns.scatterplot(pd.DataFrame(Add_points), x=0,y=1, alpha=0.5)
plt.xlabel('Component 1')
plt.ylabel('Component 2')
plt.show()
return fig
# %% LDA per motors____________________________________________
for Motor in range(1,7,1):
lda = LinearDiscriminantAnalysis()
Movements_Data_Motor = Movements_Data_File.loc[Movements_Data_File['Motor'] == Motor].copy()
Group_Data = Movements_Data_Motor['Load_Class']
Movements_Data_Motor = Movements_Data_Motor.drop(columns=['Load_Class', 'Motor', 'Mean_Temperature'])
Xlda = lda.fit_transform(Movements_Data_Motor,Group_Data) # builds the ratios between variables
[n,p]=Xlda.shape
Additional_Data = Data_2kg.loc[Data_2kg['Motor'] == Motor].copy() # Preparing new points for the scatter plot
Additional_Data = Additional_Data.drop(columns=['Load_Class', 'Motor', 'Mean_Temperature'])
New_points = lda.transform(Additional_Data)
explained_variance_ratio(lda, p)
coef = corelations_circle(Movements_Data_Motor, Xlda, Movements_Data_Motor.shape[1])
scatter_points(Xlda, Group_Data, New_points)
# print(coef)
# %% ACP per motors (less revelating than LDA but still interesting) ____________________________________________
for Motor in range(1,7,1):
acp = PCA()
Movements_Data_Motor = Movements_Data_File.loc[Movements_Data_File['Motor'] == Motor].copy()
Group_Data = Movements_Data_Motor['Load_Class']
Movements_Data_Motor = Movements_Data_Motor.drop(columns=['Load_Class', 'Motor', 'Mean_Temperature'])
Movements_Data_Motor = (Movements_Data_Motor - Movements_Data_Motor.mean())/ Movements_Data_Motor.std()
Xacp = acp.fit_transform(Movements_Data_Motor)
[n,p]=Xacp.shape
Additional_Data = Data_2kg.loc[Data_2kg['Motor'] == Motor].copy()
Additional_Data = Additional_Data.drop(columns=['Load_Class', 'Motor', 'Mean_Temperature'])
New_points = acp.transform(Additional_Data)
explained_variance_ratio(acp, p)
corelations_circle(Movements_Data_Motor, Xacp, Movements_Data_Motor.shape[1])
scatter_points(Xacp, Group_Data)
# %% LDA on all the data (no dinstinction between motors) ___________________________
lda = LinearDiscriminantAnalysis()
Group_Data = Movements_Data_File['Load_Class']
Movements_Data = Movements_Data_File.drop(columns=['Load_Class', 'Motor'])
Xlda = lda.fit_transform(Movements_Data,Group_Data)
[n,p]=Xlda.shape
Additional_Data = Data_2kg.drop(columns=['Load_Class', 'Motor'])
New_points = lda.transform(Additional_Data)
explained_variance_ratio(lda, p)
coef = corelations_circle(Movements_Data, Xlda, Movements_Data.shape[1])
fig = scatter_points(Xlda, Group_Data, New_points)
# %% ACP on all the data ___________________________
acp = PCA()
Group_Data = Movements_Data_File['Load_Class']
Movements_Data = Movements_Data_File.drop(columns=['Load_Class', 'Motor', 'Mean_Temperature'])
Xacp = acp.fit_transform(Movements_Data)
[n,p]=Xacp.shape
Additional_Data = Data_2kg.drop(columns=['Load_Class', 'Motor', 'Mean_Temperature'])
New_points = acp.transform(Additional_Data)
explained_variance_ratio(acp, p)
coef = corelations_circle(Movements_Data, Xacp, Movements_Data.shape[1])
fig = scatter_points(Xacp, Group_Data, New_points)
# %%
......@@ -261,12 +261,12 @@ def plot_all_axis(DB, variable:str, byvariable:str, byvalues:List, MovingMotor:i
df = DB.robot(2).by_class(Class).by_speed(v).by_moving_motor(MovingMotor).select_column(*columns).run()
df['Speed'] = df.Speed.astype('category')
df['Sample_time'] -= df['Sample_time'].min() # limits data to 2-3 iterations
dataframe = pd.concat([dataframe, df[0:500]])
dataframe = pd.concat([dataframe, df[0:500]]) # hardcoded limit data to 2-3 iterations
if byvariable == 'Class':
df = DB.robot(2).by_class(v).by_speed(Speed).by_moving_motor(MovingMotor).run()
df['Class'] = df.Class.astype('category')
df['Sample_time'] -= df['Sample_time'].min()
dataframe = pd.concat([dataframe, df[300:900]]) # limits data to 2-3 iterations
dataframe = pd.concat([dataframe, df[300:900]]) # hardcoded limit data to 2-3 iterations
rms.append(np.sqrt(np.mean(df[f'{variable}_A{MovingMotor}']**2)))
print(f'Motor {MovingMotor} runs rms :',rms)
......@@ -334,13 +334,13 @@ def plot_moving_axes(DB, variable:str, byvariable:str, byvalues:List, Speed:int
df = DB.robot(2).by_class(Class).by_speed(v).by_moving_motor(Motor).select_column(*columns).run()
df['Speed'] = df.Speed.astype('category')
df['Sample_time'] -= df['Sample_time'].min()
dataframe = pd.concat([dataframe, df[0:500]]) # limits data to 2-3 iterations
dataframe = pd.concat([dataframe, df[0:500]]) # hardcoded limit data to 2-3 iterations
if byvariable == 'Class':
df = DB.robot(2).by_class(v).by_speed(Speed).by_moving_motor(Motor).select_column(*columns).run()
df['Class'] = df.Class.astype('category')
df['Sample_time'] -= df['Sample_time'].min()
dataframe = pd.concat([dataframe, df[300:900]])
rms.append(np.sqrt(np.mean(df[f'{variable}_A{Motor}']**2))) # limits data to 2-3 iterations
rms.append(np.sqrt(np.mean(df[f'{variable}_A{Motor}']**2))) # hardcoded limit data to 2-3 iterations
print(f'Motor {Motor} runs rms :',rms)
correct_positions(dataframe)
......@@ -405,7 +405,7 @@ def plot_grouped_load(DB, variable:str, Classes:List[List], Motor:int, Speed:int
df = DB.robot(2).by_class(c).by_speed(Speed).by_moving_motor(Motor).select_column(*columns).run()
df['Sample_time'] -= df['Sample_time'].min()
df['Class'] = df.Class.astype('category')
dataframe = pd.concat([dataframe, df[300:900]]) # limits data to 2-3 iterations
dataframe = pd.concat([dataframe, df[300:900]]) # hardcoded limit data to 2-3 iterations
rms.append(np.sqrt(np.mean(df[f'{variable}_A{Motor}']**2)))
print(f'Motor {Motor} runs rms :',rms)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment