libs.bootstrap.initialization

libs/bootstrap/initialization.py

  1"""
  2libs/bootstrap/initialization.py
  3"""
  4
  5import json
  6import logging
  7import os
  8from importlib.resources import files
  9from pathlib import Path
 10from typing import TYPE_CHECKING, Any, Union
 11
 12import libs.global_value as g
 13from libs.utils import dbutil
 14
 15if TYPE_CHECKING:
 16    from libs.types import GradeTableDict
 17
 18
 19def main(init_db: bool) -> None:
 20    """
 21    初期化処理
 22
 23    Args:
 24        init_db (bool): setup処理の実行有無
 25
 26    """
 27    if init_db:
 28        setup_resultdb(g.cfg.setting.database_file)  # DB初期化
 29
 30    setup_grade_table()  # 段位テーブル取り込み
 31    setup_rule_data()  # ルールデータ取り込み
 32    setup_regulations(g.cfg.setting.database_file)  # レギュレーション設定取り込み
 33
 34
 35def setup_resultdb(database_file: Union[str, Path]) -> None:
 36    """
 37    DB初期化 & マイグレーション
 38
 39    Args:
 40        database_file (Union[str, Path]): データベース接続パス
 41
 42    """
 43    resultdb = dbutil.connection(database_file)
 44    memdb = dbutil.connection(":memory:")
 45
 46    # 旧テーブル削除
 47    resultdb.execute("drop table if exists rule;")
 48    resultdb.execute("drop table if exists words;")
 49
 50    table_list = {
 51        "member": "CREATE_TABLE_MEMBER",  # メンバー登録テーブル
 52        "alias": "CREATE_TABLE_ALIAS",  # 別名定義テーブル
 53        "team": "CREATE_TABLE_TEAM",  # チーム定義テーブル
 54        "result": "CREATE_TABLE_RESULT",  # データ取り込みテーブル
 55        "remarks": "CREATE_TABLE_REMARKS",  # メモ格納テーブル
 56        "words": "CREATE_TABLE_WORDS",  # レギュレーションワード登録テーブル
 57        "rule": "CREATE_TABLE_RULE",  # ルールセット登録テーブル
 58    }
 59    for table_name, keyword in table_list.items():
 60        # テーブル作成
 61        resultdb.execute(dbutil.query(keyword))
 62        memdb.execute(dbutil.query(keyword))
 63
 64        # スキーマ比較
 65        actual_cols = dbutil.table_info(resultdb, table_name)
 66        expected_cols = dbutil.table_info(memdb, table_name)
 67        for col_name, col_data in expected_cols.items():
 68            if col_name not in actual_cols:
 69                # NOT NULL かつ DEFAULT 未指定だと追加できないので回避
 70                if col_data["notnull"] and col_data["dflt_value"] is None:
 71                    logging.warning(
 72                        "migration skip: table=%s, column=%s, reason='NOT NULL' and 'DEFAULT' unspecified",
 73                        table_name,
 74                        col_name,
 75                    )
 76                    continue
 77                col_type = col_data["type"]
 78                notnull = "NOT NULL" if col_data["notnull"] else ""
 79                dflt = f"DEFAULT {col_data['dflt_value']}" if col_data["dflt_value"] is not None else ""
 80                resultdb.execute(f"alter table {table_name} add column {col_name} {col_type} {notnull} {dflt};")
 81                logging.info("migration: table=%s, column=%s", table_name, col_name)
 82
 83    # 追加カラムデータ更新
 84    resultdb.execute("update result set mode = 4 where mode isnull and p4_name != '' and p4_str != '';")
 85
 86    # VIEW
 87    rows = resultdb.execute("select name from sqlite_master where type = 'view';")
 88    for row in rows.fetchall():
 89        resultdb.execute(f"drop view if exists '{row['name']}';")
 90    resultdb.execute(dbutil.query("CREATE_VIEW_INDIVIDUAL_RESULTS").replace("<time_adjust>", str(g.cfg.setting.time_adjust)))
 91    resultdb.execute(dbutil.query("CREATE_VIEW_GAME_RESULTS").replace("<time_adjust>", str(g.cfg.setting.time_adjust)))
 92    resultdb.execute(dbutil.query("CREATE_VIEW_GAME_INFO"))
 93    resultdb.execute(dbutil.query("CREATE_VIEW_REGULATIONS"))
 94
 95    # INDEX
 96    resultdb.execute(dbutil.query("CREATE_INDEX"))
 97
 98    # ゲスト設定チェック
 99    ret = resultdb.execute("select * from member where id=0;")
100    data = ret.fetchall()
101
102    if len(data) == 0:
103        logging.info("ゲスト設定: %s", g.cfg.member.guest_name)
104        sql = "insert into member (id, name) values (0, ?);"
105        resultdb.execute(sql, (g.cfg.member.guest_name,))
106    elif data[0][1] != g.cfg.member.guest_name:
107        logging.warning("ゲスト修正: %s -> %s", data[0][1], g.cfg.member.guest_name)
108        sql = "update member set name=? where id=0;"
109        resultdb.execute(sql, (g.cfg.member.guest_name,))
110
111    resultdb.commit()
112    resultdb.close()
113    memdb.close()
114
115
116def setup_rule_data() -> None:
117    """ルールデータ取り込み"""
118
119    # メイン設定ファイルから取り込み
120    if g.cfg.main_parser.has_section("mahjong"):
121        section_data = dict(g.cfg.main_parser["mahjong"])
122        if rule_version := section_data.get("rule_version"):
123            g.cfg.rule.data_set("mahjong", rule_data=section_data)
124
125    # ルール設定ファイル探索 & 取り込み
126    if g.cfg.setting.rule_config:
127        if not g.cfg.setting.rule_config.exists():
128            if (new_conf := g.cfg.config_dir / str(g.cfg.setting.rule_config)) and new_conf.exists():
129                g.cfg.setting.rule_config = new_conf
130            elif (new_conf := g.cfg.script_dir / str(g.cfg.setting.rule_config)) and new_conf.exists():
131                g.cfg.setting.rule_config = new_conf
132            elif (new_conf := Path.cwd() / str(g.cfg.setting.rule_config)) and new_conf.exists():
133                g.cfg.setting.rule_config = new_conf
134            else:
135                g.cfg.setting.rule_config = None
136        if g.cfg.setting.rule_config:
137            g.cfg.rule.read_config(g.cfg.setting.rule_config)
138
139    # ルールセットがなければプリセットから取り込み
140    if not g.cfg.rule.rule_list:
141        if (new_conf := g.cfg.config_dir / "files/default_rule.ini") and new_conf.exists():
142            g.cfg.setting.rule_config = new_conf
143        elif (new_conf := g.cfg.script_dir / "files/default_rule.ini") and new_conf.exists():
144            g.cfg.setting.rule_config = new_conf
145
146        if g.cfg.setting.rule_config:
147            g.cfg.rule.read_config(g.cfg.setting.rule_config)
148        else:
149            raise TypeError("Preset not found.")
150
151    # デフォルトルール定義
152    if not g.cfg.setting.default_rule:
153        g.cfg.setting.default_rule = g.cfg.rule.rule_list[0]
154
155    # マッピング生成
156    for rule_version in g.cfg.rule.rule_list:
157        for keyword in g.cfg.rule.keywords(rule_version):
158            g.cfg.rule.keyword_mapping.update({keyword: rule_version})
159
160    if g.cfg.main_parser.has_section("keyword_mapping"):
161        for keyword, rule_version in dict(g.cfg.main_parser["keyword_mapping"]).items():
162            if not rule_version:
163                g.cfg.rule.keyword_mapping.update({keyword: g.cfg.setting.default_rule})
164            elif rule_version in g.cfg.rule.data:
165                g.cfg.rule.keyword_mapping.update({keyword: rule_version})
166
167    g.cfg.rule.status_update(g.params.placeholder())
168    g.cfg.rule.remarks_words_update(g.cfg.setting.remarks_suffix)
169    g.cfg.rule.register_to_database()
170
171
172def setup_regulations(database_file: Union[str, Path]) -> None:
173    """
174    レギュレーション設定取り込み
175
176    Args:
177        database_file (Union[str, Path]): データベース接続パス
178
179    """
180
181    def _db_set() -> None:
182        params: dict[str, Any] = {}
183        for k, v in parser.items(section):
184            match k:
185                case "yakuman_list":
186                    words_list = {x.strip() for x in v.split(",")}
187                    for word in words_list:
188                        params = {"word": word, "type": 0, "ex_point": None, "rule_version": rule}
189                        resultdb.execute(dbutil.query("WORDS_INSERT"), params)
190                    logging.debug("regulations table(type0): %s", words_list)
191                case "word_list":
192                    words_list = {x.strip() for x in v.split(",")}
193                    for word in words_list:
194                        params = {"word": word, "type": 1, "ex_point": None, "rule_version": rule}
195                        resultdb.execute(dbutil.query("WORDS_INSERT"), params)
196                    logging.debug("regulations table(type1): %s", words_list)
197                case _:
198                    params = {"word": k.strip(), "type": regulation_type, "ex_point": int(v), "rule_version": rule}
199                    resultdb.execute(dbutil.query("WORDS_INSERT"), params)
200                    logging.debug("regulations table(type%s): %s, %s", regulation_type, params["word"], params["ex_point"])
201
202    resultdb = dbutil.connection(database_file)
203    resultdb.execute("delete from words;")
204
205    for rule in g.cfg.rule.rule_list:
206        # 個人レギュレーション
207        regulation_type = 2
208        section_patterns = [
209            (g.cfg.rule.config, f"{rule}_regulations"),
210            (g.cfg.rule.config, f"regulations_{rule}"),
211            (g.cfg.main_parser, f"{rule}_regulations"),
212            (g.cfg.main_parser, f"regulations_{rule}"),
213            (g.cfg.main_parser, "regulations"),
214        ]
215        for parser, section in section_patterns:
216            if section in parser.sections():
217                _db_set()
218                break
219
220        # チームレギュレーション
221        regulation_type = 3
222        section_patterns = [
223            (g.cfg.rule.config, f"{rule}_regulations_team"),
224            (g.cfg.rule.config, f"regulations_team_{rule}"),
225            (g.cfg.main_parser, f"{rule}_regulations_team"),
226            (g.cfg.main_parser, f"regulations_team_{rule}"),
227            (g.cfg.main_parser, "regulations_team"),
228        ]
229        for parser, section in section_patterns:
230            if section in parser.sections():
231                _db_set()
232                break
233
234    resultdb.commit()
235    resultdb.close()
236
237
238def setup_grade_table() -> None:
239    """段位テーブル取り込み"""
240    # テーブル選択
241    match table_name := g.cfg.badge.grade.table_name:
242        case "":
243            return
244        case "mahjongsoul" | "雀魂":
245            tbl_file = str(files("files.gradetable").joinpath("mahjongsoul.json"))
246        case "tenho" | "天鳳":
247            tbl_file = str(files("files.gradetable").joinpath("tenho.json"))
248        case _:
249            tbl_file = os.path.join(g.cfg.config_dir, table_name)
250            if not os.path.isfile(tbl_file):
251                return
252
253    with open(tbl_file, encoding="utf-8") as f:
254        try:
255            tbl_data: "GradeTableDict" = json.load(f)
256        except json.JSONDecodeError as err:
257            logging.warning("JSONDecodeError: %s", err)
258            return
259
260    if not isinstance(tbl_list := tbl_data.get("table"), list):
261        logging.warning("undefined key [table]")
262        return
263
264    for x in tbl_list:
265        if isinstance(x, dict):
266            x["demote"] = x.get("demote", True)
267            if {"grade", "point", "acquisition", "demote"} == set(x.keys()):
268                if not isinstance(x.get("grade"), str):
269                    tbl_data = {}
270                    break
271                point = x.get("point")
272                if not isinstance(point, list) or len(point) != 2:
273                    logging.warning("point is not match")
274                    tbl_data = {}
275                    break
276                acquisition = x.get("acquisition")
277                if not isinstance(acquisition, list) or len(acquisition) != 4:
278                    logging.warning("acquisition is not match")
279                    tbl_data = {}
280                    break
281            else:
282                logging.warning("undefined key [grade, point, acquisition]")
283                tbl_data = {}
284                break
285        else:
286            tbl_data = {}
287            break
288
289    g.cfg.badge.grade.table = tbl_data
def main(init_db: bool) -> None:
20def main(init_db: bool) -> None:
21    """
22    初期化処理
23
24    Args:
25        init_db (bool): setup処理の実行有無
26
27    """
28    if init_db:
29        setup_resultdb(g.cfg.setting.database_file)  # DB初期化
30
31    setup_grade_table()  # 段位テーブル取り込み
32    setup_rule_data()  # ルールデータ取り込み
33    setup_regulations(g.cfg.setting.database_file)  # レギュレーション設定取り込み

初期化処理

Arguments:
  • init_db (bool): setup処理の実行有無
def setup_resultdb(database_file: str | pathlib.Path) -> None:
 36def setup_resultdb(database_file: Union[str, Path]) -> None:
 37    """
 38    DB初期化 & マイグレーション
 39
 40    Args:
 41        database_file (Union[str, Path]): データベース接続パス
 42
 43    """
 44    resultdb = dbutil.connection(database_file)
 45    memdb = dbutil.connection(":memory:")
 46
 47    # 旧テーブル削除
 48    resultdb.execute("drop table if exists rule;")
 49    resultdb.execute("drop table if exists words;")
 50
 51    table_list = {
 52        "member": "CREATE_TABLE_MEMBER",  # メンバー登録テーブル
 53        "alias": "CREATE_TABLE_ALIAS",  # 別名定義テーブル
 54        "team": "CREATE_TABLE_TEAM",  # チーム定義テーブル
 55        "result": "CREATE_TABLE_RESULT",  # データ取り込みテーブル
 56        "remarks": "CREATE_TABLE_REMARKS",  # メモ格納テーブル
 57        "words": "CREATE_TABLE_WORDS",  # レギュレーションワード登録テーブル
 58        "rule": "CREATE_TABLE_RULE",  # ルールセット登録テーブル
 59    }
 60    for table_name, keyword in table_list.items():
 61        # テーブル作成
 62        resultdb.execute(dbutil.query(keyword))
 63        memdb.execute(dbutil.query(keyword))
 64
 65        # スキーマ比較
 66        actual_cols = dbutil.table_info(resultdb, table_name)
 67        expected_cols = dbutil.table_info(memdb, table_name)
 68        for col_name, col_data in expected_cols.items():
 69            if col_name not in actual_cols:
 70                # NOT NULL かつ DEFAULT 未指定だと追加できないので回避
 71                if col_data["notnull"] and col_data["dflt_value"] is None:
 72                    logging.warning(
 73                        "migration skip: table=%s, column=%s, reason='NOT NULL' and 'DEFAULT' unspecified",
 74                        table_name,
 75                        col_name,
 76                    )
 77                    continue
 78                col_type = col_data["type"]
 79                notnull = "NOT NULL" if col_data["notnull"] else ""
 80                dflt = f"DEFAULT {col_data['dflt_value']}" if col_data["dflt_value"] is not None else ""
 81                resultdb.execute(f"alter table {table_name} add column {col_name} {col_type} {notnull} {dflt};")
 82                logging.info("migration: table=%s, column=%s", table_name, col_name)
 83
 84    # 追加カラムデータ更新
 85    resultdb.execute("update result set mode = 4 where mode isnull and p4_name != '' and p4_str != '';")
 86
 87    # VIEW
 88    rows = resultdb.execute("select name from sqlite_master where type = 'view';")
 89    for row in rows.fetchall():
 90        resultdb.execute(f"drop view if exists '{row['name']}';")
 91    resultdb.execute(dbutil.query("CREATE_VIEW_INDIVIDUAL_RESULTS").replace("<time_adjust>", str(g.cfg.setting.time_adjust)))
 92    resultdb.execute(dbutil.query("CREATE_VIEW_GAME_RESULTS").replace("<time_adjust>", str(g.cfg.setting.time_adjust)))
 93    resultdb.execute(dbutil.query("CREATE_VIEW_GAME_INFO"))
 94    resultdb.execute(dbutil.query("CREATE_VIEW_REGULATIONS"))
 95
 96    # INDEX
 97    resultdb.execute(dbutil.query("CREATE_INDEX"))
 98
 99    # ゲスト設定チェック
100    ret = resultdb.execute("select * from member where id=0;")
101    data = ret.fetchall()
102
103    if len(data) == 0:
104        logging.info("ゲスト設定: %s", g.cfg.member.guest_name)
105        sql = "insert into member (id, name) values (0, ?);"
106        resultdb.execute(sql, (g.cfg.member.guest_name,))
107    elif data[0][1] != g.cfg.member.guest_name:
108        logging.warning("ゲスト修正: %s -> %s", data[0][1], g.cfg.member.guest_name)
109        sql = "update member set name=? where id=0;"
110        resultdb.execute(sql, (g.cfg.member.guest_name,))
111
112    resultdb.commit()
113    resultdb.close()
114    memdb.close()

DB初期化 & マイグレーション

Arguments:
  • database_file (Union[str, Path]): データベース接続パス
def setup_rule_data() -> None:
117def setup_rule_data() -> None:
118    """ルールデータ取り込み"""
119
120    # メイン設定ファイルから取り込み
121    if g.cfg.main_parser.has_section("mahjong"):
122        section_data = dict(g.cfg.main_parser["mahjong"])
123        if rule_version := section_data.get("rule_version"):
124            g.cfg.rule.data_set("mahjong", rule_data=section_data)
125
126    # ルール設定ファイル探索 & 取り込み
127    if g.cfg.setting.rule_config:
128        if not g.cfg.setting.rule_config.exists():
129            if (new_conf := g.cfg.config_dir / str(g.cfg.setting.rule_config)) and new_conf.exists():
130                g.cfg.setting.rule_config = new_conf
131            elif (new_conf := g.cfg.script_dir / str(g.cfg.setting.rule_config)) and new_conf.exists():
132                g.cfg.setting.rule_config = new_conf
133            elif (new_conf := Path.cwd() / str(g.cfg.setting.rule_config)) and new_conf.exists():
134                g.cfg.setting.rule_config = new_conf
135            else:
136                g.cfg.setting.rule_config = None
137        if g.cfg.setting.rule_config:
138            g.cfg.rule.read_config(g.cfg.setting.rule_config)
139
140    # ルールセットがなければプリセットから取り込み
141    if not g.cfg.rule.rule_list:
142        if (new_conf := g.cfg.config_dir / "files/default_rule.ini") and new_conf.exists():
143            g.cfg.setting.rule_config = new_conf
144        elif (new_conf := g.cfg.script_dir / "files/default_rule.ini") and new_conf.exists():
145            g.cfg.setting.rule_config = new_conf
146
147        if g.cfg.setting.rule_config:
148            g.cfg.rule.read_config(g.cfg.setting.rule_config)
149        else:
150            raise TypeError("Preset not found.")
151
152    # デフォルトルール定義
153    if not g.cfg.setting.default_rule:
154        g.cfg.setting.default_rule = g.cfg.rule.rule_list[0]
155
156    # マッピング生成
157    for rule_version in g.cfg.rule.rule_list:
158        for keyword in g.cfg.rule.keywords(rule_version):
159            g.cfg.rule.keyword_mapping.update({keyword: rule_version})
160
161    if g.cfg.main_parser.has_section("keyword_mapping"):
162        for keyword, rule_version in dict(g.cfg.main_parser["keyword_mapping"]).items():
163            if not rule_version:
164                g.cfg.rule.keyword_mapping.update({keyword: g.cfg.setting.default_rule})
165            elif rule_version in g.cfg.rule.data:
166                g.cfg.rule.keyword_mapping.update({keyword: rule_version})
167
168    g.cfg.rule.status_update(g.params.placeholder())
169    g.cfg.rule.remarks_words_update(g.cfg.setting.remarks_suffix)
170    g.cfg.rule.register_to_database()

ルールデータ取り込み

def setup_regulations(database_file: str | pathlib.Path) -> None:
173def setup_regulations(database_file: Union[str, Path]) -> None:
174    """
175    レギュレーション設定取り込み
176
177    Args:
178        database_file (Union[str, Path]): データベース接続パス
179
180    """
181
182    def _db_set() -> None:
183        params: dict[str, Any] = {}
184        for k, v in parser.items(section):
185            match k:
186                case "yakuman_list":
187                    words_list = {x.strip() for x in v.split(",")}
188                    for word in words_list:
189                        params = {"word": word, "type": 0, "ex_point": None, "rule_version": rule}
190                        resultdb.execute(dbutil.query("WORDS_INSERT"), params)
191                    logging.debug("regulations table(type0): %s", words_list)
192                case "word_list":
193                    words_list = {x.strip() for x in v.split(",")}
194                    for word in words_list:
195                        params = {"word": word, "type": 1, "ex_point": None, "rule_version": rule}
196                        resultdb.execute(dbutil.query("WORDS_INSERT"), params)
197                    logging.debug("regulations table(type1): %s", words_list)
198                case _:
199                    params = {"word": k.strip(), "type": regulation_type, "ex_point": int(v), "rule_version": rule}
200                    resultdb.execute(dbutil.query("WORDS_INSERT"), params)
201                    logging.debug("regulations table(type%s): %s, %s", regulation_type, params["word"], params["ex_point"])
202
203    resultdb = dbutil.connection(database_file)
204    resultdb.execute("delete from words;")
205
206    for rule in g.cfg.rule.rule_list:
207        # 個人レギュレーション
208        regulation_type = 2
209        section_patterns = [
210            (g.cfg.rule.config, f"{rule}_regulations"),
211            (g.cfg.rule.config, f"regulations_{rule}"),
212            (g.cfg.main_parser, f"{rule}_regulations"),
213            (g.cfg.main_parser, f"regulations_{rule}"),
214            (g.cfg.main_parser, "regulations"),
215        ]
216        for parser, section in section_patterns:
217            if section in parser.sections():
218                _db_set()
219                break
220
221        # チームレギュレーション
222        regulation_type = 3
223        section_patterns = [
224            (g.cfg.rule.config, f"{rule}_regulations_team"),
225            (g.cfg.rule.config, f"regulations_team_{rule}"),
226            (g.cfg.main_parser, f"{rule}_regulations_team"),
227            (g.cfg.main_parser, f"regulations_team_{rule}"),
228            (g.cfg.main_parser, "regulations_team"),
229        ]
230        for parser, section in section_patterns:
231            if section in parser.sections():
232                _db_set()
233                break
234
235    resultdb.commit()
236    resultdb.close()

レギュレーション設定取り込み

Arguments:
  • database_file (Union[str, Path]): データベース接続パス
def setup_grade_table() -> None:
239def setup_grade_table() -> None:
240    """段位テーブル取り込み"""
241    # テーブル選択
242    match table_name := g.cfg.badge.grade.table_name:
243        case "":
244            return
245        case "mahjongsoul" | "雀魂":
246            tbl_file = str(files("files.gradetable").joinpath("mahjongsoul.json"))
247        case "tenho" | "天鳳":
248            tbl_file = str(files("files.gradetable").joinpath("tenho.json"))
249        case _:
250            tbl_file = os.path.join(g.cfg.config_dir, table_name)
251            if not os.path.isfile(tbl_file):
252                return
253
254    with open(tbl_file, encoding="utf-8") as f:
255        try:
256            tbl_data: "GradeTableDict" = json.load(f)
257        except json.JSONDecodeError as err:
258            logging.warning("JSONDecodeError: %s", err)
259            return
260
261    if not isinstance(tbl_list := tbl_data.get("table"), list):
262        logging.warning("undefined key [table]")
263        return
264
265    for x in tbl_list:
266        if isinstance(x, dict):
267            x["demote"] = x.get("demote", True)
268            if {"grade", "point", "acquisition", "demote"} == set(x.keys()):
269                if not isinstance(x.get("grade"), str):
270                    tbl_data = {}
271                    break
272                point = x.get("point")
273                if not isinstance(point, list) or len(point) != 2:
274                    logging.warning("point is not match")
275                    tbl_data = {}
276                    break
277                acquisition = x.get("acquisition")
278                if not isinstance(acquisition, list) or len(acquisition) != 4:
279                    logging.warning("acquisition is not match")
280                    tbl_data = {}
281                    break
282            else:
283                logging.warning("undefined key [grade, point, acquisition]")
284                tbl_data = {}
285                break
286        else:
287            tbl_data = {}
288            break
289
290    g.cfg.badge.grade.table = tbl_data

段位テーブル取り込み