libs.functions.lookup
libs/functions/lookup.py
1""" 2libs/functions/lookup.py 3""" 4 5import logging 6from configparser import ConfigParser 7from contextlib import closing 8from pathlib import Path 9from typing import TYPE_CHECKING, Any, Optional, Union, cast 10 11import libs.global_value as g 12from libs.domain.score import GameResult 13from libs.types import ChannelType, CommandType 14from libs.utils import dbutil 15 16if TYPE_CHECKING: 17 from pathlib import Path 18 19 from integrations.protocols import MessageParserProtocol 20 from libs.bootstrap.section import SubCommands 21 22 23def get_config_value( 24 config_file: "Path", 25 section: str, 26 name: str, 27 val_type: type, 28 fallback: Union[bool, int, float, str, list[Any], None] = None, 29) -> Any: 30 """ 31 設定値取得 32 33 Args: 34 config_file (Path): 設定ファイルパス 35 section (str): セクション名 36 name (str): 項目名 37 val_type (type): 取り込む値の型 (bool, int, float, str, list[Any]) 38 fallback (Union[bool, int, float, str, list[Any]], optional): 項目が見つからない場合に返す値. Defaults to None 39 40 Returns: 41 Any: 取得した値 42 - 実際に返す型: Union[int, float, bool, str, list[Any], None] 43 44 Raises: 45 TypeError: val_type が bool, int, float, str, list 以外の場合 46 47 """ 48 value: Union[int, float, bool, str, list[Any], None] = fallback 49 parser = ConfigParser() 50 parser.read(config_file, encoding="utf-8") 51 52 if parser.has_option(section, name): 53 match val_type: 54 case x if x is int: 55 value = parser.getint(section, name) 56 case x if x is float: 57 value = parser.getfloat(section, name) 58 case x if x is bool: 59 value = parser.getboolean(section, name) 60 case x if x is str: 61 value = parser.get(section, name) 62 case x if x is list: 63 value = [x.strip() for x in parser.get(section, name).split(",")] 64 case _: 65 raise TypeError(f"Unsupported val_type: {val_type}") 66 67 return value 68 69 70def resolve_separate_flag(m: "MessageParserProtocol") -> bool: 71 """ 72 優先度の高いセパレート設定フラグを取得する 73 74 Args: 75 m (MessageParserProtocol): メッセージデータ 76 77 Returns: 78 bool: セパレート設定フラグ 79 80 """ 81 separate_flg: Optional[bool] = None 82 83 # DM / HomeApp(slack) はセパレートしない 84 if m.data.channel_type in {ChannelType.DIRECT_MESSAGE, ChannelType.HOME_APP}: 85 return False 86 87 if g.cfg.main_parser.has_section(m.status.source): 88 # チャンネル個別ファイル内設定 89 if channel_config := g.cfg.main_parser[m.status.source].get("channel_config"): 90 separate_flg = get_config_value(config_file=Path(channel_config), section="setting", name="separate", val_type=bool) 91 if separate_flg is not None: 92 return separate_flg 93 # チャンネル設定 94 else: 95 separate_flg = get_config_value(config_file=g.cfg.config_file, section=m.status.source, name="separate", val_type=bool) 96 if separate_flg is not None: 97 return separate_flg 98 99 # サービス別設定 100 separate_flg = get_config_value(config_file=g.cfg.config_file, section=g.adapter.interface_type, name="separate", val_type=bool) 101 if separate_flg is not None: 102 return separate_flg 103 104 # メイン設定 105 separate_flg = get_config_value(config_file=g.cfg.config_file, section="setting", name="separate", val_type=bool) 106 if separate_flg is not None: 107 return separate_flg 108 109 return False 110 111 112def member_info(params: dict[str, Any]) -> dict[str, Any]: 113 """ 114 指定メンバーの記録情報を返す 115 116 Args: 117 params (dict[str, Any]): 対象メンバー 118 119 Returns: 120 dict[str, Any]: 記録情報 121 122 """ 123 ret = dbutil.execute( 124 """ 125 select 126 count() as game_count, 127 min(ts) as first_game, 128 max(ts) as last_game, 129 max(rpoint) as rpoint_max, 130 min(rpoint) as rpoint_min 131 from 132 individual_results 133 where 134 mode = :mode 135 and rule_version in (<<rule_list>>) 136 and playtime between :starttime and :endtime 137 --[separate] and source = :source 138 --[individual] and name = :player_name 139 --[team] and team = :player_name 140 ; 141 """, 142 params, 143 ) 144 145 return ret[0] 146 147 148def get_guest() -> str: 149 """ 150 ゲスト名取得 151 152 Returns: 153 str: ゲスト名 154 155 """ 156 guest_name: str = "" 157 with closing(dbutil.connection(g.cfg.setting.database_file)) as conn: 158 rows = conn.execute("select name from member where id=0") 159 guest_name = str(rows.fetchone()[0]) 160 161 return guest_name 162 163 164def get_current_rule_version(m: "MessageParserProtocol", command_suffix: list[str]) -> str: 165 """ 166 ルール識別子探索 167 168 Args: 169 m (MessageParserProtocol): メッセージデータ 170 command_suffix (list[str]): コマンドサフィックス 171 172 Returns: 173 str: ルール識別子 174 175 """ 176 for suffix in command_suffix: 177 if rule_version := g.cfg.rule.keyword_mapping.get(m.keyword.removesuffix(suffix)): 178 return rule_version 179 180 return g.cfg.setting.default_rule 181 182 183def regulation_list(word_type: int = 0, rule_version: str | None = None) -> list[str]: 184 """ 185 登録済みワードリストを取得する 186 187 Args: 188 word_type (int, optional): 取得するタイプ. Defaults to 0. 189 rule_version (str, optional): ルール識別子 190 191 Returns: 192 list[str]: 取得結果 193 194 """ 195 ret: list[str] = [] 196 197 if not rule_version and not (rule_version := g.params.default_rule): 198 return [] 199 200 with closing(dbutil.connection(g.cfg.setting.database_file)) as cur: 201 rows = cur.execute( 202 """ 203 select 204 word, 205 ex_point 206 from 207 words 208 where 209 type=? 210 and rule_version=? 211 """, 212 (word_type, rule_version), 213 ).fetchall() 214 215 for word, ex_point in rows: 216 if ex_point: 217 point = f"{ex_point:.1f}".replace("-", "▲") 218 ret.append(f"{word}\t{point}pt") 219 else: 220 ret.append(f"{word}") 221 222 return ret 223 224 225def resolve_commands(rule_version: str, command_type: CommandType) -> list[str]: 226 """ 227 ルール識別子で割り当てられているコマンドワードを返す 228 229 Args: 230 rule_version (str): ルール識別子 231 command_type (CommandType): コマンド種別 232 233 Returns: 234 list[str]: コマンドワード 235 236 """ 237 keywords: list[str] = [word for word, rule in g.cfg.rule.keyword_mapping.items() if rule == rule_version] 238 commandwords: list[str] = [] 239 240 if hasattr(g.cfg, command_type): 241 sub_com = cast("SubCommands", getattr(g.cfg, command_type)) 242 commandwords.append(sub_com.default_commandword) 243 commandwords.extend(sub_com.commandword) 244 commandwords.extend([f"{command}{suffix}" for suffix in sub_com.command_suffix for command in keywords]) 245 246 return [x for x in g.keyword_dispatcher if x in commandwords] 247 248 249def exsist_record(ts: str) -> GameResult: 250 """ 251 記録されているゲーム結果を返す 252 253 Args: 254 ts (str): 検索するタイムスタンプ 255 256 Returns: 257 GameResult: スコアデータ 258 259 """ 260 result = GameResult() 261 262 with closing(dbutil.connection(g.cfg.setting.database_file)) as conn: 263 row = conn.execute( 264 """ 265 select 266 ts, 267 p1_name, p1_str, 268 p2_name, p2_str, 269 p3_name, p3_str, 270 p4_name, p4_str, 271 comment, 272 rule_version 273 from 274 result where ts = :ts 275 ; 276 """, 277 {"ts": ts}, 278 ).fetchone() 279 280 if row: 281 result.calc(**dict(row)) 282 283 return result 284 285 286def read_memberslist() -> None: 287 """メンバー情報/チーム情報の再読み込み""" 288 # todo: 最後にアクセスしたDBのメンバーリストが返る 289 g.params.update_from_dict( 290 { 291 "database_file": g.cfg.setting.database_file, 292 "logging_verbose": g.args.verbose, 293 } 294 ) 295 296 g.cfg.member.guest_name = get_guest() 297 g.cfg.member.info = g.cfg.member.get_info 298 g.cfg.team.info = g.cfg.team.get_info 299 300 logging.debug("guest_name: %s", g.cfg.member.guest_name) 301 logging.debug("member_list: %s", g.cfg.member.lists) 302 logging.debug("team_list: %s", g.cfg.team.lists) 303 304 305def enumeration_all_members() -> list[str]: 306 """ 307 メンバーとチームをすべて列挙する 308 309 Returns: 310 list[str]: メンバー名(別名含む)/チーム名のリスト 311 312 """ 313 ret_list: list[str] = [] 314 315 for member in g.cfg.member.info: 316 ret_list.append(member.get("name")) 317 ret_list.extend(member.get("alias")) 318 ret_list.extend([team.get("team") for team in g.cfg.team.info]) 319 320 return list(set(ret_list))
def
get_config_value( config_file: pathlib.Path, section: str, name: str, val_type: type, fallback: bool | int | float | str | list[Any] | None = None) -> Any:
24def get_config_value( 25 config_file: "Path", 26 section: str, 27 name: str, 28 val_type: type, 29 fallback: Union[bool, int, float, str, list[Any], None] = None, 30) -> Any: 31 """ 32 設定値取得 33 34 Args: 35 config_file (Path): 設定ファイルパス 36 section (str): セクション名 37 name (str): 項目名 38 val_type (type): 取り込む値の型 (bool, int, float, str, list[Any]) 39 fallback (Union[bool, int, float, str, list[Any]], optional): 項目が見つからない場合に返す値. Defaults to None 40 41 Returns: 42 Any: 取得した値 43 - 実際に返す型: Union[int, float, bool, str, list[Any], None] 44 45 Raises: 46 TypeError: val_type が bool, int, float, str, list 以外の場合 47 48 """ 49 value: Union[int, float, bool, str, list[Any], None] = fallback 50 parser = ConfigParser() 51 parser.read(config_file, encoding="utf-8") 52 53 if parser.has_option(section, name): 54 match val_type: 55 case x if x is int: 56 value = parser.getint(section, name) 57 case x if x is float: 58 value = parser.getfloat(section, name) 59 case x if x is bool: 60 value = parser.getboolean(section, name) 61 case x if x is str: 62 value = parser.get(section, name) 63 case x if x is list: 64 value = [x.strip() for x in parser.get(section, name).split(",")] 65 case _: 66 raise TypeError(f"Unsupported val_type: {val_type}") 67 68 return value
設定値取得
Arguments:
- config_file (Path): 設定ファイルパス
- section (str): セクション名
- name (str): 項目名
- val_type (type): 取り込む値の型 (bool, int, float, str, list[Any])
- fallback (Union[bool, int, float, str, list[Any]], optional): 項目が見つからない場合に返す値. Defaults to None
Returns:
Any: 取得した値 - 実際に返す型: Union[int, float, bool, str, list[Any], None]
Raises:
- TypeError: val_type が bool, int, float, str, list 以外の場合
71def resolve_separate_flag(m: "MessageParserProtocol") -> bool: 72 """ 73 優先度の高いセパレート設定フラグを取得する 74 75 Args: 76 m (MessageParserProtocol): メッセージデータ 77 78 Returns: 79 bool: セパレート設定フラグ 80 81 """ 82 separate_flg: Optional[bool] = None 83 84 # DM / HomeApp(slack) はセパレートしない 85 if m.data.channel_type in {ChannelType.DIRECT_MESSAGE, ChannelType.HOME_APP}: 86 return False 87 88 if g.cfg.main_parser.has_section(m.status.source): 89 # チャンネル個別ファイル内設定 90 if channel_config := g.cfg.main_parser[m.status.source].get("channel_config"): 91 separate_flg = get_config_value(config_file=Path(channel_config), section="setting", name="separate", val_type=bool) 92 if separate_flg is not None: 93 return separate_flg 94 # チャンネル設定 95 else: 96 separate_flg = get_config_value(config_file=g.cfg.config_file, section=m.status.source, name="separate", val_type=bool) 97 if separate_flg is not None: 98 return separate_flg 99 100 # サービス別設定 101 separate_flg = get_config_value(config_file=g.cfg.config_file, section=g.adapter.interface_type, name="separate", val_type=bool) 102 if separate_flg is not None: 103 return separate_flg 104 105 # メイン設定 106 separate_flg = get_config_value(config_file=g.cfg.config_file, section="setting", name="separate", val_type=bool) 107 if separate_flg is not None: 108 return separate_flg 109 110 return False
優先度の高いセパレート設定フラグを取得する
Arguments:
- m (MessageParserProtocol): メッセージデータ
Returns:
bool: セパレート設定フラグ
def
member_info(params: dict[str, typing.Any]) -> dict[str, typing.Any]:
113def member_info(params: dict[str, Any]) -> dict[str, Any]: 114 """ 115 指定メンバーの記録情報を返す 116 117 Args: 118 params (dict[str, Any]): 対象メンバー 119 120 Returns: 121 dict[str, Any]: 記録情報 122 123 """ 124 ret = dbutil.execute( 125 """ 126 select 127 count() as game_count, 128 min(ts) as first_game, 129 max(ts) as last_game, 130 max(rpoint) as rpoint_max, 131 min(rpoint) as rpoint_min 132 from 133 individual_results 134 where 135 mode = :mode 136 and rule_version in (<<rule_list>>) 137 and playtime between :starttime and :endtime 138 --[separate] and source = :source 139 --[individual] and name = :player_name 140 --[team] and team = :player_name 141 ; 142 """, 143 params, 144 ) 145 146 return ret[0]
指定メンバーの記録情報を返す
Arguments:
- params (dict[str, Any]): 対象メンバー
Returns:
dict[str, Any]: 記録情報
def
get_guest() -> str:
149def get_guest() -> str: 150 """ 151 ゲスト名取得 152 153 Returns: 154 str: ゲスト名 155 156 """ 157 guest_name: str = "" 158 with closing(dbutil.connection(g.cfg.setting.database_file)) as conn: 159 rows = conn.execute("select name from member where id=0") 160 guest_name = str(rows.fetchone()[0]) 161 162 return guest_name
ゲスト名取得
Returns:
str: ゲスト名
def
get_current_rule_version( m: integrations.protocols.MessageParserProtocol, command_suffix: list[str]) -> str:
165def get_current_rule_version(m: "MessageParserProtocol", command_suffix: list[str]) -> str: 166 """ 167 ルール識別子探索 168 169 Args: 170 m (MessageParserProtocol): メッセージデータ 171 command_suffix (list[str]): コマンドサフィックス 172 173 Returns: 174 str: ルール識別子 175 176 """ 177 for suffix in command_suffix: 178 if rule_version := g.cfg.rule.keyword_mapping.get(m.keyword.removesuffix(suffix)): 179 return rule_version 180 181 return g.cfg.setting.default_rule
ルール識別子探索
Arguments:
- m (MessageParserProtocol): メッセージデータ
- command_suffix (list[str]): コマンドサフィックス
Returns:
str: ルール識別子
def
regulation_list(word_type: int = 0, rule_version: str | None = None) -> list[str]:
184def regulation_list(word_type: int = 0, rule_version: str | None = None) -> list[str]: 185 """ 186 登録済みワードリストを取得する 187 188 Args: 189 word_type (int, optional): 取得するタイプ. Defaults to 0. 190 rule_version (str, optional): ルール識別子 191 192 Returns: 193 list[str]: 取得結果 194 195 """ 196 ret: list[str] = [] 197 198 if not rule_version and not (rule_version := g.params.default_rule): 199 return [] 200 201 with closing(dbutil.connection(g.cfg.setting.database_file)) as cur: 202 rows = cur.execute( 203 """ 204 select 205 word, 206 ex_point 207 from 208 words 209 where 210 type=? 211 and rule_version=? 212 """, 213 (word_type, rule_version), 214 ).fetchall() 215 216 for word, ex_point in rows: 217 if ex_point: 218 point = f"{ex_point:.1f}".replace("-", "▲") 219 ret.append(f"{word}\t{point}pt") 220 else: 221 ret.append(f"{word}") 222 223 return ret
登録済みワードリストを取得する
Arguments:
- word_type (int, optional): 取得するタイプ. Defaults to 0.
- rule_version (str, optional): ルール識別子
Returns:
list[str]: 取得結果
226def resolve_commands(rule_version: str, command_type: CommandType) -> list[str]: 227 """ 228 ルール識別子で割り当てられているコマンドワードを返す 229 230 Args: 231 rule_version (str): ルール識別子 232 command_type (CommandType): コマンド種別 233 234 Returns: 235 list[str]: コマンドワード 236 237 """ 238 keywords: list[str] = [word for word, rule in g.cfg.rule.keyword_mapping.items() if rule == rule_version] 239 commandwords: list[str] = [] 240 241 if hasattr(g.cfg, command_type): 242 sub_com = cast("SubCommands", getattr(g.cfg, command_type)) 243 commandwords.append(sub_com.default_commandword) 244 commandwords.extend(sub_com.commandword) 245 commandwords.extend([f"{command}{suffix}" for suffix in sub_com.command_suffix for command in keywords]) 246 247 return [x for x in g.keyword_dispatcher if x in commandwords]
ルール識別子で割り当てられているコマンドワードを返す
Arguments:
- rule_version (str): ルール識別子
- command_type (CommandType): コマンド種別
Returns:
list[str]: コマンドワード
250def exsist_record(ts: str) -> GameResult: 251 """ 252 記録されているゲーム結果を返す 253 254 Args: 255 ts (str): 検索するタイムスタンプ 256 257 Returns: 258 GameResult: スコアデータ 259 260 """ 261 result = GameResult() 262 263 with closing(dbutil.connection(g.cfg.setting.database_file)) as conn: 264 row = conn.execute( 265 """ 266 select 267 ts, 268 p1_name, p1_str, 269 p2_name, p2_str, 270 p3_name, p3_str, 271 p4_name, p4_str, 272 comment, 273 rule_version 274 from 275 result where ts = :ts 276 ; 277 """, 278 {"ts": ts}, 279 ).fetchone() 280 281 if row: 282 result.calc(**dict(row)) 283 284 return result
記録されているゲーム結果を返す
Arguments:
- ts (str): 検索するタイムスタンプ
Returns:
GameResult: スコアデータ
def
read_memberslist() -> None:
287def read_memberslist() -> None: 288 """メンバー情報/チーム情報の再読み込み""" 289 # todo: 最後にアクセスしたDBのメンバーリストが返る 290 g.params.update_from_dict( 291 { 292 "database_file": g.cfg.setting.database_file, 293 "logging_verbose": g.args.verbose, 294 } 295 ) 296 297 g.cfg.member.guest_name = get_guest() 298 g.cfg.member.info = g.cfg.member.get_info 299 g.cfg.team.info = g.cfg.team.get_info 300 301 logging.debug("guest_name: %s", g.cfg.member.guest_name) 302 logging.debug("member_list: %s", g.cfg.member.lists) 303 logging.debug("team_list: %s", g.cfg.team.lists)
メンバー情報/チーム情報の再読み込み
def
enumeration_all_members() -> list[str]:
306def enumeration_all_members() -> list[str]: 307 """ 308 メンバーとチームをすべて列挙する 309 310 Returns: 311 list[str]: メンバー名(別名含む)/チーム名のリスト 312 313 """ 314 ret_list: list[str] = [] 315 316 for member in g.cfg.member.info: 317 ret_list.append(member.get("name")) 318 ret_list.extend(member.get("alias")) 319 ret_list.extend([team.get("team") for team in g.cfg.team.info]) 320 321 return list(set(ret_list))
メンバーとチームをすべて列挙する
Returns:
list[str]: メンバー名(別名含む)/チーム名のリスト