libs.data.initialization

libs/data/initialization.py

  1"""
  2libs/data/initialization.py
  3"""
  4
  5import json
  6import logging
  7import os
  8from importlib.resources import files
  9from typing import TYPE_CHECKING, cast
 10
 11import libs.global_value as g
 12from libs.utils import dbutil
 13
 14if TYPE_CHECKING:
 15    from configparser import ConfigParser
 16
 17    from libs.types import GradeTableDict
 18
 19
 20def initialization_resultdb() -> None:
 21    """DB初期化 & マイグレーション"""
 22
 23    resultdb = dbutil.connection(g.cfg.setting.database_file)
 24    memdb = dbutil.connection(":memory:")
 25
 26    table_list = {
 27        "member": "CREATE_TABLE_MEMBER",  # メンバー登録テーブル
 28        "alias": "CREATE_TABLE_ALIAS",  # 別名定義テーブル
 29        "team": "CREATE_TABLE_TEAM",  # チーム定義テーブル
 30        "result": "CREATE_TABLE_RESULT",  # データ取り込みテーブル
 31        "remarks": "CREATE_TABLE_REMARKS",  # メモ格納テーブル
 32        "words": "CREATE_TABLE_WORDS",  # レギュレーションワード登録テーブル
 33    }
 34    for table_name, keyword in table_list.items():
 35        # テーブル作成
 36        resultdb.execute(dbutil.query(keyword))
 37        memdb.execute(dbutil.query(keyword))
 38
 39        # スキーマ比較
 40        actual_cols = dbutil.table_info(resultdb, table_name)
 41        expected_cols = dbutil.table_info(memdb, table_name)
 42        for col_name, col_data in expected_cols.items():
 43            if col_name not in actual_cols:
 44                if col_data["notnull"] and col_data["dflt_value"] is None:  # NOT NULL かつ DEFAULT 未指定だと追加できないので回避
 45                    logging.warning("migration skip: table=%s, column=%s, reason='NOT NULL' and 'DEFAULT' unspecified", table_name, col_name)
 46                    continue
 47                col_type = col_data["type"]
 48                notnull = "NOT NULL" if col_data["notnull"] else ""
 49                dflt = f"DEFAULT {col_data["dflt_value"]}" if col_data["dflt_value"] is not None else ""
 50                resultdb.execute(f"alter table {table_name} add column {col_name} {col_type} {notnull} {dflt};")
 51                logging.info("migration: table=%s, column=%s", table_name, col_name)
 52
 53    # wordsテーブル情報読み込み(regulations)
 54    if cast("ConfigParser", getattr(g.cfg, "_parser")).has_section("regulations"):
 55        resultdb.execute("delete from words;")
 56        for k, v in cast("ConfigParser", getattr(g.cfg, "_parser")).items("regulations"):
 57            match k:
 58                case "undefined":
 59                    g.cfg.undefined_word = int(v)
 60                case "type0" | "yakuman":
 61                    words_list = {x.strip() for x in v.split(",")}
 62                    for word in words_list:
 63                        resultdb.execute(
 64                            "insert into words(word, type, ex_point) values (?, 0, NULL);",
 65                            (word,)
 66                        )
 67                    logging.debug("regulations table(type0): %s", words_list)
 68                case "type2":
 69                    words_list = {x.strip() for x in v.split(",")}
 70                    for word in words_list:
 71                        resultdb.execute(
 72                            "insert into words(word, type, ex_point) values (?, 2, NULL);",
 73                            (word,)
 74                        )
 75                    logging.debug("regulations table(type2): %s", words_list)
 76                case _:
 77                    word = k.strip()
 78                    ex_point = int(v)
 79                    resultdb.execute(
 80                        "insert into words(word, type, ex_point) values (?, 1, ?);",
 81                        (word, ex_point,)
 82                    )
 83                    logging.debug("regulations table(type1): %s, %s", word, ex_point)
 84
 85    # VIEW
 86    rows = resultdb.execute("select name from sqlite_master where type = 'view';")
 87    for row in rows.fetchall():
 88        resultdb.execute(f"drop view if exists '{row["name"]}';")
 89    resultdb.execute(dbutil.query("CREATE_VIEW_INDIVIDUAL_RESULTS"))
 90    resultdb.execute(dbutil.query("CREATE_VIEW_GAME_RESULTS"))
 91    resultdb.execute(dbutil.query("CREATE_VIEW_GAME_INFO"))
 92    resultdb.execute(dbutil.query("CREATE_VIEW_REGULATIONS").format(undefined_word=g.cfg.undefined_word))
 93
 94    # ゲスト設定チェック
 95    ret = resultdb.execute("select * from member where id=0;")
 96    data = ret.fetchall()
 97
 98    if len(data) == 0:
 99        logging.info("ゲスト設定: %s", g.cfg.member.guest_name)
100        sql = "insert into member (id, name) values (0, ?);"
101        resultdb.execute(sql, (g.cfg.member.guest_name,))
102    elif data[0][1] != g.cfg.member.guest_name:
103        logging.info("ゲスト修正: %s -> %s", data[0][1], g.cfg.member.guest_name)
104        sql = "update member set name=? where id=0;"
105        resultdb.execute(sql, (g.cfg.member.guest_name,))
106
107    resultdb.commit()
108    resultdb.close()
109    memdb.close()
110    read_grade_table()
111
112
113def read_grade_table() -> None:
114    """段位テーブル読み込み"""
115
116    # テーブル選択
117    match table_name := g.cfg.badge.grade.table_name:
118        case "":
119            return
120        case "mahjongsoul" | "雀魂":
121            tbl_file = str(files("files.gradetable").joinpath("mahjongsoul.json"))
122        case "tenho" | "天鳳":
123            tbl_file = str(files("files.gradetable").joinpath("tenho.json"))
124        case _:
125            tbl_file = os.path.join(g.cfg.config_dir, table_name)
126            if not os.path.isfile(tbl_file):
127                return
128
129    with open(tbl_file, encoding="utf-8") as f:
130        try:
131            tbl_data: "GradeTableDict" = json.load(f)
132        except json.JSONDecodeError as err:
133            logging.error(err)
134            return
135
136    if not isinstance(tbl_list := tbl_data.get("table"), list):
137        logging.error("undefined key [table]")
138        return
139
140    for x in tbl_list:
141        if isinstance(x, dict):
142            x["demote"] = x.get("demote", True)
143            if {"grade", "point", "acquisition", "demote"} == set(x.keys()):
144                if not isinstance(x.get("grade"), str):
145                    tbl_data = {}
146                    break
147                point = x.get("point")
148                if not isinstance(point, list) or len(point) != 2:
149                    logging.error("point is not match")
150                    tbl_data = {}
151                    break
152                acquisition = x.get("acquisition")
153                if not isinstance(acquisition, list) or len(acquisition) != 4:
154                    logging.error("acquisition is not match")
155                    tbl_data = {}
156                    break
157            else:
158                logging.error("undefined key [grade, point, acquisition]")
159                tbl_data = {}
160                break
161        else:
162            tbl_data = {}
163            break
164
165    g.cfg.badge.grade.table = tbl_data
def initialization_resultdb() -> None:
 21def initialization_resultdb() -> None:
 22    """DB初期化 & マイグレーション"""
 23
 24    resultdb = dbutil.connection(g.cfg.setting.database_file)
 25    memdb = dbutil.connection(":memory:")
 26
 27    table_list = {
 28        "member": "CREATE_TABLE_MEMBER",  # メンバー登録テーブル
 29        "alias": "CREATE_TABLE_ALIAS",  # 別名定義テーブル
 30        "team": "CREATE_TABLE_TEAM",  # チーム定義テーブル
 31        "result": "CREATE_TABLE_RESULT",  # データ取り込みテーブル
 32        "remarks": "CREATE_TABLE_REMARKS",  # メモ格納テーブル
 33        "words": "CREATE_TABLE_WORDS",  # レギュレーションワード登録テーブル
 34    }
 35    for table_name, keyword in table_list.items():
 36        # テーブル作成
 37        resultdb.execute(dbutil.query(keyword))
 38        memdb.execute(dbutil.query(keyword))
 39
 40        # スキーマ比較
 41        actual_cols = dbutil.table_info(resultdb, table_name)
 42        expected_cols = dbutil.table_info(memdb, table_name)
 43        for col_name, col_data in expected_cols.items():
 44            if col_name not in actual_cols:
 45                if col_data["notnull"] and col_data["dflt_value"] is None:  # NOT NULL かつ DEFAULT 未指定だと追加できないので回避
 46                    logging.warning("migration skip: table=%s, column=%s, reason='NOT NULL' and 'DEFAULT' unspecified", table_name, col_name)
 47                    continue
 48                col_type = col_data["type"]
 49                notnull = "NOT NULL" if col_data["notnull"] else ""
 50                dflt = f"DEFAULT {col_data["dflt_value"]}" if col_data["dflt_value"] is not None else ""
 51                resultdb.execute(f"alter table {table_name} add column {col_name} {col_type} {notnull} {dflt};")
 52                logging.info("migration: table=%s, column=%s", table_name, col_name)
 53
 54    # wordsテーブル情報読み込み(regulations)
 55    if cast("ConfigParser", getattr(g.cfg, "_parser")).has_section("regulations"):
 56        resultdb.execute("delete from words;")
 57        for k, v in cast("ConfigParser", getattr(g.cfg, "_parser")).items("regulations"):
 58            match k:
 59                case "undefined":
 60                    g.cfg.undefined_word = int(v)
 61                case "type0" | "yakuman":
 62                    words_list = {x.strip() for x in v.split(",")}
 63                    for word in words_list:
 64                        resultdb.execute(
 65                            "insert into words(word, type, ex_point) values (?, 0, NULL);",
 66                            (word,)
 67                        )
 68                    logging.debug("regulations table(type0): %s", words_list)
 69                case "type2":
 70                    words_list = {x.strip() for x in v.split(",")}
 71                    for word in words_list:
 72                        resultdb.execute(
 73                            "insert into words(word, type, ex_point) values (?, 2, NULL);",
 74                            (word,)
 75                        )
 76                    logging.debug("regulations table(type2): %s", words_list)
 77                case _:
 78                    word = k.strip()
 79                    ex_point = int(v)
 80                    resultdb.execute(
 81                        "insert into words(word, type, ex_point) values (?, 1, ?);",
 82                        (word, ex_point,)
 83                    )
 84                    logging.debug("regulations table(type1): %s, %s", word, ex_point)
 85
 86    # VIEW
 87    rows = resultdb.execute("select name from sqlite_master where type = 'view';")
 88    for row in rows.fetchall():
 89        resultdb.execute(f"drop view if exists '{row["name"]}';")
 90    resultdb.execute(dbutil.query("CREATE_VIEW_INDIVIDUAL_RESULTS"))
 91    resultdb.execute(dbutil.query("CREATE_VIEW_GAME_RESULTS"))
 92    resultdb.execute(dbutil.query("CREATE_VIEW_GAME_INFO"))
 93    resultdb.execute(dbutil.query("CREATE_VIEW_REGULATIONS").format(undefined_word=g.cfg.undefined_word))
 94
 95    # ゲスト設定チェック
 96    ret = resultdb.execute("select * from member where id=0;")
 97    data = ret.fetchall()
 98
 99    if len(data) == 0:
100        logging.info("ゲスト設定: %s", g.cfg.member.guest_name)
101        sql = "insert into member (id, name) values (0, ?);"
102        resultdb.execute(sql, (g.cfg.member.guest_name,))
103    elif data[0][1] != g.cfg.member.guest_name:
104        logging.info("ゲスト修正: %s -> %s", data[0][1], g.cfg.member.guest_name)
105        sql = "update member set name=? where id=0;"
106        resultdb.execute(sql, (g.cfg.member.guest_name,))
107
108    resultdb.commit()
109    resultdb.close()
110    memdb.close()
111    read_grade_table()

DB初期化 & マイグレーション

def read_grade_table() -> None:
114def read_grade_table() -> None:
115    """段位テーブル読み込み"""
116
117    # テーブル選択
118    match table_name := g.cfg.badge.grade.table_name:
119        case "":
120            return
121        case "mahjongsoul" | "雀魂":
122            tbl_file = str(files("files.gradetable").joinpath("mahjongsoul.json"))
123        case "tenho" | "天鳳":
124            tbl_file = str(files("files.gradetable").joinpath("tenho.json"))
125        case _:
126            tbl_file = os.path.join(g.cfg.config_dir, table_name)
127            if not os.path.isfile(tbl_file):
128                return
129
130    with open(tbl_file, encoding="utf-8") as f:
131        try:
132            tbl_data: "GradeTableDict" = json.load(f)
133        except json.JSONDecodeError as err:
134            logging.error(err)
135            return
136
137    if not isinstance(tbl_list := tbl_data.get("table"), list):
138        logging.error("undefined key [table]")
139        return
140
141    for x in tbl_list:
142        if isinstance(x, dict):
143            x["demote"] = x.get("demote", True)
144            if {"grade", "point", "acquisition", "demote"} == set(x.keys()):
145                if not isinstance(x.get("grade"), str):
146                    tbl_data = {}
147                    break
148                point = x.get("point")
149                if not isinstance(point, list) or len(point) != 2:
150                    logging.error("point is not match")
151                    tbl_data = {}
152                    break
153                acquisition = x.get("acquisition")
154                if not isinstance(acquisition, list) or len(acquisition) != 4:
155                    logging.error("acquisition is not match")
156                    tbl_data = {}
157                    break
158            else:
159                logging.error("undefined key [grade, point, acquisition]")
160                tbl_data = {}
161                break
162        else:
163            tbl_data = {}
164            break
165
166    g.cfg.badge.grade.table = tbl_data

段位テーブル読み込み