libs.data.initialization
libs/data/initialization.py
1""" 2libs/data/initialization.py 3""" 4 5import json 6import logging 7import os 8from importlib.resources import files 9from typing import TYPE_CHECKING, cast 10 11import libs.global_value as g 12from libs.utils import dbutil 13 14if TYPE_CHECKING: 15 from configparser import ConfigParser 16 17 from libs.types import GradeTableDict 18 19 20def initialization_resultdb() -> None: 21 """DB初期化 & マイグレーション""" 22 23 resultdb = dbutil.connection(g.cfg.setting.database_file) 24 memdb = dbutil.connection(":memory:") 25 26 table_list = { 27 "member": "CREATE_TABLE_MEMBER", # メンバー登録テーブル 28 "alias": "CREATE_TABLE_ALIAS", # 別名定義テーブル 29 "team": "CREATE_TABLE_TEAM", # チーム定義テーブル 30 "result": "CREATE_TABLE_RESULT", # データ取り込みテーブル 31 "remarks": "CREATE_TABLE_REMARKS", # メモ格納テーブル 32 "words": "CREATE_TABLE_WORDS", # レギュレーションワード登録テーブル 33 } 34 for table_name, keyword in table_list.items(): 35 # テーブル作成 36 resultdb.execute(dbutil.query(keyword)) 37 memdb.execute(dbutil.query(keyword)) 38 39 # スキーマ比較 40 actual_cols = dbutil.table_info(resultdb, table_name) 41 expected_cols = dbutil.table_info(memdb, table_name) 42 for col_name, col_data in expected_cols.items(): 43 if col_name not in actual_cols: 44 if col_data["notnull"] and col_data["dflt_value"] is None: # NOT NULL かつ DEFAULT 未指定だと追加できないので回避 45 logging.warning("migration skip: table=%s, column=%s, reason='NOT NULL' and 'DEFAULT' unspecified", table_name, col_name) 46 continue 47 col_type = col_data["type"] 48 notnull = "NOT NULL" if col_data["notnull"] else "" 49 dflt = f"DEFAULT {col_data["dflt_value"]}" if col_data["dflt_value"] is not None else "" 50 resultdb.execute(f"alter table {table_name} add column {col_name} {col_type} {notnull} {dflt};") 51 logging.info("migration: table=%s, column=%s", table_name, col_name) 52 53 # wordsテーブル情報読み込み(regulations) 54 if cast("ConfigParser", getattr(g.cfg, "_parser")).has_section("regulations"): 55 resultdb.execute("delete from words;") 56 for k, v in cast("ConfigParser", getattr(g.cfg, "_parser")).items("regulations"): 57 match k: 58 case "undefined": 59 g.cfg.undefined_word = int(v) 60 case "type0" | "yakuman": 61 words_list = {x.strip() for x in v.split(",")} 62 for word in words_list: 63 resultdb.execute( 64 "insert into words(word, type, ex_point) values (?, 0, NULL);", 65 (word,) 66 ) 67 logging.debug("regulations table(type0): %s", words_list) 68 case "type2": 69 words_list = {x.strip() for x in v.split(",")} 70 for word in words_list: 71 resultdb.execute( 72 "insert into words(word, type, ex_point) values (?, 2, NULL);", 73 (word,) 74 ) 75 logging.debug("regulations table(type2): %s", words_list) 76 case _: 77 word = k.strip() 78 ex_point = int(v) 79 resultdb.execute( 80 "insert into words(word, type, ex_point) values (?, 1, ?);", 81 (word, ex_point,) 82 ) 83 logging.debug("regulations table(type1): %s, %s", word, ex_point) 84 85 # VIEW 86 rows = resultdb.execute("select name from sqlite_master where type = 'view';") 87 for row in rows.fetchall(): 88 resultdb.execute(f"drop view if exists '{row["name"]}';") 89 resultdb.execute(dbutil.query("CREATE_VIEW_INDIVIDUAL_RESULTS")) 90 resultdb.execute(dbutil.query("CREATE_VIEW_GAME_RESULTS")) 91 resultdb.execute(dbutil.query("CREATE_VIEW_GAME_INFO")) 92 resultdb.execute(dbutil.query("CREATE_VIEW_REGULATIONS").format(undefined_word=g.cfg.undefined_word)) 93 94 # ゲスト設定チェック 95 ret = resultdb.execute("select * from member where id=0;") 96 data = ret.fetchall() 97 98 if len(data) == 0: 99 logging.info("ゲスト設定: %s", g.cfg.member.guest_name) 100 sql = "insert into member (id, name) values (0, ?);" 101 resultdb.execute(sql, (g.cfg.member.guest_name,)) 102 elif data[0][1] != g.cfg.member.guest_name: 103 logging.info("ゲスト修正: %s -> %s", data[0][1], g.cfg.member.guest_name) 104 sql = "update member set name=? where id=0;" 105 resultdb.execute(sql, (g.cfg.member.guest_name,)) 106 107 resultdb.commit() 108 resultdb.close() 109 memdb.close() 110 read_grade_table() 111 112 113def read_grade_table() -> None: 114 """段位テーブル読み込み""" 115 116 # テーブル選択 117 match table_name := g.cfg.badge.grade.table_name: 118 case "": 119 return 120 case "mahjongsoul" | "雀魂": 121 tbl_file = str(files("files.gradetable").joinpath("mahjongsoul.json")) 122 case "tenho" | "天鳳": 123 tbl_file = str(files("files.gradetable").joinpath("tenho.json")) 124 case _: 125 tbl_file = os.path.join(g.cfg.config_dir, table_name) 126 if not os.path.isfile(tbl_file): 127 return 128 129 with open(tbl_file, encoding="utf-8") as f: 130 try: 131 tbl_data: "GradeTableDict" = json.load(f) 132 except json.JSONDecodeError as err: 133 logging.error(err) 134 return 135 136 if not isinstance(tbl_list := tbl_data.get("table"), list): 137 logging.error("undefined key [table]") 138 return 139 140 for x in tbl_list: 141 if isinstance(x, dict): 142 x["demote"] = x.get("demote", True) 143 if {"grade", "point", "acquisition", "demote"} == set(x.keys()): 144 if not isinstance(x.get("grade"), str): 145 tbl_data = {} 146 break 147 point = x.get("point") 148 if not isinstance(point, list) or len(point) != 2: 149 logging.error("point is not match") 150 tbl_data = {} 151 break 152 acquisition = x.get("acquisition") 153 if not isinstance(acquisition, list) or len(acquisition) != 4: 154 logging.error("acquisition is not match") 155 tbl_data = {} 156 break 157 else: 158 logging.error("undefined key [grade, point, acquisition]") 159 tbl_data = {} 160 break 161 else: 162 tbl_data = {} 163 break 164 165 g.cfg.badge.grade.table = tbl_data
def
initialization_resultdb() -> None:
21def initialization_resultdb() -> None: 22 """DB初期化 & マイグレーション""" 23 24 resultdb = dbutil.connection(g.cfg.setting.database_file) 25 memdb = dbutil.connection(":memory:") 26 27 table_list = { 28 "member": "CREATE_TABLE_MEMBER", # メンバー登録テーブル 29 "alias": "CREATE_TABLE_ALIAS", # 別名定義テーブル 30 "team": "CREATE_TABLE_TEAM", # チーム定義テーブル 31 "result": "CREATE_TABLE_RESULT", # データ取り込みテーブル 32 "remarks": "CREATE_TABLE_REMARKS", # メモ格納テーブル 33 "words": "CREATE_TABLE_WORDS", # レギュレーションワード登録テーブル 34 } 35 for table_name, keyword in table_list.items(): 36 # テーブル作成 37 resultdb.execute(dbutil.query(keyword)) 38 memdb.execute(dbutil.query(keyword)) 39 40 # スキーマ比較 41 actual_cols = dbutil.table_info(resultdb, table_name) 42 expected_cols = dbutil.table_info(memdb, table_name) 43 for col_name, col_data in expected_cols.items(): 44 if col_name not in actual_cols: 45 if col_data["notnull"] and col_data["dflt_value"] is None: # NOT NULL かつ DEFAULT 未指定だと追加できないので回避 46 logging.warning("migration skip: table=%s, column=%s, reason='NOT NULL' and 'DEFAULT' unspecified", table_name, col_name) 47 continue 48 col_type = col_data["type"] 49 notnull = "NOT NULL" if col_data["notnull"] else "" 50 dflt = f"DEFAULT {col_data["dflt_value"]}" if col_data["dflt_value"] is not None else "" 51 resultdb.execute(f"alter table {table_name} add column {col_name} {col_type} {notnull} {dflt};") 52 logging.info("migration: table=%s, column=%s", table_name, col_name) 53 54 # wordsテーブル情報読み込み(regulations) 55 if cast("ConfigParser", getattr(g.cfg, "_parser")).has_section("regulations"): 56 resultdb.execute("delete from words;") 57 for k, v in cast("ConfigParser", getattr(g.cfg, "_parser")).items("regulations"): 58 match k: 59 case "undefined": 60 g.cfg.undefined_word = int(v) 61 case "type0" | "yakuman": 62 words_list = {x.strip() for x in v.split(",")} 63 for word in words_list: 64 resultdb.execute( 65 "insert into words(word, type, ex_point) values (?, 0, NULL);", 66 (word,) 67 ) 68 logging.debug("regulations table(type0): %s", words_list) 69 case "type2": 70 words_list = {x.strip() for x in v.split(",")} 71 for word in words_list: 72 resultdb.execute( 73 "insert into words(word, type, ex_point) values (?, 2, NULL);", 74 (word,) 75 ) 76 logging.debug("regulations table(type2): %s", words_list) 77 case _: 78 word = k.strip() 79 ex_point = int(v) 80 resultdb.execute( 81 "insert into words(word, type, ex_point) values (?, 1, ?);", 82 (word, ex_point,) 83 ) 84 logging.debug("regulations table(type1): %s, %s", word, ex_point) 85 86 # VIEW 87 rows = resultdb.execute("select name from sqlite_master where type = 'view';") 88 for row in rows.fetchall(): 89 resultdb.execute(f"drop view if exists '{row["name"]}';") 90 resultdb.execute(dbutil.query("CREATE_VIEW_INDIVIDUAL_RESULTS")) 91 resultdb.execute(dbutil.query("CREATE_VIEW_GAME_RESULTS")) 92 resultdb.execute(dbutil.query("CREATE_VIEW_GAME_INFO")) 93 resultdb.execute(dbutil.query("CREATE_VIEW_REGULATIONS").format(undefined_word=g.cfg.undefined_word)) 94 95 # ゲスト設定チェック 96 ret = resultdb.execute("select * from member where id=0;") 97 data = ret.fetchall() 98 99 if len(data) == 0: 100 logging.info("ゲスト設定: %s", g.cfg.member.guest_name) 101 sql = "insert into member (id, name) values (0, ?);" 102 resultdb.execute(sql, (g.cfg.member.guest_name,)) 103 elif data[0][1] != g.cfg.member.guest_name: 104 logging.info("ゲスト修正: %s -> %s", data[0][1], g.cfg.member.guest_name) 105 sql = "update member set name=? where id=0;" 106 resultdb.execute(sql, (g.cfg.member.guest_name,)) 107 108 resultdb.commit() 109 resultdb.close() 110 memdb.close() 111 read_grade_table()
DB初期化 & マイグレーション
def
read_grade_table() -> None:
114def read_grade_table() -> None: 115 """段位テーブル読み込み""" 116 117 # テーブル選択 118 match table_name := g.cfg.badge.grade.table_name: 119 case "": 120 return 121 case "mahjongsoul" | "雀魂": 122 tbl_file = str(files("files.gradetable").joinpath("mahjongsoul.json")) 123 case "tenho" | "天鳳": 124 tbl_file = str(files("files.gradetable").joinpath("tenho.json")) 125 case _: 126 tbl_file = os.path.join(g.cfg.config_dir, table_name) 127 if not os.path.isfile(tbl_file): 128 return 129 130 with open(tbl_file, encoding="utf-8") as f: 131 try: 132 tbl_data: "GradeTableDict" = json.load(f) 133 except json.JSONDecodeError as err: 134 logging.error(err) 135 return 136 137 if not isinstance(tbl_list := tbl_data.get("table"), list): 138 logging.error("undefined key [table]") 139 return 140 141 for x in tbl_list: 142 if isinstance(x, dict): 143 x["demote"] = x.get("demote", True) 144 if {"grade", "point", "acquisition", "demote"} == set(x.keys()): 145 if not isinstance(x.get("grade"), str): 146 tbl_data = {} 147 break 148 point = x.get("point") 149 if not isinstance(point, list) or len(point) != 2: 150 logging.error("point is not match") 151 tbl_data = {} 152 break 153 acquisition = x.get("acquisition") 154 if not isinstance(acquisition, list) or len(acquisition) != 4: 155 logging.error("acquisition is not match") 156 tbl_data = {} 157 break 158 else: 159 logging.error("undefined key [grade, point, acquisition]") 160 tbl_data = {} 161 break 162 else: 163 tbl_data = {} 164 break 165 166 g.cfg.badge.grade.table = tbl_data
段位テーブル読み込み