Prechádzať zdrojové kódy

更新
1. 每日更新小程序表现特征任务
2. 每日抓取当天发布文章任务,直接请求pw 接口,增加一轮校验
3. 将广告数据库替换为长文数据库

罗俊辉 1 rok pred
rodič
commit
5612dbddfa

+ 11 - 0
applications/pqMysql.py

@@ -39,4 +39,15 @@ class PQMySQL(object):
         result = cursor.fetchall()
         return result
 
+    def insertMany(self, sql, params):
+        """
+
+        :param sql:
+        :param params:
+        :return:
+        """
+        return None
+
+
+
 

+ 194 - 0
getOffVideosDaily.py

@@ -0,0 +1,194 @@
+"""
+@author: luojunhui
+@description: GetOffVideos Daily
+"""
+import json
+import time
+from concurrent.futures.thread import ThreadPoolExecutor
+
+import requests
+import schedule
+
+from applications import PQMySQL, Functions
+from applications.decoratorApi import retryOnTimeout
+
+
+@retryOnTimeout()
+def bot(account_list):
+    """
+    机器人
+    """
+    url = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410"
+    headers = {"Content-Type": "application/json"}
+    payload = {
+        "msg_type": "interactive",
+        "card": {
+            "elements": [
+                {
+                    "tag": "div",
+                    "text": {
+                        "content": "存在视频下架失败<at id=all></at>\n",
+                        "tag": "lark_md",
+                    },
+                },
+                {
+                    "tag": "div",
+                    "text": {
+                        "content": json.dumps(
+                            account_list, ensure_ascii=False, indent=4
+                        ),
+                        "tag": "lark_md",
+                    },
+                },
+            ],
+            "header": {"title": {"content": "【重点关注】", "tag": "plain_text"}},
+        },
+    }
+    requests.request("POST", url=url, headers=headers, data=json.dumps(payload), timeout=10)
+
+
+class AutoGetOffVideos(object):
+    """
+    自动下架视频
+    """
+    pqMysql = PQMySQL()
+
+    @classmethod
+    def getLongArticlesVideos(cls, time_stamp):
+        """
+        获取待下架的视频
+        :return:
+        """
+        select_sql = f"""
+        SELECT video_id
+        FROM get_off_videos
+        WHERE video_status = 1 and publish_time < {time_stamp};
+        """
+        result = cls.pqMysql.select(sql=select_sql)
+        return result
+
+    @classmethod
+    def updateVideoIdStatus(cls, video_id):
+        """
+        修改数据库内视频状态
+        :param video_id:
+        :return:
+        """
+        time_stamp = int(time.time())
+        select_sql = f"""
+                UPDATE get_off_videos
+                SET video_status = 0, get_off_time = {time_stamp}
+                WHERE video_id = %s;
+                """
+        cls.pqMysql.update(
+            sql=select_sql,
+            params=video_id
+        )
+
+    @classmethod
+    def changeVideoIdStatus(cls, video_id):
+        """
+        修改视频规则
+        :return:
+        """
+        url = "https://admin.piaoquantv.com/manager/video/audit/v2/updateAuditStatus"
+        payload = "videoId={}&auditStatus=2&updateReasonJson=&rejectReasonJson=%5B%7B%22reason%22%3A%22%E9%95%BF%E6%96%87%E8%87%AA%E5%8A%A8%E4%B8%8B%E6%9E%B6%22%2C%22reasonId%22%3A-1%7D%5D&adminUid=206".format(
+            video_id)
+        headers = {
+            'accept': 'application/json',
+            'accept-language': 'en,zh;q=0.9,zh-CN;q=0.8',
+            'content-type': 'application/x-www-form-urlencoded;charset=UTF-8',
+            'cookie': 'SESSION=NTljNTg2YjktMTU0MC00YWQ5LWE4ZTktNDFhODY0NzM3NTcx',
+            'origin': 'https://admin.piaoquantv.com',
+            'priority': 'u=1, i',
+            'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="126", "Google Chrome";v="126"',
+            'sec-ch-ua-mobile': '?0',
+            'sec-ch-ua-platform': '"macOS"',
+            'sec-fetch-dest': 'empty',
+            'sec-fetch-mode': 'cors',
+            'sec-fetch-site': 'same-origin',
+            'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36'
+        }
+        response = requests.request(
+            "POST",
+            url,
+            headers=headers,
+            data=payload
+        )
+        if response.status_code == 2:
+            result = response.json()
+            if result.get("code", None) == 0:
+                cls.updateVideoIdStatus(video_id=video_id)
+
+    @classmethod
+    def task1(cls):
+        """
+        已经请求超过3天的视频全部下架
+        :return:
+        """
+        now_stamp = int(time.time())
+        three_days_before = now_stamp - 3 * 24 * 60 * 60
+        video_set = cls.getLongArticlesVideos(time_stamp=three_days_before)
+        vid_list = [i[0] for i in video_set]
+        with ThreadPoolExecutor(max_workers=8) as Pool1:
+            Pool1.map(cls.changeVideoIdStatus, vid_list)
+
+    @classmethod
+    def task2(cls):
+        """
+        校验 3 天前发布的视频是否已经下架
+        :return:
+        """
+        three_days_ago = int(time.time()) - 3 * 24 * 3600
+        sql = f"""
+        SELECT video_id
+        FROM get_off_videos
+        WHERE publish_time < {three_days_ago} and video_status = 1;
+        """
+        vid_tuple = cls.pqMysql.select(sql)
+        if vid_tuple:
+            vid_list = [i[0] for i in vid_tuple]
+            with ThreadPoolExecutor(max_workers=8) as Pool1:
+                Pool1.map(cls.changeVideoIdStatus, vid_list)
+
+            vid_tuple2 = cls.pqMysql.select(sql)
+            if vid_tuple2:
+                vid_list2 = [i[0] for i in vid_tuple2]
+                bot(vid_list2)
+            else:
+                return
+        else:
+            return
+
+
+def getOffJob():
+    """
+    下架任务
+    :return:
+    """
+    AGV = AutoGetOffVideos()
+    AGV.task1()
+
+
+def checkJob():
+    """
+    校验 3 天前发布的视频是否已经被下架
+    :return:
+    """
+    AGV = AutoGetOffVideos()
+    AGV.task2()
+
+
+if __name__ == '__main__':
+    schedule.every().day.at("09:30").do(Functions().job_with_thread, getOffJob)
+
+    schedule.every().day.at("10:30").do(Functions().job_with_thread, checkJob)
+
+    schedule.every().day.at("14:30").do(Functions().job_with_thread, getOffJob)
+
+    schedule.every().day.at("15:00").do(Functions().job_with_thread, checkJob)
+
+    while True:
+        schedule.run_pending()
+        time.sleep(1)
+        print("自动下架视频任务正常执行")

+ 83 - 0
long_articles_job.sh

@@ -0,0 +1,83 @@
+#!/bin/bash
+
+# 获取当前日期,格式为 YYYY-MM-DD
+CURRENT_DATE=$(date +%F)
+
+# 日志文件路径,包含日期
+LOG_FILE="/root/luojunhui/logs/longArticles_$CURRENT_DATE.txt"
+exec >> "$LOG_FILE" 2>&1
+
+# 判断自动下架并且自动重启下架视频任务
+
+if pgrep -f "python3 getOffVideosDaily.py" > /dev/null
+then
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - getOffVideosDaily.py running"
+else
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart getOffVideosDaily.py"
+    # 切换到指定目录
+    cd /root/luojunhui/LongArticlesJob
+
+    # 激活 Conda 环境
+    source /root/miniconda3/etc/profile.d/conda.sh
+    conda activate tasks
+
+    # 在后台运行 Python 脚本并重定向日志输出
+    nohup python3 getOffVideosDaily.py >> /dev/null 2>&1 &
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restart getOffVideosDaily.py"
+fi
+
+
+# 判断更新 source_id任务
+if pgrep -f "python3 migrateRootSourceId.py" > /dev/null
+then
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - migrateRootSourceId.py running"
+else
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart migrateRootSourceId.py"
+    # 切换到指定目录
+    cd /root/luojunhui/LongArticlesJob
+
+    # 激活 Conda 环境
+    source /root/miniconda3/etc/profile.d/conda.sh
+    conda activate tasks
+
+    # 在后台运行 Python 脚本并重定向日志输出
+    nohup python3 migrateRootSourceId.py >> /dev/null 2>&1 &
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restart migrateRootSourceId.py"
+fi
+
+# 判断自动更新文章任务
+if pgrep -f "python3 updatePublishMsgDaily.py" > /dev/null
+then
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - updatePublishMsgDaily.py running"
+else
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart updatePublishMsgDaily.py"
+    # 切换到指定目录
+    cd /root/luojunhui/LongArticlesJob
+
+    # 激活 Conda 环境
+    source /root/miniconda3/etc/profile.d/conda.sh
+    conda activate tasks
+
+    # 在后台运行 Python 脚本并重定向日志输出
+    nohup python3 updatePublishMsgDaily.py >> /dev/null 2>&1 &
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restart updatePublishMsgDaily.py"
+fi
+
+
+# 自动更新小程序信息任务
+if pgrep -f "python3 updateMinigramInfoDaily.py" > /dev/null
+then
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - updateMinigramInfoDaily.py running"
+else
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart updateMinigramInfoDaily.py"
+    # 切换到指定目录
+    cd /root/luojunhui/LongArticlesJob
+
+    # 激活 Conda 环境
+    source /root/miniconda3/etc/profile.d/conda.sh
+    conda activate tasks
+
+    # 在后台运行 Python 脚本并重定向日志输出
+    nohup python3 updateMinigramInfoDaily.py >> /dev/null 2>&1 &
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restart updateMinigramInfoDaily.py"
+fi

+ 131 - 0
migrateRootSourceId.py

@@ -0,0 +1,131 @@
+"""
+@author: luojunhui
+迁移rootSourceId
+"""
+import json
+import time
+
+import pymysql
+import datetime
+import schedule
+from tqdm import tqdm
+from concurrent.futures.thread import ThreadPoolExecutor
+
+from applications import Functions, PQMySQL
+
+
+class UpdateRootSourceId(object):
+    """
+    更新 rootSourceId
+    """
+
+    db_client = PQMySQL()
+    source_id_list = {
+        'longArticles_2d311f88a9c1bd5a90ce88339ae93e78': 1,
+        'longArticles_8d9fd0553c988e7a6bf3a6198f78d890': 1,
+        'longArticles_99763b3ad92c781194dbd3eb3321542c': 1,
+        'longArticles_2a27f501ef0d758c35dd3b70cf3bbfa3': 1,
+        "touliu_tencentGzhArticle_cc284926a7d1c19f9a4e6abe5520468b": 1,
+        "touliu_tencentGzhArticle_2e4c21de3707f3b368b0cc4500d120f0": 1,
+        "touliu_tencentGzhArticle_a18c11dd294df014334f7db72830221a": 1,
+        "touliu_tencentGzhArticle_c2debdc233827497e24b633dea36c57c": 1,
+        "touliu_tencentGzhArticle_d66796826916665a23c667472ef4dd56": 1,
+        "touliu_tencentGzhArticle_f8e97355f3687f57fd4efeb635a7a3a2": 1,
+        "touliu_tencentGzhArticle_gh_68e7fdc09fe4_90bb12e53f6628fd5330310c7c3cc344": 1,
+        "touliu_tencentGzhArticle_gh_68e7fdc09fe4_cd602a61ea073e41404572fce51eb297": 1,
+        "touliu_tencentGzhArticle_gh_68e7fdc09fe4_d8fca9b2712f829d625d98bec37db228": 1,
+        "touliu_tencentGzhArticle_gh_77f36c109fb1_1401a97f6537f32b14496cd5fe6caa70": 1,
+        "touliu_tencentGzhArticle_gh_77f36c109fb1_926713998cd1513370b910ba20adda44": 1,
+        "touliu_tencentGzhArticle_gh_77f36c109fb1_4ca7c1c6223501ff4f80913f8363309f": 1
+    }
+
+    @classmethod
+    def getDataList(cls, request_time_stamp):
+        """
+
+        :param request_time_stamp:
+        :return:
+        """
+        start_dt = request_time_stamp - 24 * 3600
+        sql = f"""
+            select trace_id, gh_id, account_name, article_title, result1, result2, result3, request_time_stamp
+            from long_articles_video 
+            where request_time_stamp > {start_dt} 
+            and request_time_stamp < {request_time_stamp}
+            and content_status = 2;
+            """
+        result = cls.db_client.select(sql)
+        return result
+
+    @classmethod
+    def processEachData(cls, data_tuple):
+        """
+        处理数据
+        :param data_tuple:
+        :return:
+        """
+        trace_id = data_tuple[0]
+        gh_id = data_tuple[1]
+        account_name = data_tuple[2]
+        title = data_tuple[3]
+        result_1 = data_tuple[4]
+        result_2 = data_tuple[5]
+        result_3 = data_tuple[6]
+        request_time_stamp = data_tuple[7]
+        result_list = [result_1, result_2, result_3]
+        for result in result_list:
+            if result:
+                source_id = json.loads(result)['productionPath'].split("rootSourceId%3D")[1]
+                video_id = json.loads(result)['productionPath'].split("videos%3Fid%3D")[1].split("%26su%")[0]
+                sql = f"""
+                INSERT INTO long_articles_root_source_id
+                (rootSourceId, accountName, ghId, articleTitle, requestTime, trace_id, push_type, video_id)
+                values 
+                (%s, %s, %s, %s, %s, %s, %s, %s);
+                """
+                cls.db_client.update(
+                    sql=sql,
+                    params=(
+                        source_id,
+                        account_name,
+                        gh_id,
+                        title,
+                        request_time_stamp,
+                        trace_id,
+                        cls.source_id_list.get(source_id, 2),
+                        video_id
+                    )
+                )
+            else:
+                print("No result")
+
+    @classmethod
+    def sourceIdJob(cls):
+        """
+        执行代码
+        :return:
+        """
+        today_string = datetime.datetime.today().strftime("%Y-%m-%d")
+        time_stamp = datetime.datetime.strptime(today_string, '%Y-%m-%d').timestamp()
+        data_list = cls.getDataList(int(time_stamp))
+        for item in tqdm(data_list):
+            try:
+                cls.processEachData(item)
+            except Exception as e:
+                print(e)
+
+
+def source_id_job():
+    """
+    :return:
+    """
+    S = UpdateRootSourceId()
+    S.sourceIdJob()
+
+
+if __name__ == '__main__':
+    source_id_job()
+    # schedule.every().day.at("01:00").do(Functions().job_with_thread, source_id_job)
+    # while True:
+    #     schedule.run_pending()
+    #     time.sleep(1)

+ 6 - 1
updateMinigramInfoDaily.py

@@ -1,6 +1,6 @@
 """
 @author luojunhui
-@description Update Daily
+@description Update Minigram Info Daily
 """
 import time
 
@@ -65,6 +65,7 @@ class DailyDataManager(object):
                     root_source_id = item['path'].split("rootSourceId%3D")[-1]
                     video_id = item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0]
                     kimi_title = item['title']
+                    # print(image_url, nick_name, root_source_id, video_id, kimi_title)
                     insert_sql = f"""
                             INSERT INTO long_articles_detail_info
                             (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
@@ -181,6 +182,7 @@ class DailyDataManager(object):
         # 获取昨天的日期
         yesterday = today - timedelta(days=3)
         yesterday_str = yesterday.__str__().split(" ")[0]
+        print(yesterday_str)
         sql = f"""
         select distinct root_source_id
         from long_articles_detail_info
@@ -197,6 +199,7 @@ class DailyDataManager(object):
                     fission_0 = result[key][1]
                     fission_1 = result[key][2]
                     fission_2 = result[key][3]
+                    # print(s_id, recall_dt, first_level, fission_0, fission_1, fission_2)
                     update_sql = f"""
                     UPDATE long_articles_detail_info
                     set first_level = %s, fission_0 = %s, fission_1 = %s, fission_2 = %s
@@ -239,6 +242,8 @@ def updateMinigramInfoJob():
 
 if __name__ == '__main__':
 
+    # updateArticlesJob()
+    # updateMinigramInfoJob()
     schedule.every().day.at("01:00").do(Functions().job_with_thread, updateArticlesJob)
 
     schedule.every().day.at("04:30").do(Functions().job_with_thread, updateMinigramInfoJob)