|
@@ -0,0 +1,397 @@
|
|
|
+"""
|
|
|
+@author: luojunhui
|
|
|
+@task: 抓取公众号视频
|
|
|
+"""
|
|
|
+
|
|
|
+import json, time
|
|
|
+import traceback
|
|
|
+from typing import List, Dict
|
|
|
+
|
|
|
+from pymysql.cursors import DictCursor
|
|
|
+from tqdm import tqdm
|
|
|
+
|
|
|
+from applications import log
|
|
|
+from applications.api import ApolloApi, FeishuBotApi
|
|
|
+from applications.const import WeixinVideoCrawlerConst
|
|
|
+from applications.db import DatabaseConnector
|
|
|
+from applications.pipeline import scrape_video_entities_process
|
|
|
+from applications.utils import (
|
|
|
+ generate_gzh_id,
|
|
|
+ download_gzh_video,
|
|
|
+ upload_to_oss,
|
|
|
+ show_desc_to_sta,
|
|
|
+ Item,
|
|
|
+ insert_into_single_video_source_table,
|
|
|
+)
|
|
|
+from config import long_articles_config
|
|
|
+from cold_start.crawler.wechat import get_article_list_from_account
|
|
|
+from cold_start.filter import video_crawler_duplicate_filter
|
|
|
+
|
|
|
+
|
|
|
+class CrawlerGzhVideos:
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.db_client = DatabaseConnector(long_articles_config)
|
|
|
+ self.db_client.connect()
|
|
|
+ self.apollo = ApolloApi(env="prod")
|
|
|
+ self.const = WeixinVideoCrawlerConst()
|
|
|
+ self.festival_list = json.loads(self.apollo.get_config_value("festival"))
|
|
|
+ self.feishu_bot = FeishuBotApi()
|
|
|
+
|
|
|
+ def is_festival(self, title: str) -> bool:
|
|
|
+ """
|
|
|
+ 判断是否为节日
|
|
|
+ :param title:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ for festival in self.festival_list:
|
|
|
+ if festival in title:
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
+ def set_status_for_title(self, title: str) -> int:
|
|
|
+ """
|
|
|
+ set title_status for each title
|
|
|
+ """
|
|
|
+ if self.is_festival(title):
|
|
|
+ return self.const.TITLE_FESTIVAL_STATUS
|
|
|
+ elif len(title) < self.const.TITLE_MIN_LENGTH:
|
|
|
+ return self.const.TITLE_SHORT_STATUS
|
|
|
+ else:
|
|
|
+ return self.const.TITLE_DEFAULT_STATUS
|
|
|
+
|
|
|
+ def is_video_downloaded(self, url_unique: str) -> bool:
|
|
|
+ """
|
|
|
+ check whether video has been downloaded
|
|
|
+ """
|
|
|
+ fetch_query = f"""
|
|
|
+ select id from publish_single_video_source where url_unique_md5 = %s;
|
|
|
+ """
|
|
|
+ return self.db_client.fetch(query=fetch_query, params=(url_unique,))
|
|
|
+
|
|
|
+ def insert_msg_list(self, account_name, gh_id, msg_list: List[Dict]) -> None:
|
|
|
+ """
|
|
|
+ 插入视频信息
|
|
|
+ :param gh_id:
|
|
|
+ :param account_name:
|
|
|
+ :param msg_list:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ for info in msg_list:
|
|
|
+ create_time = (
|
|
|
+ info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
|
|
|
+ )
|
|
|
+ publish_type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
|
|
|
+ detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
|
|
|
+ if detail_article_list:
|
|
|
+ for article in tqdm(
|
|
|
+ detail_article_list,
|
|
|
+ desc="{}: crawler_in_msg_list".format(account_name),
|
|
|
+ ):
|
|
|
+ article_url = article.get("ContentUrl", None)
|
|
|
+ url_unique = generate_gzh_id(article_url)
|
|
|
+ # 判断该视频链接是否下载,若已经下载则直接跳过
|
|
|
+ if self.is_video_downloaded(url_unique):
|
|
|
+ print("url exists")
|
|
|
+ continue
|
|
|
+
|
|
|
+ title = article.get("Title", None)
|
|
|
+ if not title:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 判断标题是否重复
|
|
|
+ if video_crawler_duplicate_filter(title, self.db_client):
|
|
|
+ log(
|
|
|
+ task="weixin_video_crawler",
|
|
|
+ function="insert_msg_list",
|
|
|
+ message="标题去重",
|
|
|
+ data={"url": article_url},
|
|
|
+ )
|
|
|
+ continue
|
|
|
+
|
|
|
+ try:
|
|
|
+ download_path = download_gzh_video(article_url)
|
|
|
+ if download_path:
|
|
|
+ oss_path = upload_to_oss(local_video_path=download_path)
|
|
|
+ position = article.get("ItemIndex", None)
|
|
|
+ cover_url = article.get("CoverImgUrl", None)
|
|
|
+ show_desc = article.get("ShowDesc", None)
|
|
|
+ show_stat = show_desc_to_sta(show_desc)
|
|
|
+ read_cnt = show_stat.get("show_view_count", 0)
|
|
|
+ like_cnt = show_stat.get("show_like_count", 0)
|
|
|
+ title_status = self.set_status_for_title(title)
|
|
|
+ insert_sql = f"""
|
|
|
+ INSERT INTO publish_single_video_source
|
|
|
+ (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_index, article_publish_type, article_url, cover_url, video_oss_path, bad_status, publish_timestamp, crawler_timestamp, url_unique_md5)
|
|
|
+ values
|
|
|
+ (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ self.db_client.save(
|
|
|
+ query=insert_sql,
|
|
|
+ params=(
|
|
|
+ "video" + url_unique,
|
|
|
+ title,
|
|
|
+ gh_id,
|
|
|
+ account_name,
|
|
|
+ read_cnt,
|
|
|
+ like_cnt,
|
|
|
+ position,
|
|
|
+ publish_type,
|
|
|
+ article_url,
|
|
|
+ cover_url,
|
|
|
+ oss_path,
|
|
|
+ title_status,
|
|
|
+ create_time,
|
|
|
+ int(time.time()),
|
|
|
+ url_unique,
|
|
|
+ ),
|
|
|
+ )
|
|
|
+ log(
|
|
|
+ task="weixin_video_crawler",
|
|
|
+ function="insert_msg_list",
|
|
|
+ message="插入一条视频",
|
|
|
+ data={
|
|
|
+ "account_name": account_name,
|
|
|
+ "url": article_url,
|
|
|
+ },
|
|
|
+ )
|
|
|
+ except Exception as e:
|
|
|
+ try:
|
|
|
+ update_sql = f"""
|
|
|
+ UPDATE publish_single_video_source
|
|
|
+ SET read_cnt = %s, like_cnt = %s
|
|
|
+ WHERE url_unique_md5 = %s;
|
|
|
+ """
|
|
|
+ self.db_client.save(
|
|
|
+ query=update_sql,
|
|
|
+ params=(
|
|
|
+ read_cnt,
|
|
|
+ like_cnt,
|
|
|
+ generate_gzh_id(article_url),
|
|
|
+ ),
|
|
|
+ )
|
|
|
+ except Exception as e:
|
|
|
+ error_stack = traceback.format_exc()
|
|
|
+ log(
|
|
|
+ task="weixin_video_crawler",
|
|
|
+ function="update_msg_list",
|
|
|
+ status="fail",
|
|
|
+ message="更新内容失败",
|
|
|
+ data={
|
|
|
+ "error": str(e),
|
|
|
+ "error_stack": error_stack,
|
|
|
+ "url": article_url,
|
|
|
+ },
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ continue
|
|
|
+ except Exception as e:
|
|
|
+ error_stack = traceback.format_exc()
|
|
|
+ log(
|
|
|
+ task="weixin_video_crawler",
|
|
|
+ function="update_msg_list",
|
|
|
+ status="fail",
|
|
|
+ message="更新内容失败",
|
|
|
+ data={
|
|
|
+ "error": str(e),
|
|
|
+ "error_stack": error_stack,
|
|
|
+ "url": article_url,
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+ def crawler_article_video_list(self, account_obj: Dict, cursor=None):
|
|
|
+ """
|
|
|
+ 抓取单个账号的文章列表,获取视频
|
|
|
+ :param cursor:
|
|
|
+ :param account_obj:
|
|
|
+ :return: 返回待下载的视频列表
|
|
|
+ """
|
|
|
+ gh_id = account_obj["gh_id"]
|
|
|
+ account_name = account_obj["account_name"]
|
|
|
+ latest_crawler_timestamp = account_obj["latest_crawler_timestamp"]
|
|
|
+ if latest_crawler_timestamp is None:
|
|
|
+ latest_crawler_timestamp = self.const.DEFAULT_TIMESTAMP
|
|
|
+
|
|
|
+ # 调用爬虫接口
|
|
|
+ response = get_article_list_from_account(gh_id, index=cursor)
|
|
|
+ if response["code"] == self.const.REQUEST_SUCCESS:
|
|
|
+ # 一般返回最近10天的msg_list
|
|
|
+ msg_list = response.get("data", {}).get("data", [])
|
|
|
+ if msg_list:
|
|
|
+ last_msg = msg_list[-1]
|
|
|
+ last_msg_base_info = last_msg["AppMsg"]["BaseInfo"]
|
|
|
+ last_msg_create_timestamp = last_msg_base_info["CreateTime"]
|
|
|
+ self.insert_msg_list(
|
|
|
+ account_name=account_name, gh_id=gh_id, msg_list=msg_list
|
|
|
+ )
|
|
|
+ if last_msg_create_timestamp > latest_crawler_timestamp:
|
|
|
+ next_cursor = response["data"]["next_cursor"]
|
|
|
+ return self.crawler_article_video_list(
|
|
|
+ account_obj=account_obj, cursor=next_cursor
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ return []
|
|
|
+ else:
|
|
|
+ return []
|
|
|
+ return []
|
|
|
+
|
|
|
+
|
|
|
+class CrawlerGzhAccountVideos(CrawlerGzhVideos):
|
|
|
+
|
|
|
+ def get_crawler_accounts(self) -> List[Dict]:
|
|
|
+ """
|
|
|
+ 获取微信公众号列表
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ select_sql = f"""
|
|
|
+ SELECT gh_id, account_name, latest_crawler_timestamp
|
|
|
+ FROM weixin_account_for_videos
|
|
|
+ WHERE status = {self.const.ACCOUNT_CRAWL_STATUS}
|
|
|
+ ORDER BY latest_crawler_timestamp;
|
|
|
+ """
|
|
|
+ response = self.db_client.fetch(select_sql, DictCursor)
|
|
|
+ return response
|
|
|
+
|
|
|
+ def update_account_latest_crawler_timestamp(self, gh_id: str) -> int:
|
|
|
+ """
|
|
|
+ 更新最新抓取时间戳
|
|
|
+ :param gh_id:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ update_sql = f"""
|
|
|
+ UPDATE weixin_account_for_videos
|
|
|
+ SET latest_crawler_timestamp = (
|
|
|
+ SELECT max(publish_timestamp)
|
|
|
+ FROM publish_single_video_source
|
|
|
+ WHERE out_account_id = %s
|
|
|
+ )
|
|
|
+ WHERE gh_id = %s;
|
|
|
+ """
|
|
|
+ affected_rows = self.db_client.save(query=update_sql, params=(gh_id, gh_id))
|
|
|
+ return affected_rows
|
|
|
+
|
|
|
+ def deal(self):
|
|
|
+ account_list = self.get_crawler_accounts()
|
|
|
+ for account_obj in tqdm(account_list, desc="crawler_video_for_each_account"):
|
|
|
+ try:
|
|
|
+ self.crawler_article_video_list(account_obj)
|
|
|
+ self.update_account_latest_crawler_timestamp(gh_id=account_obj["gh_id"])
|
|
|
+ time.sleep(self.const.SLEEP_SECONDS)
|
|
|
+ except Exception as e:
|
|
|
+ error_stack = traceback.format_exc()
|
|
|
+ log(
|
|
|
+ task="weixin_video_crawler",
|
|
|
+ function="crawler_task",
|
|
|
+ status="fail",
|
|
|
+ message="抓取任务失败--单账号",
|
|
|
+ data={
|
|
|
+ "error": str(e),
|
|
|
+ "error_stack": error_stack,
|
|
|
+ "account_name": account_obj["account_name"],
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+class CrawlerGzhMetaVideos(CrawlerGzhVideos):
|
|
|
+ def clearget_meta_article_list(self, limit=100000):
|
|
|
+ fetch_query = f"""
|
|
|
+ select article_id, title, out_account_id, read_cnt, like_cnt, article_index, link, publish_time
|
|
|
+ from crawler_meta_article
|
|
|
+ where platform = 'weixin' and score > 0.5 and status = 1 and has_video = 0
|
|
|
+ order by article_id
|
|
|
+ desc limit 1000;
|
|
|
+ """
|
|
|
+ return self.db_client.fetch(fetch_query, cursor_type=DictCursor)
|
|
|
+
|
|
|
+ def update_article_status(self, article_id, ori_status, new_status):
|
|
|
+ update_query = f"""
|
|
|
+ update crawler_meta_article
|
|
|
+ set has_video = %s
|
|
|
+ where has_video = %s and article_id = %s;
|
|
|
+ """
|
|
|
+ return self.db_client.save(
|
|
|
+ query=update_query,
|
|
|
+ params=(new_status, ori_status, article_id)
|
|
|
+ )
|
|
|
+
|
|
|
+ def crawler_each_video(self, video_data):
|
|
|
+ """
|
|
|
+ crawler single video data
|
|
|
+ """
|
|
|
+ # lock
|
|
|
+ affected_rows = self.update_article_status(
|
|
|
+ article_id=video_data['article_id'],
|
|
|
+ ori_status=self.const.INIT_STATUS,
|
|
|
+ new_status=self.const.PROCESSING_STATUS
|
|
|
+ )
|
|
|
+ if not affected_rows:
|
|
|
+ return
|
|
|
+
|
|
|
+ video_item = Item()
|
|
|
+ unique_id = generate_gzh_id(video_data["link"])
|
|
|
+
|
|
|
+ # add info to item
|
|
|
+ video_item.add("content_trace_id", f"video{unique_id}")
|
|
|
+ video_item.add("url_unique_md5", unique_id)
|
|
|
+ video_item.add("article_title", video_data['title'])
|
|
|
+ video_item.add("out_account_id", video_data["out_account_id"])
|
|
|
+ video_item.add("out_account_name", "article_meta")
|
|
|
+ video_item.add("publish_timestamp", video_data["publish_time"])
|
|
|
+ video_item.add("read_cnt", video_data["read_cnt"])
|
|
|
+ video_item.add("like_cnt", video_data["like_cnt"])
|
|
|
+ video_item.add("article_index", video_data["article_index"])
|
|
|
+ video_item.add("platform", "gzh")
|
|
|
+ video_item.add("article_url", video_data["link"])
|
|
|
+ video_item.add("crawler_timestamp", int(time.time()))
|
|
|
+
|
|
|
+ # check item before insert
|
|
|
+ video_item.check(source="video")
|
|
|
+
|
|
|
+ try:
|
|
|
+ item_with_oss_path = scrape_video_entities_process(
|
|
|
+ video_item=video_item.item, db_client=self.db_client
|
|
|
+ )
|
|
|
+ if item_with_oss_path:
|
|
|
+ insert_into_single_video_source_table(
|
|
|
+ db_client=self.db_client, video_item=item_with_oss_path
|
|
|
+ )
|
|
|
+ self.update_article_status(
|
|
|
+ article_id=video_data['article_id'],
|
|
|
+ ori_status=self.const.PROCESSING_STATUS,
|
|
|
+ new_status=self.const.SUCCESS_STATUS
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ self.update_article_status(
|
|
|
+ article_id=video_data['article_id'],
|
|
|
+ ori_status=self.const.PROCESSING_STATUS,
|
|
|
+ new_status=self.const.FAIL_STATUS
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ detail = {
|
|
|
+ "video_item": video_item.item,
|
|
|
+ "error": str(e),
|
|
|
+ "traceback": traceback.format_exc(),
|
|
|
+ }
|
|
|
+ log(
|
|
|
+ task="crawler_gzh_videos",
|
|
|
+ function="crawler_each_video",
|
|
|
+ message="crawler_gzh_videos failed",
|
|
|
+ status="failed",
|
|
|
+ data=detail,
|
|
|
+ )
|
|
|
+ self.update_article_status(
|
|
|
+ article_id=video_data['article_id'],
|
|
|
+ ori_status=self.const.PROCESSING_STATUS,
|
|
|
+ new_status=self.const.FAIL_STATUS
|
|
|
+ )
|
|
|
+
|
|
|
+ def deal(self):
|
|
|
+ meta_article_list = self.get_meta_article_list()
|
|
|
+ for article in tqdm(meta_article_list):
|
|
|
+ self.crawler_each_video(article)
|
|
|
+
|
|
|
+
|
|
|
+
|