Parcourir la source

Merge branch 'feature/20250711-luojuhui-gzh-video-improve' of luojunhui/LongArticlesJob into master

luojunhui il y a 3 mois
Parent
commit
577a72a0b2

+ 6 - 0
applications/const/__init__.py

@@ -227,6 +227,12 @@ class WeixinVideoCrawlerConst:
     # safe score
     TITLE_SAFE_SCORE_THRESHOLD = 7
 
+    # Task Status
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    SUCCESS_STATUS = 2
+    FAIL_STATUS = 99
+
 
 class UpdateMiniProgramDetailConst(updatePublishedMsgTaskConst):
     """

+ 1 - 1
cold_start/crawler/wechat/__init__.py

@@ -4,4 +4,4 @@
 from .article_association import ArticleAssociationCrawler
 from .official_accounts_api import get_article_list_from_account
 from .official_accounts_api import get_article_detail
-from.official_accounts_api import get_source_account_from_article
+from .official_accounts_api import get_source_account_from_article

+ 7 - 1
long_articles_job.py

@@ -10,6 +10,7 @@ from tasks.crawler_tasks.crawler_video.crawler_sohu_videos import (
 from tasks.crawler_tasks.crawler_video.crawler_sph_videos import (
     CrawlerChannelAccountVideos,
 )
+from tasks.crawler_tasks.crawler_video.crawler_gzh_videos import CrawlerGzhMetaVideos
 from tasks.data_tasks.fwh_data_recycle import FwhGroupPublishRecordManager
 from tasks.data_tasks.fwh_data_recycle import SaveFwhDataToDatabase
 from tasks.monitor_tasks.kimi_balance_monitor import check_kimi_balance
@@ -41,7 +42,6 @@ def run_sph_video_crawler():
 def run_fwh_data_manager():
     fwh_group_publish_record_manager = FwhGroupPublishRecordManager()
     fwh_group_publish_record_manager.deal()
-    fwh_group_publish_record_manager.monitor()
 
     # 2. 保存数据到数据库
     save_fwh_data_to_database = SaveFwhDataToDatabase()
@@ -52,6 +52,10 @@ def run_top_article_generalize_from_article_pool():
     task = TopArticleGeneralizeFromArticlePool()
     task.deal()
 
+def crawler_gzh_meta_videos():
+    task = CrawlerGzhMetaVideos()
+    task.deal()
+
 
 def main():
     """
@@ -81,6 +85,8 @@ def main():
                 run_sph_video_crawler()
             case "top_article_generalize":
                 run_top_article_generalize_from_article_pool()
+            case "crawler_gzh_meta_videos":
+                crawler_gzh_meta_videos()
             case _:
                 print("task_name cannot be None")
 

+ 397 - 0
tasks/crawler_tasks/crawler_video/crawler_gzh_videos.py

@@ -0,0 +1,397 @@
+"""
+@author: luojunhui
+@task: 抓取公众号视频
+"""
+
+import json, time
+import traceback
+from typing import List, Dict
+
+from pymysql.cursors import DictCursor
+from tqdm import tqdm
+
+from applications import log
+from applications.api import ApolloApi, FeishuBotApi
+from applications.const import WeixinVideoCrawlerConst
+from applications.db import DatabaseConnector
+from applications.pipeline import scrape_video_entities_process
+from applications.utils import (
+    generate_gzh_id,
+    download_gzh_video,
+    upload_to_oss,
+    show_desc_to_sta,
+    Item,
+    insert_into_single_video_source_table,
+)
+from config import long_articles_config
+from cold_start.crawler.wechat import get_article_list_from_account
+from cold_start.filter import video_crawler_duplicate_filter
+
+
+class CrawlerGzhVideos:
+
+    def __init__(self):
+        self.db_client = DatabaseConnector(long_articles_config)
+        self.db_client.connect()
+        self.apollo = ApolloApi(env="prod")
+        self.const = WeixinVideoCrawlerConst()
+        self.festival_list = json.loads(self.apollo.get_config_value("festival"))
+        self.feishu_bot = FeishuBotApi()
+
+    def is_festival(self, title: str) -> bool:
+        """
+        判断是否为节日
+        :param title:
+        :return:
+        """
+        for festival in self.festival_list:
+            if festival in title:
+                return True
+        return False
+
+    def set_status_for_title(self, title: str) -> int:
+        """
+        set title_status for each title
+        """
+        if self.is_festival(title):
+            return self.const.TITLE_FESTIVAL_STATUS
+        elif len(title) < self.const.TITLE_MIN_LENGTH:
+            return self.const.TITLE_SHORT_STATUS
+        else:
+            return self.const.TITLE_DEFAULT_STATUS
+
+    def is_video_downloaded(self, url_unique: str) -> bool:
+        """
+        check whether video has been downloaded
+        """
+        fetch_query = f"""
+            select id from publish_single_video_source where url_unique_md5 = %s;
+        """
+        return self.db_client.fetch(query=fetch_query, params=(url_unique,))
+
+    def insert_msg_list(self, account_name, gh_id, msg_list: List[Dict]) -> None:
+        """
+        插入视频信息
+        :param gh_id:
+        :param account_name:
+        :param msg_list:
+        :return:
+        """
+        for info in msg_list:
+            create_time = (
+                info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
+            )
+            publish_type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
+            detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
+            if detail_article_list:
+                for article in tqdm(
+                    detail_article_list,
+                    desc="{}: crawler_in_msg_list".format(account_name),
+                ):
+                    article_url = article.get("ContentUrl", None)
+                    url_unique = generate_gzh_id(article_url)
+                    # 判断该视频链接是否下载,若已经下载则直接跳过
+                    if self.is_video_downloaded(url_unique):
+                        print("url exists")
+                        continue
+
+                    title = article.get("Title", None)
+                    if not title:
+                        continue
+
+                    # 判断标题是否重复
+                    if video_crawler_duplicate_filter(title, self.db_client):
+                        log(
+                            task="weixin_video_crawler",
+                            function="insert_msg_list",
+                            message="标题去重",
+                            data={"url": article_url},
+                        )
+                        continue
+
+                    try:
+                        download_path = download_gzh_video(article_url)
+                        if download_path:
+                            oss_path = upload_to_oss(local_video_path=download_path)
+                            position = article.get("ItemIndex", None)
+                            cover_url = article.get("CoverImgUrl", None)
+                            show_desc = article.get("ShowDesc", None)
+                            show_stat = show_desc_to_sta(show_desc)
+                            read_cnt = show_stat.get("show_view_count", 0)
+                            like_cnt = show_stat.get("show_like_count", 0)
+                            title_status = self.set_status_for_title(title)
+                            insert_sql = f"""
+                                INSERT INTO publish_single_video_source
+                                (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_index, article_publish_type, article_url, cover_url, video_oss_path, bad_status, publish_timestamp, crawler_timestamp, url_unique_md5)
+                                values
+                                (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+                            """
+                            try:
+                                self.db_client.save(
+                                    query=insert_sql,
+                                    params=(
+                                        "video" + url_unique,
+                                        title,
+                                        gh_id,
+                                        account_name,
+                                        read_cnt,
+                                        like_cnt,
+                                        position,
+                                        publish_type,
+                                        article_url,
+                                        cover_url,
+                                        oss_path,
+                                        title_status,
+                                        create_time,
+                                        int(time.time()),
+                                        url_unique,
+                                    ),
+                                )
+                                log(
+                                    task="weixin_video_crawler",
+                                    function="insert_msg_list",
+                                    message="插入一条视频",
+                                    data={
+                                        "account_name": account_name,
+                                        "url": article_url,
+                                    },
+                                )
+                            except Exception as e:
+                                try:
+                                    update_sql = f"""
+                                        UPDATE publish_single_video_source
+                                        SET read_cnt = %s, like_cnt = %s
+                                        WHERE url_unique_md5 = %s;
+                                    """
+                                    self.db_client.save(
+                                        query=update_sql,
+                                        params=(
+                                            read_cnt,
+                                            like_cnt,
+                                            generate_gzh_id(article_url),
+                                        ),
+                                    )
+                                except Exception as e:
+                                    error_stack = traceback.format_exc()
+                                    log(
+                                        task="weixin_video_crawler",
+                                        function="update_msg_list",
+                                        status="fail",
+                                        message="更新内容失败",
+                                        data={
+                                            "error": str(e),
+                                            "error_stack": error_stack,
+                                            "url": article_url,
+                                        },
+                                    )
+                        else:
+                            continue
+                    except Exception as e:
+                        error_stack = traceback.format_exc()
+                        log(
+                            task="weixin_video_crawler",
+                            function="update_msg_list",
+                            status="fail",
+                            message="更新内容失败",
+                            data={
+                                "error": str(e),
+                                "error_stack": error_stack,
+                                "url": article_url,
+                            },
+                        )
+
+    def crawler_article_video_list(self, account_obj: Dict, cursor=None):
+        """
+        抓取单个账号的文章列表,获取视频
+        :param cursor:
+        :param account_obj:
+        :return: 返回待下载的视频列表
+        """
+        gh_id = account_obj["gh_id"]
+        account_name = account_obj["account_name"]
+        latest_crawler_timestamp = account_obj["latest_crawler_timestamp"]
+        if latest_crawler_timestamp is None:
+            latest_crawler_timestamp = self.const.DEFAULT_TIMESTAMP
+
+        # 调用爬虫接口
+        response = get_article_list_from_account(gh_id, index=cursor)
+        if response["code"] == self.const.REQUEST_SUCCESS:
+            # 一般返回最近10天的msg_list
+            msg_list = response.get("data", {}).get("data", [])
+            if msg_list:
+                last_msg = msg_list[-1]
+                last_msg_base_info = last_msg["AppMsg"]["BaseInfo"]
+                last_msg_create_timestamp = last_msg_base_info["CreateTime"]
+                self.insert_msg_list(
+                    account_name=account_name, gh_id=gh_id, msg_list=msg_list
+                )
+                if last_msg_create_timestamp > latest_crawler_timestamp:
+                    next_cursor = response["data"]["next_cursor"]
+                    return self.crawler_article_video_list(
+                        account_obj=account_obj, cursor=next_cursor
+                    )
+            else:
+                return []
+        else:
+            return []
+        return []
+
+
+class CrawlerGzhAccountVideos(CrawlerGzhVideos):
+
+    def get_crawler_accounts(self) -> List[Dict]:
+        """
+        获取微信公众号列表
+        :return:
+        """
+        select_sql = f"""
+            SELECT gh_id, account_name, latest_crawler_timestamp
+            FROM weixin_account_for_videos
+            WHERE status = {self.const.ACCOUNT_CRAWL_STATUS}
+            ORDER BY latest_crawler_timestamp;
+        """
+        response = self.db_client.fetch(select_sql, DictCursor)
+        return response
+
+    def update_account_latest_crawler_timestamp(self, gh_id: str) -> int:
+        """
+        更新最新抓取时间戳
+        :param gh_id:
+        :return:
+        """
+        update_sql = f"""
+            UPDATE weixin_account_for_videos
+            SET latest_crawler_timestamp = (
+                SELECT max(publish_timestamp) 
+                FROM publish_single_video_source 
+                WHERE out_account_id = %s
+                )
+            WHERE gh_id = %s;
+        """
+        affected_rows = self.db_client.save(query=update_sql, params=(gh_id, gh_id))
+        return affected_rows
+
+    def deal(self):
+        account_list = self.get_crawler_accounts()
+        for account_obj in tqdm(account_list, desc="crawler_video_for_each_account"):
+            try:
+                self.crawler_article_video_list(account_obj)
+                self.update_account_latest_crawler_timestamp(gh_id=account_obj["gh_id"])
+                time.sleep(self.const.SLEEP_SECONDS)
+            except Exception as e:
+                error_stack = traceback.format_exc()
+                log(
+                    task="weixin_video_crawler",
+                    function="crawler_task",
+                    status="fail",
+                    message="抓取任务失败--单账号",
+                    data={
+                        "error": str(e),
+                        "error_stack": error_stack,
+                        "account_name": account_obj["account_name"],
+                    },
+                )
+
+
+class CrawlerGzhMetaVideos(CrawlerGzhVideos):
+    def get_meta_article_list(self, limit=100000):
+        fetch_query = f"""
+            select article_id, title, out_account_id, read_cnt, like_cnt, article_index, link, publish_time
+            from crawler_meta_article
+            where platform = 'weixin' and score > 0.5 and status = 1 and has_video = 0
+            order by article_id
+            desc limit 1000; 
+        """
+        return self.db_client.fetch(fetch_query, cursor_type=DictCursor)
+
+    def update_article_status(self, article_id, ori_status, new_status):
+        update_query = f"""
+            update crawler_meta_article
+            set has_video = %s 
+            where has_video = %s and article_id = %s;
+        """
+        return self.db_client.save(
+            query=update_query,
+            params=(new_status, ori_status, article_id)
+        )
+
+    def crawler_each_video(self, video_data):
+        """
+        crawler single video data
+        """
+        # lock
+        affected_rows = self.update_article_status(
+            article_id=video_data['article_id'],
+            ori_status=self.const.INIT_STATUS,
+            new_status=self.const.PROCESSING_STATUS
+        )
+        if not affected_rows:
+            return
+
+        video_item = Item()
+        unique_id = generate_gzh_id(video_data["link"])
+
+        # add info to item
+        video_item.add("content_trace_id", f"video{unique_id}")
+        video_item.add("url_unique_md5", unique_id)
+        video_item.add("article_title", video_data['title'])
+        video_item.add("out_account_id", video_data["out_account_id"])
+        video_item.add("out_account_name", "article_meta")
+        video_item.add("publish_timestamp", video_data["publish_time"])
+        video_item.add("read_cnt", video_data["read_cnt"])
+        video_item.add("like_cnt", video_data["like_cnt"])
+        video_item.add("article_index", video_data["article_index"])
+        video_item.add("platform", "gzh")
+        video_item.add("article_url", video_data["link"])
+        video_item.add("crawler_timestamp", int(time.time()))
+
+        # check item before insert
+        video_item.check(source="video")
+
+        try:
+            item_with_oss_path = scrape_video_entities_process(
+                video_item=video_item.item, db_client=self.db_client
+            )
+            if item_with_oss_path:
+                insert_into_single_video_source_table(
+                    db_client=self.db_client, video_item=item_with_oss_path
+                )
+                self.update_article_status(
+                    article_id=video_data['article_id'],
+                    ori_status=self.const.PROCESSING_STATUS,
+                    new_status=self.const.SUCCESS_STATUS
+                )
+            else:
+                self.update_article_status(
+                    article_id=video_data['article_id'],
+                    ori_status=self.const.PROCESSING_STATUS,
+                    new_status=self.const.FAIL_STATUS
+                )
+
+
+        except Exception as e:
+            detail = {
+                "video_item": video_item.item,
+                "error": str(e),
+                "traceback": traceback.format_exc(),
+            }
+            log(
+                task="crawler_gzh_videos",
+                function="crawler_each_video",
+                message="crawler_gzh_videos failed",
+                status="failed",
+                data=detail,
+            )
+            self.update_article_status(
+                article_id=video_data['article_id'],
+                ori_status=self.const.PROCESSING_STATUS,
+                new_status=self.const.FAIL_STATUS
+            )
+
+    def deal(self):
+        meta_article_list = self.get_meta_article_list()
+        for article in tqdm(meta_article_list):
+            self.crawler_each_video(article)
+
+
+