Ver código fonte

视频抓取v1提交

luojunhui 7 meses atrás
pai
commit
27f6a0cc49
1 arquivos alterados com 216 adições e 0 exclusões
  1. 216 0
      coldStartTasks/crawler/weixin_video_crawler.py

+ 216 - 0
coldStartTasks/crawler/weixin_video_crawler.py

@@ -0,0 +1,216 @@
+"""
+@author: luojunhui
+抓取视频
+"""
+import time
+import traceback
+from typing import List, Dict
+
+from tqdm import tqdm
+
+from applications import bot
+from applications import log
+from applications import Functions
+from applications import WeixinSpider
+from applications import longArticlesMySQL
+from applications.const import WeixinVideoCrawlerConst
+
+spider = WeixinSpider()
+const = WeixinVideoCrawlerConst()
+functions = Functions()
+
+
+class WeixinVideoCrawler(object):
+    """
+    微信视频抓取
+    """
+
+    def __init__(self):
+        self.db_client = longArticlesMySQL()
+
+    def update_account_latest_crawler_timestamp(self, gh_id: str) -> int:
+        """
+        更新最新抓取时间戳
+        :param gh_id:
+        :return:
+        """
+        update_sql = f"""
+            UPDATE weixin_account_for_videos
+            SET latest_crawler_timestamp = (
+                SELECT max(publish_timestamp) 
+                FROM publish_single_video_source 
+                WHERE out_account_id = %s
+                )
+            WHERE gh_id = %s;
+        """
+        affected_rows = self.db_client.update(
+            sql=update_sql,
+            params=(gh_id, gh_id)
+        )
+        return affected_rows
+
+    def get_crawler_accounts(self) -> List[Dict]:
+        """
+        获取微信公众号列表
+        :return:
+        """
+        select_sql = f"""
+            SELECT gh_id, account_name, latest_crawler_timestamp
+            FROM weixin_account_for_videos
+            WHERE status = {const.ACCOUNT_CRAWL_STATUS};
+        """
+        response = self.db_client.select_json(select_sql)
+        return response
+
+    def crawler_article_video_list(self, account_obj: Dict, cursor=None):
+        """
+        抓取单个账号的文章列表,获取视频
+        :param cursor:
+        :param account_obj:
+        :return: 返回待下载的视频列表
+        """
+        gh_id = account_obj["gh_id"]
+        account_name = account_obj["account_name"]
+        latest_crawler_timestamp = account_obj["latest_crawler_timestamp"]
+        if latest_crawler_timestamp is None:
+            latest_crawler_timestamp = const.DEFAULT_TIMESTAMP
+        # 调用爬虫接口
+        response = spider.update_msg_list(gh_id, index=cursor)
+        if response['code'] == 0:
+            # 一般返回最近10天的msg_list
+            msg_list = response.get('data', {}).get("data", [])
+            if msg_list:
+                last_msg = msg_list[-1]
+                last_msg_base_info = last_msg['AppMsg']['BaseInfo']
+                last_msg_create_timestamp = last_msg_base_info['CreateTime']
+                self.insert_msg_list(account_name=account_name, gh_id=gh_id, msg_list=msg_list)
+                if last_msg_create_timestamp > latest_crawler_timestamp:
+                    next_cursor = response['data']['next_cursor']
+                    return self.crawler_article_video_list(account_obj=account_obj, cursor=next_cursor)
+            else:
+                return []
+        else:
+            return []
+        return []
+
+    def insert_msg_list(self, account_name, gh_id, msg_list: List[Dict]) -> None:
+        """
+        插入视频信息
+        :param gh_id:
+        :param account_name:
+        :param msg_list:
+        :return:
+        """
+        for info in msg_list:
+            create_time = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
+            publish_type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
+            detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
+            if detail_article_list:
+                for article in detail_article_list:
+                    article_url = article.get("ContentUrl", None)
+                    download_path = functions.download_gzh_video(article_url)
+                    if download_path:
+                        oss_path = functions.upload_to_oss(local_video_path=download_path)
+                        title = article.get("Title", None)
+                        position = article.get("ItemIndex", None)
+                        cover_url = article.get("CoverImgUrl", None)
+                        show_desc = article.get("ShowDesc", None)
+                        show_stat = functions.show_desc_to_sta(show_desc)
+                        read_cnt = show_stat.get("show_view_count", 0)
+                        like_cnt = show_stat.get("show_like_count", 0)
+                        url_unique = functions.generateGzhId(article_url)
+                        insert_sql = f"""
+                            INSERT INTO publish_single_video_source
+                            (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_index, article_publish_type, article_url, cover_url, video_oss_path, publish_timestamp, crawler_timestamp, url_unique_md5)
+                            values
+                            (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+                        """
+                        try:
+                            self.db_client.update(
+                                sql=insert_sql,
+                                params=(
+                                    "video" + url_unique,
+                                    title,
+                                    gh_id,
+                                    account_name,
+                                    read_cnt,
+                                    like_cnt,
+                                    position,
+                                    publish_type,
+                                    article_url,
+                                    cover_url,
+                                    oss_path,
+                                    create_time,
+                                    int(time.time()),
+                                    url_unique
+                                )
+                            )
+                            log(
+                                task='weixin_video_crawler',
+                                function="insert_msg_list",
+                                message="插入一条视频",
+                                data={"account_name": account_name, "url": article_url}
+                            )
+                        except Exception as e:
+                            print(str(e))
+                            try:
+                                update_sql = f"""
+                                    UPDATE publish_single_video_source
+                                    SET read_cnt = %s, like_cnt = %s
+                                    WHERE url_unique_md5 = %s;
+                                """
+                                self.db_client.update(
+                                    sql=update_sql,
+                                    params=(read_cnt, like_cnt, functions.generateGzhId(article_url))
+                                )
+                            except Exception as e:
+                                error_stack = traceback.format_exc()
+                                log(
+                                    task='weixin_video_crawler',
+                                    function="update_msg_list",
+                                    status="fail",
+                                    message="更新内容失败",
+                                    data={"error": str(e), "error_stack": error_stack, "url": article_url}
+
+                                )
+                    else:
+                        continue
+
+    def crawler_task(self):
+        """
+        抓取任务
+        :return:
+        """
+        account_list = self.get_crawler_accounts()
+        for account_obj in tqdm(account_list, desc="crawler_video_for_each_account"):
+            self.crawler_article_video_list(account_obj)
+            self.update_account_latest_crawler_timestamp(gh_id=account_obj["gh_id"])
+            time.sleep(1)
+
+    def mention(self, start_timestamp):
+        """
+        飞书发送消息
+        :param start_timestamp:
+        :return:
+        """
+        sql = f"""select count(1) from publish_single_video_source where crawler_timestamp >= {start_timestamp};"""
+        response = self.db_client.select(sql)
+        new_articles_count = response[0][0]
+        bot(
+            title='微信抓取任务执行完成',
+            detail={
+                "新增视频数量": new_articles_count
+            }
+        )
+
+    def run(self):
+        """
+        执行任务
+        :return:
+        """
+        start_timestamp = int(time.time())
+        self.crawler_task()
+        self.mention(start_timestamp)
+
+
+