libs.utils.dictutil
libs/utils/dictutil.py
1""" 2libs/utils/dictutil.py 3""" 4 5import logging 6from typing import TYPE_CHECKING, Any, cast 7 8import libs.global_value as g 9from cls.command import CommandParser 10from cls.timekit import ExtendedDatetime as ExtDt 11from libs.data import lookup 12from libs.utils import formatter 13 14if TYPE_CHECKING: 15 from cls.config import SubCommand 16 from integrations.protocols import MessageParserProtocol 17 18 19def placeholder(subcom: "SubCommand", m: "MessageParserProtocol") -> dict: 20 """プレースホルダに使用する辞書を生成 21 22 Args: 23 subcom (SubCommand): パラメータ 24 m (MessageParserProtocol): メッセージデータ 25 26 Returns: 27 dict: プレースホルダ用辞書 28 """ 29 30 parser = CommandParser() 31 ret_dict: dict = {} 32 33 # 初期化 34 g.params.clear() 35 36 # 設定周りのパラメータの取り込み 37 ret_dict.update(command=subcom.section) 38 ret_dict.update(g.cfg.mahjong.to_dict()) 39 ret_dict.update(guest_name=g.cfg.member.guest_name) 40 ret_dict.update(undefined_word=g.cfg.undefined_word) 41 42 # デフォルト値の取り込み 43 ret_dict.update(subcom.to_dict()) 44 45 # always_argumentの処理 46 pre_param = parser.analysis_argument(subcom.always_argument) 47 logging.debug("analysis_argument: %s", pre_param) 48 ret_dict.update(pre_param.flags) 49 50 # 引数の処理 51 param = parser.analysis_argument(m.argument) 52 logging.debug("argument: %s", param) 53 ret_dict.update(param.flags) # 上書き 54 55 # ルールバージョン先行評価 56 if (rule_version := ret_dict.get("rule_version")): 57 g.params.update(rule_version=rule_version) 58 if (mixed := ret_dict.get("mixed")): 59 g.params.update(mixed=mixed) 60 61 # 検索範囲取得 62 departure_time = ExtDt(hours=-g.cfg.setting.time_adjust) 63 if param.search_range: 64 search_range = param.search_range 65 elif pre_param.search_range: 66 search_range = pre_param.search_range 67 else: 68 search_range = departure_time.range(subcom.aggregation_range) 69 70 ret_dict.update(starttime=(departure_time.range(search_range) + {"hours": g.cfg.setting.time_adjust}).start) 71 ret_dict.update(endtime=(departure_time.range(search_range) + {"hours": g.cfg.setting.time_adjust}).end) 72 ret_dict.update(onday=departure_time.range(search_range).end) 73 74 # どのオプションにも該当しないキーワードはプレイヤー名 or チーム名 75 player_name: str = str() 76 target_player: list = [] 77 78 check_list: list = param.unknown + pre_param.unknown 79 if ret_dict.get("individual"): 80 if ret_dict.get("all_player"): 81 check_list.extend(lookup.internal.get_member()) 82 target_player = _collect_member(check_list) 83 else: 84 if ret_dict.get("all_player"): 85 check_list.extend(lookup.internal.get_team()) 86 target_player = _collect_team(check_list) 87 88 if target_player: 89 player_name = target_player[0] 90 91 # リスト生成 92 player_list: dict = {} 93 competition_list: dict = {} 94 95 for idx, name in enumerate(target_player): 96 player_list[f"player_{idx}"] = name 97 if name != player_name: 98 competition_list[f"competition_{idx}"] = name 99 100 ret_dict.update(player_name=player_name) 101 ret_dict.update(target_player=target_player) 102 ret_dict.update(player_list=player_list) 103 ret_dict.update(competition_list=competition_list) 104 105 # プレイヤーリスト/対戦相手リスト 106 if ret_dict["player_list"]: 107 for k, v in cast(dict, ret_dict["player_list"]).items(): 108 ret_dict[k] = v 109 if ret_dict["competition_list"]: 110 for k, v in cast(dict, ret_dict["competition_list"]).items(): 111 ret_dict[k] = v 112 113 # 規定打数設定 114 if ret_dict.get("mixed") and not ret_dict.get("stipulated"): # 横断集計&規定数制限なし 115 if target_player: 116 ret_dict.update(stipulated=1) # 個人成績 117 else: 118 ret_dict.update(stipulated=0) 119 elif not ret_dict.get("stipulated"): # 通常集計&規定数制限なし 120 if subcom.section == "ranking": # ランキングはレート計算 121 ret_dict.update(stipulated=0) 122 else: 123 ret_dict.update(stipulated=1) 124 125 return ret_dict 126 127 128def _collect_member(target_list: list) -> list: 129 ret_list: list = [] 130 save_flg = g.params.get("individual") 131 g.params.update(individual=True) 132 for name in list(dict.fromkeys(target_list)): 133 if name in lookup.internal.get_team(): 134 teammates = lookup.internal.get_teammates(name) 135 ret_list.extend(teammates) 136 continue 137 if g.params.get("unregistered_replace", True): 138 ret_list.append(name) 139 else: 140 ret_list.append(formatter.name_replace(name)) 141 142 g.params.update(individual=save_flg) 143 return list(dict.fromkeys(ret_list)) 144 145 146def _collect_team(target_list: list) -> list: 147 ret_list: list = [] 148 for team in list(dict.fromkeys(target_list)): 149 if team in lookup.internal.get_member(): 150 name = lookup.internal.which_team(team) 151 if name: 152 ret_list.append(name) 153 else: 154 ret_list.append(team) 155 156 return list(dict.fromkeys(ret_list)) 157 158 159def merge_dicts(dict1: Any, dict2: Any) -> dict: 160 """辞書の内容をマージする 161 162 Args: 163 dict1 (Any): 1つ目の辞書 164 dict2 (Any): 2つ目の辞書 165 166 Returns: 167 dict: マージされた辞書 168 """ 169 170 merged: dict = {} 171 172 for key in set(dict1) | set(dict2): 173 val1: Any = dict1.get(key) 174 val2: Any = dict2.get(key) 175 176 if isinstance(val1, (int, float)) and isinstance(val2, (int, float)): 177 merged[key] = val1 + val2 178 elif isinstance(val1, str) and isinstance(val2, str): 179 merged[key] = val1 + val2 180 elif isinstance(val1, list) and isinstance(val2, list): 181 merged[key] = sorted(list(set(val1 + val2))) 182 else: 183 merged[key] = val1 if val2 is None else val2 184 185 return merged
def
placeholder( subcom: cls.config.SubCommand, m: integrations.protocols.MessageParserProtocol) -> dict:
20def placeholder(subcom: "SubCommand", m: "MessageParserProtocol") -> dict: 21 """プレースホルダに使用する辞書を生成 22 23 Args: 24 subcom (SubCommand): パラメータ 25 m (MessageParserProtocol): メッセージデータ 26 27 Returns: 28 dict: プレースホルダ用辞書 29 """ 30 31 parser = CommandParser() 32 ret_dict: dict = {} 33 34 # 初期化 35 g.params.clear() 36 37 # 設定周りのパラメータの取り込み 38 ret_dict.update(command=subcom.section) 39 ret_dict.update(g.cfg.mahjong.to_dict()) 40 ret_dict.update(guest_name=g.cfg.member.guest_name) 41 ret_dict.update(undefined_word=g.cfg.undefined_word) 42 43 # デフォルト値の取り込み 44 ret_dict.update(subcom.to_dict()) 45 46 # always_argumentの処理 47 pre_param = parser.analysis_argument(subcom.always_argument) 48 logging.debug("analysis_argument: %s", pre_param) 49 ret_dict.update(pre_param.flags) 50 51 # 引数の処理 52 param = parser.analysis_argument(m.argument) 53 logging.debug("argument: %s", param) 54 ret_dict.update(param.flags) # 上書き 55 56 # ルールバージョン先行評価 57 if (rule_version := ret_dict.get("rule_version")): 58 g.params.update(rule_version=rule_version) 59 if (mixed := ret_dict.get("mixed")): 60 g.params.update(mixed=mixed) 61 62 # 検索範囲取得 63 departure_time = ExtDt(hours=-g.cfg.setting.time_adjust) 64 if param.search_range: 65 search_range = param.search_range 66 elif pre_param.search_range: 67 search_range = pre_param.search_range 68 else: 69 search_range = departure_time.range(subcom.aggregation_range) 70 71 ret_dict.update(starttime=(departure_time.range(search_range) + {"hours": g.cfg.setting.time_adjust}).start) 72 ret_dict.update(endtime=(departure_time.range(search_range) + {"hours": g.cfg.setting.time_adjust}).end) 73 ret_dict.update(onday=departure_time.range(search_range).end) 74 75 # どのオプションにも該当しないキーワードはプレイヤー名 or チーム名 76 player_name: str = str() 77 target_player: list = [] 78 79 check_list: list = param.unknown + pre_param.unknown 80 if ret_dict.get("individual"): 81 if ret_dict.get("all_player"): 82 check_list.extend(lookup.internal.get_member()) 83 target_player = _collect_member(check_list) 84 else: 85 if ret_dict.get("all_player"): 86 check_list.extend(lookup.internal.get_team()) 87 target_player = _collect_team(check_list) 88 89 if target_player: 90 player_name = target_player[0] 91 92 # リスト生成 93 player_list: dict = {} 94 competition_list: dict = {} 95 96 for idx, name in enumerate(target_player): 97 player_list[f"player_{idx}"] = name 98 if name != player_name: 99 competition_list[f"competition_{idx}"] = name 100 101 ret_dict.update(player_name=player_name) 102 ret_dict.update(target_player=target_player) 103 ret_dict.update(player_list=player_list) 104 ret_dict.update(competition_list=competition_list) 105 106 # プレイヤーリスト/対戦相手リスト 107 if ret_dict["player_list"]: 108 for k, v in cast(dict, ret_dict["player_list"]).items(): 109 ret_dict[k] = v 110 if ret_dict["competition_list"]: 111 for k, v in cast(dict, ret_dict["competition_list"]).items(): 112 ret_dict[k] = v 113 114 # 規定打数設定 115 if ret_dict.get("mixed") and not ret_dict.get("stipulated"): # 横断集計&規定数制限なし 116 if target_player: 117 ret_dict.update(stipulated=1) # 個人成績 118 else: 119 ret_dict.update(stipulated=0) 120 elif not ret_dict.get("stipulated"): # 通常集計&規定数制限なし 121 if subcom.section == "ranking": # ランキングはレート計算 122 ret_dict.update(stipulated=0) 123 else: 124 ret_dict.update(stipulated=1) 125 126 return ret_dict
プレースホルダに使用する辞書を生成
Arguments:
- subcom (SubCommand): パラメータ
- m (MessageParserProtocol): メッセージデータ
Returns:
dict: プレースホルダ用辞書
def
merge_dicts(dict1: Any, dict2: Any) -> dict:
160def merge_dicts(dict1: Any, dict2: Any) -> dict: 161 """辞書の内容をマージする 162 163 Args: 164 dict1 (Any): 1つ目の辞書 165 dict2 (Any): 2つ目の辞書 166 167 Returns: 168 dict: マージされた辞書 169 """ 170 171 merged: dict = {} 172 173 for key in set(dict1) | set(dict2): 174 val1: Any = dict1.get(key) 175 val2: Any = dict2.get(key) 176 177 if isinstance(val1, (int, float)) and isinstance(val2, (int, float)): 178 merged[key] = val1 + val2 179 elif isinstance(val1, str) and isinstance(val2, str): 180 merged[key] = val1 + val2 181 elif isinstance(val1, list) and isinstance(val2, list): 182 merged[key] = sorted(list(set(val1 + val2))) 183 else: 184 merged[key] = val1 if val2 is None else val2 185 186 return merged
辞書の内容をマージする
Arguments:
- dict1 (Any): 1つ目の辞書
- dict2 (Any): 2つ目の辞書
Returns:
dict: マージされた辞書