【sqlite3】PythonでDB操作:トランザクション管理とロックレベルの詳細

概要

sqlite3モジュールを使用して、どのようにトランザクション管理をするのかをまとめた。 トランザクションモードの種類や、意味、使用方法について扱っている。

 

 

前提

DB環境構築~DBファイルの作成ができていること。

あわせて読みたい

概要 sqlite3モジュールを使用してDB更新する方法についてまとめた。 以下の操作について扱う。 ・テーブル作成/削除 ・レコード作成/更新/削除   前提 DB環境構築~DBファイルの作成ができていること。[…]

【sqlite3】PythonでDB操作:テーブルとレコードの作成、更新、削除方法

 

テーブル定義

以下のUserテーブルに対して操作を行う。


CREATE TABLE user ( 
  id INTEGER PRIMARY KEY, 
  name TEXT, 
  kana TEXT, 
  memo TEXT 
);

 

 

トランザクション制御

トランザクション開始によりDBをロックした状態で障害等が起きた場合、トランザクション開始前の状態に戻すrollback)。 正しくトランザクションの処理ができた場合、処理を確定するcommit)。

トランザクションのロックレベル

connectメソッドisolation_level属性を設定することで、トランザクションのロックレベルを設定可能。 ロックレベルは同時に複数のトランザクションが行われる際に、それらがどの程度互いに見えるか(つまり「分離」されているか)を制御する。

con = sqlite3.connect(sqlite_path, isolation_level=‘XXXX’)

 

ロックレベル 説明
None オートコミットモード。各SQL文が独立したトランザクションとして即座にコミットされる。
DEFERRED 新たなトランザクションが始まったときにロックは取得されず、データが実際に読み書きされるまでロックの取得は遅延される。
IMMEDIATE 新たなトランザクションが始まったときにすぐにロックが取得される。これにより他のデータベース接続からの書き込みがブロックされるが、読み取りは可能。
EXCLUSIVE 新たなトランザクションが始まったときにすぐにロックが取得される。これにより他のデータベース接続からの読み取りおよび書き込みがブロックされる。

 

トランザクションの開始タイミング

SQLiteでのトランザクションを開始するタイミングは、以下のいずれかで自動的に開始される

①connection.executeが実行されたとき
②BEGIN TRANSACTION文が実行されたとき
①の場合、ロックレベルがNoneモードだとトランザクション管理ができない
一方で②でトランザクションを明示的に開始すると、トランザクション内でのDB操作を一括で管理するためロックレベルがNoneモードでもトランザクション管理ができるようになる

 

 

ロックレベルによる挙動

None

オートコミットモード。
各SQL文が独立したトランザクションとして即座にコミットされる。

トランザクション開始タイミング①

 


import sqlite3
import os

def get_con():
    """ DB接続してconnectionオブジェクトを返却 """
    sqlite_path = os.path.join("db", "test.db")
    # NONEモードで接続
    con = sqlite3.connect(sqlite_path, isolation_level=None)
    return con

def show_user_count(cur):
    """ userテーブルのレコード数を表示する """
    # userテーブルのレコード数確認
    cur.execute("SELECT COUNT(*) FROM USER")
    print("レコード数:", cur.fetchone()[0])

def main():
    """ メイン実行関数 """
    con = get_con()
    cur = con.cursor()
    try:
        print("----トランザクション開始----")
        # レコード数確認
        show_user_count(cur)
        # 1件レコード追加
        # 即座にコミットされる
        cur.execute("INSERT INTO user (name, kana, memo) VALUES (?, ?, ?)", ("てすと", "テスト", "memo"))
        # レコード数確認
        show_user_count(cur)
        # 例外を発生させる(トランザクション中のエラー)
        print("----例外発生----")
        raise ValueError("例外を発生させる")
        # 到達しない
        con.commit()
    except ValueError:
        print("----ロールバック開始----")
        # ロールバック
        con.rollback()
        # レコード数確認
        cur = get_con().cursor()
        show_user_count(cur)
    finally:
        # DBとの接続切断
        con.close()

if __name__ == "__main__":
    main()
# --------------------------------------
#出力
#----トランザクション開始----
#レコード数: 4
#レコード数: 5
#----例外発生----
#----ロールバック開始----
#レコード数: 5

 

トランザクション開始タイミング②

 


import sqlite3
import os

def get_con():
    """ DB接続してconnectionオブジェクトを返却 """
    sqlite_path = os.path.join("db", "test.db")
    # NONEモードで接続
    con = sqlite3.connect(sqlite_path, isolation_level=None)
    return con

def show_user_count(cur):
    """ userテーブルのレコード数を表示する """
    # userテーブルのレコード数確認
    cur.execute("SELECT COUNT(*) FROM user")
    print("レコード数:", cur.fetchone()[0])

def main():
    """ メイン実行関数 """
    con = get_con()
    cur = con.cursor()
    try:
        print("----トランザクション開始----")
        cur.execute('BEGIN TRANSACTION;')
        # レコード数確認
        show_user_count(cur)
        # 1件レコード追加
        # 即座にコミットされない
        cur.execute("INSERT INTO user (name, kana, memo) VALUES (?, ?, ?)", ("てすと", "テスト", "memo"))
        # レコード数確認
        show_user_count(cur)
        # 例外を発生させる(トランザクション中のエラー)
        print("----例外発生----")
        raise ValueError("例外を発生させる")
        # 到達しない
        con.commit()
    except ValueError:
        print("----ロールバック開始----")
        # ロールバック
        con.rollback()
        # レコード数確認
        cur = get_con().cursor()
        show_user_count(cur)
    finally:
        # DBとの接続切断
        con.close()

if __name__ == "__main__":
    main()
# --------------------------------------
#出力
#----トランザクション開始----
#レコード数: 5
#レコード数: 6
#----例外発生----
#----ロールバック開始----
#レコード数: 5

 

DEFERRED

新たなトランザクションが始まったときにロックは取得されず、データが実際に読み書きされるまでロックの取得は遅延される。

トランザクション開始タイミング①

 


import sqlite3
import os

def get_con():
    """ DB接続してconnectionオブジェクトを返却 """
    sqlite_path = os.path.join("db", "test.db")
    # DEFERREDモードで接続
    con = sqlite3.connect(sqlite_path, isolation_level="DEFERRED")
    return con

def show_user_count(cur):
    """ userテーブルのレコード数を表示する """
    # userテーブルのレコード数確認
    cur.execute("SELECT COUNT(*) FROM user")
    print("レコード数:", cur.fetchone()[0])

def main():
    """ メイン実行関数 """
    con = get_con()
    cur = con.cursor()
    try:
        print("----トランザクション開始----")
        # レコード数確認
        show_user_count(cur)
        # 1件レコード追加
        cur.execute("INSERT INTO user (name, kana, memo) VALUES (?, ?, ?)", ("てすと", "テスト", "memo"))
        # レコード数確認
        show_user_count(cur)
        # 例外を発生させる(トランザクション中のエラー)
        print("----例外発生----")
        raise ValueError("例外を発生させる")
        # 到達しない
        con.commit()
    except ValueError:
        print("----ロールバック開始----")
        # ロールバック
        con.rollback()
        # レコード数確認
        cur = get_con().cursor()
        show_user_count(cur)
    finally:
        # DBとの接続切断
        con.close()

if __name__ == "__main__":
    main()
# --------------------------------------
#出力
#----トランザクション開始----
#レコード数: 5
#レコード数: 6
#----例外発生----
#----ロールバック開始----
#レコード数: 5

 

トランザクション開始タイミング②

 


import sqlite3
import os

def get_con():
    """ DB接続してconnectionオブジェクトを返却 """
    sqlite_path = os.path.join("db", "test.db")
    # DEFERREDモードで接続
    con = sqlite3.connect(sqlite_path, isolation_level="DEFERRED")
    return con

def show_user_count(cur):
    """ userテーブルのレコード数を表示する """
    # userテーブルのレコード数確認
    cur.execute("SELECT COUNT(*) FROM user")
    print("レコード数:", cur.fetchone()[0])

def main():
    """ メイン実行関数 """
    con = get_con()
    cur = con.cursor()
    try:
        print("----トランザクション開始----")
        cur.execute('BEGIN TRANSACTION;')
        # レコード数確認
        show_user_count(cur)
        # 1件レコード追加
        cur.execute("INSERT INTO user (name, kana, memo) VALUES (?, ?, ?)", ("てすと", "テスト", "memo"))
        # レコード数確認
        show_user_count(cur)
        # 例外を発生させる(トランザクション中のエラー)
        print("----例外発生----")
        raise ValueError("例外を発生させる")
        # 到達しない
        con.commit()
    except ValueError:
        print("----ロールバック開始----")
        # ロールバック
        con.rollback()
        # レコード数確認
        cur = get_con().cursor()
        show_user_count(cur)
    finally:
        # DBとの接続切断
        con.close()

if __name__ == "__main__":
    main()
# --------------------------------------
#出力
#----トランザクション開始----
#レコード数: 5
#レコード数: 6
#----例外発生----
#----ロールバック開始----
#レコード数: 5

 

IMMEDIATE

新たなトランザクションが始まったときにすぐにロックが取得される。 これにより他のデータベース接続からの書き込みがブロックされるが、読み取りは可能。

トランザクション開始タイミング①

 


import sqlite3
import os

def get_con():
    """ DB接続してconnectionオブジェクトを返却 """
    sqlite_path = os.path.join("db", "test.db")
    # IMMEDIATEモードで接続
    con = sqlite3.connect(sqlite_path, isolation_level="IMMEDIATE")
    return con

def show_user_count(cur):
    """ userテーブルのレコード数を表示する """
    # userテーブルのレコード数確認
    cur.execute("SELECT COUNT(*) FROM user")
    print("レコード数:", cur.fetchone()[0])

def main():
    """ メイン実行関数 """
    con = get_con()
    cur = con.cursor()
    try:
        print("----トランザクション開始----")
        # レコード数確認
        show_user_count(cur)
        # 1件レコード追加
        cur.execute("INSERT INTO user (name, kana, memo) VALUES (?, ?, ?)", ("てすと", "テスト", "memo"))
        # レコード数確認
        show_user_count(cur)
        # 例外を発生させる(トランザクション中のエラー)
        print("----例外発生----")
        raise ValueError("例外を発生させる")
        # 到達しない
        con.commit()
    except ValueError:
        print("----ロールバック開始----")
        # ロールバック
        con.rollback()
        # レコード数確認
        cur = get_con().cursor()
        show_user_count(cur)
    finally:
        # DBとの接続切断
        con.close()

if __name__ == "__main__":
    main()
# --------------------------------------
#出力
#----トランザクション開始----
#レコード数: 5
#レコード数: 6
#----例外発生----
#----ロールバック開始----
#レコード数: 5

 

トランザクション開始タイミング②

 


import sqlite3
import os

def get_con():
    """ DB接続してconnectionオブジェクトを返却 """
    sqlite_path = os.path.join("db", "test.db")
    # IMMEDIATEモードで接続
    con = sqlite3.connect(sqlite_path, isolation_level="IMMEDIATE")
    return con

def show_user_count(cur):
    """ userテーブルのレコード数を表示する """
    # userテーブルのレコード数確認
    cur.execute("SELECT COUNT(*) FROM user")
    print("レコード数:", cur.fetchone()[0])

def main():
    """ メイン実行関数 """
    con = get_con()
    cur = con.cursor()
    try:
        print("----トランザクション開始----")
        cur.execute('BEGIN TRANSACTION;')
        # レコード数確認
        show_user_count(cur)
        # 1件レコード追加
        cur.execute("INSERT INTO user (name, kana, memo) VALUES (?, ?, ?)", ("てすと", "テスト", "memo"))
        # レコード数確認
        show_user_count(cur)
        # 例外を発生させる(トランザクション中のエラー)
        print("----例外発生----")
        raise ValueError("例外を発生させる")
        # 到達しない
        con.commit()
    except ValueError:
        print("----ロールバック開始----")
        # ロールバック
        con.rollback()
        # レコード数確認
        cur = get_con().cursor()
        show_user_count(cur)
    finally:
        # DBとの接続切断
        con.close()

if __name__ == "__main__":
    main()
# --------------------------------------
#出力
#----トランザクション開始----
#レコード数: 5
#レコード数: 6
#----例外発生----
#----ロールバック開始----
#レコード数: 5

 

EXCLUSIVE

新たなトランザクションが始まったときにすぐにロックが取得される。 これにより他のデータベース接続からの読み取りおよび書き込みがブロックされる。

トランザクション開始タイミング①

 


import sqlite3
import os

def get_con():
    """ DB接続してconnectionオブジェクトを返却 """
    sqlite_path = os.path.join("db", "test.db")
    # EXCLUSIVEモードで接続
    con = sqlite3.connect(sqlite_path, isolation_level="EXCLUSIVE")
    return con

def show_user_count(cur):
    """ userテーブルのレコード数を表示する """
    # userテーブルのレコード数確認
    cur.execute("SELECT COUNT(*) FROM user")
    print("レコード数:", cur.fetchone()[0])

def main():
    """ メイン実行関数 """
    con = get_con()
    cur = con.cursor()
    try:
        print("----トランザクション開始----")
        # レコード数確認
        show_user_count(cur)
        # 1件レコード追加
        cur.execute("INSERT INTO user (name, kana, memo) VALUES (?, ?, ?)", ("てすと", "テスト", "memo"))
        # レコード数確認
        show_user_count(cur)
        # 例外を発生させる(トランザクション中のエラー)
        print("----例外発生----")
        raise ValueError("例外を発生させる")
        # 到達しない
        con.commit()
    except ValueError:
        print("----ロールバック開始----")
        # ロールバック
        con.rollback()
        # レコード数確認
        cur = get_con().cursor()
        show_user_count(cur)
    finally:
        # DBとの接続切断
        con.close()

if __name__ == "__main__":
    main()
# --------------------------------------
#出力
#----トランザクション開始----
#レコード数: 5
#レコード数: 6
#----例外発生----
#----ロールバック開始----
#レコード数: 5

 

トランザクション開始タイミング②


import sqlite3
import os

def get_con():
    """ DB接続してconnectionオブジェクトを返却 """
    sqlite_path = os.path.join("db", "test.db")
    # EXCLUSIVEモードで接続
    con = sqlite3.connect(sqlite_path, isolation_level="EXCLUSIVE")
    return con

def show_user_count(cur):
    """ userテーブルのレコード数を表示する """
    # userテーブルのレコード数確認
    cur.execute("SELECT COUNT(*) FROM user")
    print("レコード数:", cur.fetchone()[0])

def main():
    """ メイン実行関数 """
    con = get_con()
    cur = con.cursor()
    try:
        print("----トランザクション開始----")
        cur.execute('BEGIN TRANSACTION;')
        # レコード数確認
        show_user_count(cur)
        # 1件レコード追加
        cur.execute("INSERT INTO user (name, kana, memo) VALUES (?, ?, ?)", ("てすと", "テスト", "memo"))
        # レコード数確認
        show_user_count(cur)
        # 例外を発生させる(トランザクション中のエラー)
        print("----例外発生----")
        raise ValueError("例外を発生させる")
        # 到達しない
        con.commit()
    except ValueError:
        print("----ロールバック開始----")
        # ロールバック
        con.rollback()
        # レコード数確認
        cur = get_con().cursor()
        show_user_count(cur)
    finally:
        # DBとの接続切断
        con.close()

if __name__ == "__main__":
    main()
# --------------------------------------
#出力
#----トランザクション開始----
#レコード数: 5
#レコード数: 6
#----例外発生----
#----ロールバック開始----
#レコード数: 5

 

その他補足

isolation_levelを指定しない場合、デフォルトでNoneになるという説明ページもあったがNoneになっていなかった。
即座にコミットがされておらず、isolation_level属性を参照してもNoneになっていなかった。

isolation_levelを指定しないでDB更新処理(読み取り以外)を行っていると、「database is locked」が発生するケースがあった。
その場合、jupyter labを「File>Shut Down」して再度起動することでデータベースのロックを解除できた。

スポンサーリンク