Skip to content
Snippets Groups Projects
database.py 11.77 KiB
from pathlib import Path
import sqlite3
from typing import Dict, List, Callable
import pandas as pd
import numpy as np
from .utils import from_class_to_text, Chronometer
from .extractor import DataExtractor
from .data_finder import DataFile

_DB_A_COLUMNS = []
for i in range(1,7):
    _DB_A_COLUMNS = [ 
        *_DB_A_COLUMNS,
        *[ f"{col}_A{i}" for col in 
            [ 
                "Position", 
                "Position_Error", 
                "Position_Command", 
                "Current", 
                "Temperature" 
            ]
        ]
    ]

_DB_ROBOT_COLUMNS = [
    "Sample_time",
    "Class",
    "Speed",
    "MovingMotor",
    *_DB_A_COLUMNS,
    "Sample_rate",
    "run"
]

_DB_METADATA_COLUMNS = [
    "Year", "Month", "Day",
    "Name", 
    "Start_Speed", "Stop_Speed", "Constant_Speed", 
    "Sampling", "Class",
    *[ f"Iteration_A{i}" for i in range(1,7) ],
    "Robot",
    "FileName"
]

_DB_METADATA_TABLE = "DataFiles"

DBAppendCallback = Callable[[str, int, int], None]

class DatabaseQuery:
    """Represents a simple query for the database, used to filter the 
    requested data.
    """

    # The database connection
    db: sqlite3.Connection = None
    # The selected robot
    robot: int = 1

    # The defined filters
    filters: Dict[str, str] = {}

    # The selected columns
    columns: List[str] = []

    def __init__(self, db: sqlite3.Connection, robot: int) -> None:
        
        self.columns = []
        self.filters = {}

        self.db = db
        self.robot = robot
        return

    def _make_query_string (self) -> str:
        """Transforms the internal state to a SQLite query string

        Returns:
            str: The SQLite Query
        """

        query = "SELECT " + ("*" if len(self.columns) == 0 else (", ".join(self.columns)))
        query += f" FROM Robot_{self.robot}"

        filter_columns = self.filters.keys()
        if len(filter_columns) > 0:
            query += " WHERE "
            words = []
            for f in filter_columns:
                words.append(f"{f} = {self.filters[f]}")
            query += " AND ".join(words)

        return query

    def run (self) -> pd.DataFrame:
        """Runs the query to the SQLite Database

        Raises:
            Exception: The query configuration is wrong

        Returns:
            pd.DataFrame: The requested data
        """

        # Check if the robot parameter is OK
        if self.robot not in range(1,4):
            raise Exception("Invalid query parameter: Robot must be between 1 and 3")
        
        # Check if the requested columns are valid
        bad_columns = []
        for c in self.columns:
            if c not in _DB_ROBOT_COLUMNS:
                bad_columns.append(c)

        if len(bad_columns) > 0:
            t = " ".join(bad_columns)
            raise Exception(f"Invalid query parameter: invalid column(s): {t}")

        # Check if the filters are valid
        bad_filters = []
        for c in self.filters:
            if c not in _DB_ROBOT_COLUMNS:
                bad_filters.append(c)

        if len(bad_filters) > 0:
            t = " ".join(bad_filters)
            raise Exception(f"Invalid query parameter: invalid filter(s): {t}")

        # Get the requested data
        query = self._make_query_string()
        dataframe = pd.read_sql(query, self.db)
        
        # Transform some columns to categories
        if "Class" in dataframe.columns:
            dataframe["Class"] = [
                from_class_to_text(c) for c in dataframe["Class"]
            ]

        if "Sample_rate" in dataframe.columns:
            dataframe["Sample_rate"] = dataframe["Sample_rate"].astype("category")

        return dataframe

    def select_column (self, *columns: List[str]):
        """Adds columns to the selection

        Returns:
            self
        """

        for c in columns:
            if c not in _DB_ROBOT_COLUMNS:
                continue

            self.columns.append(c)

        return self
    
    def by_class (self, class_number: int):
        """Adds a filter on the class of the samples

        Args:
            class_number (int): between 0 and 15, the desired sample class

        Returns:
            self
        """

        if 0 <= class_number <= 15 and type(class_number) == int:
            self.filters["Class"] = f"{class_number}"
        else:
            print("Invalid class number", class_number)

        return self
    
    def by_speed (self, speed: int):
        """Adds a filter on the running speed of the samples

        Args:
            speed (int): The desired speed

        Returns:
            self
        """

        if 0 <= speed <= 100 and type(speed) == int:
            self.filters["Speed"] = f"{speed}"
        else:
            print("Invalid speed", speed)

        return self
        
    def by_moving_motor (self, moving_motor: int):
        """Adds a filter on the moving motor in the sample

        Args:
            moving_motor (int): The desired moving motor number

        Returns:
            self
        """

        if type(moving_motor) != int or moving_motor not in range(1,7):
            print("Invalid motor number", moving_motor)
            return self
        
        self.filters["MovingMotor"] = f"{moving_motor}"

        return self
    
    def sample_rate (self, rate: int):
        """Filters the samples by sampling rate

        Args:
            rate (int): The desired sampling rate (in ms)

        Returns:
            self
        """

        if type(rate) != int or rate not in range(4,61):
            print("Invalid sampling rate", rate, "ms")
            return self
        
        self.filters["Sample_rate"] = f"{rate}"

        return self
        
    def by_run (self, run: int):
        """Adds a filter on the source file

        Args:
            run (int): The desired source file index

        Returns:
            self
        """
        
        self.filters["run"] = f"{run}"

        return self

    def _select_distinct (self, property: str):
        value = pd.read_sql_query(f"SELECT DISTINCT `{property}` FROM Robot_{self.robot}", self.db)
        return value[property].to_numpy()

    def _select_file_names (self):
        value = pd.read_sql_query(f"SELECT `FileName` FROM DataFiles WHERE DataFiles.`index` IN (SELECT DISTINCT run FROM Robot_{self.robot})", self.db)
        return value["FileName"].to_numpy()

    def list_unique_properties (self):
        """Lists all of the unique values taken by the sample labels in
        the robot table.
        """

        props = [
            "Speed", "Class", "MovingMotor", "Sample_rate", "run"
        ]

        out = { }

        for p in props:
            out[p] = self._select_distinct(p)

        out["FileNames"] = self._select_file_names()
        
        return out
    

class Database:
    """Simple class that ease the use of the produced SQLite Database
    """

    # The path 
    db_file: Path = None

    # The database connection
    db: sqlite3.Connection = None

    def __init__(self, db: Path | str) -> None:
        
        if type(db) != Path:
            db = Path(db).absolute()
        
        if not db.exists():
            print("The specified database does not exist")
            return
        
        self.db_file = db

        self.open()

    def robot (self, cell: int):
        """Returns a query object for the specified robot data

        Args:
            cell (int): The cell number (between 1 and 3)

        Returns:
            The query object or None
        """

        if cell not in range(1,4):
            print("Invalid cell number", cell)
            return None

        return DatabaseQuery(self.db, cell)
    
    def list_files (self) -> pd.DataFrame:
        """Lists the file names used to create this database, with their indexes
        """

        return pd.read_sql_query(f"SELECT `FileName`, `index` FROM {_DB_METADATA_TABLE}", self.db)

    def append_files (self, *files: DataFile, next: DBAppendCallback | None = None):
        """Adds data to the database
        """

        # Adds the file description to the database
        try:
            md = pd.read_sql_query(f"SELECT `index` FROM {_DB_METADATA_TABLE}", self.db)
            index = md["index"].to_numpy()[-1] + 1
        except:
            index = 0

        metadata: pd.DataFrame = pd.concat([
            DataFile_to_metadata(f) for f in files
        ])
        metadata["index"] = np.arange(len(metadata)) + index

        metadata.to_sql(_DB_METADATA_TABLE, self.db, if_exists="append")

        # Gets and saves the data from the files
        Extractor = DataExtractor()
        Chrono = Chronometer()
        I = len(files)
        for i, f in enumerate(files):
            Extractor.reset()
            prefix = f"[{i+1}/{I}]"
            if next is not None:
                next(f.file_name, i+1, I)
            Chrono.tick()

            # Read Excel file
            print(prefix, "Opening file", f.file_name, end=" ... ")
            Extractor.load(f.file_path, f.trace)
            Chrono.tick(True)

            # Format columns
            print(prefix, "Formatting file", f.file_name, end=" ... ")
            # Extractor.auto_fit_trace_positions()
            Extractor.apply_correction()

            # Get the output
            data_sysvar, data_trace = Extractor.extract()            
            data = data_trace if f.trace else data_sysvar

            # Add labels
            data_len = len(data[data.columns[0]])
            data["Sample_rate"] = [ f.sampling_time ] * data_len
            data["run"] =         [ index + i ] * data_len
            data["trace"] =       [ 1 if f.trace else 0 ] * data_len

            Chrono.tick(True)

            # Add to database
            print(prefix, "Saving file", f.file_name, end=" ... ")
            table = f.robot.replace(" ", "_")
            data.to_sql(table, self.db, if_exists="append", index=False)
            Chrono.tick(True)
            # raise Exception("")

    def open (self):
        """Opens the database"""
        self.db = sqlite3.connect(self.db_file)

    def close (self):
        """Closes the database"""
        self.db.close()


def DataFile_to_metadata (file: DataFile) -> pd.DataFrame:
    """Converts a data file descriptor to a DataFrame 
    to be added to a sqlite database

    Args:
        file (DataFile): The file to format

    Returns:
        pd.DataFrame: The metadata
    """

    constant_speed = type(file.speed) != slice
    if constant_speed:
        start_speed = file.speed
        stop_speed = file.speed
    else:
        start_speed = file.speed.start
        stop_speed = file.speed.stop

    robot = file.robot.replace(" ", "_")

    cols = _DB_METADATA_COLUMNS
    rows = [[
        file.year, file.month, file.day,
        file.name,
        start_speed, stop_speed, constant_speed,
        file.sampling_time, file.class_number,
        *file.iterations,
        robot,
        file.file_name
    ]]

    return pd.DataFrame(rows, columns=cols)

def init_database (db_file: Path | str):
    """Initializes a new sqlite database

    Args:
        db_file (Path | str): The path to the database
    """

    DB = sqlite3.connect(db_file)

    # Initialize metadata table
    DB.execute("""
        CREATE TABLE "DataFiles" (
            "level_0"	INTEGER,
            "Year"	INTEGER,
            "Month"	INTEGER,
            "Day"	INTEGER,
            "Name"	TEXT,
            "Start_Speed"	INTEGER,
            "Stop_Speed"	INTEGER,
            "Constant_Speed"	INTEGER,
            "Sampling"	INTEGER,
            "Class"	INTEGER,
            "Iteration_A1"	INTEGER,
            "Iteration_A2"	INTEGER,
            "Iteration_A3"	INTEGER,
            "Iteration_A4"	INTEGER,
            "Iteration_A5"	INTEGER,
            "Iteration_A6"	INTEGER,
            "Robot"	TEXT,
            "FileName"	TEXT,
            "index"	INTEGER
        );
    """)

    DB.close()