integrations.standard_io.functions
integrations/standard_io/functions.py
1""" 2integrations/standard_io/functions.py 3""" 4 5from typing import TYPE_CHECKING, Any 6 7from integrations.base.interface import FunctionsInterface 8from libs.utils.timekit import ExtendedDatetime as ExtDt 9 10if TYPE_CHECKING: 11 from integrations.protocols import MessageParserProtocol 12 13 14class SvcFunctions(FunctionsInterface): 15 """標準入出力専用関数""" 16 17 def post_processing(self, m: "MessageParserProtocol") -> None: 18 """ 19 後処理 20 21 Args: 22 m (MessageParserProtocol): メッセージデータ 23 24 """ 25 print(ExtDt(float(m.data.event_ts)), m.status.message) 26 27 def get_conversations(self, m: "MessageParserProtocol") -> dict[str, Any]: 28 """Abstractmethod dummy""" 29 _ = m 30 return {}
15class SvcFunctions(FunctionsInterface): 16 """標準入出力専用関数""" 17 18 def post_processing(self, m: "MessageParserProtocol") -> None: 19 """ 20 後処理 21 22 Args: 23 m (MessageParserProtocol): メッセージデータ 24 25 """ 26 print(ExtDt(float(m.data.event_ts)), m.status.message) 27 28 def get_conversations(self, m: "MessageParserProtocol") -> dict[str, Any]: 29 """Abstractmethod dummy""" 30 _ = m 31 return {}
標準入出力専用関数
18 def post_processing(self, m: "MessageParserProtocol") -> None: 19 """ 20 後処理 21 22 Args: 23 m (MessageParserProtocol): メッセージデータ 24 25 """ 26 print(ExtDt(float(m.data.event_ts)), m.status.message)
後処理
Arguments:
- m (MessageParserProtocol): メッセージデータ
def
get_conversations( self, m: integrations.protocols.MessageParserProtocol) -> dict[str, typing.Any]:
28 def get_conversations(self, m: "MessageParserProtocol") -> dict[str, Any]: 29 """Abstractmethod dummy""" 30 _ = m 31 return {}
Abstractmethod dummy