Browse Source

add other tasks

luojunhui 1 month ago
parent
commit
e5d43a95fd
3 changed files with 164 additions and 1 deletions
  1. 25 1
      long_articles_job.py
  2. 1 0
      tasks/safety_tasks/__init__.py
  3. 138 0
      tasks/safety_tasks/get_off_videos.py

+ 25 - 1
long_articles_job.py

@@ -1,7 +1,9 @@
+import time
 from argparse import ArgumentParser
 
 from cold_start.crawler.baidu import BaiduVideoCrawler
 from tasks.ai_tasks import run_title_similarity_task
+from tasks.safety_tasks import GetOffVideos
 from tasks.crawler_tasks.crawler_video.crawler_piaoquan_videos import (
     CrawlerPiaoQuanVideos,
 )
@@ -20,8 +22,12 @@ from tasks.crawler_tasks.crawler_video.crawler_gzh_videos import CrawlerGzhMetaV
 from tasks.data_tasks.fwh_data_recycle import FwhGroupPublishRecordManager
 from tasks.data_tasks.fwh_data_recycle import SaveFwhDataToDatabase
 from tasks.data_tasks.fwh_data_recycle import FwhGroupPublishMonitor
+
 from tasks.monitor_tasks.kimi_balance_monitor import check_kimi_balance
-from tasks.monitor_tasks.outside_server_accounts_monitor import run_outside_server_accounts_monitor
+from tasks.monitor_tasks.outside_server_accounts_monitor import (
+    run_outside_server_accounts_monitor,
+)
+
 from tasks.publish_tasks.top_article_generalize import (
     TopArticleGeneralizeFromArticlePool,
 )
@@ -83,6 +89,10 @@ def run_top_article_generalize_from_article_pool():
     task.deal()
 
 
+def run_get_off_videos():
+    GetOffVideos().deal()
+
+
 def main():
     """
     run long_articles_job
@@ -102,26 +112,40 @@ def main():
         match task_name:
             case "run_piaoquan_video_crawler":
                 crawler.run_piaoquan_video_crawler()
+
             case "run_sohu_video_crawler":
                 crawler.run_sohu_video_crawler()
+
             case "run_sph_video_crawler":
                 crawler.run_sph_video_crawler()
+
             case "crawler_gzh_meta_videos":
                 crawler.crawler_gzh_meta_videos()
+
             case "run_toutiao_video_crawler":
                 crawler.run_toutiao_video_crawler()
+
             case "run_baidu_video_crawler":
                 crawler.run_baidu_video_crawler()
+
             case "run_check_kimi_balance":
                 check_kimi_balance()
+
             case "run_fwh_data_manager":
                 run_fwh_data_manager()
+
             case "run_title_similarity_task":
                 run_title_similarity_task()
+
             case "top_article_generalize":
                 run_top_article_generalize_from_article_pool()
+
+            case "run_get_off_videos":
+                run_get_off_videos()
+
             case "run_outside_server_accounts_monitor":
                 run_outside_server_accounts_monitor()
+
             case _:
                 print("task_name cannot be None")
 

+ 1 - 0
tasks/safety_tasks/__init__.py

@@ -0,0 +1 @@
+from .get_off_videos import GetOffVideos

+ 138 - 0
tasks/safety_tasks/get_off_videos.py

@@ -0,0 +1,138 @@
+"""
+@author: luojunhui
+@desc: auto get off videos in 3 days for safety
+"""
+
+import time
+
+from pymysql.cursors import DictCursor
+from tqdm import tqdm
+from applications import log, PQAPI
+from applications.db import DatabaseConnector
+from applications.api import FeishuBotApi
+from config import long_articles_config
+
+
+class GetOffVideos:
+    def __init__(self):
+        self.db_client = DatabaseConnector(long_articles_config)
+        self.db_client.connect()
+        self.pq_api = PQAPI()
+        self.feishu_robot = FeishuBotApi()
+        self.EXPIRE_TIME = 3 * 24 * 60 * 60
+
+    def get_long_articles_videos(self, timestamp):
+        """
+        获取待下架的视频
+        :return:
+        """
+        fetch_query = f"""
+            select video_id from get_off_videos where video_status = %s and publish_time < %s;
+        """
+        article_list = self.db_client.fetch(
+            query=fetch_query, cursor_type=DictCursor, params=(1, timestamp)
+        )
+        log(
+            task="getOffVideosDaily",
+            function="get_long_articles_videos",
+            message="查找到视频 id_list,一共{}条视频".format(len(article_list)),
+        )
+        return article_list
+
+    def update_video_statsu(self, video_id, ori_status, new_status):
+        """
+        更新get_off_videos表内,视频状态
+        """
+        now_timestamp = int(time.time())
+        update_query = f"""
+            update get_off_videos
+            set video_status = %s, get_off_time = %s
+            where video_id = %s and video_status = %s;
+        """
+        self.db_client.save(
+            query=update_query, params=(new_status, now_timestamp, video_id, ori_status)
+        )
+
+    def change_video_id_status(self, video_id):
+        """
+        修改视频状态
+        """
+        response = self.pq_api.changeVideoStatus(video_id, 2)
+        if response:
+            self.update_video_statsu(video_id=video_id, ori_status=1, new_status=0)
+        else:
+            log(
+                task="getOffVideosDaily",
+                function="change_video_status",
+                status="fail",
+                message="请求票圈修改状态异常: ---video_id = {}".format(video_id),
+            )
+
+    def check_task(self):
+        three_days_ago = int(time.time()) - self.EXPIRE_TIME
+        fetch_query = f"""
+            select video_id from get_off_videos where publish_time < %s and video_status = %s;
+        """
+        video_id_list = self.db_client.fetch(
+            fetch_query, cursor_type=DictCursor, params=(three_days_ago, 1)
+        )
+        if video_id_list:
+            video_id_list = [i["video_id"] for i in video_id_list]
+            for vid in video_id_list:
+                try:
+                    self.change_video_id_status(video_id=vid)
+                except Exception as e:
+                    log(
+                        task="getOffVideosDaily",
+                        function="check_job",
+                        status="fail",
+                        message="task2下架单个视频失败,video_id={}, 报错信息={}".format(
+                            vid, e
+                        ),
+                    )
+            time.sleep(10)
+
+            # check
+            rest = self.db_client.fetch(
+                query=fetch_query, cursor_type=DictCursor, params=(three_days_ago, 1)
+            )
+            if rest:
+                rest = [i["video_id"] for i in rest]
+                self.feishu_robot.bot(
+                    title="get_off_videos 下架视频异常", detail={"video_id": rest}
+                )
+            else:
+                return
+        else:
+            return
+
+    def get_off_job(self):
+        """
+        自动下架视频任务
+        """
+        three_days_before = int(time.time()) - self.EXPIRE_TIME
+        fetch_response = self.get_long_articles_videos(timestamp=three_days_before)
+        video_id_list = [i["video_id"] for i in fetch_response]
+        for video_id in tqdm(video_id_list):
+            try:
+                self.change_video_id_status(video_id=video_id)
+            except Exception as e:
+                log(
+                    task="getOffVideosDaily",
+                    function="get_off_job",
+                    status="fail",
+                    message="get_off_job下架单个视频失败,video_id={}, 报错信息={}".format(
+                        video_id, e
+                    ),
+                )
+
+    def deal(self):
+        self.get_off_job()
+        time.sleep(60)
+        self.check_task()
+        self.feishu_robot.bot(
+            title="get_off_videos 下架视频任务完成",
+            detail={"message": "get_off_videos 下架视频任务完成"},
+            env="long_articles_task",
+            mention=False,
+        )