libs.data.lookup.api
lib/data/lookup/api.py
1""" 2lib/data/lookup/api.py 3""" 4 5import logging 6 7from slack_sdk.errors import SlackApiError 8from slack_sdk.web import SlackResponse 9 10import libs.global_value as g 11 12 13def reactions_status(ch=None, ts=None): 14 """botが付けたリアクションの種類を返す 15 16 Args: 17 ch (str, optional): チャンネルID. Defaults to None. 18 ts (str, optional): メッセージのタイムスタンプ. Defaults to None. 19 20 Returns: 21 list: リアクション 22 """ 23 24 ch = ch if ch else g.msg.channel_id 25 ts = ts if ts else g.msg.event_ts 26 icon: list = [] 27 28 try: # 削除済みメッセージはエラーになるので潰す 29 res = g.app.client.reactions_get(channel=ch, timestamp=ts) 30 logging.trace(res.validate()) # type: ignore 31 except SlackApiError: 32 return icon 33 34 if "reactions" in res["message"]: 35 for reaction in res["message"]["reactions"]: 36 if g.bot_id in reaction["users"]: 37 icon.append(reaction["name"]) 38 39 logging.info("ch=%s, ts=%s, user=%s, icon=%s", ch, ts, g.bot_id, icon) 40 return icon 41 42 43def get_channel_id() -> str | None: 44 """チャンネルIDを取得する 45 46 Returns: 47 str: チャンネルID 48 """ 49 50 channel_id = None 51 52 try: 53 response = g.webclient.search_messages( 54 query=f"in:{g.cfg.search.channel}", 55 count=1, 56 ) 57 messages: dict = response.get("messages", {}) 58 if messages.get("matches"): 59 channel = messages["matches"][0]["channel"] 60 if isinstance(g.cfg.search.channel, str): 61 if channel["name"] in g.cfg.search.channel: 62 channel_id = channel["id"] 63 else: 64 channel_id = channel["id"] 65 except SlackApiError as e: 66 logging.error(e) 67 68 return channel_id 69 70 71def get_dm_channel_id(user_id: str) -> str | None: 72 """DMのチャンネルIDを取得する 73 74 Args: 75 user_id (str): DMの相手 76 77 Returns: 78 str: チャンネルID 79 """ 80 81 channel_id = None 82 83 try: 84 response = g.app.client.conversations_open(users=[user_id]) 85 channel_id = response["channel"]["id"] 86 except SlackApiError as e: 87 logging.error(e) 88 89 return channel_id 90 91 92def get_conversations(ch=None, ts=None): 93 """スレッド情報の取得 94 95 Args: 96 ch (str, optional): チャンネルID. Defaults to None. 97 ts (str, optional): メッセージのタイムスタンプ. Defaults to None. 98 99 Returns: 100 SlackResponse: API response 101 """ 102 103 ch = ch if ch else g.msg.channel_id 104 ts = ts if ts else g.msg.event_ts 105 res: SlackResponse 106 107 try: 108 res = g.app.client.conversations_replies(channel=ch, ts=ts) 109 logging.trace(res.validate()) # type: ignore 110 except SlackApiError as e: 111 logging.error(e) 112 113 return res
def
reactions_status(ch=None, ts=None):
14def reactions_status(ch=None, ts=None): 15 """botが付けたリアクションの種類を返す 16 17 Args: 18 ch (str, optional): チャンネルID. Defaults to None. 19 ts (str, optional): メッセージのタイムスタンプ. Defaults to None. 20 21 Returns: 22 list: リアクション 23 """ 24 25 ch = ch if ch else g.msg.channel_id 26 ts = ts if ts else g.msg.event_ts 27 icon: list = [] 28 29 try: # 削除済みメッセージはエラーになるので潰す 30 res = g.app.client.reactions_get(channel=ch, timestamp=ts) 31 logging.trace(res.validate()) # type: ignore 32 except SlackApiError: 33 return icon 34 35 if "reactions" in res["message"]: 36 for reaction in res["message"]["reactions"]: 37 if g.bot_id in reaction["users"]: 38 icon.append(reaction["name"]) 39 40 logging.info("ch=%s, ts=%s, user=%s, icon=%s", ch, ts, g.bot_id, icon) 41 return icon
botが付けたリアクションの種類を返す
Arguments:
- ch (str, optional): チャンネルID. Defaults to None.
- ts (str, optional): メッセージのタイムスタンプ. Defaults to None.
Returns:
list: リアクション
def
get_channel_id() -> str | None:
44def get_channel_id() -> str | None: 45 """チャンネルIDを取得する 46 47 Returns: 48 str: チャンネルID 49 """ 50 51 channel_id = None 52 53 try: 54 response = g.webclient.search_messages( 55 query=f"in:{g.cfg.search.channel}", 56 count=1, 57 ) 58 messages: dict = response.get("messages", {}) 59 if messages.get("matches"): 60 channel = messages["matches"][0]["channel"] 61 if isinstance(g.cfg.search.channel, str): 62 if channel["name"] in g.cfg.search.channel: 63 channel_id = channel["id"] 64 else: 65 channel_id = channel["id"] 66 except SlackApiError as e: 67 logging.error(e) 68 69 return channel_id
チャンネルIDを取得する
Returns:
str: チャンネルID
def
get_dm_channel_id(user_id: str) -> str | None:
72def get_dm_channel_id(user_id: str) -> str | None: 73 """DMのチャンネルIDを取得する 74 75 Args: 76 user_id (str): DMの相手 77 78 Returns: 79 str: チャンネルID 80 """ 81 82 channel_id = None 83 84 try: 85 response = g.app.client.conversations_open(users=[user_id]) 86 channel_id = response["channel"]["id"] 87 except SlackApiError as e: 88 logging.error(e) 89 90 return channel_id
DMのチャンネルIDを取得する
Arguments:
- user_id (str): DMの相手
Returns:
str: チャンネルID
def
get_conversations(ch=None, ts=None):
93def get_conversations(ch=None, ts=None): 94 """スレッド情報の取得 95 96 Args: 97 ch (str, optional): チャンネルID. Defaults to None. 98 ts (str, optional): メッセージのタイムスタンプ. Defaults to None. 99 100 Returns: 101 SlackResponse: API response 102 """ 103 104 ch = ch if ch else g.msg.channel_id 105 ts = ts if ts else g.msg.event_ts 106 res: SlackResponse 107 108 try: 109 res = g.app.client.conversations_replies(channel=ch, ts=ts) 110 logging.trace(res.validate()) # type: ignore 111 except SlackApiError as e: 112 logging.error(e) 113 114 return res
スレッド情報の取得
Arguments:
- ch (str, optional): チャンネルID. Defaults to None.
- ts (str, optional): メッセージのタイムスタンプ. Defaults to None.
Returns:
SlackResponse: API response