libs.bootstrap.initialization
libs/bootstrap/initialization.py
1""" 2libs/bootstrap/initialization.py 3""" 4 5import json 6import logging 7import os 8from importlib.resources import files 9from pathlib import Path 10from typing import TYPE_CHECKING, Any, Union 11 12import libs.global_value as g 13from libs.utils import dbutil 14 15if TYPE_CHECKING: 16 from libs.types import GradeTableDict 17 18 19def main(init_db: bool) -> None: 20 """ 21 初期化処理 22 23 Args: 24 init_db (bool): setup処理の実行有無 25 26 """ 27 if init_db: 28 setup_resultdb(g.cfg.setting.database_file) # DB初期化 29 30 setup_grade_table() # 段位テーブル取り込み 31 setup_rule_data() # ルールデータ取り込み 32 setup_regulations(g.cfg.setting.database_file) # レギュレーション設定取り込み 33 34 35def setup_resultdb(database_file: Union[str, Path]) -> None: 36 """ 37 DB初期化 & マイグレーション 38 39 Args: 40 database_file (Union[str, Path]): データベース接続パス 41 42 """ 43 resultdb = dbutil.connection(database_file) 44 memdb = dbutil.connection(":memory:") 45 46 # 旧テーブル削除 47 resultdb.execute("drop table if exists rule;") 48 resultdb.execute("drop table if exists words;") 49 50 table_list = { 51 "member": "CREATE_TABLE_MEMBER", # メンバー登録テーブル 52 "alias": "CREATE_TABLE_ALIAS", # 別名定義テーブル 53 "team": "CREATE_TABLE_TEAM", # チーム定義テーブル 54 "result": "CREATE_TABLE_RESULT", # データ取り込みテーブル 55 "remarks": "CREATE_TABLE_REMARKS", # メモ格納テーブル 56 "words": "CREATE_TABLE_WORDS", # レギュレーションワード登録テーブル 57 "rule": "CREATE_TABLE_RULE", # ルールセット登録テーブル 58 } 59 for table_name, keyword in table_list.items(): 60 # テーブル作成 61 resultdb.execute(dbutil.query(keyword)) 62 memdb.execute(dbutil.query(keyword)) 63 64 # スキーマ比較 65 actual_cols = dbutil.table_info(resultdb, table_name) 66 expected_cols = dbutil.table_info(memdb, table_name) 67 for col_name, col_data in expected_cols.items(): 68 if col_name not in actual_cols: 69 # NOT NULL かつ DEFAULT 未指定だと追加できないので回避 70 if col_data["notnull"] and col_data["dflt_value"] is None: 71 logging.warning( 72 "migration skip: table=%s, column=%s, reason='NOT NULL' and 'DEFAULT' unspecified", 73 table_name, 74 col_name, 75 ) 76 continue 77 col_type = col_data["type"] 78 notnull = "NOT NULL" if col_data["notnull"] else "" 79 dflt = f"DEFAULT {col_data['dflt_value']}" if col_data["dflt_value"] is not None else "" 80 resultdb.execute(f"alter table {table_name} add column {col_name} {col_type} {notnull} {dflt};") 81 logging.info("migration: table=%s, column=%s", table_name, col_name) 82 83 # 追加カラムデータ更新 84 resultdb.execute("update result set mode = 4 where mode isnull and p4_name != '' and p4_str != '';") 85 86 # VIEW 87 rows = resultdb.execute("select name from sqlite_master where type = 'view';") 88 for row in rows.fetchall(): 89 resultdb.execute(f"drop view if exists '{row['name']}';") 90 resultdb.execute(dbutil.query("CREATE_VIEW_INDIVIDUAL_RESULTS").replace("<time_adjust>", str(g.cfg.setting.time_adjust))) 91 resultdb.execute(dbutil.query("CREATE_VIEW_GAME_RESULTS").replace("<time_adjust>", str(g.cfg.setting.time_adjust))) 92 resultdb.execute(dbutil.query("CREATE_VIEW_GAME_INFO")) 93 resultdb.execute(dbutil.query("CREATE_VIEW_REGULATIONS")) 94 95 # INDEX 96 resultdb.execute(dbutil.query("CREATE_INDEX")) 97 98 # ゲスト設定チェック 99 ret = resultdb.execute("select * from member where id=0;") 100 data = ret.fetchall() 101 102 if len(data) == 0: 103 logging.info("ゲスト設定: %s", g.cfg.member.guest_name) 104 sql = "insert into member (id, name) values (0, ?);" 105 resultdb.execute(sql, (g.cfg.member.guest_name,)) 106 elif data[0][1] != g.cfg.member.guest_name: 107 logging.warning("ゲスト修正: %s -> %s", data[0][1], g.cfg.member.guest_name) 108 sql = "update member set name=? where id=0;" 109 resultdb.execute(sql, (g.cfg.member.guest_name,)) 110 111 resultdb.commit() 112 resultdb.close() 113 memdb.close() 114 115 116def setup_rule_data() -> None: 117 """ルールデータ取り込み""" 118 119 # メイン設定ファイルから取り込み 120 if g.cfg.main_parser.has_section("mahjong"): 121 section_data = dict(g.cfg.main_parser["mahjong"]) 122 if rule_version := section_data.get("rule_version"): 123 g.cfg.rule.data_set("mahjong", rule_data=section_data) 124 125 # ルール設定ファイル探索 & 取り込み 126 if g.cfg.setting.rule_config: 127 if not g.cfg.setting.rule_config.exists(): 128 if (new_conf := g.cfg.config_dir / str(g.cfg.setting.rule_config)) and new_conf.exists(): 129 g.cfg.setting.rule_config = new_conf 130 elif (new_conf := g.cfg.script_dir / str(g.cfg.setting.rule_config)) and new_conf.exists(): 131 g.cfg.setting.rule_config = new_conf 132 elif (new_conf := Path.cwd() / str(g.cfg.setting.rule_config)) and new_conf.exists(): 133 g.cfg.setting.rule_config = new_conf 134 else: 135 g.cfg.setting.rule_config = None 136 if g.cfg.setting.rule_config: 137 g.cfg.rule.read_config(g.cfg.setting.rule_config) 138 139 # ルールセットがなければプリセットから取り込み 140 if not g.cfg.rule.rule_list: 141 if (new_conf := g.cfg.config_dir / "files/default_rule.ini") and new_conf.exists(): 142 g.cfg.setting.rule_config = new_conf 143 elif (new_conf := g.cfg.script_dir / "files/default_rule.ini") and new_conf.exists(): 144 g.cfg.setting.rule_config = new_conf 145 146 if g.cfg.setting.rule_config: 147 g.cfg.rule.read_config(g.cfg.setting.rule_config) 148 else: 149 raise TypeError("Preset not found.") 150 151 # デフォルトルール定義 152 if not g.cfg.setting.default_rule: 153 g.cfg.setting.default_rule = g.cfg.rule.rule_list[0] 154 155 # マッピング生成 156 for rule_version in g.cfg.rule.rule_list: 157 for keyword in g.cfg.rule.keywords(rule_version): 158 g.cfg.rule.keyword_mapping.update({keyword: rule_version}) 159 160 if g.cfg.main_parser.has_section("keyword_mapping"): 161 for keyword, rule_version in dict(g.cfg.main_parser["keyword_mapping"]).items(): 162 if not rule_version: 163 g.cfg.rule.keyword_mapping.update({keyword: g.cfg.setting.default_rule}) 164 elif rule_version in g.cfg.rule.data: 165 g.cfg.rule.keyword_mapping.update({keyword: rule_version}) 166 167 g.cfg.rule.status_update(g.params.placeholder()) 168 g.cfg.rule.remarks_words_update(g.cfg.setting.remarks_suffix) 169 g.cfg.rule.register_to_database() 170 171 172def setup_regulations(database_file: Union[str, Path]) -> None: 173 """ 174 レギュレーション設定取り込み 175 176 Args: 177 database_file (Union[str, Path]): データベース接続パス 178 179 """ 180 181 def _db_set() -> None: 182 params: dict[str, Any] = {} 183 for k, v in parser.items(section): 184 match k: 185 case "yakuman_list": 186 words_list = {x.strip() for x in v.split(",")} 187 for word in words_list: 188 params = {"word": word, "type": 0, "ex_point": None, "rule_version": rule} 189 resultdb.execute(dbutil.query("WORDS_INSERT"), params) 190 logging.debug("regulations table(type0): %s", words_list) 191 case "word_list": 192 words_list = {x.strip() for x in v.split(",")} 193 for word in words_list: 194 params = {"word": word, "type": 1, "ex_point": None, "rule_version": rule} 195 resultdb.execute(dbutil.query("WORDS_INSERT"), params) 196 logging.debug("regulations table(type1): %s", words_list) 197 case _: 198 params = {"word": k.strip(), "type": regulation_type, "ex_point": int(v), "rule_version": rule} 199 resultdb.execute(dbutil.query("WORDS_INSERT"), params) 200 logging.debug("regulations table(type%s): %s, %s", regulation_type, params["word"], params["ex_point"]) 201 202 resultdb = dbutil.connection(database_file) 203 resultdb.execute("delete from words;") 204 205 for rule in g.cfg.rule.rule_list: 206 # 個人レギュレーション 207 regulation_type = 2 208 section_patterns = [ 209 (g.cfg.rule.config, f"{rule}_regulations"), 210 (g.cfg.rule.config, f"regulations_{rule}"), 211 (g.cfg.main_parser, f"{rule}_regulations"), 212 (g.cfg.main_parser, f"regulations_{rule}"), 213 (g.cfg.main_parser, "regulations"), 214 ] 215 for parser, section in section_patterns: 216 if section in parser.sections(): 217 _db_set() 218 break 219 220 # チームレギュレーション 221 regulation_type = 3 222 section_patterns = [ 223 (g.cfg.rule.config, f"{rule}_regulations_team"), 224 (g.cfg.rule.config, f"regulations_team_{rule}"), 225 (g.cfg.main_parser, f"{rule}_regulations_team"), 226 (g.cfg.main_parser, f"regulations_team_{rule}"), 227 (g.cfg.main_parser, "regulations_team"), 228 ] 229 for parser, section in section_patterns: 230 if section in parser.sections(): 231 _db_set() 232 break 233 234 resultdb.commit() 235 resultdb.close() 236 237 238def setup_grade_table() -> None: 239 """段位テーブル取り込み""" 240 # テーブル選択 241 match table_name := g.cfg.badge.grade.table_name: 242 case "": 243 return 244 case "mahjongsoul" | "雀魂": 245 tbl_file = str(files("files.gradetable").joinpath("mahjongsoul.json")) 246 case "tenho" | "天鳳": 247 tbl_file = str(files("files.gradetable").joinpath("tenho.json")) 248 case _: 249 tbl_file = os.path.join(g.cfg.config_dir, table_name) 250 if not os.path.isfile(tbl_file): 251 return 252 253 with open(tbl_file, encoding="utf-8") as f: 254 try: 255 tbl_data: "GradeTableDict" = json.load(f) 256 except json.JSONDecodeError as err: 257 logging.warning("JSONDecodeError: %s", err) 258 return 259 260 if not isinstance(tbl_list := tbl_data.get("table"), list): 261 logging.warning("undefined key [table]") 262 return 263 264 for x in tbl_list: 265 if isinstance(x, dict): 266 x["demote"] = x.get("demote", True) 267 if {"grade", "point", "acquisition", "demote"} == set(x.keys()): 268 if not isinstance(x.get("grade"), str): 269 tbl_data = {} 270 break 271 point = x.get("point") 272 if not isinstance(point, list) or len(point) != 2: 273 logging.warning("point is not match") 274 tbl_data = {} 275 break 276 acquisition = x.get("acquisition") 277 if not isinstance(acquisition, list) or len(acquisition) != 4: 278 logging.warning("acquisition is not match") 279 tbl_data = {} 280 break 281 else: 282 logging.warning("undefined key [grade, point, acquisition]") 283 tbl_data = {} 284 break 285 else: 286 tbl_data = {} 287 break 288 289 g.cfg.badge.grade.table = tbl_data
def
main(init_db: bool) -> None:
20def main(init_db: bool) -> None: 21 """ 22 初期化処理 23 24 Args: 25 init_db (bool): setup処理の実行有無 26 27 """ 28 if init_db: 29 setup_resultdb(g.cfg.setting.database_file) # DB初期化 30 31 setup_grade_table() # 段位テーブル取り込み 32 setup_rule_data() # ルールデータ取り込み 33 setup_regulations(g.cfg.setting.database_file) # レギュレーション設定取り込み
初期化処理
Arguments:
- init_db (bool): setup処理の実行有無
def
setup_resultdb(database_file: str | pathlib.Path) -> None:
36def setup_resultdb(database_file: Union[str, Path]) -> None: 37 """ 38 DB初期化 & マイグレーション 39 40 Args: 41 database_file (Union[str, Path]): データベース接続パス 42 43 """ 44 resultdb = dbutil.connection(database_file) 45 memdb = dbutil.connection(":memory:") 46 47 # 旧テーブル削除 48 resultdb.execute("drop table if exists rule;") 49 resultdb.execute("drop table if exists words;") 50 51 table_list = { 52 "member": "CREATE_TABLE_MEMBER", # メンバー登録テーブル 53 "alias": "CREATE_TABLE_ALIAS", # 別名定義テーブル 54 "team": "CREATE_TABLE_TEAM", # チーム定義テーブル 55 "result": "CREATE_TABLE_RESULT", # データ取り込みテーブル 56 "remarks": "CREATE_TABLE_REMARKS", # メモ格納テーブル 57 "words": "CREATE_TABLE_WORDS", # レギュレーションワード登録テーブル 58 "rule": "CREATE_TABLE_RULE", # ルールセット登録テーブル 59 } 60 for table_name, keyword in table_list.items(): 61 # テーブル作成 62 resultdb.execute(dbutil.query(keyword)) 63 memdb.execute(dbutil.query(keyword)) 64 65 # スキーマ比較 66 actual_cols = dbutil.table_info(resultdb, table_name) 67 expected_cols = dbutil.table_info(memdb, table_name) 68 for col_name, col_data in expected_cols.items(): 69 if col_name not in actual_cols: 70 # NOT NULL かつ DEFAULT 未指定だと追加できないので回避 71 if col_data["notnull"] and col_data["dflt_value"] is None: 72 logging.warning( 73 "migration skip: table=%s, column=%s, reason='NOT NULL' and 'DEFAULT' unspecified", 74 table_name, 75 col_name, 76 ) 77 continue 78 col_type = col_data["type"] 79 notnull = "NOT NULL" if col_data["notnull"] else "" 80 dflt = f"DEFAULT {col_data['dflt_value']}" if col_data["dflt_value"] is not None else "" 81 resultdb.execute(f"alter table {table_name} add column {col_name} {col_type} {notnull} {dflt};") 82 logging.info("migration: table=%s, column=%s", table_name, col_name) 83 84 # 追加カラムデータ更新 85 resultdb.execute("update result set mode = 4 where mode isnull and p4_name != '' and p4_str != '';") 86 87 # VIEW 88 rows = resultdb.execute("select name from sqlite_master where type = 'view';") 89 for row in rows.fetchall(): 90 resultdb.execute(f"drop view if exists '{row['name']}';") 91 resultdb.execute(dbutil.query("CREATE_VIEW_INDIVIDUAL_RESULTS").replace("<time_adjust>", str(g.cfg.setting.time_adjust))) 92 resultdb.execute(dbutil.query("CREATE_VIEW_GAME_RESULTS").replace("<time_adjust>", str(g.cfg.setting.time_adjust))) 93 resultdb.execute(dbutil.query("CREATE_VIEW_GAME_INFO")) 94 resultdb.execute(dbutil.query("CREATE_VIEW_REGULATIONS")) 95 96 # INDEX 97 resultdb.execute(dbutil.query("CREATE_INDEX")) 98 99 # ゲスト設定チェック 100 ret = resultdb.execute("select * from member where id=0;") 101 data = ret.fetchall() 102 103 if len(data) == 0: 104 logging.info("ゲスト設定: %s", g.cfg.member.guest_name) 105 sql = "insert into member (id, name) values (0, ?);" 106 resultdb.execute(sql, (g.cfg.member.guest_name,)) 107 elif data[0][1] != g.cfg.member.guest_name: 108 logging.warning("ゲスト修正: %s -> %s", data[0][1], g.cfg.member.guest_name) 109 sql = "update member set name=? where id=0;" 110 resultdb.execute(sql, (g.cfg.member.guest_name,)) 111 112 resultdb.commit() 113 resultdb.close() 114 memdb.close()
DB初期化 & マイグレーション
Arguments:
- database_file (Union[str, Path]): データベース接続パス
def
setup_rule_data() -> None:
117def setup_rule_data() -> None: 118 """ルールデータ取り込み""" 119 120 # メイン設定ファイルから取り込み 121 if g.cfg.main_parser.has_section("mahjong"): 122 section_data = dict(g.cfg.main_parser["mahjong"]) 123 if rule_version := section_data.get("rule_version"): 124 g.cfg.rule.data_set("mahjong", rule_data=section_data) 125 126 # ルール設定ファイル探索 & 取り込み 127 if g.cfg.setting.rule_config: 128 if not g.cfg.setting.rule_config.exists(): 129 if (new_conf := g.cfg.config_dir / str(g.cfg.setting.rule_config)) and new_conf.exists(): 130 g.cfg.setting.rule_config = new_conf 131 elif (new_conf := g.cfg.script_dir / str(g.cfg.setting.rule_config)) and new_conf.exists(): 132 g.cfg.setting.rule_config = new_conf 133 elif (new_conf := Path.cwd() / str(g.cfg.setting.rule_config)) and new_conf.exists(): 134 g.cfg.setting.rule_config = new_conf 135 else: 136 g.cfg.setting.rule_config = None 137 if g.cfg.setting.rule_config: 138 g.cfg.rule.read_config(g.cfg.setting.rule_config) 139 140 # ルールセットがなければプリセットから取り込み 141 if not g.cfg.rule.rule_list: 142 if (new_conf := g.cfg.config_dir / "files/default_rule.ini") and new_conf.exists(): 143 g.cfg.setting.rule_config = new_conf 144 elif (new_conf := g.cfg.script_dir / "files/default_rule.ini") and new_conf.exists(): 145 g.cfg.setting.rule_config = new_conf 146 147 if g.cfg.setting.rule_config: 148 g.cfg.rule.read_config(g.cfg.setting.rule_config) 149 else: 150 raise TypeError("Preset not found.") 151 152 # デフォルトルール定義 153 if not g.cfg.setting.default_rule: 154 g.cfg.setting.default_rule = g.cfg.rule.rule_list[0] 155 156 # マッピング生成 157 for rule_version in g.cfg.rule.rule_list: 158 for keyword in g.cfg.rule.keywords(rule_version): 159 g.cfg.rule.keyword_mapping.update({keyword: rule_version}) 160 161 if g.cfg.main_parser.has_section("keyword_mapping"): 162 for keyword, rule_version in dict(g.cfg.main_parser["keyword_mapping"]).items(): 163 if not rule_version: 164 g.cfg.rule.keyword_mapping.update({keyword: g.cfg.setting.default_rule}) 165 elif rule_version in g.cfg.rule.data: 166 g.cfg.rule.keyword_mapping.update({keyword: rule_version}) 167 168 g.cfg.rule.status_update(g.params.placeholder()) 169 g.cfg.rule.remarks_words_update(g.cfg.setting.remarks_suffix) 170 g.cfg.rule.register_to_database()
ルールデータ取り込み
def
setup_regulations(database_file: str | pathlib.Path) -> None:
173def setup_regulations(database_file: Union[str, Path]) -> None: 174 """ 175 レギュレーション設定取り込み 176 177 Args: 178 database_file (Union[str, Path]): データベース接続パス 179 180 """ 181 182 def _db_set() -> None: 183 params: dict[str, Any] = {} 184 for k, v in parser.items(section): 185 match k: 186 case "yakuman_list": 187 words_list = {x.strip() for x in v.split(",")} 188 for word in words_list: 189 params = {"word": word, "type": 0, "ex_point": None, "rule_version": rule} 190 resultdb.execute(dbutil.query("WORDS_INSERT"), params) 191 logging.debug("regulations table(type0): %s", words_list) 192 case "word_list": 193 words_list = {x.strip() for x in v.split(",")} 194 for word in words_list: 195 params = {"word": word, "type": 1, "ex_point": None, "rule_version": rule} 196 resultdb.execute(dbutil.query("WORDS_INSERT"), params) 197 logging.debug("regulations table(type1): %s", words_list) 198 case _: 199 params = {"word": k.strip(), "type": regulation_type, "ex_point": int(v), "rule_version": rule} 200 resultdb.execute(dbutil.query("WORDS_INSERT"), params) 201 logging.debug("regulations table(type%s): %s, %s", regulation_type, params["word"], params["ex_point"]) 202 203 resultdb = dbutil.connection(database_file) 204 resultdb.execute("delete from words;") 205 206 for rule in g.cfg.rule.rule_list: 207 # 個人レギュレーション 208 regulation_type = 2 209 section_patterns = [ 210 (g.cfg.rule.config, f"{rule}_regulations"), 211 (g.cfg.rule.config, f"regulations_{rule}"), 212 (g.cfg.main_parser, f"{rule}_regulations"), 213 (g.cfg.main_parser, f"regulations_{rule}"), 214 (g.cfg.main_parser, "regulations"), 215 ] 216 for parser, section in section_patterns: 217 if section in parser.sections(): 218 _db_set() 219 break 220 221 # チームレギュレーション 222 regulation_type = 3 223 section_patterns = [ 224 (g.cfg.rule.config, f"{rule}_regulations_team"), 225 (g.cfg.rule.config, f"regulations_team_{rule}"), 226 (g.cfg.main_parser, f"{rule}_regulations_team"), 227 (g.cfg.main_parser, f"regulations_team_{rule}"), 228 (g.cfg.main_parser, "regulations_team"), 229 ] 230 for parser, section in section_patterns: 231 if section in parser.sections(): 232 _db_set() 233 break 234 235 resultdb.commit() 236 resultdb.close()
レギュレーション設定取り込み
Arguments:
- database_file (Union[str, Path]): データベース接続パス
def
setup_grade_table() -> None:
239def setup_grade_table() -> None: 240 """段位テーブル取り込み""" 241 # テーブル選択 242 match table_name := g.cfg.badge.grade.table_name: 243 case "": 244 return 245 case "mahjongsoul" | "雀魂": 246 tbl_file = str(files("files.gradetable").joinpath("mahjongsoul.json")) 247 case "tenho" | "天鳳": 248 tbl_file = str(files("files.gradetable").joinpath("tenho.json")) 249 case _: 250 tbl_file = os.path.join(g.cfg.config_dir, table_name) 251 if not os.path.isfile(tbl_file): 252 return 253 254 with open(tbl_file, encoding="utf-8") as f: 255 try: 256 tbl_data: "GradeTableDict" = json.load(f) 257 except json.JSONDecodeError as err: 258 logging.warning("JSONDecodeError: %s", err) 259 return 260 261 if not isinstance(tbl_list := tbl_data.get("table"), list): 262 logging.warning("undefined key [table]") 263 return 264 265 for x in tbl_list: 266 if isinstance(x, dict): 267 x["demote"] = x.get("demote", True) 268 if {"grade", "point", "acquisition", "demote"} == set(x.keys()): 269 if not isinstance(x.get("grade"), str): 270 tbl_data = {} 271 break 272 point = x.get("point") 273 if not isinstance(point, list) or len(point) != 2: 274 logging.warning("point is not match") 275 tbl_data = {} 276 break 277 acquisition = x.get("acquisition") 278 if not isinstance(acquisition, list) or len(acquisition) != 4: 279 logging.warning("acquisition is not match") 280 tbl_data = {} 281 break 282 else: 283 logging.warning("undefined key [grade, point, acquisition]") 284 tbl_data = {} 285 break 286 else: 287 tbl_data = {} 288 break 289 290 g.cfg.badge.grade.table = tbl_data
段位テーブル取り込み