libs.utils.dictutil

libs/utils/dictutil.py

  1"""
  2libs/utils/dictutil.py
  3"""
  4
  5import logging
  6from typing import TYPE_CHECKING, Any, Protocol
  7
  8import libs.global_value as g
  9from libs.domain.command import CommandParser
 10from libs.domain.placeholder import PlaceholderBuilder
 11from libs.functions import lookup, search
 12from libs.types import ChannelType
 13from libs.utils import formatter
 14from libs.utils.timekit import ExtendedDatetime as ExtDt
 15
 16if TYPE_CHECKING:
 17    from integrations.protocols import MessageParserProtocol
 18
 19
 20class SubCommandLike(Protocol):
 21    """placeholder生成に必要なサブコマンド設定の最小インターフェース"""
 22
 23    section: str
 24    always_argument: list[str]
 25    aggregation_range: str
 26
 27    def to_dict(self, drop_items: list[str] | None = None) -> dict[str, Any]: ...
 28
 29
 30def placeholder(subcom: "SubCommandLike", m: "MessageParserProtocol") -> PlaceholderBuilder:
 31    """
 32    プレースホルダに使用する辞書を生成
 33
 34    Args:
 35        subcom (SubCommandLike): サブコマンド設定
 36        m (MessageParserProtocol): メッセージデータ
 37
 38    Returns:
 39        PlaceholderBuilder: プレースホルダ用データ
 40
 41    """
 42    # 初期化
 43    parser: CommandParser = CommandParser()
 44    params: PlaceholderBuilder = PlaceholderBuilder()
 45    rule_version: str | None = None
 46    params.update_from_dict(
 47        {
 48            "service_type": g.adapter.interface_type,
 49            "command": subcom.section,
 50            "guest_name": g.cfg.member.guest_name,
 51            "logging_verbose": g.args.verbose,
 52            **g.cfg.setting.to_dict(),
 53            **subcom.to_dict(),  #  サブコマンドデフォルト値
 54        }
 55    )
 56
 57    # チャンネル個別設定読み込み
 58    if channel_config := g.cfg.read_channel_config(m.status.source, params.placeholder()):
 59        logging.debug("read channel config: %s", channel_config.absolute())
 60        params.channel_config = channel_config
 61        params.update_from_dict(subcom.to_dict())  # 更新
 62
 63    # メンバー情報更新 # ToDo: DB切り替え実装後
 64    # g.cfg.member.guest_name = lookup.get_guest()
 65    # g.cfg.member.info = g.cfg.member.get_info
 66    # g.cfg.team.info = g.cfg.team.get_info
 67
 68    params.source = g.cfg.resolve_channel_id(m.status.source)
 69
 70    # セパレートフラグ更新
 71    params.update_setting(main_config=g.cfg.config_file, key_name="separate", val_type=bool)
 72    if m.data.channel_type in {ChannelType.DIRECT_MESSAGE, ChannelType.HOME_APP}:
 73        params.separate = False  # DM / HomeApp(slack) はセパレートしない
 74
 75    # ルール識別子探索
 76    params.update_setting(main_config=g.cfg.config_file, key_name="default_rule", val_type=str)
 77    if (command_suffix := subcom.to_dict().get("command_suffix")) and isinstance(command_suffix, list):
 78        rule_version = lookup.get_current_rule_version(m, command_suffix)
 79    rule_version = rule_version if rule_version else params.default_rule
 80    params.update_from_dict(
 81        {
 82            **g.cfg.rule.to_dict(rule_version),
 83            "target_mode": g.cfg.rule.get_mode(rule_version),
 84        }
 85    )
 86
 87    # always_argumentの処理
 88    pre_param = parser.analysis_argument(subcom.always_argument)
 89    logging.debug("analysis_argument: %s", pre_param)
 90    params.update_from_dict(pre_param.flags)
 91
 92    # 引数の処理
 93    post_param = parser.analysis_argument(m.argument)
 94    logging.debug("argument: %s", post_param)
 95    params.update_from_dict(post_param.flags)  # 上書き
 96
 97    # 検索範囲取得
 98    departure_time = ExtDt(hours=-g.cfg.setting.time_adjust)
 99    if post_param.search_range:
100        search_range = post_param.search_range
101    elif pre_param.search_range:
102        search_range = pre_param.search_range
103    else:
104        search_range = departure_time.range(subcom.aggregation_range)
105
106    params.starttime = (departure_time.range(search_range) + {"hours": g.cfg.setting.time_adjust}).start
107    params.endtime = (departure_time.range(search_range) + {"hours": g.cfg.setting.time_adjust}).end
108    params.onday = departure_time.range(search_range).end
109
110    # どのオプションにも該当しないキーワード
111    check_list: list[str] = post_param.unknown + pre_param.unknown
112
113    # 追加ルール識別子
114    rule_list: list[str] = []
115    for name in list(check_list):
116        if name in g.cfg.rule.keyword_mapping:  # マッピング済みルール識別子
117            rule_list.append(g.cfg.rule.keyword_mapping.get(name, rule_version))
118            check_list.remove(name)
119        if name in g.cfg.rule.keyword_mapping.values():  # マッピング済みルール識別子
120            rule_list.append(name)
121            check_list.remove(name)
122        if name in g.cfg.rule.rule_list:  # マッピングされていないルール識別子
123            rule_list.append(name)
124            if name in check_list:
125                check_list.remove(name)
126    if params.mixed:
127        for rule in g.cfg.rule.rule_list:  # 全ルール追加
128            if g.cfg.rule.get_mode(rule) == params.target_mode:
129                rule_list.append(rule)
130    if rule_list:
131        params.rule_list.extend(list(set(rule_list)))
132    else:
133        params.rule_list.append(rule_version)
134
135    # プレイヤー名
136    target_player: list[str] = []
137    if params.individual:
138        if params.all_player:
139            check_list.extend(g.cfg.member.lists)
140        for name in check_list:
141            if name in g.cfg.team.lists:  # チーム名がある場合は所属メンバーに展開
142                target_player.extend(g.cfg.team.member(name))
143            else:
144                target_player.append(formatter.name_replace(name, not_replace=True))
145    else:  # チーム名
146        if params.all_player:
147            check_list.extend(g.cfg.team.lists)
148        for team in check_list:
149            if team in g.cfg.member.lists:
150                if team_name := g.cfg.team.which(team):  # プレイヤー名がある場合は所属チームを追加
151                    target_player.append(team_name)
152            else:
153                target_player.append(team)
154
155    target_player = sorted(set(target_player), key=target_player.index)  # 順序を維持したまま重複排除
156
157    if target_player:
158        params.player_name = target_player[0]
159        params.target_player = target_player
160        params.player_list = target_player
161        params.competition_list = target_player[1:]
162
163    # 出力タイプ
164    if not params.format:
165        params.format = "default"
166
167    # 規定打数設定
168    if params.mixed and not params.stipulated:  # 横断集計&規定数制限なし
169        if target_player:
170            params.stipulated = 1  # 個人成績
171        else:
172            params.stipulated = 0
173    elif not params.stipulated:  # 通常集計&規定数制限なし
174        if subcom.section == "ranking":  # ランキングはレート計算
175            params.stipulated = 0
176        else:
177            params.stipulated = 1
178
179    if departure_time.range(search_range).start == ExtDt("1900-01-01 00:00:00.000000"):
180        params.starttime = search.first_record(
181            g.cfg.rule.get_version(
182                mode=params.mode,
183                mapping=not (params.mixed),
184            )
185        )
186
187    return params
188
189
190def merge_dicts(dict1: dict[Any, Any], dict2: dict[Any, Any]) -> dict[Any, Any]:
191    """
192    辞書の内容をマージする
193
194    Args:
195        dict1 (dict[Any, Any]): 1つ目の辞書
196        dict2 (dict[Any, Any]): 2つ目の辞書
197
198    Returns:
199        dict: マージされた辞書
200
201    """
202    merged: dict[Any, Any] = {}
203
204    for key in set(dict1) | set(dict2):
205        val1: Any = dict1.get(key)
206        val2: Any = dict2.get(key)
207
208        if isinstance(val1, (int, float)) and isinstance(val2, (int, float)):
209            merged[key] = val1 + val2
210        elif isinstance(val1, str) and isinstance(val2, str):
211            merged[key] = val1 + val2
212        elif isinstance(val1, list) and isinstance(val2, list):
213            merged[key] = sorted(list(set(val1 + val2)))
214        else:
215            merged[key] = val1 if val2 is None else val2
216
217    return merged
class SubCommandLike(typing.Protocol):
21class SubCommandLike(Protocol):
22    """placeholder生成に必要なサブコマンド設定の最小インターフェース"""
23
24    section: str
25    always_argument: list[str]
26    aggregation_range: str
27
28    def to_dict(self, drop_items: list[str] | None = None) -> dict[str, Any]: ...

placeholder生成に必要なサブコマンド設定の最小インターフェース

SubCommandLike(*args, **kwargs)
1866def _no_init_or_replace_init(self, *args, **kwargs):
1867    cls = type(self)
1868
1869    if cls._is_protocol:
1870        raise TypeError('Protocols cannot be instantiated')
1871
1872    # Already using a custom `__init__`. No need to calculate correct
1873    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1874    if cls.__init__ is not _no_init_or_replace_init:
1875        return
1876
1877    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1878    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1879    # searches for a proper new `__init__` in the MRO. The new `__init__`
1880    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1881    # instantiation of the protocol subclass will thus use the new
1882    # `__init__` and no longer call `_no_init_or_replace_init`.
1883    for base in cls.__mro__:
1884        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1885        if init is not _no_init_or_replace_init:
1886            cls.__init__ = init
1887            break
1888    else:
1889        # should not happen
1890        cls.__init__ = object.__init__
1891
1892    cls.__init__(self, *args, **kwargs)
section: str
always_argument: list[str]
aggregation_range: str
def to_dict(self, drop_items: list[str] | None = None) -> dict[str, typing.Any]:
28    def to_dict(self, drop_items: list[str] | None = None) -> dict[str, Any]: ...
 31def placeholder(subcom: "SubCommandLike", m: "MessageParserProtocol") -> PlaceholderBuilder:
 32    """
 33    プレースホルダに使用する辞書を生成
 34
 35    Args:
 36        subcom (SubCommandLike): サブコマンド設定
 37        m (MessageParserProtocol): メッセージデータ
 38
 39    Returns:
 40        PlaceholderBuilder: プレースホルダ用データ
 41
 42    """
 43    # 初期化
 44    parser: CommandParser = CommandParser()
 45    params: PlaceholderBuilder = PlaceholderBuilder()
 46    rule_version: str | None = None
 47    params.update_from_dict(
 48        {
 49            "service_type": g.adapter.interface_type,
 50            "command": subcom.section,
 51            "guest_name": g.cfg.member.guest_name,
 52            "logging_verbose": g.args.verbose,
 53            **g.cfg.setting.to_dict(),
 54            **subcom.to_dict(),  #  サブコマンドデフォルト値
 55        }
 56    )
 57
 58    # チャンネル個別設定読み込み
 59    if channel_config := g.cfg.read_channel_config(m.status.source, params.placeholder()):
 60        logging.debug("read channel config: %s", channel_config.absolute())
 61        params.channel_config = channel_config
 62        params.update_from_dict(subcom.to_dict())  # 更新
 63
 64    # メンバー情報更新 # ToDo: DB切り替え実装後
 65    # g.cfg.member.guest_name = lookup.get_guest()
 66    # g.cfg.member.info = g.cfg.member.get_info
 67    # g.cfg.team.info = g.cfg.team.get_info
 68
 69    params.source = g.cfg.resolve_channel_id(m.status.source)
 70
 71    # セパレートフラグ更新
 72    params.update_setting(main_config=g.cfg.config_file, key_name="separate", val_type=bool)
 73    if m.data.channel_type in {ChannelType.DIRECT_MESSAGE, ChannelType.HOME_APP}:
 74        params.separate = False  # DM / HomeApp(slack) はセパレートしない
 75
 76    # ルール識別子探索
 77    params.update_setting(main_config=g.cfg.config_file, key_name="default_rule", val_type=str)
 78    if (command_suffix := subcom.to_dict().get("command_suffix")) and isinstance(command_suffix, list):
 79        rule_version = lookup.get_current_rule_version(m, command_suffix)
 80    rule_version = rule_version if rule_version else params.default_rule
 81    params.update_from_dict(
 82        {
 83            **g.cfg.rule.to_dict(rule_version),
 84            "target_mode": g.cfg.rule.get_mode(rule_version),
 85        }
 86    )
 87
 88    # always_argumentの処理
 89    pre_param = parser.analysis_argument(subcom.always_argument)
 90    logging.debug("analysis_argument: %s", pre_param)
 91    params.update_from_dict(pre_param.flags)
 92
 93    # 引数の処理
 94    post_param = parser.analysis_argument(m.argument)
 95    logging.debug("argument: %s", post_param)
 96    params.update_from_dict(post_param.flags)  # 上書き
 97
 98    # 検索範囲取得
 99    departure_time = ExtDt(hours=-g.cfg.setting.time_adjust)
100    if post_param.search_range:
101        search_range = post_param.search_range
102    elif pre_param.search_range:
103        search_range = pre_param.search_range
104    else:
105        search_range = departure_time.range(subcom.aggregation_range)
106
107    params.starttime = (departure_time.range(search_range) + {"hours": g.cfg.setting.time_adjust}).start
108    params.endtime = (departure_time.range(search_range) + {"hours": g.cfg.setting.time_adjust}).end
109    params.onday = departure_time.range(search_range).end
110
111    # どのオプションにも該当しないキーワード
112    check_list: list[str] = post_param.unknown + pre_param.unknown
113
114    # 追加ルール識別子
115    rule_list: list[str] = []
116    for name in list(check_list):
117        if name in g.cfg.rule.keyword_mapping:  # マッピング済みルール識別子
118            rule_list.append(g.cfg.rule.keyword_mapping.get(name, rule_version))
119            check_list.remove(name)
120        if name in g.cfg.rule.keyword_mapping.values():  # マッピング済みルール識別子
121            rule_list.append(name)
122            check_list.remove(name)
123        if name in g.cfg.rule.rule_list:  # マッピングされていないルール識別子
124            rule_list.append(name)
125            if name in check_list:
126                check_list.remove(name)
127    if params.mixed:
128        for rule in g.cfg.rule.rule_list:  # 全ルール追加
129            if g.cfg.rule.get_mode(rule) == params.target_mode:
130                rule_list.append(rule)
131    if rule_list:
132        params.rule_list.extend(list(set(rule_list)))
133    else:
134        params.rule_list.append(rule_version)
135
136    # プレイヤー名
137    target_player: list[str] = []
138    if params.individual:
139        if params.all_player:
140            check_list.extend(g.cfg.member.lists)
141        for name in check_list:
142            if name in g.cfg.team.lists:  # チーム名がある場合は所属メンバーに展開
143                target_player.extend(g.cfg.team.member(name))
144            else:
145                target_player.append(formatter.name_replace(name, not_replace=True))
146    else:  # チーム名
147        if params.all_player:
148            check_list.extend(g.cfg.team.lists)
149        for team in check_list:
150            if team in g.cfg.member.lists:
151                if team_name := g.cfg.team.which(team):  # プレイヤー名がある場合は所属チームを追加
152                    target_player.append(team_name)
153            else:
154                target_player.append(team)
155
156    target_player = sorted(set(target_player), key=target_player.index)  # 順序を維持したまま重複排除
157
158    if target_player:
159        params.player_name = target_player[0]
160        params.target_player = target_player
161        params.player_list = target_player
162        params.competition_list = target_player[1:]
163
164    # 出力タイプ
165    if not params.format:
166        params.format = "default"
167
168    # 規定打数設定
169    if params.mixed and not params.stipulated:  # 横断集計&規定数制限なし
170        if target_player:
171            params.stipulated = 1  # 個人成績
172        else:
173            params.stipulated = 0
174    elif not params.stipulated:  # 通常集計&規定数制限なし
175        if subcom.section == "ranking":  # ランキングはレート計算
176            params.stipulated = 0
177        else:
178            params.stipulated = 1
179
180    if departure_time.range(search_range).start == ExtDt("1900-01-01 00:00:00.000000"):
181        params.starttime = search.first_record(
182            g.cfg.rule.get_version(
183                mode=params.mode,
184                mapping=not (params.mixed),
185            )
186        )
187
188    return params

プレースホルダに使用する辞書を生成

Arguments:
  • subcom (SubCommandLike): サブコマンド設定
  • m (MessageParserProtocol): メッセージデータ
Returns:

PlaceholderBuilder: プレースホルダ用データ

def merge_dicts( dict1: dict[typing.Any, typing.Any], dict2: dict[typing.Any, typing.Any]) -> dict[typing.Any, typing.Any]:
191def merge_dicts(dict1: dict[Any, Any], dict2: dict[Any, Any]) -> dict[Any, Any]:
192    """
193    辞書の内容をマージする
194
195    Args:
196        dict1 (dict[Any, Any]): 1つ目の辞書
197        dict2 (dict[Any, Any]): 2つ目の辞書
198
199    Returns:
200        dict: マージされた辞書
201
202    """
203    merged: dict[Any, Any] = {}
204
205    for key in set(dict1) | set(dict2):
206        val1: Any = dict1.get(key)
207        val2: Any = dict2.get(key)
208
209        if isinstance(val1, (int, float)) and isinstance(val2, (int, float)):
210            merged[key] = val1 + val2
211        elif isinstance(val1, str) and isinstance(val2, str):
212            merged[key] = val1 + val2
213        elif isinstance(val1, list) and isinstance(val2, list):
214            merged[key] = sorted(list(set(val1 + val2)))
215        else:
216            merged[key] = val1 if val2 is None else val2
217
218    return merged

辞書の内容をマージする

Arguments:
  • dict1 (dict[Any, Any]): 1つ目の辞書
  • dict2 (dict[Any, Any]): 2つ目の辞書
Returns:

dict: マージされた辞書