123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397 |
- """
- @author: luojunhui
- @task: 抓取公众号视频
- """
- import json, time
- import traceback
- from typing import List, Dict
- from pymysql.cursors import DictCursor
- from tqdm import tqdm
- from applications import log
- from applications.api import ApolloApi, FeishuBotApi
- from applications.const import WeixinVideoCrawlerConst
- from applications.db import DatabaseConnector
- from applications.pipeline import scrape_video_entities_process
- from applications.utils import (
- generate_gzh_id,
- download_gzh_video,
- upload_to_oss,
- show_desc_to_sta,
- Item,
- insert_into_single_video_source_table,
- )
- from config import long_articles_config
- from cold_start.crawler.wechat import get_article_list_from_account
- from cold_start.filter import video_crawler_duplicate_filter
- class CrawlerGzhVideos:
- def __init__(self):
- self.db_client = DatabaseConnector(long_articles_config)
- self.db_client.connect()
- self.apollo = ApolloApi(env="prod")
- self.const = WeixinVideoCrawlerConst()
- self.festival_list = json.loads(self.apollo.get_config_value("festival"))
- self.feishu_bot = FeishuBotApi()
- def is_festival(self, title: str) -> bool:
- """
- 判断是否为节日
- :param title:
- :return:
- """
- for festival in self.festival_list:
- if festival in title:
- return True
- return False
- def set_status_for_title(self, title: str) -> int:
- """
- set title_status for each title
- """
- if self.is_festival(title):
- return self.const.TITLE_FESTIVAL_STATUS
- elif len(title) < self.const.TITLE_MIN_LENGTH:
- return self.const.TITLE_SHORT_STATUS
- else:
- return self.const.TITLE_DEFAULT_STATUS
- def is_video_downloaded(self, url_unique: str) -> bool:
- """
- check whether video has been downloaded
- """
- fetch_query = f"""
- select id from publish_single_video_source where url_unique_md5 = %s;
- """
- return self.db_client.fetch(query=fetch_query, params=(url_unique,))
- def insert_msg_list(self, account_name, gh_id, msg_list: List[Dict]) -> None:
- """
- 插入视频信息
- :param gh_id:
- :param account_name:
- :param msg_list:
- :return:
- """
- for info in msg_list:
- create_time = (
- info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
- )
- publish_type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
- detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
- if detail_article_list:
- for article in tqdm(
- detail_article_list,
- desc="{}: crawler_in_msg_list".format(account_name),
- ):
- article_url = article.get("ContentUrl", None)
- url_unique = generate_gzh_id(article_url)
- # 判断该视频链接是否下载,若已经下载则直接跳过
- if self.is_video_downloaded(url_unique):
- print("url exists")
- continue
- title = article.get("Title", None)
- if not title:
- continue
- # 判断标题是否重复
- if video_crawler_duplicate_filter(title, self.db_client):
- log(
- task="weixin_video_crawler",
- function="insert_msg_list",
- message="标题去重",
- data={"url": article_url},
- )
- continue
- try:
- download_path = download_gzh_video(article_url)
- if download_path:
- oss_path = upload_to_oss(local_video_path=download_path)
- position = article.get("ItemIndex", None)
- cover_url = article.get("CoverImgUrl", None)
- show_desc = article.get("ShowDesc", None)
- show_stat = show_desc_to_sta(show_desc)
- read_cnt = show_stat.get("show_view_count", 0)
- like_cnt = show_stat.get("show_like_count", 0)
- title_status = self.set_status_for_title(title)
- insert_sql = f"""
- INSERT INTO publish_single_video_source
- (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_index, article_publish_type, article_url, cover_url, video_oss_path, bad_status, publish_timestamp, crawler_timestamp, url_unique_md5)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- try:
- self.db_client.save(
- query=insert_sql,
- params=(
- "video" + url_unique,
- title,
- gh_id,
- account_name,
- read_cnt,
- like_cnt,
- position,
- publish_type,
- article_url,
- cover_url,
- oss_path,
- title_status,
- create_time,
- int(time.time()),
- url_unique,
- ),
- )
- log(
- task="weixin_video_crawler",
- function="insert_msg_list",
- message="插入一条视频",
- data={
- "account_name": account_name,
- "url": article_url,
- },
- )
- except Exception as e:
- try:
- update_sql = f"""
- UPDATE publish_single_video_source
- SET read_cnt = %s, like_cnt = %s
- WHERE url_unique_md5 = %s;
- """
- self.db_client.save(
- query=update_sql,
- params=(
- read_cnt,
- like_cnt,
- generate_gzh_id(article_url),
- ),
- )
- except Exception as e:
- error_stack = traceback.format_exc()
- log(
- task="weixin_video_crawler",
- function="update_msg_list",
- status="fail",
- message="更新内容失败",
- data={
- "error": str(e),
- "error_stack": error_stack,
- "url": article_url,
- },
- )
- else:
- continue
- except Exception as e:
- error_stack = traceback.format_exc()
- log(
- task="weixin_video_crawler",
- function="update_msg_list",
- status="fail",
- message="更新内容失败",
- data={
- "error": str(e),
- "error_stack": error_stack,
- "url": article_url,
- },
- )
- def crawler_article_video_list(self, account_obj: Dict, cursor=None):
- """
- 抓取单个账号的文章列表,获取视频
- :param cursor:
- :param account_obj:
- :return: 返回待下载的视频列表
- """
- gh_id = account_obj["gh_id"]
- account_name = account_obj["account_name"]
- latest_crawler_timestamp = account_obj["latest_crawler_timestamp"]
- if latest_crawler_timestamp is None:
- latest_crawler_timestamp = self.const.DEFAULT_TIMESTAMP
- # 调用爬虫接口
- response = get_article_list_from_account(gh_id, index=cursor)
- if response["code"] == self.const.REQUEST_SUCCESS:
- # 一般返回最近10天的msg_list
- msg_list = response.get("data", {}).get("data", [])
- if msg_list:
- last_msg = msg_list[-1]
- last_msg_base_info = last_msg["AppMsg"]["BaseInfo"]
- last_msg_create_timestamp = last_msg_base_info["CreateTime"]
- self.insert_msg_list(
- account_name=account_name, gh_id=gh_id, msg_list=msg_list
- )
- if last_msg_create_timestamp > latest_crawler_timestamp:
- next_cursor = response["data"]["next_cursor"]
- return self.crawler_article_video_list(
- account_obj=account_obj, cursor=next_cursor
- )
- else:
- return []
- else:
- return []
- return []
- class CrawlerGzhAccountVideos(CrawlerGzhVideos):
- def get_crawler_accounts(self) -> List[Dict]:
- """
- 获取微信公众号列表
- :return:
- """
- select_sql = f"""
- SELECT gh_id, account_name, latest_crawler_timestamp
- FROM weixin_account_for_videos
- WHERE status = {self.const.ACCOUNT_CRAWL_STATUS}
- ORDER BY latest_crawler_timestamp;
- """
- response = self.db_client.fetch(select_sql, DictCursor)
- return response
- def update_account_latest_crawler_timestamp(self, gh_id: str) -> int:
- """
- 更新最新抓取时间戳
- :param gh_id:
- :return:
- """
- update_sql = f"""
- UPDATE weixin_account_for_videos
- SET latest_crawler_timestamp = (
- SELECT max(publish_timestamp)
- FROM publish_single_video_source
- WHERE out_account_id = %s
- )
- WHERE gh_id = %s;
- """
- affected_rows = self.db_client.save(query=update_sql, params=(gh_id, gh_id))
- return affected_rows
- def deal(self):
- account_list = self.get_crawler_accounts()
- for account_obj in tqdm(account_list, desc="crawler_video_for_each_account"):
- try:
- self.crawler_article_video_list(account_obj)
- self.update_account_latest_crawler_timestamp(gh_id=account_obj["gh_id"])
- time.sleep(self.const.SLEEP_SECONDS)
- except Exception as e:
- error_stack = traceback.format_exc()
- log(
- task="weixin_video_crawler",
- function="crawler_task",
- status="fail",
- message="抓取任务失败--单账号",
- data={
- "error": str(e),
- "error_stack": error_stack,
- "account_name": account_obj["account_name"],
- },
- )
- class CrawlerGzhMetaVideos(CrawlerGzhVideos):
- def clearget_meta_article_list(self, limit=100000):
- fetch_query = f"""
- select article_id, title, out_account_id, read_cnt, like_cnt, article_index, link, publish_time
- from crawler_meta_article
- where platform = 'weixin' and score > 0.5 and status = 1 and has_video = 0
- order by article_id
- desc limit 1000;
- """
- return self.db_client.fetch(fetch_query, cursor_type=DictCursor)
- def update_article_status(self, article_id, ori_status, new_status):
- update_query = f"""
- update crawler_meta_article
- set has_video = %s
- where has_video = %s and article_id = %s;
- """
- return self.db_client.save(
- query=update_query,
- params=(new_status, ori_status, article_id)
- )
- def crawler_each_video(self, video_data):
- """
- crawler single video data
- """
- # lock
- affected_rows = self.update_article_status(
- article_id=video_data['article_id'],
- ori_status=self.const.INIT_STATUS,
- new_status=self.const.PROCESSING_STATUS
- )
- if not affected_rows:
- return
- video_item = Item()
- unique_id = generate_gzh_id(video_data["link"])
- # add info to item
- video_item.add("content_trace_id", f"video{unique_id}")
- video_item.add("url_unique_md5", unique_id)
- video_item.add("article_title", video_data['title'])
- video_item.add("out_account_id", video_data["out_account_id"])
- video_item.add("out_account_name", "article_meta")
- video_item.add("publish_timestamp", video_data["publish_time"])
- video_item.add("read_cnt", video_data["read_cnt"])
- video_item.add("like_cnt", video_data["like_cnt"])
- video_item.add("article_index", video_data["article_index"])
- video_item.add("platform", "gzh")
- video_item.add("article_url", video_data["link"])
- video_item.add("crawler_timestamp", int(time.time()))
- # check item before insert
- video_item.check(source="video")
- try:
- item_with_oss_path = scrape_video_entities_process(
- video_item=video_item.item, db_client=self.db_client
- )
- if item_with_oss_path:
- insert_into_single_video_source_table(
- db_client=self.db_client, video_item=item_with_oss_path
- )
- self.update_article_status(
- article_id=video_data['article_id'],
- ori_status=self.const.PROCESSING_STATUS,
- new_status=self.const.SUCCESS_STATUS
- )
- else:
- self.update_article_status(
- article_id=video_data['article_id'],
- ori_status=self.const.PROCESSING_STATUS,
- new_status=self.const.FAIL_STATUS
- )
- except Exception as e:
- detail = {
- "video_item": video_item.item,
- "error": str(e),
- "traceback": traceback.format_exc(),
- }
- log(
- task="crawler_gzh_videos",
- function="crawler_each_video",
- message="crawler_gzh_videos failed",
- status="failed",
- data=detail,
- )
- self.update_article_status(
- article_id=video_data['article_id'],
- ori_status=self.const.PROCESSING_STATUS,
- new_status=self.const.FAIL_STATUS
- )
- def deal(self):
- meta_article_list = self.get_meta_article_list()
- for article in tqdm(meta_article_list):
- self.crawler_each_video(article)
|