583 views
この記事は最終更新から 369日 が経過しています。
1. やりたいこと
Pythonプログラムで MySQL DBにアクセスするとき、ここ数年は SQLAlchemy を使っている。
今回、たまたま mysql-connector-python を使うことになったので、これを使う便利クラスでラップしたい。
2. やってみる
個人的な備忘録なので、解説無しで結果だけを記録しておく。
※今後の改良時に随時更新しよう。
(1/5) logger_config.py
import logging
logger = logging.getLogger('MyApp')
logger.setLevel(logging.INFO)
handler = logging.FileHandler('myapp.log', mode='a', encoding='utf-8')
formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
(2/5) DBSession.py
DB接続管理クラス
from logger_config import logger
import mysql.connector
from mysql.connector import Error
class DBSession:
def __init__(self, host, user, password, database):
try:
self.conn = mysql.connector.connect(
host=host,
user=user,
password=password,
database=database
)
self._cursor = self.conn.cursor(dictionary=True)
logger.info("DBSession 接続成功")
except Error as e:
logger.error(f"DBSession 接続エラー: {e}")
self.conn = None
self._cursor = None
raise
@property
def cursor(self):
if not self.conn or not self._cursor:
raise RuntimeError("DB接続またはカーソルが無効です")
return self._cursor
def commit(self):
if self.conn:
self.conn.commit()
def rollback(self):
if self.conn:
self.conn.rollback()
def close(self):
if self._cursor:
self._cursor.close()
if self.conn:
self.conn.close()
logger.info("DBSession 切断完了")
(3/5) DBTableBase.py
各テーブル用DAOの親クラス
from logger_config import logger
from typing import List, Dict, Any
import re
class DBTableBase:
def __init__(self, session, table_name: str, id_column: str):
if not re.match(r'^\w+$', table_name):
raise ValueError("Invalid table name")
if not re.match(r'^\w+$', id_column):
raise ValueError("Invalid id column")
self.session = session
self._table_name = table_name
self._id_column = id_column
self._ary_value = []
@property
def table_name(self):
return self._table_name
@property
def id_column(self):
return self._id_column
@property
def cursor(self):
if not self.session.conn:
raise RuntimeError("DB接続が無効です")
return self.session.cursor
@property
def fetchedData(self):
return self._ary_value
def execute(self, sql: str, params: tuple = ()):
self.cursor.execute(sql, params)
def insert(self, columns: List[str], values: List[Any]):
try:
placeholders = ", ".join(["%s"] * len(values))
column_names = ", ".join(columns)
sql = f"INSERT INTO {self.table_name} ({column_names}) VALUES ({placeholders})"
self.execute(sql, tuple(values))
except Exception as e:
logger.error(f"[{self.__class__.__name__}] INSERTエラー: {e}")
raise
def insert_dict(self, data: Dict[str, Any]):
self.insert(list(data.keys()), list(data.values()))
def select_by_id(self, id_value: Any) -> Dict[str, Any]:
try:
sql = f"SELECT * FROM {self.table_name} WHERE {self.id_column} = %s"
self.execute(sql, (id_value,))
self._ary_value = [self.cursor.fetchone()]
return self._ary_value[0]
except Exception as e:
logger.error(f"[{self.__class__.__name__}] SELECTエラー: {e}")
raise
def select_by_conditions(self, conditions: Dict[str, Any]) -> List[Dict[str, Any]]:
try:
where_clause = " AND ".join([f"{k} = %s" for k in conditions.keys()])
sql = f"SELECT * FROM {self.table_name} WHERE {where_clause}"
self.execute(sql, tuple(conditions.values()))
self._ary_value = self.cursor.fetchall()
return self._ary_value
except Exception as e:
logger.error(f"[{self.__class__.__name__}] 条件検索エラー: {e}")
raise
def select_all(self) -> List[Dict[str, Any]]:
try:
sql = f"SELECT * FROM {self.table_name} ORDER BY {self.id_column}"
self.execute(sql)
self._ary_value = self.cursor.fetchall()
return self._ary_value
except Exception as e:
logger.error(f"[{self.__class__.__name__}] 全件取得エラー: {e}")
raise
def update(self, updates: Dict[str, Any], conditions: Dict[str, Any]):
try:
set_clause = ", ".join([f"{k} = %s" for k in updates.keys()])
where_clause = " AND ".join([f"{k} = %s" for k in conditions.keys()])
sql = f"UPDATE {self.table_name} SET {set_clause} WHERE {where_clause}"
values = list(updates.values()) + list(conditions.values())
self.execute(sql, tuple(values))
except Exception as e:
logger.error(f"[{self.__class__.__name__}] UPDATEエラー: {e}")
raise
def update_by_id(self, id_value: Any, update_column: str, new_value: Any):
self.update({update_column: new_value}, {self.id_column: id_value})
def delete_by_id(self, id_value: Any):
try:
sql = f"DELETE FROM {self.table_name} WHERE {self.id_column} = %s"
self.execute(sql, (id_value,))
except Exception as e:
logger.error(f"[{self.__class__.__name__}] DELETEエラー: {e}")
raise
(4/5) PriceTable.py
個別のテーブル用DAOクラス
※必ずDBTableBaseを継承すること
from DBTableBase import DBTableBase
from datetime import date
from typing import Union
class PriceTable(DBTableBase):
def __init__(self, session):
super().__init__(session, 'tbl_price', 'p_id')
def insert_price(self, pd_id: int, p_date: date, p_price: Union[int, float], s_id: int):
self.insert_dict({
'pd_id': pd_id,
'p_date': p_date,
'p_price': p_price,
's_id': s_id,
})
def update_price(self, p_id: int, new_price: Union[int, float]):
self.update_by_id(p_id, 'p_price', new_price)
(5/5) _usage.py
使用例
from datetime import date
from DBSession import DBSession
from ShopTable import ShopTable
from ProductTable import ProductTable
# DB接続
session = DBSession('localhost', 'xxxxx', 'yyyyy', 'my_db')
shop = ShopTable(session)
product = ProductTable(session)
# (1) shopテーブルのレコードを表示する。
ary_res1 = shop.select_all()
print(ary_res1)
ary_res2 = shop.fetchedData
print(ary_res2)
# (2) productテーブルにレコードを追加する。
product.insert_product('Red Apple', '1')
session.commit()
3. MariaDB公式ライブラリを使う場合
MariaDB用に最適化されているので、MariaDBを使う場合はこちらの方が良いはず。
元が同じソフトウェアなので大きな差はない。
DBSession.pyの importだけ変えれば動く。(2025.04.14時点)
from logger_config import logger
import mariadb
from mariadb import Error
class DBSession:
def __init__(self, host, user, password, database):
try:
self.conn = mariadb.connect(
host=host,
user=user,
password=password,
database=database
)
self._cursor = self.conn.cursor(dictionary=True)
logger.info("DBSession 接続成功")
except Error as e:
logger.error(f"DBSession 接続エラー: {e}")
self.conn = None
self._cursor = None
raise
@property
def cursor(self):
if not self.conn or not self._cursor:
raise RuntimeError("DB接続またはカーソルが無効です")
return self._cursor
def commit(self):
if self.conn:
self.conn.commit()
def rollback(self):
if self.conn:
self.conn.rollback()
def close(self):
if self._cursor:
self._cursor.close()
if self.conn:
self.conn.close()
logger.info("DBSession 切断完了")
アクセス数(直近7日): ※試験運用中、BOT除外簡易実装済2026-04-19: 1回 2026-04-18: 0回 2026-04-17: 1回 2026-04-16: 0回 2026-04-15: 0回 2026-04-14: 0回 2026-04-13: 0回