| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397 | """@author: luojunhui@task: 抓取公众号视频"""import json, timeimport tracebackfrom typing import List, Dictfrom pymysql.cursors import DictCursorfrom tqdm import tqdmfrom applications import logfrom applications.api import ApolloApi, FeishuBotApifrom applications.const import WeixinVideoCrawlerConstfrom applications.db import DatabaseConnectorfrom applications.pipeline import scrape_video_entities_processfrom applications.utils import (    generate_gzh_id,    download_gzh_video,    upload_to_oss,    show_desc_to_sta,    Item,    insert_into_single_video_source_table,)from config import long_articles_configfrom cold_start.crawler.wechat import get_article_list_from_accountfrom cold_start.filter import video_crawler_duplicate_filterclass CrawlerGzhVideos:    def __init__(self):        self.db_client = DatabaseConnector(long_articles_config)        self.db_client.connect()        self.apollo = ApolloApi(env="prod")        self.const = WeixinVideoCrawlerConst()        self.festival_list = json.loads(self.apollo.get_config_value("festival"))        self.feishu_bot = FeishuBotApi()    def is_festival(self, title: str) -> bool:        """        判断是否为节日        :param title:        :return:        """        for festival in self.festival_list:            if festival in title:                return True        return False    def set_status_for_title(self, title: str) -> int:        """        set title_status for each title        """        if self.is_festival(title):            return self.const.TITLE_FESTIVAL_STATUS        elif len(title) < self.const.TITLE_MIN_LENGTH:            return self.const.TITLE_SHORT_STATUS        else:            return self.const.TITLE_DEFAULT_STATUS    def is_video_downloaded(self, url_unique: str) -> bool:        """        check whether video has been downloaded        """        fetch_query = f"""            select id from publish_single_video_source where url_unique_md5 = %s;        """        return self.db_client.fetch(query=fetch_query, params=(url_unique,))    def insert_msg_list(self, account_name, gh_id, msg_list: List[Dict]) -> None:        """        插入视频信息        :param gh_id:        :param account_name:        :param msg_list:        :return:        """        for info in msg_list:            create_time = (                info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)            )            publish_type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)            detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])            if detail_article_list:                for article in tqdm(                    detail_article_list,                    desc="{}: crawler_in_msg_list".format(account_name),                ):                    article_url = article.get("ContentUrl", None)                    url_unique = generate_gzh_id(article_url)                    # 判断该视频链接是否下载,若已经下载则直接跳过                    if self.is_video_downloaded(url_unique):                        print("url exists")                        continue                    title = article.get("Title", None)                    if not title:                        continue                    # 判断标题是否重复                    if video_crawler_duplicate_filter(title, self.db_client):                        log(                            task="weixin_video_crawler",                            function="insert_msg_list",                            message="标题去重",                            data={"url": article_url},                        )                        continue                    try:                        download_path = download_gzh_video(article_url)                        if download_path:                            oss_path = upload_to_oss(local_video_path=download_path)                            position = article.get("ItemIndex", None)                            cover_url = article.get("CoverImgUrl", None)                            show_desc = article.get("ShowDesc", None)                            show_stat = show_desc_to_sta(show_desc)                            read_cnt = show_stat.get("show_view_count", 0)                            like_cnt = show_stat.get("show_like_count", 0)                            title_status = self.set_status_for_title(title)                            insert_sql = f"""                                INSERT INTO publish_single_video_source                                (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_index, article_publish_type, article_url, cover_url, video_oss_path, bad_status, publish_timestamp, crawler_timestamp, url_unique_md5)                                values                                (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);                            """                            try:                                self.db_client.save(                                    query=insert_sql,                                    params=(                                        "video" + url_unique,                                        title,                                        gh_id,                                        account_name,                                        read_cnt,                                        like_cnt,                                        position,                                        publish_type,                                        article_url,                                        cover_url,                                        oss_path,                                        title_status,                                        create_time,                                        int(time.time()),                                        url_unique,                                    ),                                )                                log(                                    task="weixin_video_crawler",                                    function="insert_msg_list",                                    message="插入一条视频",                                    data={                                        "account_name": account_name,                                        "url": article_url,                                    },                                )                            except Exception as e:                                try:                                    update_sql = f"""                                        UPDATE publish_single_video_source                                        SET read_cnt = %s, like_cnt = %s                                        WHERE url_unique_md5 = %s;                                    """                                    self.db_client.save(                                        query=update_sql,                                        params=(                                            read_cnt,                                            like_cnt,                                            generate_gzh_id(article_url),                                        ),                                    )                                except Exception as e:                                    error_stack = traceback.format_exc()                                    log(                                        task="weixin_video_crawler",                                        function="update_msg_list",                                        status="fail",                                        message="更新内容失败",                                        data={                                            "error": str(e),                                            "error_stack": error_stack,                                            "url": article_url,                                        },                                    )                        else:                            continue                    except Exception as e:                        error_stack = traceback.format_exc()                        log(                            task="weixin_video_crawler",                            function="update_msg_list",                            status="fail",                            message="更新内容失败",                            data={                                "error": str(e),                                "error_stack": error_stack,                                "url": article_url,                            },                        )    def crawler_article_video_list(self, account_obj: Dict, cursor=None):        """        抓取单个账号的文章列表,获取视频        :param cursor:        :param account_obj:        :return: 返回待下载的视频列表        """        gh_id = account_obj["gh_id"]        account_name = account_obj["account_name"]        latest_crawler_timestamp = account_obj["latest_crawler_timestamp"]        if latest_crawler_timestamp is None:            latest_crawler_timestamp = self.const.DEFAULT_TIMESTAMP        # 调用爬虫接口        response = get_article_list_from_account(gh_id, index=cursor)        if response["code"] == self.const.REQUEST_SUCCESS:            # 一般返回最近10天的msg_list            msg_list = response.get("data", {}).get("data", [])            if msg_list:                last_msg = msg_list[-1]                last_msg_base_info = last_msg["AppMsg"]["BaseInfo"]                last_msg_create_timestamp = last_msg_base_info["CreateTime"]                self.insert_msg_list(                    account_name=account_name, gh_id=gh_id, msg_list=msg_list                )                if last_msg_create_timestamp > latest_crawler_timestamp:                    next_cursor = response["data"]["next_cursor"]                    return self.crawler_article_video_list(                        account_obj=account_obj, cursor=next_cursor                    )            else:                return []        else:            return []        return []class CrawlerGzhAccountVideos(CrawlerGzhVideos):    def get_crawler_accounts(self) -> List[Dict]:        """        获取微信公众号列表        :return:        """        select_sql = f"""            SELECT gh_id, account_name, latest_crawler_timestamp            FROM weixin_account_for_videos            WHERE status = {self.const.ACCOUNT_CRAWL_STATUS}            ORDER BY latest_crawler_timestamp;        """        response = self.db_client.fetch(select_sql, DictCursor)        return response    def update_account_latest_crawler_timestamp(self, gh_id: str) -> int:        """        更新最新抓取时间戳        :param gh_id:        :return:        """        update_sql = f"""            UPDATE weixin_account_for_videos            SET latest_crawler_timestamp = (                SELECT max(publish_timestamp)                 FROM publish_single_video_source                 WHERE out_account_id = %s                )            WHERE gh_id = %s;        """        affected_rows = self.db_client.save(query=update_sql, params=(gh_id, gh_id))        return affected_rows    def deal(self):        account_list = self.get_crawler_accounts()        for account_obj in tqdm(account_list, desc="crawler_video_for_each_account"):            try:                self.crawler_article_video_list(account_obj)                self.update_account_latest_crawler_timestamp(gh_id=account_obj["gh_id"])                time.sleep(self.const.SLEEP_SECONDS)            except Exception as e:                error_stack = traceback.format_exc()                log(                    task="weixin_video_crawler",                    function="crawler_task",                    status="fail",                    message="抓取任务失败--单账号",                    data={                        "error": str(e),                        "error_stack": error_stack,                        "account_name": account_obj["account_name"],                    },                )class CrawlerGzhMetaVideos(CrawlerGzhVideos):    def get_meta_article_list(self, limit=100000):        fetch_query = f"""            select article_id, title, out_account_id, read_cnt, like_cnt, article_index, link, publish_time            from crawler_meta_article            where platform = 'weixin' and score > 0.5 and status = 1 and has_video = 0            order by article_id            desc limit 1000;         """        return self.db_client.fetch(fetch_query, cursor_type=DictCursor)    def update_article_status(self, article_id, ori_status, new_status):        update_query = f"""            update crawler_meta_article            set has_video = %s             where has_video = %s and article_id = %s;        """        return self.db_client.save(            query=update_query,            params=(new_status, ori_status, article_id)        )    def crawler_each_video(self, video_data):        """        crawler single video data        """        # lock        affected_rows = self.update_article_status(            article_id=video_data['article_id'],            ori_status=self.const.INIT_STATUS,            new_status=self.const.PROCESSING_STATUS        )        if not affected_rows:            return        video_item = Item()        unique_id = generate_gzh_id(video_data["link"])        # add info to item        video_item.add("content_trace_id", f"video{unique_id}")        video_item.add("url_unique_md5", unique_id)        video_item.add("article_title", video_data['title'])        video_item.add("out_account_id", video_data["out_account_id"])        video_item.add("out_account_name", "article_meta")        video_item.add("publish_timestamp", video_data["publish_time"])        video_item.add("read_cnt", video_data["read_cnt"])        video_item.add("like_cnt", video_data["like_cnt"])        video_item.add("article_index", video_data["article_index"])        video_item.add("platform", "gzh")        video_item.add("article_url", video_data["link"])        video_item.add("crawler_timestamp", int(time.time()))        # check item before insert        video_item.check(source="video")        try:            item_with_oss_path = scrape_video_entities_process(                video_item=video_item.item, db_client=self.db_client            )            if item_with_oss_path:                insert_into_single_video_source_table(                    db_client=self.db_client, video_item=item_with_oss_path                )                self.update_article_status(                    article_id=video_data['article_id'],                    ori_status=self.const.PROCESSING_STATUS,                    new_status=self.const.SUCCESS_STATUS                )            else:                self.update_article_status(                    article_id=video_data['article_id'],                    ori_status=self.const.PROCESSING_STATUS,                    new_status=self.const.FAIL_STATUS                )        except Exception as e:            detail = {                "video_item": video_item.item,                "error": str(e),                "traceback": traceback.format_exc(),            }            log(                task="crawler_gzh_videos",                function="crawler_each_video",                message="crawler_gzh_videos failed",                status="failed",                data=detail,            )            self.update_article_status(                article_id=video_data['article_id'],                ori_status=self.const.PROCESSING_STATUS,                new_status=self.const.FAIL_STATUS            )    def deal(self):        meta_article_list = self.get_meta_article_list()        for article in tqdm(meta_article_list):            self.crawler_each_video(article)
 |