-
VulcanixFR authored9cba6dfa
database.py 11.77 KiB
from pathlib import Path
import sqlite3
from typing import Dict, List, Callable
import pandas as pd
import numpy as np
from .utils import from_class_to_text, Chronometer
from .extractor import DataExtractor
from .data_finder import DataFile
_DB_A_COLUMNS = []
for i in range(1,7):
_DB_A_COLUMNS = [
*_DB_A_COLUMNS,
*[ f"{col}_A{i}" for col in
[
"Position",
"Position_Error",
"Position_Command",
"Current",
"Temperature"
]
]
]
_DB_ROBOT_COLUMNS = [
"Sample_time",
"Class",
"Speed",
"MovingMotor",
*_DB_A_COLUMNS,
"Sample_rate",
"run"
]
_DB_METADATA_COLUMNS = [
"Year", "Month", "Day",
"Name",
"Start_Speed", "Stop_Speed", "Constant_Speed",
"Sampling", "Class",
*[ f"Iteration_A{i}" for i in range(1,7) ],
"Robot",
"FileName"
]
_DB_METADATA_TABLE = "DataFiles"
DBAppendCallback = Callable[[str, int, int], None]
class DatabaseQuery:
"""Represents a simple query for the database, used to filter the
requested data.
"""
# The database connection
db: sqlite3.Connection = None
# The selected robot
robot: int = 1
# The defined filters
filters: Dict[str, str] = {}
# The selected columns
columns: List[str] = []
def __init__(self, db: sqlite3.Connection, robot: int) -> None:
self.columns = []
self.filters = {}
self.db = db
self.robot = robot
return
def _make_query_string (self) -> str:
"""Transforms the internal state to a SQLite query string
Returns:
str: The SQLite Query
"""
query = "SELECT " + ("*" if len(self.columns) == 0 else (", ".join(self.columns)))
query += f" FROM Robot_{self.robot}"
filter_columns = self.filters.keys()
if len(filter_columns) > 0:
query += " WHERE "
words = []
for f in filter_columns:
words.append(f"{f} = {self.filters[f]}")
query += " AND ".join(words)
return query
def run (self) -> pd.DataFrame:
"""Runs the query to the SQLite Database
Raises:
Exception: The query configuration is wrong
Returns:
pd.DataFrame: The requested data
"""
# Check if the robot parameter is OK
if self.robot not in range(1,4):
raise Exception("Invalid query parameter: Robot must be between 1 and 3")
# Check if the requested columns are valid
bad_columns = []
for c in self.columns:
if c not in _DB_ROBOT_COLUMNS:
bad_columns.append(c)
if len(bad_columns) > 0:
t = " ".join(bad_columns)
raise Exception(f"Invalid query parameter: invalid column(s): {t}")
# Check if the filters are valid
bad_filters = []
for c in self.filters:
if c not in _DB_ROBOT_COLUMNS:
bad_filters.append(c)
if len(bad_filters) > 0:
t = " ".join(bad_filters)
raise Exception(f"Invalid query parameter: invalid filter(s): {t}")
# Get the requested data
query = self._make_query_string()
dataframe = pd.read_sql(query, self.db)
# Transform some columns to categories
if "Class" in dataframe.columns:
dataframe["Class"] = [
from_class_to_text(c) for c in dataframe["Class"]
]
if "Sample_rate" in dataframe.columns:
dataframe["Sample_rate"] = dataframe["Sample_rate"].astype("category")
return dataframe
def select_column (self, *columns: List[str]):
"""Adds columns to the selection
Returns:
self
"""
for c in columns:
if c not in _DB_ROBOT_COLUMNS:
continue
self.columns.append(c)
return self
def by_class (self, class_number: int):
"""Adds a filter on the class of the samples
Args:
class_number (int): between 0 and 15, the desired sample class
Returns:
self
"""
if 0 <= class_number <= 15 and type(class_number) == int:
self.filters["Class"] = f"{class_number}"
else:
print("Invalid class number", class_number)
return self
def by_speed (self, speed: int):
"""Adds a filter on the running speed of the samples
Args:
speed (int): The desired speed
Returns:
self
"""
if 0 <= speed <= 100 and type(speed) == int:
self.filters["Speed"] = f"{speed}"
else:
print("Invalid speed", speed)
return self
def by_moving_motor (self, moving_motor: int):
"""Adds a filter on the moving motor in the sample
Args:
moving_motor (int): The desired moving motor number
Returns:
self
"""
if type(moving_motor) != int or moving_motor not in range(1,7):
print("Invalid motor number", moving_motor)
return self
self.filters["MovingMotor"] = f"{moving_motor}"
return self
def sample_rate (self, rate: int):
"""Filters the samples by sampling rate
Args:
rate (int): The desired sampling rate (in ms)
Returns:
self
"""
if type(rate) != int or rate not in range(4,61):
print("Invalid sampling rate", rate, "ms")
return self
self.filters["Sample_rate"] = f"{rate}"
return self
def by_run (self, run: int):
"""Adds a filter on the source file
Args:
run (int): The desired source file index
Returns:
self
"""
self.filters["run"] = f"{run}"
return self
def _select_distinct (self, property: str):
value = pd.read_sql_query(f"SELECT DISTINCT `{property}` FROM Robot_{self.robot}", self.db)
return value[property].to_numpy()
def _select_file_names (self):
value = pd.read_sql_query(f"SELECT `FileName` FROM DataFiles WHERE DataFiles.`index` IN (SELECT DISTINCT run FROM Robot_{self.robot})", self.db)
return value["FileName"].to_numpy()
def list_unique_properties (self):
"""Lists all of the unique values taken by the sample labels in
the robot table.
"""
props = [
"Speed", "Class", "MovingMotor", "Sample_rate", "run"
]
out = { }
for p in props:
out[p] = self._select_distinct(p)
out["FileNames"] = self._select_file_names()
return out
class Database:
"""Simple class that ease the use of the produced SQLite Database
"""
# The path
db_file: Path = None
# The database connection
db: sqlite3.Connection = None
def __init__(self, db: Path | str) -> None:
if type(db) != Path:
db = Path(db).absolute()
if not db.exists():
print("The specified database does not exist")
return
self.db_file = db
self.open()
def robot (self, cell: int):
"""Returns a query object for the specified robot data
Args:
cell (int): The cell number (between 1 and 3)
Returns:
The query object or None
"""
if cell not in range(1,4):
print("Invalid cell number", cell)
return None
return DatabaseQuery(self.db, cell)
def list_files (self) -> pd.DataFrame:
"""Lists the file names used to create this database, with their indexes
"""
return pd.read_sql_query(f"SELECT `FileName`, `index` FROM {_DB_METADATA_TABLE}", self.db)
def append_files (self, *files: DataFile, next: DBAppendCallback | None = None):
"""Adds data to the database
"""
# Adds the file description to the database
try:
md = pd.read_sql_query(f"SELECT `index` FROM {_DB_METADATA_TABLE}", self.db)
index = md["index"].to_numpy()[-1] + 1
except:
index = 0
metadata: pd.DataFrame = pd.concat([
DataFile_to_metadata(f) for f in files
])
metadata["index"] = np.arange(len(metadata)) + index
metadata.to_sql(_DB_METADATA_TABLE, self.db, if_exists="append")
# Gets and saves the data from the files
Extractor = DataExtractor()
Chrono = Chronometer()
I = len(files)
for i, f in enumerate(files):
Extractor.reset()
prefix = f"[{i+1}/{I}]"
if next is not None:
next(f.file_name, i+1, I)
Chrono.tick()
# Read Excel file
print(prefix, "Opening file", f.file_name, end=" ... ")
Extractor.load(f.file_path, f.trace)
Chrono.tick(True)
# Format columns
print(prefix, "Formatting file", f.file_name, end=" ... ")
# Extractor.auto_fit_trace_positions()
Extractor.apply_correction()
# Get the output
data_sysvar, data_trace = Extractor.extract()
data = data_trace if f.trace else data_sysvar
# Add labels
data_len = len(data[data.columns[0]])
data["Sample_rate"] = [ f.sampling_time ] * data_len
data["run"] = [ index + i ] * data_len
data["trace"] = [ 1 if f.trace else 0 ] * data_len
Chrono.tick(True)
# Add to database
print(prefix, "Saving file", f.file_name, end=" ... ")
table = f.robot.replace(" ", "_")
data.to_sql(table, self.db, if_exists="append", index=False)
Chrono.tick(True)
# raise Exception("")
def open (self):
"""Opens the database"""
self.db = sqlite3.connect(self.db_file)
def close (self):
"""Closes the database"""
self.db.close()
def DataFile_to_metadata (file: DataFile) -> pd.DataFrame:
"""Converts a data file descriptor to a DataFrame
to be added to a sqlite database
Args:
file (DataFile): The file to format
Returns:
pd.DataFrame: The metadata
"""
constant_speed = type(file.speed) != slice
if constant_speed:
start_speed = file.speed
stop_speed = file.speed
else:
start_speed = file.speed.start
stop_speed = file.speed.stop
robot = file.robot.replace(" ", "_")
cols = _DB_METADATA_COLUMNS
rows = [[
file.year, file.month, file.day,
file.name,
start_speed, stop_speed, constant_speed,
file.sampling_time, file.class_number,
*file.iterations,
robot,
file.file_name
]]
return pd.DataFrame(rows, columns=cols)
def init_database (db_file: Path | str):
"""Initializes a new sqlite database
Args:
db_file (Path | str): The path to the database
"""
DB = sqlite3.connect(db_file)
# Initialize metadata table
DB.execute("""
CREATE TABLE "DataFiles" (
"level_0" INTEGER,
"Year" INTEGER,
"Month" INTEGER,
"Day" INTEGER,
"Name" TEXT,
"Start_Speed" INTEGER,
"Stop_Speed" INTEGER,
"Constant_Speed" INTEGER,
"Sampling" INTEGER,
"Class" INTEGER,
"Iteration_A1" INTEGER,
"Iteration_A2" INTEGER,
"Iteration_A3" INTEGER,
"Iteration_A4" INTEGER,
"Iteration_A5" INTEGER,
"Iteration_A6" INTEGER,
"Robot" TEXT,
"FileName" TEXT,
"index" INTEGER
);
""")
DB.close()