Browse Source

Merge branch '2024-11-12-luojunhui-add-log-to-history-task' of Server/title_with_video into 2024-09-23newDbTasks

luojunhui 5 months ago
parent
commit
559d11cb32
3 changed files with 153 additions and 13 deletions
  1. 40 0
      applications/feishu/__init__.py
  2. 48 2
      historyTask.py
  3. 65 11
      tasks/history_task.py

+ 40 - 0
applications/feishu/__init__.py

@@ -0,0 +1,40 @@
+"""
+@author: luojunhui
+"""
+import json
+import requests
+
+
+def bot(title, detail, mention=True):
+    """
+    机器人
+    """
+    title_obj = {
+        "content": "{}<at id=all></at>\n".format(title) if mention else "{}\n".format(title),
+        "tag": "lark_md",
+    }
+    head_title = "【重点关注】" if mention else "【普通通知】"
+    url = "https://open.feishu.cn/open-apis/bot/v2/hook/f32c0456-847f-41f3-97db-33fcc1616bcd"
+    headers = {"Content-Type": "application/json"}
+    payload = {
+        "msg_type": "interactive",
+        "card": {
+            "elements": [
+                {
+                    "tag": "div",
+                    "text": title_obj,
+                },
+                {
+                    "tag": "div",
+                    "text": {
+                        "content": json.dumps(
+                            detail, ensure_ascii=False, indent=4
+                        ),
+                        "tag": "lark_md",
+                    },
+                },
+            ],
+            "header": {"title": {"content": head_title, "tag": "plain_text"}},
+        },
+    }
+    requests.request("POST", url=url, headers=headers, data=json.dumps(payload), timeout=10)

+ 48 - 2
historyTask.py

@@ -4,8 +4,12 @@
 import time
 import asyncio
 import datetime
+import traceback
+
 from tasks.history_task import historyContentIdTask
 from applications.db import AsyncMySQLClient
+from applications.log import logging
+from applications.feishu import bot
 
 
 async def main():
@@ -14,8 +18,46 @@ async def main():
     :return:
     """
     async_mysql_pool = AsyncMySQLClient()
-    await async_mysql_pool.init_pool()
-    history_content_id_task = historyContentIdTask(async_mysql_pool)
+    try:
+        await async_mysql_pool.init_pool()
+        logging(
+            code="history0001",
+            info="Init MySQL pool successfully",
+            alg="historyContentIdTask",
+            function="main"
+        )
+    except Exception as e:
+        logging(
+            code="history0002",
+            info="Init MySQL pool failed",
+            alg="historyContentIdTask",
+            function="main",
+            data={
+                "error": str(e),
+                "traceback": traceback.format_exc()
+            }
+        )
+        bot(
+            title="historyContentIdTask INIT MYSQL FAILS",
+            detail={
+                "error": str(e),
+                "traceback": traceback.format_exc()
+            }
+        )
+    try:
+        history_content_id_task = historyContentIdTask(async_mysql_pool)
+    except Exception as e:
+        logging(
+            code="history0003",
+            info="Init historyContentIdTask failed",
+            alg="historyContentIdTask",
+            function="main",
+            data={
+                "error": str(e),
+                "traceback": traceback.format_exc()
+            }
+        )
+        return
     await history_content_id_task.deal()
 
 
@@ -24,4 +66,8 @@ if __name__ == '__main__':
         asyncio.run(main())
         now_str = datetime.datetime.now().__str__()
         print("{}    请求执行完成, 等待60s".format(now_str))
+        logging(
+            code="history0004",
+            info="History task finished"
+        )
         time.sleep(60)

+ 65 - 11
tasks/history_task.py

@@ -4,7 +4,9 @@
 import json
 import time
 import asyncio
+import traceback
 
+from applications.feishu import bot
 from applications.config import Config
 from applications.log import logging
 from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail
@@ -188,6 +190,12 @@ class historyContentIdTask(object):
                 "videoPath": response['data'][0]['videoPath'],
                 "videoOss": video_obj['video_oss_path']
             }
+            logging(
+                code="history1006",
+                info="视频已经发布到 pq",
+                trace_id=trace_id,
+                data=obj
+            )
             L.append(obj)
         update_sql = f"""
            UPDATE {self.article_match_video_table}
@@ -205,9 +213,10 @@ class historyContentIdTask(object):
             )
         )
         logging(
-            code="9002",
-            info="已经从历史文章更新",
-            trace_id=trace_id
+            code="history1007",
+            info="已经更文章状态为已发布",
+            trace_id=trace_id,
+            data=L
         )
 
     async def roll_back_content_status_when_fails(self, process_times, trace_id):
@@ -259,15 +268,22 @@ class historyContentIdTask(object):
         else:
             return False
 
-    async def check_title_category(self, content_id, gh_id) -> bool:
+    async def check_title_category(self, content_id, gh_id, trace_id) -> bool:
         """
         判断该文章的品类是否属于该账号的品类
+        :param trace_id:
         :param content_id:
         :param gh_id:
         :return:
         """
-        bad_category_set = set(self.account_negative_category.get(gh_id, []))
-        if bad_category_set:
+        bad_category_list = self.account_negative_category.get(gh_id, [])
+        logging(
+            code="history1101",
+            info="该账号的 negative 类型列表",
+            trace_id=trace_id,
+            data=bad_category_list
+        )
+        if bad_category_list:
             sql = f"""
                 SELECT category
                 FROM article_category
@@ -276,7 +292,12 @@ class historyContentIdTask(object):
             result = await self.mysql_client.async_select(sql)
             if result:
                 category = result[0][0]
-                if category in bad_category_set:
+                logging(
+                    code="history1102",
+                    info="文章的品类-{}".format(category),
+                    trace_id=trace_id
+                )
+                if category in bad_category_list:
                     return True
         return False
 
@@ -292,14 +313,24 @@ class historyContentIdTask(object):
         gh_id = params['gh_id']
         if flow_pool_level == "autoArticlePoolLevel4":
             # 校验文章是否属于该账号的negative 类型
-            negative_category_status = await self.check_title_category(content_id=content_id, gh_id=gh_id)
+            negative_category_status = await self.check_title_category(content_id=content_id, gh_id=gh_id, trace_id=trace_id)
             if negative_category_status:
                 # 修改状态为品类不匹配状态
+                logging(
+                    code="history1002",
+                    info="文章属于该账号的negative 类型",
+                    trace_id=trace_id
+                )
                 affected_rows = await self.update_content_status(
                     trace_id=trace_id,
                     new_content_status=self.MISMATCH_STATUS,
                     ori_content_status=self.TASK_INIT_STATUS
                 )
+                logging(
+                    code="history1003",
+                    info="已经修改该文章状态为 品类不匹配状态",
+                    trace_id=trace_id
+                )
                 if affected_rows == 0:
                     print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
                     return
@@ -307,11 +338,21 @@ class historyContentIdTask(object):
             exit_status = await self.check_title_whether_exit(content_id)
             if exit_status:
                 # 修改状态为退出状态
+                logging(
+                    code="history1004",
+                    info="文章已经晋升 or 退场",
+                    trace_id=trace_id
+                )
                 affected_rows = await self.update_content_status(
                     trace_id=trace_id,
                     new_content_status=self.EXIT_STATUS,
                     ori_content_status=self.TASK_INIT_STATUS
                 )
+                logging(
+                    code="history1005",
+                    info="已经修改该文章状态为 退出状态",
+                    trace_id=trace_id
+                )
                 if affected_rows == 0:
                     print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
                     return
@@ -341,9 +382,22 @@ class historyContentIdTask(object):
                 )
             except Exception as e:
                 logging(
-                    code="5003",
+                    code="history1008",
                     info="history task 在发布的时候出现异常, error = {}".format(e),
-                    trace_id=trace_id
+                    trace_id=trace_id,
+                    data={
+                        "error": str(e),
+                        "traceback": traceback.format_exc()
+                    }
+                )
+                bot(
+                    title="history task failed",
+                    detail={
+                        "trace_id": trace_id,
+                        "error": str(e),
+                        "traceback": traceback.format_exc()
+                    },
+                    mention=False
                 )
             await self.roll_back_content_status_when_fails(
                 trace_id=trace_id,
@@ -359,7 +413,7 @@ class historyContentIdTask(object):
         """
         task_list = await self.get_tasks()
         logging(
-            code="5002",
+            code="history1001",
             info="History content_task Task Got {} this time".format(len(task_list)),
             function="History Contents Task"
         )