libs.functions.tools.comparison
libs/functions/tools/comparison.py
1""" 2libs/functions/tools/comparison.py 3""" 4 5import os 6from typing import TYPE_CHECKING, cast 7 8import libs.global_value as g 9from integrations import factory 10from integrations.slack.events import comparison 11from libs.functions import lookup 12from libs.types import ServiceType 13 14if TYPE_CHECKING: 15 from integrations.discord.adapter import ServiceAdapter as discord_adapter 16 from integrations.protocols import MessageParserProtocol 17 from integrations.slack.adapter import ServiceAdapter as slack_adapter 18 from libs.domain.datamodels import ComparisonResults 19 20 21def main() -> None: 22 """データ突合処理""" 23 g.cfg.initialization() 24 25 # 結果の出力先(standard_io) 26 adapter_std = factory.select_adapter(ServiceType.STANDARD_IO, g.cfg) 27 m = adapter_std.parser() 28 29 # 連携サービス切替 30 match g.adapter.interface_type: 31 case ServiceType.SLACK: 32 g.adapter = factory.select_adapter(g.adapter.interface_type, g.cfg) 33 slack_comparison(m) 34 case ServiceType.DISCORD: 35 g.adapter = factory.select_adapter(g.adapter.interface_type, g.cfg) 36 discord_comparison(m) 37 case _: 38 return 39 40 if isinstance(m.status.message, str): 41 print(m.status.message) 42 else: 43 print(cast("ComparisonResults", m.status.message).output("summary")) 44 45 46def slack_comparison(m: "MessageParserProtocol") -> None: 47 """ 48 突合処理(slack) 49 50 Args: 51 m (MessageParserProtocol): メッセージデータ 52 53 Raises: 54 ModuleNotFoundError: ライブラリ未インストール 55 RuntimeError: 接続エラー 56 57 """ 58 try: 59 from slack_bolt import App 60 from slack_sdk import WebClient 61 except ModuleNotFoundError as err: 62 raise ModuleNotFoundError(err.msg) from None 63 64 g.adapter = cast("slack_adapter", g.adapter) 65 66 try: 67 app = App(token=os.environ["SLACK_BOT_TOKEN"]) 68 g.adapter.api.webclient = WebClient(token=os.environ["SLACK_WEB_TOKEN"]) 69 g.adapter.api.appclient = app.client 70 except Exception as err: 71 raise RuntimeError(err) from err 72 73 g.adapter.conf.bot_id = app.client.auth_test()["user_id"] 74 m.data.channel_id = g.adapter.functions.get_channel_id() 75 76 lookup.read_memberslist() 77 comparison.main(m) 78 79 80def discord_comparison(m: "MessageParserProtocol") -> None: 81 """ 82 突合処理(discord) 83 84 Args: 85 m (MessageParserProtocol): メッセージデータ 86 87 """ 88 g.adapter = cast("discord_adapter", g.adapter) 89 90 m.status.message = "未実装"
def
main() -> None:
22def main() -> None: 23 """データ突合処理""" 24 g.cfg.initialization() 25 26 # 結果の出力先(standard_io) 27 adapter_std = factory.select_adapter(ServiceType.STANDARD_IO, g.cfg) 28 m = adapter_std.parser() 29 30 # 連携サービス切替 31 match g.adapter.interface_type: 32 case ServiceType.SLACK: 33 g.adapter = factory.select_adapter(g.adapter.interface_type, g.cfg) 34 slack_comparison(m) 35 case ServiceType.DISCORD: 36 g.adapter = factory.select_adapter(g.adapter.interface_type, g.cfg) 37 discord_comparison(m) 38 case _: 39 return 40 41 if isinstance(m.status.message, str): 42 print(m.status.message) 43 else: 44 print(cast("ComparisonResults", m.status.message).output("summary"))
データ突合処理
47def slack_comparison(m: "MessageParserProtocol") -> None: 48 """ 49 突合処理(slack) 50 51 Args: 52 m (MessageParserProtocol): メッセージデータ 53 54 Raises: 55 ModuleNotFoundError: ライブラリ未インストール 56 RuntimeError: 接続エラー 57 58 """ 59 try: 60 from slack_bolt import App 61 from slack_sdk import WebClient 62 except ModuleNotFoundError as err: 63 raise ModuleNotFoundError(err.msg) from None 64 65 g.adapter = cast("slack_adapter", g.adapter) 66 67 try: 68 app = App(token=os.environ["SLACK_BOT_TOKEN"]) 69 g.adapter.api.webclient = WebClient(token=os.environ["SLACK_WEB_TOKEN"]) 70 g.adapter.api.appclient = app.client 71 except Exception as err: 72 raise RuntimeError(err) from err 73 74 g.adapter.conf.bot_id = app.client.auth_test()["user_id"] 75 m.data.channel_id = g.adapter.functions.get_channel_id() 76 77 lookup.read_memberslist() 78 comparison.main(m)
突合処理(slack)
Arguments:
- m (MessageParserProtocol): メッセージデータ
Raises:
- ModuleNotFoundError: ライブラリ未インストール
- RuntimeError: 接続エラー
81def discord_comparison(m: "MessageParserProtocol") -> None: 82 """ 83 突合処理(discord) 84 85 Args: 86 m (MessageParserProtocol): メッセージデータ 87 88 """ 89 g.adapter = cast("discord_adapter", g.adapter) 90 91 m.status.message = "未実装"
突合処理(discord)
Arguments:
- m (MessageParserProtocol): メッセージデータ