.. _db: *************** Base de données *************** Le :ref:`test-python` a montré la limite des structures de données natives de Python pour effectuer des requêtes complexes. Les bases de données sont des outils plus adaptés pour effectuer des requêtes complexes sur de grandes quantités de données. Les bases de données permettent de stocker, d'organiser et de manipuler des données de manière structurée. Elles offrent des fonctionnalités avancées pour effectuer des requêtes complexes, trier, filtrer et agréger des données. Les bases de données de type SQL (Structured Query Language) sont les plus courantes. Elles stockent les données dans des tables, qui sont des structures de données rectangulaires composées de lignes et de colonnes. Les bases de données SQL utilisent le langage SQL pour effectuer des requêtes sur les données. Il existe également d'autres types de bases de données, comme les bases de données NoSQL (Not Only SQL) qui stockent les données dans des documents, des graphes ou des paires clé-valeur. Parmi celles ci, on peut citer MongoDB, Neo4j et Redis. Ces bases de données fonctionne sur un modèle client serveur. Le serveur de base de données écoute sur un port donné et attend des requêtes de la part des clients. Les clients envoient des requêtes SQL au serveur, qui les exécute et renvoie les résultats. SQLite ====== `SQLite `_ est une base de données SQL embarquée, c'est-à-dire qu'elle ne nécessite pas de serveur pour fonctionner. Elle stocke les données dans un fichier unique, ce qui la rend très simple à utiliser et à déployer pour des applications légères. Python possède son wrapper pour SQLite, le module :mod:`sqlite3`. Ce module permet de se connecter à une base de données SQLite, d'exécuter des requêtes SQL et de récupérer les résultats. Cependant SQL a été conçu dans un monde non objet. Il est donc difficile de manipuler les résultats des requêtes SQL avec les structures de données Python. Il y a 3 façons de travailler avec SQLite. Raw SQL ------- La première façon de travailler avec SQlite consiste à écrire des requêtes SQL directement dans le code Python. C'est la méthode la plus simple, mais elle est limitée pour les requêtes complexes et ne tire pas partie du paradigme objet de Python. En particulier les seuls types de données que l'on peut manipuler sont NULL, INTEGER, REAL, TEXT et BLOB. Un exemple. On commence par créer la base de données :: import sqlite3 # Connect to (or create) the database conn = sqlite3.connect("example.db") cursor = conn.cursor() # Create a table cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, age INTEGER NOT NULL, city TEXT NOT NULL ); """) conn.commit() conn.close() Insertion dans la base ...................... On peut ensuite insérer des données dans la base de données :: conn = sqlite3.connect("example.db") cursor = conn.cursor() cursor.execute("INSERT INTO users (name, age, city) VALUES ('Alice', 25, 'New York')") cursor.execute("INSERT INTO users (name, age, city) VALUES ('Bob', 30, 'Paris')") conn.commit() conn.close() Récupération, mise à jour et suppression ........................................ On peut afficher l'ensemble des utilisateurs de la base de données :: conn = sqlite3.connect("example.db") cursor = conn.cursor() cursor.execute("SELECT * FROM users") rows = cursor.fetchall() conn.close() for row in rows: print(row) ou mettre à jour ou supprimer un utilisateur :: conn = sqlite3.connect("example.db") cursor = conn.cursor() cursor.execute("DELETE FROM users WHERE name = 'Bob'") conn.commit() conn.close() Pour ces opérations, on insère la requête SQL directement dans le code Python. Cela peut être pratique pour des requêtes simples, mais cela devient vite compliqué pour des requêtes complexes. Manual Object Mapping --------------------- La deuxième façon de travailler avec SQlite consiste à faire un mapping manuel entre les données de la base et des objets Python. On crée ici une classe :class:`User` qui représente un utilisateur de la base de données. Cette classe implémente des méthodes pour sauvegarder, récupérer, mettre à jour et supprimer des utilisateurs dans la base de données. Cette façon de faire permet de regrouper les opérations sur les données dans une classe, et de manipuler les données à un plus haut niveau d'abstraction. .. code-block:: python class User: def __init__(self, name: str, age: int, city: str, user_id: int = None): self.id = user_id # Can be None for new users self.name = name self.age = age self.city = city def save(self): """Inserts a new user into the database.""" conn = sqlite3.connect("example.db") cursor = conn.cursor() cursor.execute("INSERT INTO users (name, age, city) VALUES (?, ?, ?)", (self.name, self.age, self.city)) conn.commit() conn.close() @staticmethod def get_all(): """Fetches all users from the database and returns a list of User objects.""" conn = sqlite3.connect("example.db") cursor = conn.cursor() cursor.execute("SELECT id, name, age, city FROM users") rows = cursor.fetchall() conn.close() return [User(user_id=row[0], name=row[1], age=row[2], city=row[3]) for row in rows] @staticmethod def update(user_id: int, new_age: int): """Updates the age of a user by ID.""" conn = sqlite3.connect("example.db") cursor = conn.cursor() cursor.execute("UPDATE users SET age = ? WHERE id = ?", (new_age, user_id)) conn.commit() conn.close() @staticmethod def delete(user_id: int): """Deletes a user by ID.""" conn = sqlite3.connect("example.db") cursor = conn.cursor() cursor.execute("DELETE FROM users WHERE id = ?", (user_id,)) conn.commit() conn.close() Insertion dans la base ...................... L'insertion d'un utilisateur dans la base de données se fait en créant une instance de :class:`User` et en appelant la méthode :meth:`save`. .. code-block:: python # Create user objects and save them to the database user1 = User(name="Alice", age=25, city="New York") user1.save() user2 = User(name="Bob", age=30, city="Paris") user2.save() Récupération, mise à jour et suppression ........................................ On peut afficher l'ensemble des utilisateurs de la base de données. .. code-block:: python users = User.get_all() for user in users: print(f"ID: {user.id}, Name: {user.name}, Age: {user.age}, City: {user.city}") ou mettre à jour ou supprimer un utilisateur. .. code-block:: python User.update(user_id=1, new_age=26) User.delete(user_id=2) ORM --- La troisième façon de travailler avec SQlite consiste à utiliser un ORM (Object Relational Mapping) comme `SQLAlchemy `_. SQLAlchemy permet de manipuler les données de la base de données comme des objets Python sans écrire une seule ligne de SQL. C'est la solution la plus puissante, et celle qui sera utilisée ici pour son très haut niveau d'abstraction. Observer comment est défini le modèle de données. .. code-block:: python from sqlalchemy import create_engine, Column, Integer, String from sqlalchemy.orm import declarative_base, sessionmaker # Création du moteur SQLite DATABASE_URL = "sqlite:///example.db" engine = create_engine(DATABASE_URL, echo=True) # echo=True pour afficher les requêtes SQL exécutées # Définition de la base Base = declarative_base() # Définition du modèle User class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String, nullable=False) age = Column(Integer, nullable=False) city = Column(String, nullable=False) # Création de la table Base.metadata.create_all(engine) # Création d'une session Session = sessionmaker(bind=engine) session = Session() Insertion dans la base ...................... L'insertion est similaire à la méthode précédente, on crée des instances de :class:`User` et on les ajoute à la session. .. code-block:: python # Création des utilisateurs user1 = User(name="Alice", age=25, city="New York") user2 = User(name="Bob", age=30, city="Paris") # Ajout à la session et commit session.add(user1) session.add(user2) session.commit() Récupération, mise à jour et suppression ........................................ Les autres opérations sont similaires. La récupération de données. .. code-block:: python users = session.query(User).all() for user in users: print(f"ID: {user.id}, Name: {user.name}, Age: {user.age}, City: {user.city}") La modification. .. code-block:: python user = session.query(User).filter_by(name="Alice").first() if user: user.age = 26 session.commit() La suppression. .. code-block:: python user = session.query(User).filter_by(name="Bob").first() if user: session.delete(user) session.commit() .. admonition:: Tutoriel Faire le tutoriel `Data Management With Python, SQLite, and SQLAlchemy `_ SQLAlchemy et Pydantic ====================== Pour profiter des avantages des deux, il est possible d'utiliser Pydantic avec SQLAlchemy bien qu'ils adressent des problèmes différents (Pydantic pour la validation des données et SQLAlchemy pour l'ORM) : - le modèle SQLAlchemy permet d'interagir avec la base de données ; - le modèle Pydantic permet de valider et de sérialiser les données. Pour illustrer ce fonctionnement, on crée une classe :class:`User` avec trois champs : - ``id`` : sera utilisé comme clé primaire - ``name`` : le nom de l'utilisateur - ``email`` : l'adresse email de l'utilisateur La première étape est de définir un modèle de données SQLAlchemy. .. code-block:: python from sqlalchemy import create_engine, Column, Integer, String from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String, index=True) email = Column(String, unique=True, index=True) # Création de la base de données SQLite en mémoire engine = create_engine('sqlite:///:memory:') Base.metadata.create_all(bind=engine) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Ensuite, on crée un modèle Pydantic pour valider les données. .. code-block:: python from pydantic import BaseModel class UserCreate(BaseModel): name: str email: str class UserOut(BaseModel): id: int name: str email: str class Config: orm_mode = True # Ceci permet à Pydantic de travailler avec les objets SQLAlchemy On peut alors utiliser ces modèles pour interagir avec la base de données. .. code-block:: python from sqlalchemy.orm import Session def create_user(db: Session, user: UserCreate) -> UserOut: db_user = User(name=user.name, email=user.email) db.add(db_user) db.commit() db.refresh(db_user) return db_user def get_user(db: Session, user_id: int) -> UserOut: return db.query(User).filter(User.id == user_id).first() def get_user_by_email(db: Session, email: str) -> UserOut: return db.query(User).filter(User.email == email).first() def get_users(db: Session, skip: int = 0, limit: int = 100) -> List[UserOut]: return db.query(User).offset(skip).limit(limit).all() def update_user(db: Session, user_id: int, user: UserCreate) -> UserOut: db_user = db.query(User).filter(User.id == user_id).first() db_user.name = user.name db_user.email = user.email db.commit() db.refresh(db_user) return db_user def delete_user(db: Session, user_id: int) -> UserOut: db_user = db.query(User).filter(User.id == user_id).first() db.delete(db_user) db.commit() return db_user Voilà un exemple d'utilisation du code ci dessus avec 3 utilisateurs : .. code-block:: python # Utilisation de la base de données db = SessionLocal() # 1. Création de trois utilisateurs user1 = create_user(db, UserCreate(name="Alice", email="alice@example.com")) user2 = create_user(db, UserCreate(name="Bob", email="bob@example.com")) user3 = create_user(db, UserCreate(name="Charlie", email="charlie@example.com")) print("\n=== Utilisateurs après création ===") for user in get_users(db): print(user) # 2. Mise à jour d'un utilisateur updated_user = update_user(db, user2.id, UserCreate(name="Bobby", email="bobby@example.com")) print("\n=== Utilisateur après mise à jour ===") print(updated_user) # 3. Suppression d'un utilisateur deleted_user = delete_user(db, user1.id) print("\n=== Utilisateur supprimé ===") print(deleted_user) # 4. Liste des utilisateurs restants print("\n=== Utilisateurs après suppression ===") for user in get_users(db): print(user) # Fermeture de la session db.close() Application =========== Utiliser une solution Sqlite/SQLAlchemy/Pydantic pour traiter le :ref:`test-python`. La base de données Sqlite créée pourra être requêtée par le client CLI :: sqlite> SELECT * from population ...> WHERE NOM_OFFICIEL_DEPARTEMENT="Seine-et-Marne" ...> AND POPULATION_TOTALE > 50000 ...> AND ANNEE_DE_RECENSEMENT='2018'; 21406|11|Île-de-France|77|001|77284|Meaux|55416|477|55893|2018|2021|2020|CA du Pays de Meaux|200072130|Seine-et-Marne 46420|11|Île-de-France|77|005|77108|Chelles|55148|293|55441|2018|2021|2020|CA Paris - Vallée de la Marne|200057958|Seine-et-Marne On pourra utiliser la structuration suivante :: sqlalchemy_app/ │ ├── main.py ├── db/ │ ├── models.py │ ├── schema.py │ └── utils.py └── data/ └── population.csv Effectuer la requête précédente via SQLAlchemy et vérifier que le résultat est le même que celui obtenu avec SQLite. .. main.py from db.utils import create_tables, table_exists, clean_column_name from db.models import Commune import pandas as pd from pathlib import Path from sqlalchemy import create_engine, inspect from sqlalchemy.orm import sessionmaker CSV_PATH = Path("data/population.csv") SQLITE_DB_PATH = "sqlite:///population.db" def populate_data(filepath, session): df = pd.read_csv(filepath, delimiter=";") df.columns = [clean_column_name(col) for col in df.columns] for _, row in df.iterrows(): commune = Commune(**row.to_dict()) # Ensure column names match session.add(commune) session.commit() def query_data(session): results = session.query(Commune).filter( Commune.NOM_OFFICIEL_DEPARTEMENT == "Seine-et-Marne", Commune.POPULATION_TOTALE > 50000, Commune.ANNEE_DE_RECENSEMENT == "2018" ).all() for row in results: print(row.CODE_OFFICIEL_COMMUNE___ARRONDISSEMENT_MUNICIPAL, row.NOM_OFFICIEL_COMMUNE___ARRONDISSEMENT_MUNICIPAL, row.POPULATION_TOTALE, row.ANNEE_DE_RECENSEMENT) def main(): engine = create_engine(SQLITE_DB_PATH, echo=False, future=True) # inspector = inspect(engine) Session = sessionmaker(bind=engine) session = Session() if table_exists(engine, "population"): print("✅ Database and table 'population' already exists. Querying...") else: print("Creating and populating the database...") create_tables(engine) populate_data(CSV_PATH, session) return query_data(session) if __name__ == "__main__": main() .. db/models.py from sqlalchemy import Column, Integer, String from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() class Commune(Base): __tablename__ = "population" id = Column(Integer, primary_key=True, autoincrement=True) CODE_OFFICIEL_REGION = Column(String) NOM_OFFICIEL_REGION = Column(String) CODE_OFFICIEL_DEPARTEMENT = Column(String) CODE_OFFICIEL_ARRONDISSEMENT_DEPARTEMENTAL = Column(String) CODE_OFFICIEL_COMMUNE___ARRONDISSEMENT_MUNICIPAL = Column(String) NOM_OFFICIEL_COMMUNE___ARRONDISSEMENT_MUNICIPAL = Column(String) POPULATION_MUNICIPALE = Column(Integer) POPULATION_COMPTEE_A_PART = Column(Integer) POPULATION_TOTALE = Column(Integer) ANNEE_DE_RECENSEMENT = Column(String) ANNEE_DENTREE_EN_VIGUEUR = Column(String) ANNEE_DE_REFERENCE_GEOGRAPHIQUE = Column(String) NOM_OFFICIEL_EPCI = Column(String) CODE_OFFICIEL_EPCI = Column(String) NOM_OFFICIEL_DEPARTEMENT = Column(String) .. db/schema.py from pydantic import BaseModel from typing import Optional class CommuneSchema(BaseModel): CODE_OFFICIEL_REGION: Optional[str] NOM_OFFICIEL_REGION: Optional[str] CODE_OFFICIEL_DEPARTEMENT: Optional[str] CODE_OFFICIEL_ARRONDISSEMENT_DEPARTEMENTAL: Optional[str] CODE_OFFICIEL_COMMUNE___ARRONDISSEMENT_MUNICIPAL: Optional[str] NOM_OFFICIEL_COMMUNE___ARRONDISSEMENT_MUNICIPAL: Optional[str] POPULATION_MUNICIPALE: Optional[int] POPULATION_COMPTEE_A_PART: Optional[int] POPULATION_TOTALE: Optional[int] ANNEE_DE_RECENSEMENT: Optional[str] ANNEE_DENTREE_EN_VIGUEUR: Optional[str] ANNEE_DE_REFERENCE_GEOGRAPHIQUE: Optional[str] NOM_OFFICIEL_EPCI: Optional[str] CODE_OFFICIEL_EPCI: Optional[str] NOM_OFFICIEL_DEPARTEMENT: Optional[str] class Config: from_attributes = True .. db/utils.py from sqlalchemy import inspect from .models import Base import unicodedata def table_exists(engine, table_name: str) -> bool: inspector = inspect(engine) return table_name in inspector.get_table_names() def create_tables(engine): Base.metadata.create_all(engine) def clean_column_name(name): name = name.upper().strip() name = name.replace(" ", "_") name = name.replace("/", "_") name = unicodedata.normalize('NFKD', name).encode('ascii', 'ignore').decode('utf-8') return name