(127) MySQL DBドライバのラッパークラスを作る。

投稿者: | 2025年4月14日

583 views

この記事は最終更新から 369日 が経過しています。

1. やりたいこと

Pythonプログラムで MySQL DBにアクセスするとき、ここ数年は SQLAlchemy を使っている。
今回、たまたま mysql-connector-python を使うことになったので、これを使う便利クラスでラップしたい。

2. やってみる

個人的な備忘録なので、解説無しで結果だけを記録しておく。
※今後の改良時に随時更新しよう。

(1/5) logger_config.py

import logging

logger = logging.getLogger('MyApp')
logger.setLevel(logging.INFO)

handler   = logging.FileHandler('myapp.log', mode='a', encoding='utf-8')
formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

(2/5) DBSession.py

DB接続管理クラス

from logger_config import logger
import mysql.connector
from mysql.connector import Error

class DBSession:
    def __init__(self, host, user, password, database):
        try:
            self.conn = mysql.connector.connect(
                host=host,
                user=user,
                password=password,
                database=database
            )
            self._cursor = self.conn.cursor(dictionary=True)
            logger.info("DBSession 接続成功")
        except Error as e:
            logger.error(f"DBSession 接続エラー: {e}")
            self.conn = None
            self._cursor = None
            raise

    @property
    def cursor(self):
        if not self.conn or not self._cursor:
            raise RuntimeError("DB接続またはカーソルが無効です")
        return self._cursor

    def commit(self):
        if self.conn:
            self.conn.commit()

    def rollback(self):
        if self.conn:
            self.conn.rollback()

    def close(self):
        if self._cursor:
            self._cursor.close()
        if self.conn:
            self.conn.close()
        logger.info("DBSession 切断完了")

(3/5) DBTableBase.py

各テーブル用DAOの親クラス

from logger_config import logger
from typing import List, Dict, Any
import re

class DBTableBase:
    def __init__(self, session, table_name: str, id_column: str):
        if not re.match(r'^\w+$', table_name):
            raise ValueError("Invalid table name")
        if not re.match(r'^\w+$', id_column):
            raise ValueError("Invalid id column")
        self.session = session
        self._table_name = table_name
        self._id_column = id_column
        self._ary_value = []

    @property
    def table_name(self):
        return self._table_name

    @property
    def id_column(self):
        return self._id_column

    @property
    def cursor(self):
        if not self.session.conn:
            raise RuntimeError("DB接続が無効です")
        return self.session.cursor

    @property
    def fetchedData(self):
        return self._ary_value

    def execute(self, sql: str, params: tuple = ()): 
        self.cursor.execute(sql, params)

    def insert(self, columns: List[str], values: List[Any]):
        try:
            placeholders = ", ".join(["%s"] * len(values))
            column_names = ", ".join(columns)
            sql = f"INSERT INTO {self.table_name} ({column_names}) VALUES ({placeholders})"
            self.execute(sql, tuple(values))
        except Exception as e:
            logger.error(f"[{self.__class__.__name__}] INSERTエラー: {e}")
            raise

    def insert_dict(self, data: Dict[str, Any]):
        self.insert(list(data.keys()), list(data.values()))

    def select_by_id(self, id_value: Any) -> Dict[str, Any]:
        try:
            sql = f"SELECT * FROM {self.table_name} WHERE {self.id_column} = %s"
            self.execute(sql, (id_value,))
            self._ary_value = [self.cursor.fetchone()]
            return self._ary_value[0]
        except Exception as e:
            logger.error(f"[{self.__class__.__name__}] SELECTエラー: {e}")
            raise

    def select_by_conditions(self, conditions: Dict[str, Any]) -> List[Dict[str, Any]]:
        try:
            where_clause = " AND ".join([f"{k} = %s" for k in conditions.keys()])
            sql = f"SELECT * FROM {self.table_name} WHERE {where_clause}"
            self.execute(sql, tuple(conditions.values()))
            self._ary_value = self.cursor.fetchall()
            return self._ary_value
        except Exception as e:
            logger.error(f"[{self.__class__.__name__}] 条件検索エラー: {e}")
            raise

    def select_all(self) -> List[Dict[str, Any]]:
        try:
            sql = f"SELECT * FROM {self.table_name} ORDER BY {self.id_column}"
            self.execute(sql)
            self._ary_value = self.cursor.fetchall()
            return self._ary_value
        except Exception as e:
            logger.error(f"[{self.__class__.__name__}] 全件取得エラー: {e}")
            raise

    def update(self, updates: Dict[str, Any], conditions: Dict[str, Any]):
        try:
            set_clause = ", ".join([f"{k} = %s" for k in updates.keys()])
            where_clause = " AND ".join([f"{k} = %s" for k in conditions.keys()])
            sql = f"UPDATE {self.table_name} SET {set_clause} WHERE {where_clause}"
            values = list(updates.values()) + list(conditions.values())
            self.execute(sql, tuple(values))
        except Exception as e:
            logger.error(f"[{self.__class__.__name__}] UPDATEエラー: {e}")
            raise

    def update_by_id(self, id_value: Any, update_column: str, new_value: Any):
        self.update({update_column: new_value}, {self.id_column: id_value})

    def delete_by_id(self, id_value: Any):
        try:
            sql = f"DELETE FROM {self.table_name} WHERE {self.id_column} = %s"
            self.execute(sql, (id_value,))
        except Exception as e:
            logger.error(f"[{self.__class__.__name__}] DELETEエラー: {e}")
            raise

(4/5) PriceTable.py

個別のテーブル用DAOクラス
※必ずDBTableBaseを継承すること

from DBTableBase import DBTableBase
from datetime import date
from typing import Union

class PriceTable(DBTableBase):
    def __init__(self, session):
        super().__init__(session, 'tbl_price', 'p_id')

    def insert_price(self, pd_id: int, p_date: date, p_price: Union[int, float], s_id: int):
        self.insert_dict({
            'pd_id':   pd_id,
            'p_date':  p_date,
            'p_price': p_price,
            's_id':    s_id,
        })

    def update_price(self, p_id: int, new_price: Union[int, float]):
        self.update_by_id(p_id, 'p_price', new_price)

(5/5) _usage.py

使用例

from datetime import date
from DBSession import DBSession
from ShopTable import ShopTable
from ProductTable import ProductTable

# DB接続
session = DBSession('localhost', 'xxxxx', 'yyyyy', 'my_db')
shop    = ShopTable(session)
product = ProductTable(session)

# (1) shopテーブルのレコードを表示する。
ary_res1 = shop.select_all()
print(ary_res1)
ary_res2 = shop.fetchedData
print(ary_res2)

# (2) productテーブルにレコードを追加する。
product.insert_product('Red Apple', '1')
session.commit()

3. MariaDB公式ライブラリを使う場合

MariaDB用に最適化されているので、MariaDBを使う場合はこちらの方が良いはず。

元が同じソフトウェアなので大きな差はない。
DBSession.pyの importだけ変えれば動く。(2025.04.14時点)

from logger_config import logger
import mariadb
from mariadb import Error

class DBSession:
    def __init__(self, host, user, password, database):
        try:
            self.conn = mariadb.connect(
                host=host,
                user=user,
                password=password,
                database=database
            )
            self._cursor = self.conn.cursor(dictionary=True)
            logger.info("DBSession 接続成功")
        except Error as e:
            logger.error(f"DBSession 接続エラー: {e}")
            self.conn = None
            self._cursor = None
            raise

    @property
    def cursor(self):
        if not self.conn or not self._cursor:
            raise RuntimeError("DB接続またはカーソルが無効です")
        return self._cursor

    def commit(self):
        if self.conn:
            self.conn.commit()

    def rollback(self):
        if self.conn:
            self.conn.rollback()

    def close(self):
        if self._cursor:
            self._cursor.close()
        if self.conn:
            self.conn.close()
        logger.info("DBSession 切断完了")

アクセス数(直近7日): ※試験運用中、BOT除外簡易実装済
  • 2026-04-19: 1回
  • 2026-04-18: 0回
  • 2026-04-17: 1回
  • 2026-04-16: 0回
  • 2026-04-15: 0回
  • 2026-04-14: 0回
  • 2026-04-13: 0回
  • コメントを残す

    メールアドレスが公開されることはありません。 が付いている欄は必須項目です