Jelajahi Sumber

Merge branch '2024-11-29-luojunhui-developing-video-crawler' of luojunhui/LongArticlesJob into master

luojunhui 10 bulan lalu
induk
melakukan
18f935f477

+ 4 - 0
applications/api/__init__.py

@@ -0,0 +1,4 @@
+"""
+@author: luojunhui
+"""
+from .moon_shot_api import generate_mini_program_title

+ 45 - 0
applications/api/moon_shot_api.py

@@ -0,0 +1,45 @@
+"""
+@author: luojunhui
+"""
+from openai import OpenAI
+
+mini_program_title_generate_prompt = """
+    请将以上标题改写成适合小程序点击和传播的小程序标题,小程序标题的写作规范如下,请学习后进行小程序标题的编写。直接输出最终的小程序标题
+    小程序标题写作规范:
+    1.要点前置:将最重要的信息放在标题的最前面,以快速吸引读者的注意力。例如,“5月一辈子同学,三辈子亲,送给我的老同学,听哭无数人!”中的“5月”和“一辈子同学,三辈子亲”都是重要的信息点。
+    2.激发情绪:使用能够触动人心的语言,激发读者的情感共鸣。如“只剩两人同学聚会,看后感动落泪。”使用“感动落泪”激发读者的同情和怀旧情绪。
+    3.使用数字和特殊符号:数字可以提供具体性,而特殊符号如“🔴”、“😄”、“🔥”等可以吸引视觉注意力,增加点击率。
+    4.悬念和好奇心:创建悬念或提出问题,激发读者的好奇心。例如,“太神奇了!长江水位下降,重庆出现惊奇一幕!”中的“惊奇一幕”就是一个悬念。
+    5.名人效应:如果内容与知名人士相关,提及他们的名字可以增加标题的吸引力。
+    6.社会价值观:触及读者的文化和社会价值观,如家庭、友情、国家荣誉等。
+    7.标点符号的运用:使用感叹号、问号等标点来增强语气和情感表达。
+    8.直接的语言:使用直白、口语化的语言,易于理解,如“狗屁股,笑死我了!”。
+    9.热点人物或事件:提及当前的热点人物或事件,利用热点效应吸引读者。
+    10.字数适中:保持标题在10-20个字之间,既不过长也不过短,确保信息的完整性和吸引力。
+    11.适当的紧迫感:使用“最新”、“首次”、“紧急”等词汇,创造一种紧迫感,促使读者立即行动。
+    12.情感或价值诉求:使用如“感动”、“泪目”、“经典”等词汇,直接与读者的情感或价值观产生共鸣。
+    避免误导:确保标题准确反映内容,避免夸大或误导读者。
+    """
+
+
+def generate_mini_program_title(ori_title):
+    """
+    prompt + kimi + ori_title generate new title
+    :param ori_title:
+    :return:
+    """
+    client = OpenAI(
+        api_key='sk-5DqYCa88kche6nwIWjLE1p4oMm8nXrR9kQMKbBolNAWERu7q',
+        base_url="https://api.moonshot.cn/v1"
+    )
+    chat_completion = client.chat.completions.create(
+        messages=[
+            {
+                "role": "user",
+                "content": ori_title + "\n" + mini_program_title_generate_prompt
+            }
+        ],
+        model="moonshot-v1-32k",
+    )
+    response = chat_completion.choices[0].message.content
+    return response.split("\n")[0]

+ 70 - 1
applications/const.py

@@ -78,4 +78,73 @@ class updateAccountReadAvgTaskConst:
 
     # 发文模式
     ARTICLES_DAILY = 1
-    TOULIU = 2
+    TOULIU = 2
+
+
+class WeixinVideoCrawlerConst:
+    """
+    微信视频抓取常量配置
+    """
+    # 账号抓取状态
+    ACCOUNT_CRAWL_STATUS = 1
+    ACCOUNT_DO_NOT_CRAWL_STATUS = 0
+
+    # 默认最早抓取时间戳(2024-01-01)
+    DEFAULT_TIMESTAMP = 1704038400
+
+    # 搜索爬虫最大页数
+    MAX_SEARCH_PAGE_NUM = 10
+
+    # 抓取每一页的等待时间
+    SLEEP_SECONDS = 5
+
+    # 种子标题最低阅读均值倍数
+    READ_AVG_MULTIPLE = 1.3
+
+    # 种子标题最低阅读量
+    MIN_READ_COUNT = 2000
+
+    # 获取种子标题的统计周期
+    STAT_PERIOD = 7 * 24 * 60 * 60
+
+    # 接口请求成功code
+    REQUEST_SUCCESS = 0
+    PUBLISHED_ILLEGAL_TITLE_CODE = 1015
+
+    # 是否需要扫描查询源账号
+    NEED_SCAN_SOURCE_ACCOUNT = 1
+    DO_NOT_NEED_SOURCE_ACCOUNT = 0
+
+    # 视频审核状态长文库
+    VIDEO_AUDIT_INIT_STATUS = 0
+    VIDEO_AUDIT_SUCCESS_STATUS = 1
+    VIDEO_AUDIT_FAIL_STATUS = 2
+    VIDEO_TITLE_GENERATE_FAIL_STATUS = 4
+    VIDEO_AUDIT_PROCESSING_STATUS = -1
+
+    # 票圈视频审核状态, 1 审核中,2 不通过 3 待修改,4 自己可见 5 通过
+    PQ_AUDIT_PROCESSING_STATUS = 1
+    PQ_AUDIT_FAIL_STATUS = 2
+    PQ_AUDIT_WAIT_STATUS = 3
+    PQ_AUDIT_SELF_VISIBLE_STATUS = 4
+    PQ_AUDIT_SUCCESS_STATUS = 5
+
+    # 默认账号
+    DEFAULT_ACCOUNT_UID = 76862180
+
+    # 每天发送的审核视频数量
+    MAX_VIDEO_NUM = 500
+
+    # 标题状态
+    TITLE_DEFAULT_STATUS = 0
+    TITLE_EXIT_STATUS = 1
+    TITLE_FESTIVAL_STATUS = 2
+    TITLE_SHORT_STATUS = 3
+
+    # 标题最短长度
+    TITLE_MIN_LENGTH = 15
+
+
+
+
+

+ 131 - 2
applications/functions.py

@@ -1,10 +1,17 @@
 """
 @author: luojunhui
 """
-import threading
+import os
+import re
+import html
 import hashlib
+import threading
 
+import oss2
+import requests
+from uuid import uuid4
 from datetime import datetime, timezone
+from fake_useragent import FakeUserAgent
 
 
 class Functions(object):
@@ -137,4 +144,126 @@ class Functions(object):
         """
         dt_object = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone()
         date_string = dt_object.strftime(string_format)
-        return date_string
+        return date_string
+
+    @classmethod
+    def proxy(cls):
+        """
+        快代理
+        """
+        # 隧道域名:端口号
+        tunnel = "l901.kdltps.com:15818"
+
+        # 用户名密码方式
+        username = "t11983523373311"
+        password = "mtuhdr2z"
+        proxies = {
+            "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel},
+            "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}
+        }
+        return proxies
+
+    @classmethod
+    def get_video_url(cls, article_url):
+        """
+        :param article_url:
+        :return:
+        """
+        response = requests.get(
+            url=article_url,
+            headers={'User-Agent': FakeUserAgent().random},
+            # proxies=cls.proxy()
+        )
+        html_text = response.text
+        w = re.search(
+            r"mp_video_trans_info.*url:\s*\(\'(.*?)\'\)\.replace", html_text, re.S | re.M
+        ).group(1)
+        url = html.unescape(
+            re.sub(
+                r"\\x\d+", lambda x: bytes.fromhex(x.group().replace("\\x", "")).decode(), w
+            )
+        )
+        return url
+
+    @classmethod
+    def get_source_account(cls, article_url: str) -> dict:
+        """
+        获取公众号名称和头像
+        :param article_url:
+        :return:
+        """
+        response = requests.get(
+            url=article_url,
+            headers={'User-Agent': FakeUserAgent().random},
+            # proxies=cls.proxy()
+        )
+        html_text = response.text
+        # 正则表达式用于提取 hit_nickname 和 hit_username
+        regex_nickname = r"hit_nickname:\s*'([^']+)'"
+        regex_username = r"hit_username:\s*'([^']+)'"
+
+        # 提取 hit_nickname 和 hit_username
+        nickname = re.search(regex_nickname, html_text)
+        username = re.search(regex_username, html_text)
+
+        # 输出提取的结果
+        if nickname and username:
+            return {
+                'name': nickname.group(1),
+                'gh_id': username.group(1)
+            }
+        else:
+            return {}
+
+    @classmethod
+    def download_gzh_video(cls, article_url):
+        """
+        下载公众号视频
+        :param article_url:
+        :return:
+        """
+        try:
+            video_url = cls.get_video_url(article_url)
+        except Exception as e:
+            return
+        save_path = "static/{}.mp4".format(cls.str_to_md5(video_url))
+        headers = {
+            'Accept': '*/*',
+            'Accept-Language': 'zh,zh-CN;q=0.9',
+            'Connection': 'keep-alive',
+            'Origin': 'https://mp.weixin.qq.com',
+            'Referer': 'https://mp.weixin.qq.com/',
+            'Sec-Fetch-Dest': 'video',
+            'Sec-Fetch-Mode': 'cors',
+            'Sec-Fetch-Site': 'cross-site',
+            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
+            'sec-ch-ua': '"Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"',
+            'sec-ch-ua-mobile': '?0',
+            'sec-ch-ua-platform': '"macOS"'
+        }
+        res = requests.get(video_url, headers=headers)
+        with open(save_path, "wb") as f:
+            f.write(res.content)
+
+        TEN_KB = 1024 * 10
+        if os.path.getsize(save_path) > TEN_KB:
+            return save_path
+        else:
+            return None
+
+    @classmethod
+    def upload_to_oss(cls, local_video_path):
+        """
+        把视频上传到 oss
+        :return:
+        """
+        oss_video_key = "long_articles/video/" + str(uuid4())
+        access_key_id = "LTAIP6x1l3DXfSxm"
+        access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
+        endpoint = "oss-cn-hangzhou.aliyuncs.com"
+        bucket_name = "art-pubbucket"
+        bucket = oss2.Bucket(
+            oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
+        )
+        bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)
+        return oss_video_key

+ 4 - 2
applications/longArticlesMysql.py

@@ -2,6 +2,7 @@
 @author: luojunhui
 """
 import pymysql
+from pymysql.cursors import DictCursor
 
 
 class longArticlesMySQL(object):
@@ -30,13 +31,14 @@ class longArticlesMySQL(object):
         return affected_rows
 
     @classmethod
-    def select(cls, sql):
+    def select(cls, sql, cursor_type=None):
         """
         查询
+        :param cursor_type: 查询出结果的类型,默认None
         :param sql:
         :return:
         """
-        cursor = cls.connection.cursor()
+        cursor = cls.connection.cursor(cursor_type)
         cursor.execute(sql)
         result = cursor.fetchall()
         return result

+ 42 - 0
applications/pqFunctionApi.py

@@ -74,3 +74,45 @@ class PQAPI(object):
                 return False
         else:
             return False
+
+    @classmethod
+    def publish_to_pq(cls, oss_path, uid, title):
+        """
+        :return:
+        """
+        url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send"
+        headers = {
+            "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0",
+            "cookie": "JSESSIONID=4DEA2B5173BB9A9E82DB772C0ACDBC9F; JSESSIONID=D02C334150025222A0B824A98B539B78",
+            "referer": "http://appspeed.piaoquantv.com",
+            "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
+            "accept-language": "zh-CN,zh-Hans;q=0.9",
+            "Content-Type": "application/x-www-form-urlencoded",
+        }
+        payload = {
+            "deviceToken": "9ef064f2f7869b3fd67d6141f8a899175dddc91240971172f1f2a662ef891408",
+            "fileExtensions": "MP4",
+            "loginUid": uid,
+            "networkType": "Wi-Fi",
+            "platform": "iOS",
+            "requestId": "fb972cbd4f390afcfd3da1869cd7d001",
+            "sessionId": "362290597725ce1fa870d7be4f46dcc2",
+            "subSessionId": "362290597725ce1fa870d7be4f46dcc2",
+            "title": title,
+            "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
+            "uid": uid,
+            "versionCode": "486",
+            "versionName": "3.4.12",
+            "videoFromScene": "1",
+            "videoPath": oss_path,
+            "viewStatus": "1",
+            "appType": 888888,
+            "repeatStatus": 1
+        }
+        response = requests.post(
+            url=url,
+            headers=headers,
+            data=payload
+        )
+        return response
+

+ 3 - 2
applications/wxSpiderApi.py

@@ -21,7 +21,7 @@ class WeixinSpider(object):
 
     @classmethod
     @retryOnNone()
-    def search_articles(cls, title) -> dict:
+    def search_articles(cls, title, page="1") -> dict:
         """
         search articles in wx
         :return:
@@ -29,12 +29,13 @@ class WeixinSpider(object):
         url = "{}/keyword".format(cls.base_url)
         payload = json.dumps({
             "keyword": title,
-            "cursor": "1"
+            "cursor": page
         })
         response = requests.request("POST", url, headers=cls.headers, data=payload, timeout=120)
         return response.json()
 
     @classmethod
+    @retryOnNone()
     def get_article_text(cls, content_link, is_count=False, is_cache=True) -> dict:
         """
         获取文章

+ 2 - 0
coldStartTasks/crawler/__init__.py

@@ -1,3 +1,5 @@
 """
 @author: luojunhui
 """
+from .weixin_account_crawler import WeixinAccountCrawler
+from .weixin_video_crawler import WeixinVideoCrawler

+ 263 - 0
coldStartTasks/crawler/weixin_account_crawler.py

@@ -0,0 +1,263 @@
+"""
+@author: luojunhui
+"""
+import time
+import traceback
+from typing import List, Set, Dict, Tuple
+
+from tqdm import tqdm
+from pymysql.cursors import DictCursor
+
+from applications import WeixinSpider, longArticlesMySQL, log, bot, aiditApi
+from applications.const import WeixinVideoCrawlerConst
+from applications.functions import Functions
+
+const = WeixinVideoCrawlerConst()
+function = Functions()
+
+
+def get_inner_account_gh_id() -> Set[str]:
+    """
+    获取内部账号名称
+    :return:
+    """
+    accounts = aiditApi.get_publish_account_from_aigc()
+    gh_id_list = [i['ghId'] for i in accounts]
+    return set(gh_id_list)
+
+
+class WeixinAccountCrawler(object):
+    """
+    账号抓取
+    """
+
+    def __init__(self):
+        self.db_client = longArticlesMySQL()
+        self.spider = WeixinSpider()
+        self.crawler_account_count = 0
+
+    def get_crawler_articles(self) -> List[Dict]:
+        """
+        获取已经抓取到的文章,判断其是否有链接账号,若有则继续抓账号
+        :return:
+        """
+        sql = f"""
+            SELECT id, article_url
+            FROM publish_single_video_source
+            WHERE source_account = {const.NEED_SCAN_SOURCE_ACCOUNT};
+        """
+        article_url_list = self.db_client.select(sql, cursor_type=DictCursor)
+        return article_url_list
+
+    def update_crawler_article_status(self, article_id_tuple: Tuple[int, ...]) -> int:
+        """
+        :param article_id_tuple:
+        :return:
+        """
+        sql = """
+            UPDATE publish_single_video_source
+            SET source_account = %s
+            WHERE id IN %s;
+            """
+        affected_rows = self.db_client.update(sql, (const.DO_NOT_NEED_SOURCE_ACCOUNT, article_id_tuple))
+        return affected_rows
+
+    def get_seed_titles(self, run_date) -> List[str]:
+        """
+        :return:
+        """
+        publish_timestamp_threshold = int(run_date.timestamp()) - const.STAT_PERIOD
+        sql = f"""
+            SELECT distinct title
+            FROM datastat_sort_strategy
+            WHERE read_rate > {const.READ_AVG_MULTIPLE} and view_count > {const.MIN_READ_COUNT} and publish_timestamp > {publish_timestamp_threshold}
+            ORDER BY read_rate DESC;
+        """
+        title_list = self.db_client.select(sql, cursor_type=DictCursor)
+        title_list = [i['title'] for i in title_list]
+        return title_list
+
+    def is_original(self, article_url: str) -> bool:
+        """
+        判断视频是否是原创
+        :return:
+        """
+        response = self.spider.get_article_text(article_url)
+        data = response['data']['data']
+        return data['is_original']
+
+    def insert_account(self, gh_id: str, account_name: str) -> int:
+        """
+        插入账号
+        :param account_name:
+        :param gh_id:
+        :return:
+        """
+        init_date = time.strftime("%Y-%m-%d", time.localtime())
+        sql = """
+            INSERT IGNORE INTO weixin_account_for_videos
+            (gh_id, account_name, account_init_date)
+            VALUES 
+            (%s, %s, %s);
+        """
+        insert_rows = self.db_client.update(sql, (gh_id, account_name, init_date))
+        return insert_rows
+
+    def process_search_result(self, response: Dict, inner_account_set: Set[str]):
+        """
+        处理搜索结果
+        :param response:
+        :param inner_account_set:
+        :return:
+        """
+        if response['code'] != const.REQUEST_SUCCESS:
+            return
+
+        article_list = response['data']['data']
+        if article_list:
+            for article in article_list:
+                try:
+                    # 先判断账号是否内部账号
+                    article_url = article['url']
+                    account_detail = self.spider.get_account_by_url(article_url)
+                    account_detail = account_detail['data']['data']
+                    account_name = account_detail['account_name']
+                    gh_id = account_detail['wx_gh']
+                    if gh_id in inner_account_set:
+                        continue
+                    # 判断搜索结果是否原创
+                    if self.is_original(article_url):
+                        continue
+
+                    # 判断是否有视频链接
+                    try:
+                        video_url = function.get_video_url(article_url)
+                    except Exception as e:
+                        continue
+
+                    if not video_url:
+                        continue
+
+                    # 将账号抓取进来
+                    insert_rows = self.insert_account(gh_id=gh_id, account_name=account_name)
+                    if insert_rows:
+                        log(
+                            task="account_crawler_v1",
+                            function="process_search_result",
+                            message="insert account success",
+                            data={
+                                "gh_id": gh_id,
+                                "account_name": account_name
+                            }
+                        )
+                        self.crawler_account_count += 1
+                except Exception as e:
+                    log(
+                        task="account_crawler_v1",
+                        function="process_search_result",
+                        message="insert account error",
+                        data={
+                            "error": str(e),
+                            "traceback": traceback.format_exc(),
+                            "data": article
+                        }
+                    )
+
+    def search_title_in_weixin(self, title: str, inner_account_set: Set[str]) -> None:
+        """
+        调用搜索接口,在微信搜索
+        :param inner_account_set:
+        :param title:
+        :return:
+        """
+        for page_index in tqdm(range(1, const.MAX_SEARCH_PAGE_NUM + 1), desc='searching: {}'.format(title)):
+            try:
+                response = self.spider.search_articles(title, page=str(page_index))
+                self.process_search_result(response, inner_account_set)
+                time.sleep(const.SLEEP_SECONDS)
+            except Exception as e:
+                log(
+                    task="account_crawler_v1",
+                    function="search_title_in_weixin",
+                    message="search title error",
+                    data={
+                        "error": str(e),
+                        "traceback": traceback.format_exc(),
+                        "title": title
+                    }
+                )
+
+    def run(self, run_date) -> None:
+        """
+        入口函数
+        :return:
+        """
+        # get seed titles
+        title_list = self.get_seed_titles(run_date)
+        # get inner accounts set
+        inner_account_gh_id_set = get_inner_account_gh_id()
+
+        start_time = time.time()
+        for title in tqdm(title_list, desc="search each title"):
+            self.search_title_in_weixin(title, inner_account_gh_id_set)
+
+        # 通知
+        bot(
+            title="微信账号抓取V1完成",
+            detail={
+                "总更新账号数量": self.crawler_account_count,
+                "总耗时": time.time() - start_time,
+                "种子标题数量": len(title_list)
+            },
+            mention=False
+        )
+
+    def run_v2(self) -> None:
+        """
+        入口函数
+        :return:
+        """
+        # get article list
+        crawler_article_list = self.get_crawler_articles()
+        article_id_list = []
+        insert_account_count = 0
+        for crawler_article_obj in tqdm(crawler_article_list, desc="crawler article list"):
+            try:
+                article_id = crawler_article_obj['id']
+                # 记录处理过的id
+                article_id_list.append(int(article_id))
+
+                article_url = crawler_article_obj['article_url']
+                # 判断文章是否原创
+                if self.is_original(article_url):
+                    continue
+                try:
+                    source_account_info = function.get_source_account(article_url)
+                except Exception as e:
+                    continue
+                if not source_account_info:
+                    continue
+                if source_account_info:
+                    account_name = source_account_info['name']
+                    gh_id = source_account_info['gh_id']
+                    affected_rows = self.insert_account(gh_id=gh_id, account_name=account_name)
+                    insert_account_count += affected_rows
+                else:
+                    continue
+
+            except Exception as e:
+                print(e)
+                print(traceback.format_exc())
+
+        if article_id_list:
+            article_id_tuple = tuple(article_id_list)
+            affected_rows = self.update_crawler_article_status(article_id_tuple)
+
+        bot(
+            title="微信账号抓取V2完成",
+            detail={
+                "扫描文章数量": len(crawler_article_list),
+                "新增账号数量": insert_account_count
+            },
+            mention=False
+        )

+ 287 - 0
coldStartTasks/crawler/weixin_video_crawler.py

@@ -0,0 +1,287 @@
+"""
+@author: luojunhui
+抓取视频
+"""
+import json
+import time
+import traceback
+from typing import List, Dict
+
+from tqdm import tqdm
+from pymysql.cursors import DictCursor
+
+from config import apolloConfig
+from applications import bot
+from applications import log
+from applications import Functions
+from applications import WeixinSpider
+from applications import longArticlesMySQL
+from applications.const import WeixinVideoCrawlerConst
+
+spider = WeixinSpider()
+functions = Functions()
+config = apolloConfig(env="prod")
+const = WeixinVideoCrawlerConst()
+
+
+class WeixinVideoCrawler(object):
+    """
+    微信视频抓取
+    """
+
+    def __init__(self):
+        self.db_client = longArticlesMySQL()
+        self.festival_list = json.loads(config.getConfigValue("festival"))
+
+    def is_festival(self, title: str) -> bool:
+        """
+        判断是否为节假日
+        :param title:
+        :return:
+        """
+        for festival in self.festival_list:
+            if festival in title:
+                return True
+        return False
+
+    def get_title_status(self, title: str) -> int:
+        """
+        通过标题获取文章状态
+        :param title:
+        :return:
+        """
+        if self.is_festival(title):
+            return const.TITLE_FESTIVAL_STATUS
+        elif len(title) < const.TITLE_MIN_LENGTH:
+            return const.TITLE_SHORT_STATUS
+        else:
+            return const.TITLE_DEFAULT_STATUS
+
+    def update_account_latest_crawler_timestamp(self, gh_id: str) -> int:
+        """
+        更新最新抓取时间戳
+        :param gh_id:
+        :return:
+        """
+        update_sql = f"""
+            UPDATE weixin_account_for_videos
+            SET latest_crawler_timestamp = (
+                SELECT max(publish_timestamp) 
+                FROM publish_single_video_source 
+                WHERE out_account_id = %s
+                )
+            WHERE gh_id = %s;
+        """
+        affected_rows = self.db_client.update(
+            sql=update_sql,
+            params=(gh_id, gh_id)
+        )
+        return affected_rows
+
+    def get_crawler_accounts(self) -> List[Dict]:
+        """
+        获取微信公众号列表
+        :return:
+        """
+        select_sql = f"""
+            SELECT gh_id, account_name, latest_crawler_timestamp
+            FROM weixin_account_for_videos
+            WHERE status = {const.ACCOUNT_CRAWL_STATUS};
+        """
+        response = self.db_client.select(select_sql, DictCursor)
+        return response
+
+    def crawler_article_video_list(self, account_obj: Dict, cursor=None):
+        """
+        抓取单个账号的文章列表,获取视频
+        :param cursor:
+        :param account_obj:
+        :return: 返回待下载的视频列表
+        """
+        gh_id = account_obj["gh_id"]
+        account_name = account_obj["account_name"]
+        latest_crawler_timestamp = account_obj["latest_crawler_timestamp"]
+        if latest_crawler_timestamp is None:
+            latest_crawler_timestamp = const.DEFAULT_TIMESTAMP
+        # 调用爬虫接口
+        response = spider.update_msg_list(gh_id, index=cursor)
+        if response['code'] == const.REQUEST_SUCCESS:
+            # 一般返回最近10天的msg_list
+            msg_list = response.get('data', {}).get("data", [])
+            if msg_list:
+                last_msg = msg_list[-1]
+                last_msg_base_info = last_msg['AppMsg']['BaseInfo']
+                last_msg_create_timestamp = last_msg_base_info['CreateTime']
+                self.insert_msg_list(account_name=account_name, gh_id=gh_id, msg_list=msg_list)
+                if last_msg_create_timestamp > latest_crawler_timestamp:
+                    next_cursor = response['data']['next_cursor']
+                    return self.crawler_article_video_list(account_obj=account_obj, cursor=next_cursor)
+            else:
+                return []
+        else:
+            return []
+        return []
+
+    def is_downloaded(self, url_unique: str) -> bool:
+        """
+        判断该视频是否已经下载
+        :param url_unique:
+        :return:
+        """
+        select_sql = f"""
+            SELECT id
+            FROM publish_single_video_source
+            WHERE url_unique_md5 = '{url_unique}';
+        """
+        response = self.db_client.select(select_sql)
+        if response:
+            return True
+        else:
+            return False
+
+    def insert_msg_list(self, account_name, gh_id, msg_list: List[Dict]) -> None:
+        """
+        插入视频信息
+        :param gh_id:
+        :param account_name:
+        :param msg_list:
+        :return:
+        """
+        for info in msg_list:
+            create_time = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
+            publish_type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
+            detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
+            if detail_article_list:
+                for article in tqdm(detail_article_list, desc="{}: crawler_in_msg_list".format(account_name)):
+                    article_url = article.get("ContentUrl", None)
+                    url_unique = functions.generateGzhId(article_url)
+                    # 判断该视频链接是否下载,若已经下载则直接跳过
+                    if self.is_downloaded(url_unique):
+                        continue
+                    try:
+                        download_path = functions.download_gzh_video(article_url)
+                        if download_path:
+                            oss_path = functions.upload_to_oss(local_video_path=download_path)
+                            title = article.get("Title", None)
+                            position = article.get("ItemIndex", None)
+                            cover_url = article.get("CoverImgUrl", None)
+                            show_desc = article.get("ShowDesc", None)
+                            show_stat = functions.show_desc_to_sta(show_desc)
+                            read_cnt = show_stat.get("show_view_count", 0)
+                            like_cnt = show_stat.get("show_like_count", 0)
+                            title_status = self.get_title_status(title)
+                            insert_sql = f"""
+                                INSERT INTO publish_single_video_source
+                                (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_index, article_publish_type, article_url, cover_url, video_oss_path, bad_status, publish_timestamp, crawler_timestamp, url_unique_md5)
+                                values
+                                (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+                            """
+                            try:
+                                self.db_client.update(
+                                    sql=insert_sql,
+                                    params=(
+                                        "video" + url_unique,
+                                        title,
+                                        gh_id,
+                                        account_name,
+                                        read_cnt,
+                                        like_cnt,
+                                        position,
+                                        publish_type,
+                                        article_url,
+                                        cover_url,
+                                        oss_path,
+                                        title_status,
+                                        create_time,
+                                        int(time.time()),
+                                        url_unique
+                                    )
+                                )
+                                log(
+                                    task='weixin_video_crawler',
+                                    function="insert_msg_list",
+                                    message="插入一条视频",
+                                    data={"account_name": account_name, "url": article_url}
+                                )
+                            except Exception as e:
+                                try:
+                                    update_sql = f"""
+                                        UPDATE publish_single_video_source
+                                        SET read_cnt = %s, like_cnt = %s
+                                        WHERE url_unique_md5 = %s;
+                                    """
+                                    self.db_client.update(
+                                        sql=update_sql,
+                                        params=(read_cnt, like_cnt, functions.generateGzhId(article_url))
+                                    )
+                                except Exception as e:
+                                    error_stack = traceback.format_exc()
+                                    log(
+                                        task='weixin_video_crawler',
+                                        function="update_msg_list",
+                                        status="fail",
+                                        message="更新内容失败",
+                                        data={"error": str(e), "error_stack": error_stack, "url": article_url}
+
+                                    )
+                        else:
+                            continue
+                    except Exception as e:
+                        error_stack = traceback.format_exc()
+                        log(
+                            task='weixin_video_crawler',
+                            function="update_msg_list",
+                            status="fail",
+                            message="更新内容失败",
+                            data={"error": str(e), "error_stack": error_stack, "url": article_url}
+
+                        )
+
+    def crawler_task(self):
+        """
+        抓取任务
+        :return:
+        """
+        account_list = self.get_crawler_accounts()
+        for account_obj in tqdm(account_list, desc="crawler_video_for_each_account"):
+            try:
+                self.crawler_article_video_list(account_obj)
+                self.update_account_latest_crawler_timestamp(gh_id=account_obj["gh_id"])
+                time.sleep(const.SLEEP_SECONDS)
+            except Exception as e:
+                error_stack = traceback.format_exc()
+                log(
+                    task='weixin_video_crawler',
+                    function="crawler_task",
+                    status="fail",
+                    message="抓取任务失败--单账号",
+                    data={"error": str(e), "error_stack": error_stack, "account_name": account_obj["account_name"]}
+                )
+
+    def mention(self, start_timestamp):
+        """
+        飞书发送消息
+        :param start_timestamp:
+        :return:
+        """
+        sql = f"""select count(1) from publish_single_video_source where crawler_timestamp >= {start_timestamp};"""
+        response = self.db_client.select(sql)
+        new_articles_count = response[0][0]
+        bot(
+            title='微信抓取任务执行完成',
+            detail={
+                "新增视频数量": new_articles_count
+            }
+        )
+
+    def run(self):
+        """
+        执行任务
+        :return:
+        """
+        start_timestamp = int(time.time())
+        self.crawler_task()
+        self.mention(start_timestamp)
+
+
+

+ 1 - 0
coldStartTasks/publish/__init__.py

@@ -1,3 +1,4 @@
 """
 @author: luojunhui
 """
+from .publish_video_to_pq_for_audit import PublishVideosForAudit

+ 307 - 0
coldStartTasks/publish/publish_video_to_pq_for_audit.py

@@ -0,0 +1,307 @@
+"""
+@author: luojunhui
+将抓取的视频发送至pq获取视频的审核结果
+"""
+import time
+import traceback
+from typing import List, Dict
+
+from tqdm import tqdm
+from pymysql.cursors import DictCursor
+
+from applications import log
+from applications import PQAPI
+from applications import longArticlesMySQL
+from applications.const import WeixinVideoCrawlerConst
+from applications.api import generate_mini_program_title
+
+const = WeixinVideoCrawlerConst()
+pq_functions = PQAPI()
+
+
+class PublishVideosForAudit(object):
+    """
+    发布视频到pq,获取video_id,并且轮询获取视频id状态
+    """
+
+    def __init__(self):
+        self.db = longArticlesMySQL()
+
+    def get_publish_video_list(self) -> List[Dict]:
+        """
+        获取视频的信息
+        :return:
+        """
+        already_published_count = self.get_published_articles_today()
+        rest_count = const.MAX_VIDEO_NUM - already_published_count
+        sql = f"""
+            SELECT id, article_title, video_oss_path 
+            FROM publish_single_video_source 
+            WHERE audit_status = {const.VIDEO_AUDIT_INIT_STATUS} and bad_status = {const.TITLE_DEFAULT_STATUS}
+            LIMIT {rest_count};
+            """
+        response = self.db.select(sql, cursor_type=DictCursor)
+        return response
+
+    def update_audit_status(self, video_id: int, ori_audit_status: int, new_audit_status: int) -> int:
+        """
+        更新视频的审核状态
+        :param new_audit_status:
+        :param ori_audit_status:
+        :param video_id:
+        :param
+        :return:
+        """
+        update_sql = f"""
+            UPDATE publish_single_video_source
+            SET audit_status = %s 
+            WHERE audit_video_id = %s and audit_status = %s;
+        """
+        affected_rows = self.db.update(
+            sql=update_sql,
+            params=(new_audit_status, video_id, ori_audit_status)
+        )
+        return affected_rows
+
+    def get_published_articles_today(self):
+        """
+        获取今天发布的视频数量总量
+        :return:
+        """
+        select_sql = f"""
+            SELECT COUNT(1) as total_count
+            FROM publish_single_video_source
+            WHERE audit_status != {const.VIDEO_AUDIT_INIT_STATUS}
+            AND DATE(FROM_UNIXTIME(audit_timestamp)) = CURDATE();
+        """
+        response = self.db.select(select_sql, cursor_type=DictCursor)
+        return response[0]['total_count']
+
+    def publish_each_video(self, video_obj: Dict) -> Dict:
+        """
+        发布视频到pq
+        :param video_obj:
+        :return:
+        """
+        response = pq_functions.publish_to_pq(
+            oss_path=video_obj.get("video_oss_path"),
+            uid=const.DEFAULT_ACCOUNT_UID,
+            title=video_obj.get("article_title")
+        )
+        response_json = response.json()
+        if response_json.get("code") == const.REQUEST_SUCCESS:
+            video_id = response_json['data']['id']
+            update_sql = f"""
+                UPDATE publish_single_video_source
+                SET audit_status = %s, audit_video_id = %s, audit_timestamp = %s
+                WHERE id = %s;
+            """
+            affected_rows = self.db.update(
+                sql=update_sql,
+                params=(const.VIDEO_AUDIT_PROCESSING_STATUS, video_id, int(time.time()), video_obj['id'])
+            )
+            if affected_rows:
+                result = {
+                    "status": "success",
+                    "video_id": video_id
+                }
+                return result
+            else:
+                result = {
+                    "status": "fail",
+                    "video_id": video_id,
+                    "error_msg": "抢占锁失败,update执行操作修改0行"
+                }
+                return result
+        else:
+            if response_json.get("code") == const.PUBLISHED_ILLEGAL_TITLE_CODE:
+                # 发布了标题违规的视频,发布失败, 修改审核状态从0-->2
+                update_sql = f"""
+                    UPDATE publish_single_video_source
+                    SET audit_status = %s 
+                    WHERE id = %s and audit_status = %s;
+                """
+                self.db.update(update_sql, params=(const.VIDEO_AUDIT_FAIL_STATUS, video_obj['id'], const.VIDEO_AUDIT_INIT_STATUS))
+
+            result = {
+                "status": "fail",
+                "error_msg": "发布到pq失败",
+                "title": video_obj.get("article_title"),
+                "oss_path": video_obj.get("video_oss_path"),
+                "response": response_json
+            }
+            return result
+
+    def get_check_article_list(self) -> List[Dict]:
+        """
+        获取需要检查的视频列表
+        :return:
+        """
+        sql = f"""SELECT audit_video_id FROM publish_single_video_source WHERE audit_status = {const.VIDEO_AUDIT_PROCESSING_STATUS};"""
+        response = self.db.select(sql, cursor_type=DictCursor)
+        return response
+
+    def update_mini_program_title(self, video_id: int) -> bool:
+        """
+        :param video_id:
+        """
+        select_sql = f"""
+            SELECT article_title FROM publish_single_video_source WHERE audit_video_id = {video_id};
+        """
+        title = self.db.select(select_sql, cursor_type=DictCursor)[0]['article_title']
+
+        try:
+            mini_program_title = generate_mini_program_title(title)
+            update_sql = f"""
+            UPDATE publish_single_video_source SET mini_program_title = %s WHERE audit_video_id = %s;
+            """
+            self.db.update(update_sql, params=(mini_program_title, video_id))
+            log(
+                task="publish_video_for_audit",
+                function="update_mini_program_title",
+                message="修改小程序标题成功",
+                data={
+                    "video_id": video_id,
+                    "title": title,
+                    "mini_program_title": mini_program_title
+                }
+            )
+            return True
+        except Exception as e:
+            log(
+                task="publish_video_for_audit",
+                function="update_mini_program_title",
+                status="fail",
+                data={
+                    "video_id": video_id,
+                    "title": title,
+                    "error": str(e),
+                    "error_stack": traceback.format_exc()
+                }
+            )
+            return False
+
+    def check_video_status(self, video_id: int) -> Dict:
+        """
+        检查视频的状态,若视频审核通过or不通过,修改记录状态
+        :param video_id:
+        :return:
+        """
+        response = pq_functions.getPQVideoListDetail([video_id])
+        audit_status = response.get("data")[0].get("auditStatus")
+        # 请求成功
+        if audit_status == const.PQ_AUDIT_SUCCESS_STATUS:
+            # 更新小程序标题字段
+            mini_program_title_flag = self.update_mini_program_title(video_id)
+            if mini_program_title_flag:
+                # 处理成功,修改审核状态为1
+                affected_rows = self.update_audit_status(
+                    video_id=video_id,
+                    ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS,
+                    new_audit_status=const.VIDEO_AUDIT_SUCCESS_STATUS
+                )
+            else:
+                # 修改小程序标题失败,修改审核状态为4
+                affected_rows = self.update_audit_status(
+                    video_id=video_id,
+                    ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS,
+                    new_audit_status=const.VIDEO_TITLE_GENERATE_FAIL_STATUS
+                )
+        elif audit_status in {const.PQ_AUDIT_SELF_VISIBLE_STATUS, const.PQ_AUDIT_FAIL_STATUS}:
+            # 视频审核失败,修改审核状态为2
+            affected_rows = self.update_audit_status(
+                video_id=video_id,
+                ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS,
+                new_audit_status=const.VIDEO_AUDIT_FAIL_STATUS
+            )
+        elif audit_status == const.PQ_AUDIT_PROCESSING_STATUS:
+            # 视频正在审核中,不做处理
+            affected_rows = 0
+        else:
+            # 其他情况,暂时不做处理
+            affected_rows = 0
+        result = {
+            "affected_rows": affected_rows,
+            "video_id": video_id,
+            "audit_status": audit_status
+        }
+        return result
+
+    def publish_job(self):
+        """
+        发布视频到pq
+        :return:
+        """
+        video_list = self.get_publish_video_list()
+        for video_obj in tqdm(video_list, desc="视频发布"):
+            try:
+                response = self.publish_each_video(video_obj)
+                if response.get("status") == "success":
+                    log(
+                        task="publish_video_for_audit",
+                        message="发送至PQ成功",
+                        function="publish_each_video",
+                        data={
+                            "video_id": response.get("video_id")
+                        }
+                    )
+                else:
+                    log(
+                        task="publish_video_for_audit",
+                        message=response.get('error_msg'),
+                        function="publish_each_video",
+                        status="fail",
+                        data={
+                            "response": response,
+                            "video_obj": video_obj
+                        }
+                    )
+            except Exception as e:
+                error_msg = traceback.format_exc()
+                log(
+                    task="publish_video_for_audit",
+                    message="发送至PQ代码执行失败",
+                    function="publish_each_video",
+                    status="fail",
+                    data={
+                        "error_msg": error_msg,
+                        "video_obj": video_obj,
+                        "error": str(e)
+                    }
+                )
+
+    def check_job(self):
+        """
+        检查视频的状态
+        :return:
+        """
+        video_list = self.get_check_article_list()
+        for video_obj in tqdm(video_list, desc="视频检查"):
+            video_id = video_obj.get("audit_video_id")
+            try:
+                response = self.check_video_status(video_id)
+                if response.get("affected_rows"):
+                    continue
+                else:
+                    log(
+                        task="publish_video_for_audit",
+                        function="check_each_video",
+                        message="修改行数为0",
+                        data={
+                            "video_id": video_id,
+                            "audit_status": response['audit_status']
+                        }
+                    )
+            except Exception as e:
+                error_msg = traceback.format_exc()
+                log(
+                    task="publish_video_for_audit",
+                    message="查询状态执行失败",
+                    function="check_each_video",
+                    status="fail",
+                    data={
+                        "error_msg": error_msg,
+                        "video_obj": video_obj,
+                        "error": str(e)
+                    }
+                )

+ 72 - 0
run_video_account_crawler.py

@@ -0,0 +1,72 @@
+"""
+@author: luojunhui
+执行视频&&账号的抓取
+"""
+import traceback
+
+from datetime import datetime
+from argparse import ArgumentParser
+
+from applications import bot
+from coldStartTasks.crawler import WeixinAccountCrawler, WeixinVideoCrawler
+
+account_crawler = WeixinAccountCrawler()
+video_crawler = WeixinVideoCrawler()
+
+
+def main():
+    """
+    主函数
+    :return:
+    """
+    parser = ArgumentParser()
+    parser.add_argument("--run-date",
+                        help="Run only once for date in format of %Y%m%d. \
+                            If no specified, run as daily jobs.")
+    args = parser.parse_args()
+
+    if args.run_date:
+        run_date = datetime.strptime(args.run_date, "%Y-%m-%d")
+        print("Run in manual mode. Date: {}".format(args.run_date))
+    else:
+        run_date = datetime.today()
+    # 先执行账号抓取
+    try:
+        account_crawler.run(run_date)
+    except Exception as e:
+        error_msg = traceback.format_exc()
+        bot(
+            title='账号抓取v1执行失败',
+            detail={
+                "error": str(e),
+                "traceback": error_msg
+            }
+        )
+    # 再执行文章抓取
+    try:
+        video_crawler.run()
+    except Exception as e:
+        error_msg = traceback.format_exc()
+        bot(
+            title='视频抓取执行失败',
+            detail={
+                "error": str(e),
+                "traceback": error_msg
+            }
+        )
+    # 再执行账号抓取v2
+    try:
+        account_crawler.run_v2()
+    except Exception as e:
+        error_msg = traceback.format_exc()
+        bot(
+            title='账号抓取V2执行失败',
+            detail={
+                "error": str(e),
+                "traceback": error_msg
+            }
+        )
+
+
+if __name__ == '__main__':
+    main()

+ 34 - 0
run_video_publish_and_audit.py

@@ -0,0 +1,34 @@
+"""
+@author: luojunhui
+"""
+from argparse import ArgumentParser
+
+from coldStartTasks.publish import PublishVideosForAudit
+
+pub = PublishVideosForAudit()
+
+
+def main():
+    """
+    主函数
+    :return:
+    """
+    parser = ArgumentParser()
+    parser.add_argument("--run_task", type=str, help="run task, input publish or check")
+    args = parser.parse_args()
+
+    if args.run_task:
+        task = args.run_task
+        if task == "publish":
+            pub.publish_job()
+        elif task == "check":
+            pub.check_job()
+        else:
+            print("run_task input ERROR,please input publish or check")
+    else:
+        pub.publish_job()
+        pub.check_job()
+
+
+if __name__ == '__main__':
+    main()

+ 26 - 0
sh/run_account_video_crawler.sh

@@ -0,0 +1,26 @@
+#!/bin/bash
+
+# 获取当前日期,格式为 YYYY-MM-DD
+CURRENT_DATE=$(date +%F)
+
+# 日志文件路径,包含日期
+LOG_FILE="/root/luojunhui/logs/account_video_crawler_log_$CURRENT_DATE.txt"
+
+# 重定向整个脚本的输出到带日期的日志文件
+exec >> "$LOG_FILE" 2>&1
+if pgrep -f "python3 run_video_account_crawler.py" > /dev/null
+then
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - run_video_account_crawler.py is running"
+else
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart run_video_account_crawler.py"
+    # 切换到指定目录
+    cd /root/luojunhui/LongArticlesJob
+
+    # 激活 Conda 环境
+    source /root/miniconda3/etc/profile.d/conda.sh
+    conda activate tasks
+
+    # 在后台运行 Python 脚本并重定向日志输出
+    nohup python3 run_video_account_crawler.py >> "${LOG_FILE}" 2>&1 &
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted run_video_account_crawler.py"
+fi

+ 26 - 0
sh/run_video_publish_and_audit.sh

@@ -0,0 +1,26 @@
+#!/bin/bash
+
+# 获取当前日期,格式为 YYYY-MM-DD
+CURRENT_DATE=$(date +%F)
+
+# 日志文件路径,包含日期
+LOG_FILE="/root/luojunhui/logs/video_publish_audit_log_$CURRENT_DATE.txt"
+
+# 重定向整个脚本的输出到带日期的日志文件
+exec >> "$LOG_FILE" 2>&1
+if pgrep -f "python3 run_video_publish_and_audit.py" > /dev/null
+then
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - run_video_publish_and_audit.py is running"
+else
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart run_video_publish_and_audit.py"
+    # 切换到指定目录
+    cd /root/luojunhui/LongArticlesJob
+
+    # 激活 Conda 环境
+    source /root/miniconda3/etc/profile.d/conda.sh
+    conda activate tasks
+
+    # 在后台运行 Python 脚本并重定向日志输出
+    nohup python3 run_video_publish_and_audit.py >> "${LOG_FILE}" 2>&1 &
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted run_video_publish_and_audit.py"
+fi