integrations.standard_io.functions

integrations/standard_io/functions.py

 1"""
 2integrations/standard_io/functions.py
 3"""
 4
 5from typing import TYPE_CHECKING
 6
 7from cls.timekit import ExtendedDatetime as ExtDt
 8from integrations.base.interface import FunctionsInterface
 9
10if TYPE_CHECKING:
11    from integrations.protocols import MessageParserProtocol
12
13
14class SvcFunctions(FunctionsInterface):
15    """標準入出力専用関数"""
16
17    def post_processing(self, m: "MessageParserProtocol"):
18        """後処理
19
20        Args:
21            m (MessageParserProtocol): メッセージデータ
22        """
23
24        print(ExtDt(float(m.data.event_ts)), m.status.message)
25
26    def get_conversations(self, m: "MessageParserProtocol") -> dict:
27        """abstractmethod dummy"""
28
29        _ = m
30        return {}
class SvcFunctions(integrations.base.interface.FunctionsInterface):
15class SvcFunctions(FunctionsInterface):
16    """標準入出力専用関数"""
17
18    def post_processing(self, m: "MessageParserProtocol"):
19        """後処理
20
21        Args:
22            m (MessageParserProtocol): メッセージデータ
23        """
24
25        print(ExtDt(float(m.data.event_ts)), m.status.message)
26
27    def get_conversations(self, m: "MessageParserProtocol") -> dict:
28        """abstractmethod dummy"""
29
30        _ = m
31        return {}

標準入出力専用関数

def post_processing(self, m: integrations.protocols.MessageParserProtocol):
18    def post_processing(self, m: "MessageParserProtocol"):
19        """後処理
20
21        Args:
22            m (MessageParserProtocol): メッセージデータ
23        """
24
25        print(ExtDt(float(m.data.event_ts)), m.status.message)

後処理

Arguments:
  • m (MessageParserProtocol): メッセージデータ
def get_conversations(self, m: integrations.protocols.MessageParserProtocol) -> dict:
27    def get_conversations(self, m: "MessageParserProtocol") -> dict:
28        """abstractmethod dummy"""
29
30        _ = m
31        return {}

abstractmethod dummy