libs.utils.dictutil

libs/utils/dictutil.py

  1"""
  2libs/utils/dictutil.py
  3"""
  4
  5import logging
  6from typing import TYPE_CHECKING, Any, cast
  7
  8import libs.global_value as g
  9from cls.command import CommandParser
 10from cls.timekit import ExtendedDatetime as ExtDt
 11from libs.data import lookup
 12from libs.utils import formatter
 13
 14if TYPE_CHECKING:
 15    from cls.config import SubCommand
 16    from integrations.protocols import MessageParserProtocol
 17
 18
 19def placeholder(subcom: "SubCommand", m: "MessageParserProtocol") -> dict:
 20    """プレースホルダに使用する辞書を生成
 21
 22    Args:
 23        subcom (SubCommand): パラメータ
 24        m (MessageParserProtocol): メッセージデータ
 25
 26    Returns:
 27        dict: プレースホルダ用辞書
 28    """
 29
 30    parser = CommandParser()
 31    ret_dict: dict = {}
 32
 33    # 初期化
 34    g.params.clear()
 35
 36    # 設定周りのパラメータの取り込み
 37    ret_dict.update(command=subcom.section)
 38    ret_dict.update(g.cfg.mahjong.to_dict())
 39    ret_dict.update(guest_name=g.cfg.member.guest_name)
 40    ret_dict.update(undefined_word=g.cfg.undefined_word)
 41
 42    # デフォルト値の取り込み
 43    ret_dict.update(subcom.to_dict())
 44
 45    # always_argumentの処理
 46    pre_param = parser.analysis_argument(subcom.always_argument)
 47    logging.debug("analysis_argument: %s", pre_param)
 48    ret_dict.update(pre_param.flags)
 49
 50    # 引数の処理
 51    param = parser.analysis_argument(m.argument)
 52    logging.debug("argument: %s", param)
 53    ret_dict.update(param.flags)  # 上書き
 54
 55    # ルールバージョン先行評価
 56    if (rule_version := ret_dict.get("rule_version")):
 57        g.params.update(rule_version=rule_version)
 58    if (mixed := ret_dict.get("mixed")):
 59        g.params.update(mixed=mixed)
 60
 61    # 検索範囲取得
 62    departure_time = ExtDt(hours=-g.cfg.setting.time_adjust)
 63    if param.search_range:
 64        search_range = param.search_range
 65    elif pre_param.search_range:
 66        search_range = pre_param.search_range
 67    else:
 68        search_range = departure_time.range(subcom.aggregation_range)
 69
 70    ret_dict.update(starttime=(departure_time.range(search_range) + {"hours": g.cfg.setting.time_adjust}).start)
 71    ret_dict.update(endtime=(departure_time.range(search_range) + {"hours": g.cfg.setting.time_adjust}).end)
 72    ret_dict.update(onday=departure_time.range(search_range).end)
 73
 74    # どのオプションにも該当しないキーワードはプレイヤー名 or チーム名
 75    player_name: str = str()
 76    target_player: list = []
 77
 78    check_list: list = param.unknown + pre_param.unknown
 79    if ret_dict.get("individual"):
 80        if ret_dict.get("all_player"):
 81            check_list.extend(lookup.internal.get_member())
 82        target_player = _collect_member(check_list)
 83    else:
 84        if ret_dict.get("all_player"):
 85            check_list.extend(lookup.internal.get_team())
 86        target_player = _collect_team(check_list)
 87
 88    if target_player:
 89        player_name = target_player[0]
 90
 91    # リスト生成
 92    player_list: dict = {}
 93    competition_list: dict = {}
 94
 95    for idx, name in enumerate(target_player):
 96        player_list[f"player_{idx}"] = name
 97        if name != player_name:
 98            competition_list[f"competition_{idx}"] = name
 99
100    ret_dict.update(player_name=player_name)
101    ret_dict.update(target_player=target_player)
102    ret_dict.update(player_list=player_list)
103    ret_dict.update(competition_list=competition_list)
104
105    # プレイヤーリスト/対戦相手リスト
106    if ret_dict["player_list"]:
107        for k, v in cast(dict, ret_dict["player_list"]).items():
108            ret_dict[k] = v
109    if ret_dict["competition_list"]:
110        for k, v in cast(dict, ret_dict["competition_list"]).items():
111            ret_dict[k] = v
112
113    # 規定打数設定
114    if ret_dict.get("mixed") and not ret_dict.get("stipulated"):  # 横断集計&規定数制限なし
115        if target_player:
116            ret_dict.update(stipulated=1)  # 個人成績
117        else:
118            ret_dict.update(stipulated=0)
119    elif not ret_dict.get("stipulated"):  # 通常集計&規定数制限なし
120        if subcom.section == "ranking":  # ランキングはレート計算
121            ret_dict.update(stipulated=0)
122        else:
123            ret_dict.update(stipulated=1)
124
125    return ret_dict
126
127
128def _collect_member(target_list: list) -> list:
129    ret_list: list = []
130    save_flg = g.params.get("individual")
131    g.params.update(individual=True)
132    for name in list(dict.fromkeys(target_list)):
133        if name in lookup.internal.get_team():
134            teammates = lookup.internal.get_teammates(name)
135            ret_list.extend(teammates)
136            continue
137        if g.params.get("unregistered_replace", True):
138            ret_list.append(name)
139        else:
140            ret_list.append(formatter.name_replace(name))
141
142    g.params.update(individual=save_flg)
143    return list(dict.fromkeys(ret_list))
144
145
146def _collect_team(target_list: list) -> list:
147    ret_list: list = []
148    for team in list(dict.fromkeys(target_list)):
149        if team in lookup.internal.get_member():
150            name = lookup.internal.which_team(team)
151            if name:
152                ret_list.append(name)
153        else:
154            ret_list.append(team)
155
156    return list(dict.fromkeys(ret_list))
157
158
159def merge_dicts(dict1: Any, dict2: Any) -> dict:
160    """辞書の内容をマージする
161
162    Args:
163        dict1 (Any): 1つ目の辞書
164        dict2 (Any): 2つ目の辞書
165
166    Returns:
167        dict: マージされた辞書
168    """
169
170    merged: dict = {}
171
172    for key in set(dict1) | set(dict2):
173        val1: Any = dict1.get(key)
174        val2: Any = dict2.get(key)
175
176        if isinstance(val1, (int, float)) and isinstance(val2, (int, float)):
177            merged[key] = val1 + val2
178        elif isinstance(val1, str) and isinstance(val2, str):
179            merged[key] = val1 + val2
180        elif isinstance(val1, list) and isinstance(val2, list):
181            merged[key] = sorted(list(set(val1 + val2)))
182        else:
183            merged[key] = val1 if val2 is None else val2
184
185    return merged
def placeholder( subcom: cls.config.SubCommand, m: integrations.protocols.MessageParserProtocol) -> dict:
 20def placeholder(subcom: "SubCommand", m: "MessageParserProtocol") -> dict:
 21    """プレースホルダに使用する辞書を生成
 22
 23    Args:
 24        subcom (SubCommand): パラメータ
 25        m (MessageParserProtocol): メッセージデータ
 26
 27    Returns:
 28        dict: プレースホルダ用辞書
 29    """
 30
 31    parser = CommandParser()
 32    ret_dict: dict = {}
 33
 34    # 初期化
 35    g.params.clear()
 36
 37    # 設定周りのパラメータの取り込み
 38    ret_dict.update(command=subcom.section)
 39    ret_dict.update(g.cfg.mahjong.to_dict())
 40    ret_dict.update(guest_name=g.cfg.member.guest_name)
 41    ret_dict.update(undefined_word=g.cfg.undefined_word)
 42
 43    # デフォルト値の取り込み
 44    ret_dict.update(subcom.to_dict())
 45
 46    # always_argumentの処理
 47    pre_param = parser.analysis_argument(subcom.always_argument)
 48    logging.debug("analysis_argument: %s", pre_param)
 49    ret_dict.update(pre_param.flags)
 50
 51    # 引数の処理
 52    param = parser.analysis_argument(m.argument)
 53    logging.debug("argument: %s", param)
 54    ret_dict.update(param.flags)  # 上書き
 55
 56    # ルールバージョン先行評価
 57    if (rule_version := ret_dict.get("rule_version")):
 58        g.params.update(rule_version=rule_version)
 59    if (mixed := ret_dict.get("mixed")):
 60        g.params.update(mixed=mixed)
 61
 62    # 検索範囲取得
 63    departure_time = ExtDt(hours=-g.cfg.setting.time_adjust)
 64    if param.search_range:
 65        search_range = param.search_range
 66    elif pre_param.search_range:
 67        search_range = pre_param.search_range
 68    else:
 69        search_range = departure_time.range(subcom.aggregation_range)
 70
 71    ret_dict.update(starttime=(departure_time.range(search_range) + {"hours": g.cfg.setting.time_adjust}).start)
 72    ret_dict.update(endtime=(departure_time.range(search_range) + {"hours": g.cfg.setting.time_adjust}).end)
 73    ret_dict.update(onday=departure_time.range(search_range).end)
 74
 75    # どのオプションにも該当しないキーワードはプレイヤー名 or チーム名
 76    player_name: str = str()
 77    target_player: list = []
 78
 79    check_list: list = param.unknown + pre_param.unknown
 80    if ret_dict.get("individual"):
 81        if ret_dict.get("all_player"):
 82            check_list.extend(lookup.internal.get_member())
 83        target_player = _collect_member(check_list)
 84    else:
 85        if ret_dict.get("all_player"):
 86            check_list.extend(lookup.internal.get_team())
 87        target_player = _collect_team(check_list)
 88
 89    if target_player:
 90        player_name = target_player[0]
 91
 92    # リスト生成
 93    player_list: dict = {}
 94    competition_list: dict = {}
 95
 96    for idx, name in enumerate(target_player):
 97        player_list[f"player_{idx}"] = name
 98        if name != player_name:
 99            competition_list[f"competition_{idx}"] = name
100
101    ret_dict.update(player_name=player_name)
102    ret_dict.update(target_player=target_player)
103    ret_dict.update(player_list=player_list)
104    ret_dict.update(competition_list=competition_list)
105
106    # プレイヤーリスト/対戦相手リスト
107    if ret_dict["player_list"]:
108        for k, v in cast(dict, ret_dict["player_list"]).items():
109            ret_dict[k] = v
110    if ret_dict["competition_list"]:
111        for k, v in cast(dict, ret_dict["competition_list"]).items():
112            ret_dict[k] = v
113
114    # 規定打数設定
115    if ret_dict.get("mixed") and not ret_dict.get("stipulated"):  # 横断集計&規定数制限なし
116        if target_player:
117            ret_dict.update(stipulated=1)  # 個人成績
118        else:
119            ret_dict.update(stipulated=0)
120    elif not ret_dict.get("stipulated"):  # 通常集計&規定数制限なし
121        if subcom.section == "ranking":  # ランキングはレート計算
122            ret_dict.update(stipulated=0)
123        else:
124            ret_dict.update(stipulated=1)
125
126    return ret_dict

プレースホルダに使用する辞書を生成

Arguments:
  • subcom (SubCommand): パラメータ
  • m (MessageParserProtocol): メッセージデータ
Returns:

dict: プレースホルダ用辞書

def merge_dicts(dict1: Any, dict2: Any) -> dict:
160def merge_dicts(dict1: Any, dict2: Any) -> dict:
161    """辞書の内容をマージする
162
163    Args:
164        dict1 (Any): 1つ目の辞書
165        dict2 (Any): 2つ目の辞書
166
167    Returns:
168        dict: マージされた辞書
169    """
170
171    merged: dict = {}
172
173    for key in set(dict1) | set(dict2):
174        val1: Any = dict1.get(key)
175        val2: Any = dict2.get(key)
176
177        if isinstance(val1, (int, float)) and isinstance(val2, (int, float)):
178            merged[key] = val1 + val2
179        elif isinstance(val1, str) and isinstance(val2, str):
180            merged[key] = val1 + val2
181        elif isinstance(val1, list) and isinstance(val2, list):
182            merged[key] = sorted(list(set(val1 + val2)))
183        else:
184            merged[key] = val1 if val2 is None else val2
185
186    return merged

辞書の内容をマージする

Arguments:
  • dict1 (Any): 1つ目の辞書
  • dict2 (Any): 2つ目の辞書
Returns:

dict: マージされた辞書