Kaynağa Gözat

Merge branch '2024-10-25-luojunhui-add-mention-while-finish-task-job' of luojunhui/LongArticlesJob into master

luojunhui 1 yıl önce
ebeveyn
işleme
4884ce213a

+ 8 - 6
applications/feishuBotApi.py

@@ -8,10 +8,15 @@ from applications.decoratorApi import retryOnTimeout
 
 
 @retryOnTimeout()
-def bot(title, detail):
+def bot(title, detail, mention=True):
     """
     机器人
     """
+    title_obj = {
+        "content": "{}<at id=all></at>\n".format(title) if mention else "{}\n".format(title),
+        "tag": "lark_md",
+    }
+    head_title = "【重点关注】" if mention else "【普通通知】"
     url = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410"
     headers = {"Content-Type": "application/json"}
     payload = {
@@ -20,10 +25,7 @@ def bot(title, detail):
             "elements": [
                 {
                     "tag": "div",
-                    "text": {
-                        "content": "{}<at id=all></at>\n".format(title),
-                        "tag": "lark_md",
-                    },
+                    "text": title_obj,
                 },
                 {
                     "tag": "div",
@@ -35,7 +37,7 @@ def bot(title, detail):
                     },
                 },
             ],
-            "header": {"title": {"content": "【重点关注】", "tag": "plain_text"}},
+            "header": {"title": {"content": head_title, "tag": "plain_text"}},
         },
     }
     requests.request("POST", url=url, headers=headers, data=json.dumps(payload), timeout=10)

+ 103 - 103
getOffVideosDaily.py

@@ -2,210 +2,210 @@
 @author: luojunhui
 @description: GetOffVideos Daily
 """
-import json
 import time
+import datetime
+import traceback
 
-import requests
-import schedule
 from tqdm import tqdm
 
-from applications import PQMySQL, Functions, PQAPI, log
-from applications.decoratorApi import retryOnTimeout
+from applications import PQMySQL, PQAPI, log, bot
 
-
-@retryOnTimeout()
-def bot(account_list):
-    """
-    机器人
-    """
-    url = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410"
-    headers = {"Content-Type": "application/json"}
-    payload = {
-        "msg_type": "interactive",
-        "card": {
-            "elements": [
-                {
-                    "tag": "div",
-                    "text": {
-                        "content": "存在视频下架失败<at id=all></at>\n",
-                        "tag": "lark_md",
-                    },
-                },
-                {
-                    "tag": "div",
-                    "text": {
-                        "content": json.dumps(
-                            account_list, ensure_ascii=False, indent=4
-                        ),
-                        "tag": "lark_md",
-                    },
-                },
-            ],
-            "header": {"title": {"content": "【重点关注】", "tag": "plain_text"}},
-        },
-    }
-    requests.request("POST", url=url, headers=headers, data=json.dumps(payload), timeout=10)
+EXPIRE_TIME = 3 * 24 * 60 * 60
 
 
 class AutoGetOffVideos(object):
     """
     自动下架视频
     """
-    pqMysql = PQMySQL()
-    pqAPI = PQAPI()
 
-    @classmethod
-    def getLongArticlesVideos(cls, time_stamp):
+    def __init__(self):
+        self.db_client = None
+        self.pq_api = None
+
+    def base_init(self):
+        """
+        初始化数据库和票圈公共方法
+        :return:
+        """
+        try:
+            self.db_client = PQMySQL()
+        except Exception as e:
+            error_msg = traceback.format_exc()
+            bot(
+                title="自动下架视频任务,数据库初始化失败",
+                detail={
+                    "error": str(e),
+                    "error_msg": error_msg
+                }
+            )
+            return False
+
+        try:
+            self.pq_api = PQAPI()
+        except Exception as e:
+            error_msg = traceback.format_exc()
+            bot(
+                title="自动下架视频任务,pq公共方法连接失败",
+                detail={
+                    "error": str(e),
+                    "error_msg": error_msg
+                }
+            )
+            return False
+        return True
+
+    def get_long_articles_videos(self, timestamp):
         """
         获取待下架的视频
         :return:
         """
         select_sql = f"""
-        SELECT video_id
-        FROM get_off_videos
-        WHERE video_status = 1 and publish_time < {time_stamp};
+            SELECT video_id
+            FROM get_off_videos
+            WHERE video_status = 1 and publish_time < {timestamp};
         """
-        result = cls.pqMysql.select(sql=select_sql)
+        result = self.db_client.select(sql=select_sql)
         log(
             task="getOffVideosDaily",
-            function="getLongArticlesVideos",
+            function="get_long_articles_videos",
             message="查找到视频 id_list,一共{}条视频".format(len(result))
         )
         return result
 
-    @classmethod
-    def updateVideoIdStatus(cls, video_id):
+    def update_video_id_status(self, video_id):
         """
         修改数据库内视频状态
         :param video_id:
         :return:
         """
-        time_stamp = int(time.time())
+        timestamp = int(time.time())
         select_sql = f"""
                 UPDATE get_off_videos
-                SET video_status = 0, get_off_time = {time_stamp}
+                SET video_status = 0, get_off_time = {timestamp}
                 WHERE video_id = %s;
                 """
         try:
-            cls.pqMysql.update(
+            self.db_client.update(
                 sql=select_sql,
                 params=video_id
             )
             log(
                 task="getOffVideosDaily",
-                function="updateVideoIdStatus",
+                function="update_video_id_status",
                 message="成功修改视频状态",
                 data={"video_id": video_id}
             )
         except Exception as e:
             log(
                 task="getOffVideosDaily",
-                function="updateVideoIdStatus",
+                function="update_video_id_status",
                 message="修改视频状态失败--- 推测 sql 问题,报错信息:{}".format(e),
-                status="fail"
+                status="fail",
+                data={"video_id": video_id}
             )
 
-    @classmethod
-    def changeVideoIdStatus(cls, video_id):
+    def change_video_id_status(self, video_id):
         """
         修改视频id状态
         :return:
         """
-        response = cls.pqAPI.changeVideoStatus(video_id, 2)
+        response = self.pq_api.changeVideoStatus(video_id, 2)
         if response:
-            cls.updateVideoIdStatus(video_id=video_id)
+            self.update_video_id_status(video_id=video_id)
         else:
             log(
                 task="getOffVideosDaily",
-                function="changeVideoIdStatus",
+                function="change_video_id_status",
                 status="fail",
                 message="请求票圈修改状态异常: ---video_id = {}".format(video_id),
             )
+            bot(
+                title="get_off_videos 下架视频异常",
+                detail={
+                    "video_id": video_id
+                },
+                mention=False
+            )
 
-    @classmethod
-    def task1(cls):
+    def get_off_job(self):
         """
         已经请求超过3天的视频全部下架
         :return:
         """
         now_stamp = int(time.time())
-        three_days_before = now_stamp - 3 * 24 * 60 * 60
-        video_set = cls.getLongArticlesVideos(time_stamp=three_days_before)
-        vid_list = [i[0] for i in video_set]
+        three_days_before = now_stamp - EXPIRE_TIME
+        video_tuple = self.get_long_articles_videos(timestamp=three_days_before)
+        vid_list = [i[0] for i in video_tuple]
         for video_id in tqdm(vid_list):
             try:
-                cls.changeVideoIdStatus(video_id=video_id)
+                self.change_video_id_status(video_id=video_id)
             except Exception as e:
                 log(
                     task="getOffVideosDaily",
-                    function="task1",
+                    function="get_off_job",
                     status="fail",
-                    message="task1下架单个视频失败,video_id={}, 报错信息={}".format(video_id, e),
+                    message="get_off_job下架单个视频失败,video_id={}, 报错信息={}".format(video_id, e),
                 )
 
-    @classmethod
-    def task2(cls):
+    def check_job(self):
         """
         校验 3 天前发布的视频是否已经下架
         :return:
         """
-        three_days_ago = int(time.time()) - 3 * 24 * 3600
+        three_days_ago = int(time.time()) - EXPIRE_TIME
         sql = f"""
-        SELECT video_id
-        FROM get_off_videos
-        WHERE publish_time < {three_days_ago} and video_status = 1;
+            SELECT video_id
+            FROM get_off_videos
+            WHERE publish_time < {three_days_ago}
+            AND video_status = 1;
         """
-        vid_tuple = cls.pqMysql.select(sql)
+        vid_tuple = self.db_client.select(sql)
         if vid_tuple:
             vid_list = [i[0] for i in vid_tuple]
             for vid in vid_list:
                 try:
-                    cls.changeVideoIdStatus(video_id=vid)
+                    self.change_video_id_status(video_id=vid)
                 except Exception as e:
                     log(
                         task="getOffVideosDaily",
-                        function="task2",
+                        function="check_job",
                         status="fail",
                         message="task2下架单个视频失败,video_id={}, 报错信息={}".format(vid, e),
                     )
             time.sleep(10)
-            vid_tuple2 = cls.pqMysql.select(sql)
+            vid_tuple2 = self.db_client.select(sql)
             if vid_tuple2:
                 vid_list2 = [i[0] for i in vid_tuple2]
-                bot(vid_list2)
+                bot(
+                    title="getOffVideosDaily_check_job",
+                    detail={
+                        "video_list": vid_list2
+                    }
+                )
             else:
                 return
         else:
             return
 
 
-def getOffJob():
-    """
-    下架任务
-    :return:
-    """
-    AGV = AutoGetOffVideos()
-    AGV.task1()
-
-
-def checkJob():
+def main():
     """
-    校验 3 天前发布的视频是否已经被下架
+    main function
     :return:
     """
-    AGV = AutoGetOffVideos()
-    AGV.task2()
+    auto_get_off_job = AutoGetOffVideos()
+    if auto_get_off_job.base_init():
+        auto_get_off_job.get_off_job()
+        time.sleep(60)
+        auto_get_off_job.check_job()
+        bot(
+            title="get_off_jobs任务执行完成通知",
+            mention=False,
+            detail={
+                "finish_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+            }
+        )
 
 
 if __name__ == '__main__':
-    schedule.every().day.at("09:30").do(Functions().job_with_thread, getOffJob)
-
-    schedule.every().day.at("10:30").do(Functions().job_with_thread, checkJob)
-
-    schedule.every().day.at("14:30").do(Functions().job_with_thread, getOffJob)
-
-    schedule.every().day.at("15:00").do(Functions().job_with_thread, checkJob)
-
-    while True:
-        schedule.run_pending()
-        time.sleep(1)
+    main()

+ 26 - 0
sh/run_get_off_videos_three_times_per_day.sh

@@ -0,0 +1,26 @@
+#!/bin/bash
+
+# 获取当前日期,格式为 YYYY-MM-DD
+CURRENT_DATE=$(date +%F)
+
+# 日志文件路径,包含日期
+LOG_FILE="/root/luojunhui/logs/get_off_videos_task_log_$CURRENT_DATE.txt"
+
+# 重定向整个脚本的输出到带日期的日志文件
+exec >> "$LOG_FILE" 2>&1
+if pgrep -f "python3 getOffVideosDaily.py" > /dev/null
+then
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - getOffVideosDaily.py is running"
+else
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart getOffVideosDaily.py"
+    # 切换到指定目录
+    cd /root/luojunhui/LongArticlesJob
+
+    # 激活 Conda 环境
+    source /root/miniconda3/etc/profile.d/conda.sh
+    conda activate tasks
+
+    # 在后台运行 Python 脚本并重定向日志输出
+    nohup python3 getOffVideosDaily.py >> "${LOG_FILE}" 2>&1 &
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted getOffVideosDaily.py"
+fi

+ 26 - 0
sh/run_update_published_articles_daily.sh

@@ -0,0 +1,26 @@
+#!/bin/bash
+
+# 获取当前日期,格式为 YYYY-MM-DD
+CURRENT_DATE=$(date +%F)
+
+# 日志文件路径,包含日期
+LOG_FILE="/root/luojunhui/logs/update_published_articles_task_log_$CURRENT_DATE.txt"
+
+# 重定向整个脚本的输出到带日期的日志文件
+exec >> "$LOG_FILE" 2>&1
+if pgrep -f "python3 updatePublishedMsgDaily.py" > /dev/null
+then
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - updatePublishedMsgDaily.py is running"
+else
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart updatePublishedMsgDaily.py"
+    # 切换到指定目录
+    cd /root/luojunhui/LongArticlesJob
+
+    # 激活 Conda 环境
+    source /root/miniconda3/etc/profile.d/conda.sh
+    conda activate tasks
+
+    # 在后台运行 Python 脚本并重定向日志输出
+    nohup python3 updatePublishedMsgDaily.py >> "${LOG_FILE}" 2>&1 &
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted updatePublishedMsgDaily.py"
+fi

+ 18 - 18
threadAliveBot.py

@@ -24,7 +24,7 @@ def threadMonitor():
     output = result.stdout
 
     # filter
-    get_off_job = [line for line in output.splitlines() if 'python3 getOffVideosDaily.py' in line]
+    # get_off_job = [line for line in output.splitlines() if 'python3 getOffVideosDaily.py' in line]
 
     # migrate_source_id_job = [line for line in output.splitlines() if 'python3 migrateRootSourceId.py' in line]
 
@@ -32,18 +32,18 @@ def threadMonitor():
 
     updateMinigramInfoDaily = [line for line in output.splitlines() if 'python3 updateMinigramInfoDaily.py' in line]
 
-    updatePublishedMsgDaily = [line for line in output.splitlines() if 'python3 updatePublishedMsgDaily.py' in line]
+    # updatePublishedMsgDaily = [line for line in output.splitlines() if 'python3 updatePublishedMsgDaily.py' in line]
 
     # checkVideoStatusDaily = [line for line in output.splitlines() if 'python3 checkVideoStatusDaily.py' in line]
 
-    if not get_off_job:
-        bot(
-            title="定时任务进程异常挂掉",
-            detail={
-                "Job": "GetOffVideosJob",
-                "Time": datetime.datetime.now().__str__()
-            }
-        )
+    # if not get_off_job:
+    #     bot(
+    #         title="定时任务进程异常挂掉",
+    #         detail={
+    #             "Job": "GetOffVideosJob",
+    #             "Time": datetime.datetime.now().__str__()
+    #         }
+    #     )
 
     # if not updateAccountAvgDaily:
     #     bot(
@@ -63,14 +63,14 @@ def threadMonitor():
             }
         )
 
-    if not updatePublishedMsgDaily:
-        bot(
-            title="定时任务进程异常挂掉",
-            detail={
-                "Job": "updatePublishedMsgDaily",
-                "Time": datetime.datetime.now().__str__()
-            }
-        )
+    # if not updatePublishedMsgDaily:
+    #     bot(
+    #         title="定时任务进程异常挂掉",
+    #         detail={
+    #             "Job": "updatePublishedMsgDaily",
+    #             "Time": datetime.datetime.now().__str__()
+    #         }
+    #     )
 
     # if not checkVideoStatusDaily:
     #     bot(

+ 14 - 1
updateMinigramInfoDaily.py

@@ -309,6 +309,13 @@ def updateArticlesJob(biz_date=None):
         function="updateArticlesJob",
         message="文章更新完成---{}".format(biz_date.__str__())
     )
+    bot(
+        title="更新文章任务完成",
+        detail={
+            "finish_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+        },
+        mention=False
+    )
 
 
 def updateMinigramInfoJob(biz_date=None):
@@ -333,6 +340,13 @@ def updateMinigramInfoJob(biz_date=None):
             status="fail",
             message="小程序更新失败---{}, 报错信息是: {}".format(biz_date.__str__(), e)
         )
+    bot(
+        title="更新小程序信息任务完成",
+        detail={
+            "finish_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+        },
+        mention=False
+    )
 
 
 def main():
@@ -352,7 +366,6 @@ def main():
         print("Run in daily mode.")
         schedule.every().day.at("01:30").do(Functions().job_with_thread, updateArticlesJob)
         schedule.every().day.at("03:30").do(Functions().job_with_thread, updateMinigramInfoJob)
-
         while True:
             schedule.run_pending()
             time.sleep(1)

+ 57 - 17
updatePublishedMsgDaily.py

@@ -5,9 +5,8 @@
 
 import time
 import json
-import threading
+import traceback
 
-import schedule
 from tqdm import tqdm
 from datetime import datetime
 
@@ -325,7 +324,18 @@ def update_job():
     更新任务
     :return:
     """
-    db_client = PQMySQL()
+    try:
+        db_client = PQMySQL()
+    except Exception as e:
+        error_msg = traceback.format_exc()
+        bot(
+            title="更新文章任务连接数据库失败",
+            detail={
+                "error": e,
+                "msg": error_msg
+            }
+        )
+        return
     sub_accounts, server_accounts = get_accounts()
     s_count = 0
     f_count = 0
@@ -361,13 +371,28 @@ def update_job():
                 "failRate": f_count / (s_count + f_count)
             }
         )
-
+    bot(
+        title="更新每日发布文章任务完成通知",
+        detail={
+            "msg": "订阅号更新完成",
+            "finish_time": datetime.today().__str__()
+        },
+        mention=False
+    )
     for sub_item in tqdm(server_accounts):
         try:
             update_single_account(db_client, sub_item)
             time.sleep(5)
         except Exception as e:
             print(e)
+    bot(
+        title="更新每日发布文章任务完成通知",
+        detail={
+            "msg": "服务号更新完成",
+            "finish_time": datetime.today().__str__()
+        },
+        mention=False
+    )
 
 
 def check_job():
@@ -375,7 +400,19 @@ def check_job():
     校验任务
     :return:
     """
-    db_client = PQMySQL()
+    try:
+        db_client = PQMySQL()
+    except Exception as e:
+        error_msg = traceback.format_exc()
+        bot(
+            title="校验更新文章任务连接数据库失败",
+            detail={
+                "job": "check_job",
+                "error": e,
+                "msg": error_msg
+            }
+        )
+        return
     sub_accounts, server_accounts = get_accounts()
     fail_list = []
     # account_list = sub_accounts + server_accounts
@@ -398,23 +435,26 @@ def check_job():
             )
         except Exception as e:
             print("Timeout Error: {}".format(e))
+    else:
+        bot(
+            title="校验完成通知",
+            mention=False,
+            detail={
+                "msg": "校验任务完成",
+                "finish_time": datetime.today().__str__()
+            }
+        )
 
 
-def job_with_thread(job_func):
+def main():
     """
-    每个任务放到单个线程中
-    :param job_func:
+    main
     :return:
     """
-    job_thread = threading.Thread(target=job_func)
-    job_thread.start()
+    update_job()
+    time.sleep(60)
+    check_job()
 
 
 if __name__ == '__main__':
-    schedule.every().day.at("20:50").do(job_with_thread, update_job)
-
-    schedule.every().day.at("21:45").do(job_with_thread, check_job)
-
-    while True:
-        schedule.run_pending()
-        time.sleep(1)
+    main()