libs.functions.tools.comparison

libs/functions/tools/comparison.py

 1"""
 2libs/functions/tools/comparison.py
 3"""
 4
 5import os
 6from typing import TYPE_CHECKING, cast
 7
 8import libs.global_value as g
 9from integrations import factory
10from integrations.slack.events import comparison
11from libs.functions import lookup
12from libs.types import ServiceType
13
14if TYPE_CHECKING:
15    from integrations.discord.adapter import ServiceAdapter as discord_adapter
16    from integrations.protocols import MessageParserProtocol
17    from integrations.slack.adapter import ServiceAdapter as slack_adapter
18    from libs.domain.datamodels import ComparisonResults
19
20
21def main() -> None:
22    """データ突合処理"""
23    g.cfg.initialization()
24
25    # 結果の出力先(standard_io)
26    adapter_std = factory.select_adapter(ServiceType.STANDARD_IO, g.cfg)
27    m = adapter_std.parser()
28
29    # 連携サービス切替
30    match g.adapter.interface_type:
31        case ServiceType.SLACK:
32            g.adapter = factory.select_adapter(g.adapter.interface_type, g.cfg)
33            slack_comparison(m)
34        case ServiceType.DISCORD:
35            g.adapter = factory.select_adapter(g.adapter.interface_type, g.cfg)
36            discord_comparison(m)
37        case _:
38            return
39
40    if isinstance(m.status.message, str):
41        print(m.status.message)
42    else:
43        print(cast("ComparisonResults", m.status.message).output("summary"))
44
45
46def slack_comparison(m: "MessageParserProtocol") -> None:
47    """
48    突合処理(slack)
49
50    Args:
51        m (MessageParserProtocol): メッセージデータ
52
53    Raises:
54        ModuleNotFoundError: ライブラリ未インストール
55        RuntimeError: 接続エラー
56
57    """
58    try:
59        from slack_bolt import App
60        from slack_sdk import WebClient
61    except ModuleNotFoundError as err:
62        raise ModuleNotFoundError(err.msg) from None
63
64    g.adapter = cast("slack_adapter", g.adapter)
65
66    try:
67        app = App(token=os.environ["SLACK_BOT_TOKEN"])
68        g.adapter.api.webclient = WebClient(token=os.environ["SLACK_WEB_TOKEN"])
69        g.adapter.api.appclient = app.client
70    except Exception as err:
71        raise RuntimeError(err) from err
72
73    g.adapter.conf.bot_id = app.client.auth_test()["user_id"]
74    m.data.channel_id = g.adapter.functions.get_channel_id()
75
76    lookup.read_memberslist()
77    comparison.main(m)
78
79
80def discord_comparison(m: "MessageParserProtocol") -> None:
81    """
82    突合処理(discord)
83
84    Args:
85        m (MessageParserProtocol): メッセージデータ
86
87    """
88    g.adapter = cast("discord_adapter", g.adapter)
89
90    m.status.message = "未実装"
def main() -> None:
22def main() -> None:
23    """データ突合処理"""
24    g.cfg.initialization()
25
26    # 結果の出力先(standard_io)
27    adapter_std = factory.select_adapter(ServiceType.STANDARD_IO, g.cfg)
28    m = adapter_std.parser()
29
30    # 連携サービス切替
31    match g.adapter.interface_type:
32        case ServiceType.SLACK:
33            g.adapter = factory.select_adapter(g.adapter.interface_type, g.cfg)
34            slack_comparison(m)
35        case ServiceType.DISCORD:
36            g.adapter = factory.select_adapter(g.adapter.interface_type, g.cfg)
37            discord_comparison(m)
38        case _:
39            return
40
41    if isinstance(m.status.message, str):
42        print(m.status.message)
43    else:
44        print(cast("ComparisonResults", m.status.message).output("summary"))

データ突合処理

def slack_comparison(m: integrations.protocols.MessageParserProtocol) -> None:
47def slack_comparison(m: "MessageParserProtocol") -> None:
48    """
49    突合処理(slack)
50
51    Args:
52        m (MessageParserProtocol): メッセージデータ
53
54    Raises:
55        ModuleNotFoundError: ライブラリ未インストール
56        RuntimeError: 接続エラー
57
58    """
59    try:
60        from slack_bolt import App
61        from slack_sdk import WebClient
62    except ModuleNotFoundError as err:
63        raise ModuleNotFoundError(err.msg) from None
64
65    g.adapter = cast("slack_adapter", g.adapter)
66
67    try:
68        app = App(token=os.environ["SLACK_BOT_TOKEN"])
69        g.adapter.api.webclient = WebClient(token=os.environ["SLACK_WEB_TOKEN"])
70        g.adapter.api.appclient = app.client
71    except Exception as err:
72        raise RuntimeError(err) from err
73
74    g.adapter.conf.bot_id = app.client.auth_test()["user_id"]
75    m.data.channel_id = g.adapter.functions.get_channel_id()
76
77    lookup.read_memberslist()
78    comparison.main(m)

突合処理(slack)

Arguments:
  • m (MessageParserProtocol): メッセージデータ
Raises:
  • ModuleNotFoundError: ライブラリ未インストール
  • RuntimeError: 接続エラー
def discord_comparison(m: integrations.protocols.MessageParserProtocol) -> None:
81def discord_comparison(m: "MessageParserProtocol") -> None:
82    """
83    突合処理(discord)
84
85    Args:
86        m (MessageParserProtocol): メッセージデータ
87
88    """
89    g.adapter = cast("discord_adapter", g.adapter)
90
91    m.status.message = "未実装"

突合処理(discord)

Arguments:
  • m (MessageParserProtocol): メッセージデータ