libs.data.modify

libs/data/modify.py

  1"""
  2libs/data/modify.py
  3"""
  4
  5import logging
  6import os
  7import re
  8import shutil
  9import sqlite3
 10from contextlib import closing
 11from typing import TYPE_CHECKING, cast
 12
 13import libs.global_value as g
 14from cls.timekit import ExtendedDatetime as ExtDt
 15from libs.data import lookup
 16from libs.functions import message
 17from libs.types import StyleOptions
 18from libs.utils import dbutil, formatter
 19
 20if TYPE_CHECKING:
 21    from cls.score import GameResult
 22    from integrations.protocols import MessageParserProtocol
 23    from libs.types import RemarkDict
 24
 25
 26def db_insert(detection: "GameResult", m: "MessageParserProtocol") -> int:
 27    """スコアデータをDBに追加する
 28
 29    Args:
 30        detection (GameResult): スコアデータ
 31        m (MessageParserProtocol): メッセージデータ
 32
 33    Returns:
 34        int: _description_
 35    """
 36
 37    changes: int = 0
 38
 39    if m.check_updatable:
 40        with closing(dbutil.connection(g.cfg.setting.database_file)) as cur:
 41            try:
 42                cur.execute(dbutil.query("RESULT_INSERT"), {
 43                    "playtime": ExtDt(float(detection.ts)).format("sql"),
 44                    "rpoint_sum": detection.rpoint_sum(),
 45                    **detection.to_dict(),
 46                })
 47                changes = cur.total_changes
 48                cur.commit()
 49            except sqlite3.IntegrityError as err:
 50                logging.error(err)
 51        logging.info("%s", detection.to_text("logging"))
 52        _score_check(detection, m)
 53    else:
 54        logging.warning("A post to a restricted channel was detected.")
 55        m.set_data("0", message.random_reply(m, "restricted_channel"), StyleOptions(key_title=False))
 56        m.post.ts = m.data.event_ts
 57
 58    return changes
 59
 60
 61def db_update(detection: "GameResult", m: "MessageParserProtocol") -> None:
 62    """スコアデータを変更する
 63
 64    Args:
 65        detection (GameResult): スコアデータ
 66        m (MessageParserProtocol): メッセージデータ
 67    """
 68
 69    detection.calc()
 70    if m.check_updatable:
 71        with closing(dbutil.connection(g.cfg.setting.database_file)) as cur:
 72            cur.execute(dbutil.query("RESULT_UPDATE"), {
 73                "playtime": ExtDt(float(detection.ts)).format("sql"),
 74                "rpoint_sum": detection.rpoint_sum(),
 75                **detection.to_dict(),
 76            })
 77            cur.commit()
 78        logging.info("%s", detection.to_text("logging"))
 79        _score_check(detection, m)
 80    else:
 81        logging.warning("A post to a restricted channel was detected.")
 82        m.set_data("0", message.random_reply(m, "restricted_channel"), StyleOptions(key_title=False))
 83        m.post.ts = m.data.event_ts
 84
 85
 86def db_delete(m: "MessageParserProtocol"):
 87    """スコアデータを削除する
 88
 89    Args:
 90        m (MessageParserProtocol): メッセージデータ
 91    """
 92
 93    if m.check_updatable:
 94        with closing(dbutil.connection(g.cfg.setting.database_file)) as cur:
 95            # ゲーム結果の削除
 96            cur.execute(dbutil.query("RESULT_DELETE"), (m.data.event_ts,))
 97            if (delete_result := cur.execute("select changes();").fetchone()[0]):
 98                m.status.target_ts.append(m.data.event_ts)
 99                logging.info("result: ts=%s, count=%s", m.data.event_ts, delete_result)
100            # メモの削除
101            if (remark_list := cur.execute("select event_ts from remarks where thread_ts=?", (m.data.event_ts,)).fetchall()):
102                cur.execute(dbutil.query("REMARKS_DELETE_ALL"), (m.data.event_ts,))
103                if (delete_remark := cur.execute("select changes();").fetchone()[0]):
104                    m.status.target_ts.extend([x.get("event_ts") for x in list(map(dict, remark_list))])
105                    logging.info("remark: ts=%s, count=%s", m.data.event_ts, delete_remark)
106            cur.commit()
107        m.status.action = "delete"
108    else:
109        logging.warning("A post to a restricted channel was detected.")
110
111
112def db_backup() -> str:
113    """データベースのバックアップ
114
115    Returns:
116        str: 動作結果メッセージ
117    """
118
119    if not g.cfg.setting.backup_dir:  # バックアップ設定がされていない場合は何もしない
120        return ""
121
122    fname = os.path.splitext(g.cfg.setting.database_file)[0]
123    fext = os.path.splitext(g.cfg.setting.database_file)[1]
124    bktime = ExtDt().format("ext")
125    bkfname = os.path.join(g.cfg.setting.backup_dir, os.path.basename(f"{fname}_{bktime}{fext}"))
126
127    if not os.path.isdir(g.cfg.setting.backup_dir):  # バックアップディレクトリ作成
128        try:
129            os.mkdir(g.cfg.setting.backup_dir)
130        except OSError as e:
131            logging.error(e, exc_info=True)
132            logging.error("Database backup directory creation failed !!!")
133            return "\nバックアップ用ディレクトリ作成の作成に失敗しました。"
134
135    # バックアップディレクトリにコピー
136    try:
137        shutil.copyfile(g.cfg.setting.database_file, bkfname)
138        logging.info("database backup: %s", bkfname)
139        return "\nデータベースをバックアップしました。"
140    except OSError as e:
141        logging.error(e, exc_info=True)
142        logging.error("Database backup failed !!!")
143        return "\nデータベースのバックアップに失敗しました。"
144
145
146def remarks_append(m: "MessageParserProtocol", remarks: list["RemarkDict"]) -> None:
147    """メモをDBに記録する
148
149    Args:
150        m (MessageParserProtocol): メッセージデータ
151        remarks (list[RemarkDict]): メモに残す内容
152    """
153
154    if m.check_updatable:
155        with closing(dbutil.connection(g.cfg.setting.database_file)) as cur:
156            for para in remarks:
157                # 親スレッドの情報
158                row = cur.execute("select * from result where ts=:thread_ts", para).fetchone()
159                if row:
160                    if para["name"] in [v for k, v in dict(row).items() if str(k).endswith("_name")]:
161                        cur.execute(dbutil.query("REMARKS_INSERT"), para)
162                        m.status.target_ts.append(para["event_ts"])
163                        m.data.channel_id = para["source"].replace(f"{g.adapter.interface_type}_", "")
164                        logging.info("insert: %s", para)
165            cur.commit()
166            count = cur.execute("select changes();").fetchone()[0]
167
168        # 後処理
169        if count:
170            m.status.action = "change"
171            m.status.reaction = True
172        else:
173            m.status.action = "nothing"
174
175        g.adapter.functions.post_processing(m)
176
177
178def remarks_delete(m: "MessageParserProtocol"):
179    """DBからメモを削除する
180
181    Args:
182        m (MessageParserProtocol): メッセージデータ
183    """
184
185    if m.check_updatable:
186        with closing(dbutil.connection(g.cfg.setting.database_file)) as cur:
187            cur.execute(dbutil.query("REMARKS_DELETE_ONE"), (m.data.event_ts,))
188            cur.commit()
189            if (count := cur.execute("select changes();").fetchone()[0]):
190                m.status.target_ts.append(m.data.event_ts)
191                logging.info("ts=%s, count=%s", m.data.event_ts, count)
192        # 後処理
193        m.status.action = "delete"
194        g.adapter.functions.post_processing(m)
195
196
197def remarks_delete_compar(para: "RemarkDict", m: "MessageParserProtocol") -> None:
198    """DBからメモを削除する(突合)
199
200    Args:
201        para (dict): パラメータ
202        m (MessageParserProtocol): メッセージデータ
203    """
204
205    with closing(dbutil.connection(g.cfg.setting.database_file)) as cur:
206        cur.execute(dbutil.query("REMARKS_DELETE_COMPAR"), para)
207        cur.commit()
208
209        left = cur.execute("select count() from remarks where event_ts=:event_ts;", para).fetchone()[0]
210
211    # 後処理
212    m.status.action = "delete"
213    m.status.target_ts.append(para["event_ts"])
214    m.data.channel_id = para["source"].replace(f"{g.adapter.interface_type}_", "")
215    if left > 0:  # メモデータが残っているなら
216        m.status.action = "change"
217    g.adapter.functions.post_processing(m)
218
219
220def check_remarks(m: "MessageParserProtocol") -> None:
221    """メモの内容を拾ってDBに格納する
222
223    Args:
224        m (MessageParserProtocol): メッセージデータ
225
226    """
227
228    game_result = lookup.db.exsist_record(m.data.thread_ts)
229    if game_result.has_valid_data():  # ゲーム結果のスレッドになっているか
230        remarks: list["RemarkDict"] = []
231        for name, matter in zip(m.argument[0::2], m.argument[1::2]):
232            remark: "RemarkDict" = {
233                "thread_ts": m.data.thread_ts,
234                "event_ts": m.data.event_ts,
235                "name": formatter.name_replace(name, not_replace=True),
236                "matter": matter,
237            }
238            if remark["name"] in game_result.to_list() and remark not in remarks:
239                remarks.append(remark)
240
241        match m.data.status:
242            case "message_append":
243                remarks_append(m, remarks)
244            case "message_changed":
245                remarks_delete(m)
246                remarks_append(m, remarks)
247            case "message_deleted":
248                remarks_delete(m)
249
250
251def reprocessing_remarks(m: "MessageParserProtocol") -> None:
252    """スレッドの内容を再処理
253
254    Args:
255        m (MessageParserProtocol): メッセージデータ
256    """
257
258    res = g.adapter.functions.get_conversations(m)
259    msg = cast(dict, res.get("messages"))
260
261    if msg:
262        reply_count = cast(dict, msg[0]).get("reply_count", 0)
263        m.data.thread_ts = str(cast(dict, msg[0]).get("ts"))
264
265        for x in range(1, reply_count + 1):
266            m.data.text = str(cast(dict, msg[x]).get("text", ""))
267            if m.data.text:
268                m.data.event_ts = str(cast(dict, msg[x]).get("ts"))
269                logging.debug("(%s/%s) thread_ts=%s, event_ts=%s, %s", x, reply_count, m.data.thread_ts, m.data.event_ts, m.data.text)
270                if re.match(rf"^{g.cfg.setting.remarks_word}", m.keyword):
271                    check_remarks(m)
272
273
274def _score_check(detection: "GameResult", m: "MessageParserProtocol"):
275    """スコアデータ格納状態を記録する
276
277    Args:
278        detection (GameResult): スコアデータ
279        m (MessageParserProtocol): メッセージデータ
280    """
281
282    # 結果
283    m.status.action = "change"
284    m.status.target_ts.append(m.data.event_ts)
285    m.status.reaction = True
286    m.status.message = detection.to_text("detail")
287    m.post.message.clear()
288
289    # 素点合計チェック
290    if detection.deposit:
291        m.status.reaction = False
292        m.status.rpoint_sum = detection.rpoint_sum()
293        m.set_data("0", message.random_reply(m, "invalid_score"), StyleOptions(key_title=False))
294        m.post.ts = m.data.event_ts
295
296    # プレイヤー名重複チェック
297    if len(set(detection.to_list())) != 4:
298        m.status.reaction = False
299        m.set_data("1", message.random_reply(m, "same_player"), StyleOptions(key_title=False))
300        m.post.ts = m.data.event_ts
def db_insert( detection: cls.score.GameResult, m: integrations.protocols.MessageParserProtocol) -> int:
27def db_insert(detection: "GameResult", m: "MessageParserProtocol") -> int:
28    """スコアデータをDBに追加する
29
30    Args:
31        detection (GameResult): スコアデータ
32        m (MessageParserProtocol): メッセージデータ
33
34    Returns:
35        int: _description_
36    """
37
38    changes: int = 0
39
40    if m.check_updatable:
41        with closing(dbutil.connection(g.cfg.setting.database_file)) as cur:
42            try:
43                cur.execute(dbutil.query("RESULT_INSERT"), {
44                    "playtime": ExtDt(float(detection.ts)).format("sql"),
45                    "rpoint_sum": detection.rpoint_sum(),
46                    **detection.to_dict(),
47                })
48                changes = cur.total_changes
49                cur.commit()
50            except sqlite3.IntegrityError as err:
51                logging.error(err)
52        logging.info("%s", detection.to_text("logging"))
53        _score_check(detection, m)
54    else:
55        logging.warning("A post to a restricted channel was detected.")
56        m.set_data("0", message.random_reply(m, "restricted_channel"), StyleOptions(key_title=False))
57        m.post.ts = m.data.event_ts
58
59    return changes

スコアデータをDBに追加する

Arguments:
  • detection (GameResult): スコアデータ
  • m (MessageParserProtocol): メッセージデータ
Returns:

int: _description_

def db_update( detection: cls.score.GameResult, m: integrations.protocols.MessageParserProtocol) -> None:
62def db_update(detection: "GameResult", m: "MessageParserProtocol") -> None:
63    """スコアデータを変更する
64
65    Args:
66        detection (GameResult): スコアデータ
67        m (MessageParserProtocol): メッセージデータ
68    """
69
70    detection.calc()
71    if m.check_updatable:
72        with closing(dbutil.connection(g.cfg.setting.database_file)) as cur:
73            cur.execute(dbutil.query("RESULT_UPDATE"), {
74                "playtime": ExtDt(float(detection.ts)).format("sql"),
75                "rpoint_sum": detection.rpoint_sum(),
76                **detection.to_dict(),
77            })
78            cur.commit()
79        logging.info("%s", detection.to_text("logging"))
80        _score_check(detection, m)
81    else:
82        logging.warning("A post to a restricted channel was detected.")
83        m.set_data("0", message.random_reply(m, "restricted_channel"), StyleOptions(key_title=False))
84        m.post.ts = m.data.event_ts

スコアデータを変更する

Arguments:
  • detection (GameResult): スコアデータ
  • m (MessageParserProtocol): メッセージデータ
def db_delete(m: integrations.protocols.MessageParserProtocol):
 87def db_delete(m: "MessageParserProtocol"):
 88    """スコアデータを削除する
 89
 90    Args:
 91        m (MessageParserProtocol): メッセージデータ
 92    """
 93
 94    if m.check_updatable:
 95        with closing(dbutil.connection(g.cfg.setting.database_file)) as cur:
 96            # ゲーム結果の削除
 97            cur.execute(dbutil.query("RESULT_DELETE"), (m.data.event_ts,))
 98            if (delete_result := cur.execute("select changes();").fetchone()[0]):
 99                m.status.target_ts.append(m.data.event_ts)
100                logging.info("result: ts=%s, count=%s", m.data.event_ts, delete_result)
101            # メモの削除
102            if (remark_list := cur.execute("select event_ts from remarks where thread_ts=?", (m.data.event_ts,)).fetchall()):
103                cur.execute(dbutil.query("REMARKS_DELETE_ALL"), (m.data.event_ts,))
104                if (delete_remark := cur.execute("select changes();").fetchone()[0]):
105                    m.status.target_ts.extend([x.get("event_ts") for x in list(map(dict, remark_list))])
106                    logging.info("remark: ts=%s, count=%s", m.data.event_ts, delete_remark)
107            cur.commit()
108        m.status.action = "delete"
109    else:
110        logging.warning("A post to a restricted channel was detected.")

スコアデータを削除する

Arguments:
  • m (MessageParserProtocol): メッセージデータ
def db_backup() -> str:
113def db_backup() -> str:
114    """データベースのバックアップ
115
116    Returns:
117        str: 動作結果メッセージ
118    """
119
120    if not g.cfg.setting.backup_dir:  # バックアップ設定がされていない場合は何もしない
121        return ""
122
123    fname = os.path.splitext(g.cfg.setting.database_file)[0]
124    fext = os.path.splitext(g.cfg.setting.database_file)[1]
125    bktime = ExtDt().format("ext")
126    bkfname = os.path.join(g.cfg.setting.backup_dir, os.path.basename(f"{fname}_{bktime}{fext}"))
127
128    if not os.path.isdir(g.cfg.setting.backup_dir):  # バックアップディレクトリ作成
129        try:
130            os.mkdir(g.cfg.setting.backup_dir)
131        except OSError as e:
132            logging.error(e, exc_info=True)
133            logging.error("Database backup directory creation failed !!!")
134            return "\nバックアップ用ディレクトリ作成の作成に失敗しました。"
135
136    # バックアップディレクトリにコピー
137    try:
138        shutil.copyfile(g.cfg.setting.database_file, bkfname)
139        logging.info("database backup: %s", bkfname)
140        return "\nデータベースをバックアップしました。"
141    except OSError as e:
142        logging.error(e, exc_info=True)
143        logging.error("Database backup failed !!!")
144        return "\nデータベースのバックアップに失敗しました。"

データベースのバックアップ

Returns:

str: 動作結果メッセージ

def remarks_append( m: integrations.protocols.MessageParserProtocol, remarks: list[libs.types.RemarkDict]) -> None:
147def remarks_append(m: "MessageParserProtocol", remarks: list["RemarkDict"]) -> None:
148    """メモをDBに記録する
149
150    Args:
151        m (MessageParserProtocol): メッセージデータ
152        remarks (list[RemarkDict]): メモに残す内容
153    """
154
155    if m.check_updatable:
156        with closing(dbutil.connection(g.cfg.setting.database_file)) as cur:
157            for para in remarks:
158                # 親スレッドの情報
159                row = cur.execute("select * from result where ts=:thread_ts", para).fetchone()
160                if row:
161                    if para["name"] in [v for k, v in dict(row).items() if str(k).endswith("_name")]:
162                        cur.execute(dbutil.query("REMARKS_INSERT"), para)
163                        m.status.target_ts.append(para["event_ts"])
164                        m.data.channel_id = para["source"].replace(f"{g.adapter.interface_type}_", "")
165                        logging.info("insert: %s", para)
166            cur.commit()
167            count = cur.execute("select changes();").fetchone()[0]
168
169        # 後処理
170        if count:
171            m.status.action = "change"
172            m.status.reaction = True
173        else:
174            m.status.action = "nothing"
175
176        g.adapter.functions.post_processing(m)

メモをDBに記録する

Arguments:
  • m (MessageParserProtocol): メッセージデータ
  • remarks (list[RemarkDict]): メモに残す内容
def remarks_delete(m: integrations.protocols.MessageParserProtocol):
179def remarks_delete(m: "MessageParserProtocol"):
180    """DBからメモを削除する
181
182    Args:
183        m (MessageParserProtocol): メッセージデータ
184    """
185
186    if m.check_updatable:
187        with closing(dbutil.connection(g.cfg.setting.database_file)) as cur:
188            cur.execute(dbutil.query("REMARKS_DELETE_ONE"), (m.data.event_ts,))
189            cur.commit()
190            if (count := cur.execute("select changes();").fetchone()[0]):
191                m.status.target_ts.append(m.data.event_ts)
192                logging.info("ts=%s, count=%s", m.data.event_ts, count)
193        # 後処理
194        m.status.action = "delete"
195        g.adapter.functions.post_processing(m)

DBからメモを削除する

Arguments:
  • m (MessageParserProtocol): メッセージデータ
def remarks_delete_compar( para: libs.types.RemarkDict, m: integrations.protocols.MessageParserProtocol) -> None:
198def remarks_delete_compar(para: "RemarkDict", m: "MessageParserProtocol") -> None:
199    """DBからメモを削除する(突合)
200
201    Args:
202        para (dict): パラメータ
203        m (MessageParserProtocol): メッセージデータ
204    """
205
206    with closing(dbutil.connection(g.cfg.setting.database_file)) as cur:
207        cur.execute(dbutil.query("REMARKS_DELETE_COMPAR"), para)
208        cur.commit()
209
210        left = cur.execute("select count() from remarks where event_ts=:event_ts;", para).fetchone()[0]
211
212    # 後処理
213    m.status.action = "delete"
214    m.status.target_ts.append(para["event_ts"])
215    m.data.channel_id = para["source"].replace(f"{g.adapter.interface_type}_", "")
216    if left > 0:  # メモデータが残っているなら
217        m.status.action = "change"
218    g.adapter.functions.post_processing(m)

DBからメモを削除する(突合)

Arguments:
  • para (dict): パラメータ
  • m (MessageParserProtocol): メッセージデータ
def check_remarks(m: integrations.protocols.MessageParserProtocol) -> None:
221def check_remarks(m: "MessageParserProtocol") -> None:
222    """メモの内容を拾ってDBに格納する
223
224    Args:
225        m (MessageParserProtocol): メッセージデータ
226
227    """
228
229    game_result = lookup.db.exsist_record(m.data.thread_ts)
230    if game_result.has_valid_data():  # ゲーム結果のスレッドになっているか
231        remarks: list["RemarkDict"] = []
232        for name, matter in zip(m.argument[0::2], m.argument[1::2]):
233            remark: "RemarkDict" = {
234                "thread_ts": m.data.thread_ts,
235                "event_ts": m.data.event_ts,
236                "name": formatter.name_replace(name, not_replace=True),
237                "matter": matter,
238            }
239            if remark["name"] in game_result.to_list() and remark not in remarks:
240                remarks.append(remark)
241
242        match m.data.status:
243            case "message_append":
244                remarks_append(m, remarks)
245            case "message_changed":
246                remarks_delete(m)
247                remarks_append(m, remarks)
248            case "message_deleted":
249                remarks_delete(m)

メモの内容を拾ってDBに格納する

Arguments:
  • m (MessageParserProtocol): メッセージデータ
def reprocessing_remarks(m: integrations.protocols.MessageParserProtocol) -> None:
252def reprocessing_remarks(m: "MessageParserProtocol") -> None:
253    """スレッドの内容を再処理
254
255    Args:
256        m (MessageParserProtocol): メッセージデータ
257    """
258
259    res = g.adapter.functions.get_conversations(m)
260    msg = cast(dict, res.get("messages"))
261
262    if msg:
263        reply_count = cast(dict, msg[0]).get("reply_count", 0)
264        m.data.thread_ts = str(cast(dict, msg[0]).get("ts"))
265
266        for x in range(1, reply_count + 1):
267            m.data.text = str(cast(dict, msg[x]).get("text", ""))
268            if m.data.text:
269                m.data.event_ts = str(cast(dict, msg[x]).get("ts"))
270                logging.debug("(%s/%s) thread_ts=%s, event_ts=%s, %s", x, reply_count, m.data.thread_ts, m.data.event_ts, m.data.text)
271                if re.match(rf"^{g.cfg.setting.remarks_word}", m.keyword):
272                    check_remarks(m)

スレッドの内容を再処理

Arguments:
  • m (MessageParserProtocol): メッセージデータ