Kaynağa Gözat

新增迁移账号

luojunhui 2 ay önce
ebeveyn
işleme
ed155718ab

+ 120 - 0
applications/tasks/monitor_tasks/auto_reply_cards_monitor.py

@@ -0,0 +1,120 @@
+import json
+import xml.etree.ElementTree as ET
+from urllib.parse import unquote, parse_qs
+from pandas import DataFrame
+
+
+def parse_xml(xml_text):
+    # 1. 解析 XML
+    try:
+        root = ET.fromstring(xml_text)
+        page_path = root.find(".//pagepath").text
+        path, query = page_path.split("?", 1)
+        query_decoded = unquote(query)
+        params = parse_qs(query_decoded)
+        card_title = root.find(".//title").text
+        mini_program = root.find(".//sourcedisplayname").text
+        obj = {
+            "page_path": page_path,
+            "title": card_title,
+            "mini_program": mini_program,
+            "params": params,
+        }
+        return obj
+    except Exception:
+        return None
+
+
+class AutoReplyCardsMonitorConst:
+    # fetch_status
+    FETCH_INIT_STATUS = 0
+    FETCH_PROCESSING_STATUS = 1
+    FETCH_SUCCESS_STATUS = 2
+    FETCH_FAIL_STATUS = 3
+
+    # task_status
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    SUCCESS_STATUS = 2
+    FAIL_STATUS = 99
+
+
+class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
+    def __init__(self, pool, log_client):
+        self.pool = pool
+        self.log_client = log_client
+
+    async def get_tasks(self):
+        query = """
+            SELECT * FROM auto_reply_tasks;
+        """
+        return await self.pool.async_fetch(query)
+
+    async def get_result_from_aigc(self, task_id):
+        query = """
+            SELECT gzh_name, task_status, task_result, from_unixtime(create_timestamp / 1000) as create_time,
+            from_unixtime(update_timestamp / 1000) as update_time,
+             err_msg
+            FROM gzh_msg_record
+            WHERE task_id = %s;
+        """
+        return await self.pool.async_fetch(query=query, params=(task_id,), db_name="aigc")
+
+    # async def save_to_each_record(self, record: dict):
+    #     update_query = """
+    #
+    #     """
+    #     pass
+
+    async def deal(self):
+        # tasks = await self.get_tasks()
+        task_ids = [i for i in range(1, 9)]
+        L = []
+        for task_id in task_ids:
+            record = await self.get_result_from_aigc(task_id)
+            if not record:
+                continue
+            task_status = record[0]['task_status']
+            match task_status:
+                case self.FETCH_INIT_STATUS:
+                    continue
+
+                case self.FETCH_PROCESSING_STATUS:
+                    continue
+
+                case self.FETCH_SUCCESS_STATUS:
+                    fetch_result = record[0]['task_result']
+                    fetch_detail_list = json.loads(fetch_result)
+                    for index, xml_txt in enumerate(fetch_detail_list, 1):
+                        extract_info = parse_xml(xml_txt)
+                        if not extract_info:
+                            continue
+                        _id = extract_info.get('params', {}).get('id')
+                        _video_id = extract_info.get('params', {}).get('video_id')
+                        if _id:
+                            vid = _id[0]
+                        elif _video_id:
+                            vid = _video_id[0]
+                        else:
+                            vid = ''
+
+                        temp = [
+                            record[0]['gzh_name'],
+                            extract_info.get('title'),
+                            extract_info.get('mini_program'),
+                            vid,
+                            extract_info.get('params', {}).get('rootSourceId', [''])[0],
+                            extract_info.get('params', {}).get('rootShareId', [''])[0],
+                            index,
+                            record[0]['create_time'],
+                            record[0]['update_time'],
+                            extract_info['page_path']
+                        ]
+                        L.append(temp)
+
+                case self.FETCH_FAIL_STATUS:
+                    error_msg = record['err_msg']
+                    print(error_msg)
+
+        df = DataFrame(L, columns=['账号名称', '标题', '小程序', '视频 id', 'rootSourceId', 'rootShareId', 'index', 'create_time', 'update_time', 'page_path'])
+        df.to_csv("local_data.csv", index=False)