libs.functions.lookup

libs/functions/lookup.py

  1"""
  2libs/functions/lookup.py
  3"""
  4
  5import logging
  6from configparser import ConfigParser
  7from contextlib import closing
  8from pathlib import Path
  9from typing import TYPE_CHECKING, Any, Optional, Union, cast
 10
 11import libs.global_value as g
 12from libs.domain.score import GameResult
 13from libs.types import ChannelType, CommandType
 14from libs.utils import dbutil
 15
 16if TYPE_CHECKING:
 17    from pathlib import Path
 18
 19    from integrations.protocols import MessageParserProtocol
 20    from libs.bootstrap.section import SubCommands
 21
 22
 23def get_config_value(
 24    config_file: "Path",
 25    section: str,
 26    name: str,
 27    val_type: type,
 28    fallback: Union[bool, int, float, str, list[Any], None] = None,
 29) -> Any:
 30    """
 31    設定値取得
 32
 33    Args:
 34        config_file (Path): 設定ファイルパス
 35        section (str): セクション名
 36        name (str): 項目名
 37        val_type (type): 取り込む値の型 (bool, int, float, str, list[Any])
 38        fallback (Union[bool, int, float, str, list[Any]], optional): 項目が見つからない場合に返す値. Defaults to None
 39
 40    Returns:
 41        Any: 取得した値
 42            - 実際に返す型: Union[int, float, bool, str, list[Any], None]
 43
 44    Raises:
 45        TypeError: val_type が bool, int, float, str, list 以外の場合
 46
 47    """
 48    value: Union[int, float, bool, str, list[Any], None] = fallback
 49    parser = ConfigParser()
 50    parser.read(config_file, encoding="utf-8")
 51
 52    if parser.has_option(section, name):
 53        match val_type:
 54            case x if x is int:
 55                value = parser.getint(section, name)
 56            case x if x is float:
 57                value = parser.getfloat(section, name)
 58            case x if x is bool:
 59                value = parser.getboolean(section, name)
 60            case x if x is str:
 61                value = parser.get(section, name)
 62            case x if x is list:
 63                value = [x.strip() for x in parser.get(section, name).split(",")]
 64            case _:
 65                raise TypeError(f"Unsupported val_type: {val_type}")
 66
 67    return value
 68
 69
 70def resolve_separate_flag(m: "MessageParserProtocol") -> bool:
 71    """
 72    優先度の高いセパレート設定フラグを取得する
 73
 74    Args:
 75        m (MessageParserProtocol): メッセージデータ
 76
 77    Returns:
 78        bool: セパレート設定フラグ
 79
 80    """
 81    separate_flg: Optional[bool] = None
 82
 83    # DM / HomeApp(slack) はセパレートしない
 84    if m.data.channel_type in {ChannelType.DIRECT_MESSAGE, ChannelType.HOME_APP}:
 85        return False
 86
 87    if g.cfg.main_parser.has_section(m.status.source):
 88        # チャンネル個別ファイル内設定
 89        if channel_config := g.cfg.main_parser[m.status.source].get("channel_config"):
 90            separate_flg = get_config_value(config_file=Path(channel_config), section="setting", name="separate", val_type=bool)
 91            if separate_flg is not None:
 92                return separate_flg
 93        # チャンネル設定
 94        else:
 95            separate_flg = get_config_value(config_file=g.cfg.config_file, section=m.status.source, name="separate", val_type=bool)
 96            if separate_flg is not None:
 97                return separate_flg
 98
 99    # サービス別設定
100    separate_flg = get_config_value(config_file=g.cfg.config_file, section=g.adapter.interface_type, name="separate", val_type=bool)
101    if separate_flg is not None:
102        return separate_flg
103
104    # メイン設定
105    separate_flg = get_config_value(config_file=g.cfg.config_file, section="setting", name="separate", val_type=bool)
106    if separate_flg is not None:
107        return separate_flg
108
109    return False
110
111
112def member_info(params: dict[str, Any]) -> dict[str, Any]:
113    """
114    指定メンバーの記録情報を返す
115
116    Args:
117        params (dict[str, Any]): 対象メンバー
118
119    Returns:
120        dict[str, Any]: 記録情報
121
122    """
123    ret = dbutil.execute(
124        """
125        select
126            count() as game_count,
127            min(ts) as first_game,
128            max(ts) as last_game,
129            max(rpoint) as rpoint_max,
130            min(rpoint) as rpoint_min
131        from
132            individual_results
133        where
134            mode = :mode
135            and rule_version in (<<rule_list>>)
136            and playtime between :starttime and :endtime
137            --[separate] and source = :source
138            --[individual] and name = :player_name
139            --[team] and team = :player_name
140        ;
141        """,
142        params,
143    )
144
145    return ret[0]
146
147
148def get_guest() -> str:
149    """
150    ゲスト名取得
151
152    Returns:
153        str: ゲスト名
154
155    """
156    guest_name: str = ""
157    with closing(dbutil.connection(g.cfg.setting.database_file)) as conn:
158        rows = conn.execute("select name from member where id=0")
159        guest_name = str(rows.fetchone()[0])
160
161    return guest_name
162
163
164def get_current_rule_version(m: "MessageParserProtocol", command_suffix: list[str]) -> str:
165    """
166    ルール識別子探索
167
168    Args:
169        m (MessageParserProtocol): メッセージデータ
170        command_suffix (list[str]): コマンドサフィックス
171
172    Returns:
173        str: ルール識別子
174
175    """
176    for suffix in command_suffix:
177        if rule_version := g.cfg.rule.keyword_mapping.get(m.keyword.removesuffix(suffix)):
178            return rule_version
179
180    return g.cfg.setting.default_rule
181
182
183def regulation_list(word_type: int = 0, rule_version: str | None = None) -> list[str]:
184    """
185    登録済みワードリストを取得する
186
187    Args:
188        word_type (int, optional): 取得するタイプ. Defaults to 0.
189        rule_version (str, optional): ルール識別子
190
191    Returns:
192        list[str]: 取得結果
193
194    """
195    ret: list[str] = []
196
197    if not rule_version and not (rule_version := g.params.default_rule):
198        return []
199
200    with closing(dbutil.connection(g.cfg.setting.database_file)) as cur:
201        rows = cur.execute(
202            """
203            select
204                word,
205                ex_point
206            from
207                words
208            where
209                type=?
210                and rule_version=?
211            """,
212            (word_type, rule_version),
213        ).fetchall()
214
215    for word, ex_point in rows:
216        if ex_point:
217            point = f"{ex_point:.1f}".replace("-", "▲")
218            ret.append(f"{word}\t{point}pt")
219        else:
220            ret.append(f"{word}")
221
222    return ret
223
224
225def resolve_commands(rule_version: str, command_type: CommandType) -> list[str]:
226    """
227    ルール識別子で割り当てられているコマンドワードを返す
228
229    Args:
230        rule_version (str): ルール識別子
231        command_type (CommandType): コマンド種別
232
233    Returns:
234        list[str]: コマンドワード
235
236    """
237    keywords: list[str] = [word for word, rule in g.cfg.rule.keyword_mapping.items() if rule == rule_version]
238    commandwords: list[str] = []
239
240    if hasattr(g.cfg, command_type):
241        sub_com = cast("SubCommands", getattr(g.cfg, command_type))
242        commandwords.append(sub_com.default_commandword)
243        commandwords.extend(sub_com.commandword)
244        commandwords.extend([f"{command}{suffix}" for suffix in sub_com.command_suffix for command in keywords])
245
246    return [x for x in g.keyword_dispatcher if x in commandwords]
247
248
249def exsist_record(ts: str) -> GameResult:
250    """
251    記録されているゲーム結果を返す
252
253    Args:
254        ts (str): 検索するタイムスタンプ
255
256    Returns:
257        GameResult: スコアデータ
258
259    """
260    result = GameResult()
261
262    with closing(dbutil.connection(g.cfg.setting.database_file)) as conn:
263        row = conn.execute(
264            """
265            select
266                ts,
267                p1_name, p1_str,
268                p2_name, p2_str,
269                p3_name, p3_str,
270                p4_name, p4_str,
271                comment,
272                rule_version
273            from
274                result where ts = :ts
275            ;
276            """,
277            {"ts": ts},
278        ).fetchone()
279
280    if row:
281        result.calc(**dict(row))
282
283    return result
284
285
286def read_memberslist() -> None:
287    """メンバー情報/チーム情報の再読み込み"""
288    # todo: 最後にアクセスしたDBのメンバーリストが返る
289    g.params.update_from_dict(
290        {
291            "database_file": g.cfg.setting.database_file,
292            "logging_verbose": g.args.verbose,
293        }
294    )
295
296    g.cfg.member.guest_name = get_guest()
297    g.cfg.member.info = g.cfg.member.get_info
298    g.cfg.team.info = g.cfg.team.get_info
299
300    logging.debug("guest_name: %s", g.cfg.member.guest_name)
301    logging.debug("member_list: %s", g.cfg.member.lists)
302    logging.debug("team_list: %s", g.cfg.team.lists)
303
304
305def enumeration_all_members() -> list[str]:
306    """
307    メンバーとチームをすべて列挙する
308
309    Returns:
310        list[str]: メンバー名(別名含む)/チーム名のリスト
311
312    """
313    ret_list: list[str] = []
314
315    for member in g.cfg.member.info:
316        ret_list.append(member.get("name"))
317        ret_list.extend(member.get("alias"))
318    ret_list.extend([team.get("team") for team in g.cfg.team.info])
319
320    return list(set(ret_list))
def get_config_value( config_file: pathlib.Path, section: str, name: str, val_type: type, fallback: bool | int | float | str | list[Any] | None = None) -> Any:
24def get_config_value(
25    config_file: "Path",
26    section: str,
27    name: str,
28    val_type: type,
29    fallback: Union[bool, int, float, str, list[Any], None] = None,
30) -> Any:
31    """
32    設定値取得
33
34    Args:
35        config_file (Path): 設定ファイルパス
36        section (str): セクション名
37        name (str): 項目名
38        val_type (type): 取り込む値の型 (bool, int, float, str, list[Any])
39        fallback (Union[bool, int, float, str, list[Any]], optional): 項目が見つからない場合に返す値. Defaults to None
40
41    Returns:
42        Any: 取得した値
43            - 実際に返す型: Union[int, float, bool, str, list[Any], None]
44
45    Raises:
46        TypeError: val_type が bool, int, float, str, list 以外の場合
47
48    """
49    value: Union[int, float, bool, str, list[Any], None] = fallback
50    parser = ConfigParser()
51    parser.read(config_file, encoding="utf-8")
52
53    if parser.has_option(section, name):
54        match val_type:
55            case x if x is int:
56                value = parser.getint(section, name)
57            case x if x is float:
58                value = parser.getfloat(section, name)
59            case x if x is bool:
60                value = parser.getboolean(section, name)
61            case x if x is str:
62                value = parser.get(section, name)
63            case x if x is list:
64                value = [x.strip() for x in parser.get(section, name).split(",")]
65            case _:
66                raise TypeError(f"Unsupported val_type: {val_type}")
67
68    return value

設定値取得

Arguments:
  • config_file (Path): 設定ファイルパス
  • section (str): セクション名
  • name (str): 項目名
  • val_type (type): 取り込む値の型 (bool, int, float, str, list[Any])
  • fallback (Union[bool, int, float, str, list[Any]], optional): 項目が見つからない場合に返す値. Defaults to None
Returns:

Any: 取得した値 - 実際に返す型: Union[int, float, bool, str, list[Any], None]

Raises:
  • TypeError: val_type が bool, int, float, str, list 以外の場合
def resolve_separate_flag(m: integrations.protocols.MessageParserProtocol) -> bool:
 71def resolve_separate_flag(m: "MessageParserProtocol") -> bool:
 72    """
 73    優先度の高いセパレート設定フラグを取得する
 74
 75    Args:
 76        m (MessageParserProtocol): メッセージデータ
 77
 78    Returns:
 79        bool: セパレート設定フラグ
 80
 81    """
 82    separate_flg: Optional[bool] = None
 83
 84    # DM / HomeApp(slack) はセパレートしない
 85    if m.data.channel_type in {ChannelType.DIRECT_MESSAGE, ChannelType.HOME_APP}:
 86        return False
 87
 88    if g.cfg.main_parser.has_section(m.status.source):
 89        # チャンネル個別ファイル内設定
 90        if channel_config := g.cfg.main_parser[m.status.source].get("channel_config"):
 91            separate_flg = get_config_value(config_file=Path(channel_config), section="setting", name="separate", val_type=bool)
 92            if separate_flg is not None:
 93                return separate_flg
 94        # チャンネル設定
 95        else:
 96            separate_flg = get_config_value(config_file=g.cfg.config_file, section=m.status.source, name="separate", val_type=bool)
 97            if separate_flg is not None:
 98                return separate_flg
 99
100    # サービス別設定
101    separate_flg = get_config_value(config_file=g.cfg.config_file, section=g.adapter.interface_type, name="separate", val_type=bool)
102    if separate_flg is not None:
103        return separate_flg
104
105    # メイン設定
106    separate_flg = get_config_value(config_file=g.cfg.config_file, section="setting", name="separate", val_type=bool)
107    if separate_flg is not None:
108        return separate_flg
109
110    return False

優先度の高いセパレート設定フラグを取得する

Arguments:
  • m (MessageParserProtocol): メッセージデータ
Returns:

bool: セパレート設定フラグ

def member_info(params: dict[str, typing.Any]) -> dict[str, typing.Any]:
113def member_info(params: dict[str, Any]) -> dict[str, Any]:
114    """
115    指定メンバーの記録情報を返す
116
117    Args:
118        params (dict[str, Any]): 対象メンバー
119
120    Returns:
121        dict[str, Any]: 記録情報
122
123    """
124    ret = dbutil.execute(
125        """
126        select
127            count() as game_count,
128            min(ts) as first_game,
129            max(ts) as last_game,
130            max(rpoint) as rpoint_max,
131            min(rpoint) as rpoint_min
132        from
133            individual_results
134        where
135            mode = :mode
136            and rule_version in (<<rule_list>>)
137            and playtime between :starttime and :endtime
138            --[separate] and source = :source
139            --[individual] and name = :player_name
140            --[team] and team = :player_name
141        ;
142        """,
143        params,
144    )
145
146    return ret[0]

指定メンバーの記録情報を返す

Arguments:
  • params (dict[str, Any]): 対象メンバー
Returns:

dict[str, Any]: 記録情報

def get_guest() -> str:
149def get_guest() -> str:
150    """
151    ゲスト名取得
152
153    Returns:
154        str: ゲスト名
155
156    """
157    guest_name: str = ""
158    with closing(dbutil.connection(g.cfg.setting.database_file)) as conn:
159        rows = conn.execute("select name from member where id=0")
160        guest_name = str(rows.fetchone()[0])
161
162    return guest_name

ゲスト名取得

Returns:

str: ゲスト名

def get_current_rule_version( m: integrations.protocols.MessageParserProtocol, command_suffix: list[str]) -> str:
165def get_current_rule_version(m: "MessageParserProtocol", command_suffix: list[str]) -> str:
166    """
167    ルール識別子探索
168
169    Args:
170        m (MessageParserProtocol): メッセージデータ
171        command_suffix (list[str]): コマンドサフィックス
172
173    Returns:
174        str: ルール識別子
175
176    """
177    for suffix in command_suffix:
178        if rule_version := g.cfg.rule.keyword_mapping.get(m.keyword.removesuffix(suffix)):
179            return rule_version
180
181    return g.cfg.setting.default_rule

ルール識別子探索

Arguments:
  • m (MessageParserProtocol): メッセージデータ
  • command_suffix (list[str]): コマンドサフィックス
Returns:

str: ルール識別子

def regulation_list(word_type: int = 0, rule_version: str | None = None) -> list[str]:
184def regulation_list(word_type: int = 0, rule_version: str | None = None) -> list[str]:
185    """
186    登録済みワードリストを取得する
187
188    Args:
189        word_type (int, optional): 取得するタイプ. Defaults to 0.
190        rule_version (str, optional): ルール識別子
191
192    Returns:
193        list[str]: 取得結果
194
195    """
196    ret: list[str] = []
197
198    if not rule_version and not (rule_version := g.params.default_rule):
199        return []
200
201    with closing(dbutil.connection(g.cfg.setting.database_file)) as cur:
202        rows = cur.execute(
203            """
204            select
205                word,
206                ex_point
207            from
208                words
209            where
210                type=?
211                and rule_version=?
212            """,
213            (word_type, rule_version),
214        ).fetchall()
215
216    for word, ex_point in rows:
217        if ex_point:
218            point = f"{ex_point:.1f}".replace("-", "▲")
219            ret.append(f"{word}\t{point}pt")
220        else:
221            ret.append(f"{word}")
222
223    return ret

登録済みワードリストを取得する

Arguments:
  • word_type (int, optional): 取得するタイプ. Defaults to 0.
  • rule_version (str, optional): ルール識別子
Returns:

list[str]: 取得結果

def resolve_commands(rule_version: str, command_type: libs.types.CommandType) -> list[str]:
226def resolve_commands(rule_version: str, command_type: CommandType) -> list[str]:
227    """
228    ルール識別子で割り当てられているコマンドワードを返す
229
230    Args:
231        rule_version (str): ルール識別子
232        command_type (CommandType): コマンド種別
233
234    Returns:
235        list[str]: コマンドワード
236
237    """
238    keywords: list[str] = [word for word, rule in g.cfg.rule.keyword_mapping.items() if rule == rule_version]
239    commandwords: list[str] = []
240
241    if hasattr(g.cfg, command_type):
242        sub_com = cast("SubCommands", getattr(g.cfg, command_type))
243        commandwords.append(sub_com.default_commandword)
244        commandwords.extend(sub_com.commandword)
245        commandwords.extend([f"{command}{suffix}" for suffix in sub_com.command_suffix for command in keywords])
246
247    return [x for x in g.keyword_dispatcher if x in commandwords]

ルール識別子で割り当てられているコマンドワードを返す

Arguments:
  • rule_version (str): ルール識別子
  • command_type (CommandType): コマンド種別
Returns:

list[str]: コマンドワード

def exsist_record(ts: str) -> libs.domain.score.GameResult:
250def exsist_record(ts: str) -> GameResult:
251    """
252    記録されているゲーム結果を返す
253
254    Args:
255        ts (str): 検索するタイムスタンプ
256
257    Returns:
258        GameResult: スコアデータ
259
260    """
261    result = GameResult()
262
263    with closing(dbutil.connection(g.cfg.setting.database_file)) as conn:
264        row = conn.execute(
265            """
266            select
267                ts,
268                p1_name, p1_str,
269                p2_name, p2_str,
270                p3_name, p3_str,
271                p4_name, p4_str,
272                comment,
273                rule_version
274            from
275                result where ts = :ts
276            ;
277            """,
278            {"ts": ts},
279        ).fetchone()
280
281    if row:
282        result.calc(**dict(row))
283
284    return result

記録されているゲーム結果を返す

Arguments:
  • ts (str): 検索するタイムスタンプ
Returns:

GameResult: スコアデータ

def read_memberslist() -> None:
287def read_memberslist() -> None:
288    """メンバー情報/チーム情報の再読み込み"""
289    # todo: 最後にアクセスしたDBのメンバーリストが返る
290    g.params.update_from_dict(
291        {
292            "database_file": g.cfg.setting.database_file,
293            "logging_verbose": g.args.verbose,
294        }
295    )
296
297    g.cfg.member.guest_name = get_guest()
298    g.cfg.member.info = g.cfg.member.get_info
299    g.cfg.team.info = g.cfg.team.get_info
300
301    logging.debug("guest_name: %s", g.cfg.member.guest_name)
302    logging.debug("member_list: %s", g.cfg.member.lists)
303    logging.debug("team_list: %s", g.cfg.team.lists)

メンバー情報/チーム情報の再読み込み

def enumeration_all_members() -> list[str]:
306def enumeration_all_members() -> list[str]:
307    """
308    メンバーとチームをすべて列挙する
309
310    Returns:
311        list[str]: メンバー名(別名含む)/チーム名のリスト
312
313    """
314    ret_list: list[str] = []
315
316    for member in g.cfg.member.info:
317        ret_list.append(member.get("name"))
318        ret_list.extend(member.get("alias"))
319    ret_list.extend([team.get("team") for team in g.cfg.team.info])
320
321    return list(set(ret_list))

メンバーとチームをすべて列挙する

Returns:

list[str]: メンバー名(別名含む)/チーム名のリスト