libs.utils.dictutil
libs/utils/dictutil.py
1""" 2libs/utils/dictutil.py 3""" 4 5import logging 6from typing import TYPE_CHECKING, Any, Protocol 7 8import libs.global_value as g 9from libs.domain.command import CommandParser 10from libs.domain.placeholder import PlaceholderBuilder 11from libs.functions import lookup, search 12from libs.types import ChannelType 13from libs.utils import formatter 14from libs.utils.timekit import ExtendedDatetime as ExtDt 15 16if TYPE_CHECKING: 17 from integrations.protocols import MessageParserProtocol 18 19 20class SubCommandLike(Protocol): 21 """placeholder生成に必要なサブコマンド設定の最小インターフェース""" 22 23 section: str 24 always_argument: list[str] 25 aggregation_range: str 26 27 def to_dict(self, drop_items: list[str] | None = None) -> dict[str, Any]: ... 28 29 30def placeholder(subcom: "SubCommandLike", m: "MessageParserProtocol") -> PlaceholderBuilder: 31 """ 32 プレースホルダに使用する辞書を生成 33 34 Args: 35 subcom (SubCommandLike): サブコマンド設定 36 m (MessageParserProtocol): メッセージデータ 37 38 Returns: 39 PlaceholderBuilder: プレースホルダ用データ 40 41 """ 42 # 初期化 43 parser: CommandParser = CommandParser() 44 params: PlaceholderBuilder = PlaceholderBuilder() 45 rule_version: str | None = None 46 params.update_from_dict( 47 { 48 "service_type": g.adapter.interface_type, 49 "command": subcom.section, 50 "guest_name": g.cfg.member.guest_name, 51 "logging_verbose": g.args.verbose, 52 **g.cfg.setting.to_dict(), 53 **subcom.to_dict(), # サブコマンドデフォルト値 54 } 55 ) 56 57 # チャンネル個別設定読み込み 58 if channel_config := g.cfg.read_channel_config(m.status.source, params.placeholder()): 59 logging.debug("read channel config: %s", channel_config.absolute()) 60 params.channel_config = channel_config 61 params.update_from_dict(subcom.to_dict()) # 更新 62 63 # メンバー情報更新 # ToDo: DB切り替え実装後 64 # g.cfg.member.guest_name = lookup.get_guest() 65 # g.cfg.member.info = g.cfg.member.get_info 66 # g.cfg.team.info = g.cfg.team.get_info 67 68 params.source = g.cfg.resolve_channel_id(m.status.source) 69 70 # セパレートフラグ更新 71 params.update_setting(main_config=g.cfg.config_file, key_name="separate", val_type=bool) 72 if m.data.channel_type in {ChannelType.DIRECT_MESSAGE, ChannelType.HOME_APP}: 73 params.separate = False # DM / HomeApp(slack) はセパレートしない 74 75 # ルール識別子探索 76 params.update_setting(main_config=g.cfg.config_file, key_name="default_rule", val_type=str) 77 if (command_suffix := subcom.to_dict().get("command_suffix")) and isinstance(command_suffix, list): 78 rule_version = lookup.get_current_rule_version(m, command_suffix) 79 rule_version = rule_version if rule_version else params.default_rule 80 params.update_from_dict( 81 { 82 **g.cfg.rule.to_dict(rule_version), 83 "target_mode": g.cfg.rule.get_mode(rule_version), 84 } 85 ) 86 87 # always_argumentの処理 88 pre_param = parser.analysis_argument(subcom.always_argument) 89 logging.debug("analysis_argument: %s", pre_param) 90 params.update_from_dict(pre_param.flags) 91 92 # 引数の処理 93 post_param = parser.analysis_argument(m.argument) 94 logging.debug("argument: %s", post_param) 95 params.update_from_dict(post_param.flags) # 上書き 96 97 # 検索範囲取得 98 departure_time = ExtDt(hours=-g.cfg.setting.time_adjust) 99 if post_param.search_range: 100 search_range = post_param.search_range 101 elif pre_param.search_range: 102 search_range = pre_param.search_range 103 else: 104 search_range = departure_time.range(subcom.aggregation_range) 105 106 params.starttime = (departure_time.range(search_range) + {"hours": g.cfg.setting.time_adjust}).start 107 params.endtime = (departure_time.range(search_range) + {"hours": g.cfg.setting.time_adjust}).end 108 params.onday = departure_time.range(search_range).end 109 110 # どのオプションにも該当しないキーワード 111 check_list: list[str] = post_param.unknown + pre_param.unknown 112 113 # 追加ルール識別子 114 rule_list: list[str] = [] 115 for name in list(check_list): 116 if name in g.cfg.rule.keyword_mapping: # マッピング済みルール識別子 117 rule_list.append(g.cfg.rule.keyword_mapping.get(name, rule_version)) 118 check_list.remove(name) 119 if name in g.cfg.rule.keyword_mapping.values(): # マッピング済みルール識別子 120 rule_list.append(name) 121 check_list.remove(name) 122 if name in g.cfg.rule.rule_list: # マッピングされていないルール識別子 123 rule_list.append(name) 124 if name in check_list: 125 check_list.remove(name) 126 if params.mixed: 127 for rule in g.cfg.rule.rule_list: # 全ルール追加 128 if g.cfg.rule.get_mode(rule) == params.target_mode: 129 rule_list.append(rule) 130 if rule_list: 131 params.rule_list.extend(list(set(rule_list))) 132 else: 133 params.rule_list.append(rule_version) 134 135 # プレイヤー名 136 target_player: list[str] = [] 137 if params.individual: 138 if params.all_player: 139 check_list.extend(g.cfg.member.lists) 140 for name in check_list: 141 if name in g.cfg.team.lists: # チーム名がある場合は所属メンバーに展開 142 target_player.extend(g.cfg.team.member(name)) 143 else: 144 target_player.append(formatter.name_replace(name, not_replace=True)) 145 else: # チーム名 146 if params.all_player: 147 check_list.extend(g.cfg.team.lists) 148 for team in check_list: 149 if team in g.cfg.member.lists: 150 if team_name := g.cfg.team.which(team): # プレイヤー名がある場合は所属チームを追加 151 target_player.append(team_name) 152 else: 153 target_player.append(team) 154 155 target_player = sorted(set(target_player), key=target_player.index) # 順序を維持したまま重複排除 156 157 if target_player: 158 params.player_name = target_player[0] 159 params.target_player = target_player 160 params.player_list = target_player 161 params.competition_list = target_player[1:] 162 163 # 出力タイプ 164 if not params.format: 165 params.format = "default" 166 167 # 規定打数設定 168 if params.mixed and not params.stipulated: # 横断集計&規定数制限なし 169 if target_player: 170 params.stipulated = 1 # 個人成績 171 else: 172 params.stipulated = 0 173 elif not params.stipulated: # 通常集計&規定数制限なし 174 if subcom.section == "ranking": # ランキングはレート計算 175 params.stipulated = 0 176 else: 177 params.stipulated = 1 178 179 if departure_time.range(search_range).start == ExtDt("1900-01-01 00:00:00.000000"): 180 params.starttime = search.first_record( 181 g.cfg.rule.get_version( 182 mode=params.mode, 183 mapping=not (params.mixed), 184 ) 185 ) 186 187 return params 188 189 190def merge_dicts(dict1: dict[Any, Any], dict2: dict[Any, Any]) -> dict[Any, Any]: 191 """ 192 辞書の内容をマージする 193 194 Args: 195 dict1 (dict[Any, Any]): 1つ目の辞書 196 dict2 (dict[Any, Any]): 2つ目の辞書 197 198 Returns: 199 dict: マージされた辞書 200 201 """ 202 merged: dict[Any, Any] = {} 203 204 for key in set(dict1) | set(dict2): 205 val1: Any = dict1.get(key) 206 val2: Any = dict2.get(key) 207 208 if isinstance(val1, (int, float)) and isinstance(val2, (int, float)): 209 merged[key] = val1 + val2 210 elif isinstance(val1, str) and isinstance(val2, str): 211 merged[key] = val1 + val2 212 elif isinstance(val1, list) and isinstance(val2, list): 213 merged[key] = sorted(list(set(val1 + val2))) 214 else: 215 merged[key] = val1 if val2 is None else val2 216 217 return merged
class
SubCommandLike(typing.Protocol):
21class SubCommandLike(Protocol): 22 """placeholder生成に必要なサブコマンド設定の最小インターフェース""" 23 24 section: str 25 always_argument: list[str] 26 aggregation_range: str 27 28 def to_dict(self, drop_items: list[str] | None = None) -> dict[str, Any]: ...
placeholder生成に必要なサブコマンド設定の最小インターフェース
SubCommandLike(*args, **kwargs)
1866def _no_init_or_replace_init(self, *args, **kwargs): 1867 cls = type(self) 1868 1869 if cls._is_protocol: 1870 raise TypeError('Protocols cannot be instantiated') 1871 1872 # Already using a custom `__init__`. No need to calculate correct 1873 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1874 if cls.__init__ is not _no_init_or_replace_init: 1875 return 1876 1877 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1878 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1879 # searches for a proper new `__init__` in the MRO. The new `__init__` 1880 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1881 # instantiation of the protocol subclass will thus use the new 1882 # `__init__` and no longer call `_no_init_or_replace_init`. 1883 for base in cls.__mro__: 1884 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1885 if init is not _no_init_or_replace_init: 1886 cls.__init__ = init 1887 break 1888 else: 1889 # should not happen 1890 cls.__init__ = object.__init__ 1891 1892 cls.__init__(self, *args, **kwargs)
def
to_dict(self, drop_items: list[str] | None = None) -> dict[str, typing.Any]:
28 def to_dict(self, drop_items: list[str] | None = None) -> dict[str, Any]: ...
def
placeholder( subcom: SubCommandLike, m: integrations.protocols.MessageParserProtocol) -> libs.domain.placeholder.PlaceholderBuilder:
31def placeholder(subcom: "SubCommandLike", m: "MessageParserProtocol") -> PlaceholderBuilder: 32 """ 33 プレースホルダに使用する辞書を生成 34 35 Args: 36 subcom (SubCommandLike): サブコマンド設定 37 m (MessageParserProtocol): メッセージデータ 38 39 Returns: 40 PlaceholderBuilder: プレースホルダ用データ 41 42 """ 43 # 初期化 44 parser: CommandParser = CommandParser() 45 params: PlaceholderBuilder = PlaceholderBuilder() 46 rule_version: str | None = None 47 params.update_from_dict( 48 { 49 "service_type": g.adapter.interface_type, 50 "command": subcom.section, 51 "guest_name": g.cfg.member.guest_name, 52 "logging_verbose": g.args.verbose, 53 **g.cfg.setting.to_dict(), 54 **subcom.to_dict(), # サブコマンドデフォルト値 55 } 56 ) 57 58 # チャンネル個別設定読み込み 59 if channel_config := g.cfg.read_channel_config(m.status.source, params.placeholder()): 60 logging.debug("read channel config: %s", channel_config.absolute()) 61 params.channel_config = channel_config 62 params.update_from_dict(subcom.to_dict()) # 更新 63 64 # メンバー情報更新 # ToDo: DB切り替え実装後 65 # g.cfg.member.guest_name = lookup.get_guest() 66 # g.cfg.member.info = g.cfg.member.get_info 67 # g.cfg.team.info = g.cfg.team.get_info 68 69 params.source = g.cfg.resolve_channel_id(m.status.source) 70 71 # セパレートフラグ更新 72 params.update_setting(main_config=g.cfg.config_file, key_name="separate", val_type=bool) 73 if m.data.channel_type in {ChannelType.DIRECT_MESSAGE, ChannelType.HOME_APP}: 74 params.separate = False # DM / HomeApp(slack) はセパレートしない 75 76 # ルール識別子探索 77 params.update_setting(main_config=g.cfg.config_file, key_name="default_rule", val_type=str) 78 if (command_suffix := subcom.to_dict().get("command_suffix")) and isinstance(command_suffix, list): 79 rule_version = lookup.get_current_rule_version(m, command_suffix) 80 rule_version = rule_version if rule_version else params.default_rule 81 params.update_from_dict( 82 { 83 **g.cfg.rule.to_dict(rule_version), 84 "target_mode": g.cfg.rule.get_mode(rule_version), 85 } 86 ) 87 88 # always_argumentの処理 89 pre_param = parser.analysis_argument(subcom.always_argument) 90 logging.debug("analysis_argument: %s", pre_param) 91 params.update_from_dict(pre_param.flags) 92 93 # 引数の処理 94 post_param = parser.analysis_argument(m.argument) 95 logging.debug("argument: %s", post_param) 96 params.update_from_dict(post_param.flags) # 上書き 97 98 # 検索範囲取得 99 departure_time = ExtDt(hours=-g.cfg.setting.time_adjust) 100 if post_param.search_range: 101 search_range = post_param.search_range 102 elif pre_param.search_range: 103 search_range = pre_param.search_range 104 else: 105 search_range = departure_time.range(subcom.aggregation_range) 106 107 params.starttime = (departure_time.range(search_range) + {"hours": g.cfg.setting.time_adjust}).start 108 params.endtime = (departure_time.range(search_range) + {"hours": g.cfg.setting.time_adjust}).end 109 params.onday = departure_time.range(search_range).end 110 111 # どのオプションにも該当しないキーワード 112 check_list: list[str] = post_param.unknown + pre_param.unknown 113 114 # 追加ルール識別子 115 rule_list: list[str] = [] 116 for name in list(check_list): 117 if name in g.cfg.rule.keyword_mapping: # マッピング済みルール識別子 118 rule_list.append(g.cfg.rule.keyword_mapping.get(name, rule_version)) 119 check_list.remove(name) 120 if name in g.cfg.rule.keyword_mapping.values(): # マッピング済みルール識別子 121 rule_list.append(name) 122 check_list.remove(name) 123 if name in g.cfg.rule.rule_list: # マッピングされていないルール識別子 124 rule_list.append(name) 125 if name in check_list: 126 check_list.remove(name) 127 if params.mixed: 128 for rule in g.cfg.rule.rule_list: # 全ルール追加 129 if g.cfg.rule.get_mode(rule) == params.target_mode: 130 rule_list.append(rule) 131 if rule_list: 132 params.rule_list.extend(list(set(rule_list))) 133 else: 134 params.rule_list.append(rule_version) 135 136 # プレイヤー名 137 target_player: list[str] = [] 138 if params.individual: 139 if params.all_player: 140 check_list.extend(g.cfg.member.lists) 141 for name in check_list: 142 if name in g.cfg.team.lists: # チーム名がある場合は所属メンバーに展開 143 target_player.extend(g.cfg.team.member(name)) 144 else: 145 target_player.append(formatter.name_replace(name, not_replace=True)) 146 else: # チーム名 147 if params.all_player: 148 check_list.extend(g.cfg.team.lists) 149 for team in check_list: 150 if team in g.cfg.member.lists: 151 if team_name := g.cfg.team.which(team): # プレイヤー名がある場合は所属チームを追加 152 target_player.append(team_name) 153 else: 154 target_player.append(team) 155 156 target_player = sorted(set(target_player), key=target_player.index) # 順序を維持したまま重複排除 157 158 if target_player: 159 params.player_name = target_player[0] 160 params.target_player = target_player 161 params.player_list = target_player 162 params.competition_list = target_player[1:] 163 164 # 出力タイプ 165 if not params.format: 166 params.format = "default" 167 168 # 規定打数設定 169 if params.mixed and not params.stipulated: # 横断集計&規定数制限なし 170 if target_player: 171 params.stipulated = 1 # 個人成績 172 else: 173 params.stipulated = 0 174 elif not params.stipulated: # 通常集計&規定数制限なし 175 if subcom.section == "ranking": # ランキングはレート計算 176 params.stipulated = 0 177 else: 178 params.stipulated = 1 179 180 if departure_time.range(search_range).start == ExtDt("1900-01-01 00:00:00.000000"): 181 params.starttime = search.first_record( 182 g.cfg.rule.get_version( 183 mode=params.mode, 184 mapping=not (params.mixed), 185 ) 186 ) 187 188 return params
プレースホルダに使用する辞書を生成
Arguments:
- subcom (SubCommandLike): サブコマンド設定
- m (MessageParserProtocol): メッセージデータ
Returns:
PlaceholderBuilder: プレースホルダ用データ
def
merge_dicts( dict1: dict[typing.Any, typing.Any], dict2: dict[typing.Any, typing.Any]) -> dict[typing.Any, typing.Any]:
191def merge_dicts(dict1: dict[Any, Any], dict2: dict[Any, Any]) -> dict[Any, Any]: 192 """ 193 辞書の内容をマージする 194 195 Args: 196 dict1 (dict[Any, Any]): 1つ目の辞書 197 dict2 (dict[Any, Any]): 2つ目の辞書 198 199 Returns: 200 dict: マージされた辞書 201 202 """ 203 merged: dict[Any, Any] = {} 204 205 for key in set(dict1) | set(dict2): 206 val1: Any = dict1.get(key) 207 val2: Any = dict2.get(key) 208 209 if isinstance(val1, (int, float)) and isinstance(val2, (int, float)): 210 merged[key] = val1 + val2 211 elif isinstance(val1, str) and isinstance(val2, str): 212 merged[key] = val1 + val2 213 elif isinstance(val1, list) and isinstance(val2, list): 214 merged[key] = sorted(list(set(val1 + val2))) 215 else: 216 merged[key] = val1 if val2 is None else val2 217 218 return merged
辞書の内容をマージする
Arguments:
- dict1 (dict[Any, Any]): 1つ目の辞書
- dict2 (dict[Any, Any]): 2つ目の辞書
Returns:
dict: マージされた辞書