| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304 | 
							- """
 
- @author: luojunhui
 
- 抓取视频
 
- """
 
- import json
 
- import time
 
- import traceback
 
- from typing import List, Dict
 
- from tqdm import tqdm
 
- from pymysql.cursors import DictCursor
 
- from config import apolloConfig
 
- from applications import bot
 
- from applications import log
 
- from applications import Functions
 
- from applications import WeixinSpider
 
- from applications import longArticlesMySQL
 
- from applications.const import WeixinVideoCrawlerConst
 
- from coldStartTasks.filter import video_crawler_duplicate_filter
 
- spider = WeixinSpider()
 
- functions = Functions()
 
- config = apolloConfig(env="prod")
 
- const = WeixinVideoCrawlerConst()
 
- class WeixinVideoCrawler(object):
 
-     """
 
-     微信视频抓取
 
-     """
 
-     def __init__(self):
 
-         self.db_client = longArticlesMySQL()
 
-         self.festival_list = json.loads(config.getConfigValue("festival"))
 
-     def is_festival(self, title: str) -> bool:
 
-         """
 
-         判断是否为节假日
 
-         :param title:
 
-         :return:
 
-         """
 
-         for festival in self.festival_list:
 
-             if festival in title:
 
-                 return True
 
-         return False
 
-     def get_title_status(self, title: str) -> int:
 
-         """
 
-         通过标题获取文章状态
 
-         :param title:
 
-         :return:
 
-         """
 
-         if self.is_festival(title):
 
-             return const.TITLE_FESTIVAL_STATUS
 
-         elif len(title) < const.TITLE_MIN_LENGTH:
 
-             return const.TITLE_SHORT_STATUS
 
-         else:
 
-             return const.TITLE_DEFAULT_STATUS
 
-     def update_account_latest_crawler_timestamp(self, gh_id: str) -> int:
 
-         """
 
-         更新最新抓取时间戳
 
-         :param gh_id:
 
-         :return:
 
-         """
 
-         update_sql = f"""
 
-             UPDATE weixin_account_for_videos
 
-             SET latest_crawler_timestamp = (
 
-                 SELECT max(publish_timestamp) 
 
-                 FROM publish_single_video_source 
 
-                 WHERE out_account_id = %s
 
-                 )
 
-             WHERE gh_id = %s;
 
-         """
 
-         affected_rows = self.db_client.update(
 
-             sql=update_sql,
 
-             params=(gh_id, gh_id)
 
-         )
 
-         return affected_rows
 
-     def get_crawler_accounts(self) -> List[Dict]:
 
-         """
 
-         获取微信公众号列表
 
-         :return:
 
-         """
 
-         select_sql = f"""
 
-             SELECT gh_id, account_name, latest_crawler_timestamp
 
-             FROM weixin_account_for_videos
 
-             WHERE status = {const.ACCOUNT_CRAWL_STATUS}
 
-             ORDER BY latest_crawler_timestamp;
 
-         """
 
-         response = self.db_client.select(select_sql, DictCursor)
 
-         return response
 
-     def crawler_article_video_list(self, account_obj: Dict, cursor=None):
 
-         """
 
-         抓取单个账号的文章列表,获取视频
 
-         :param cursor:
 
-         :param account_obj:
 
-         :return: 返回待下载的视频列表
 
-         """
 
-         gh_id = account_obj["gh_id"]
 
-         account_name = account_obj["account_name"]
 
-         latest_crawler_timestamp = account_obj["latest_crawler_timestamp"]
 
-         if latest_crawler_timestamp is None:
 
-             latest_crawler_timestamp = const.DEFAULT_TIMESTAMP
 
-         # 调用爬虫接口
 
-         response = spider.update_msg_list(gh_id, index=cursor)
 
-         if response['code'] == const.REQUEST_SUCCESS:
 
-             # 一般返回最近10天的msg_list
 
-             msg_list = response.get('data', {}).get("data", [])
 
-             if msg_list:
 
-                 last_msg = msg_list[-1]
 
-                 last_msg_base_info = last_msg['AppMsg']['BaseInfo']
 
-                 last_msg_create_timestamp = last_msg_base_info['CreateTime']
 
-                 self.insert_msg_list(account_name=account_name, gh_id=gh_id, msg_list=msg_list)
 
-                 if last_msg_create_timestamp > latest_crawler_timestamp:
 
-                     next_cursor = response['data']['next_cursor']
 
-                     return self.crawler_article_video_list(account_obj=account_obj, cursor=next_cursor)
 
-             else:
 
-                 return []
 
-         else:
 
-             return []
 
-         return []
 
-     def is_downloaded(self, url_unique: str) -> bool:
 
-         """
 
-         判断该视频是否已经下载
 
-         :param url_unique:
 
-         :return:
 
-         """
 
-         select_sql = f"""
 
-             SELECT id
 
-             FROM publish_single_video_source
 
-             WHERE url_unique_md5 = '{url_unique}';
 
-         """
 
-         response = self.db_client.select(select_sql)
 
-         if response:
 
-             return True
 
-         else:
 
-             return False
 
-     def insert_msg_list(self, account_name, gh_id, msg_list: List[Dict]) -> None:
 
-         """
 
-         插入视频信息
 
-         :param gh_id:
 
-         :param account_name:
 
-         :param msg_list:
 
-         :return:
 
-         """
 
-         for info in msg_list:
 
-             create_time = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
 
-             publish_type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
 
-             detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
 
-             if detail_article_list:
 
-                 for article in tqdm(detail_article_list, desc="{}: crawler_in_msg_list".format(account_name)):
 
-                     article_url = article.get("ContentUrl", None)
 
-                     url_unique = functions.generateGzhId(article_url)
 
-                     # 判断该视频链接是否下载,若已经下载则直接跳过
 
-                     if self.is_downloaded(url_unique):
 
-                         print("url exists")
 
-                         continue
 
-                     title = article.get("Title", None)
 
-                     if not title:
 
-                         continue
 
-                     # 判断标题是否重复
 
-                     if video_crawler_duplicate_filter(title, self.db_client):
 
-                         log(
 
-                             task='weixin_video_crawler',
 
-                             function="insert_msg_list",
 
-                             message="标题去重",
 
-                             data={"url": article_url}
 
-                         )
 
-                         continue
 
-                     try:
 
-                         download_path = functions.download_gzh_video(article_url)
 
-                         if download_path:
 
-                             oss_path = functions.upload_to_oss(local_video_path=download_path)
 
-                             position = article.get("ItemIndex", None)
 
-                             cover_url = article.get("CoverImgUrl", None)
 
-                             show_desc = article.get("ShowDesc", None)
 
-                             show_stat = functions.show_desc_to_sta(show_desc)
 
-                             read_cnt = show_stat.get("show_view_count", 0)
 
-                             like_cnt = show_stat.get("show_like_count", 0)
 
-                             title_status = self.get_title_status(title)
 
-                             insert_sql = f"""
 
-                                 INSERT INTO publish_single_video_source
 
-                                 (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_index, article_publish_type, article_url, cover_url, video_oss_path, bad_status, publish_timestamp, crawler_timestamp, url_unique_md5)
 
-                                 values
 
-                                 (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
 
-                             """
 
-                             try:
 
-                                 self.db_client.update(
 
-                                     sql=insert_sql,
 
-                                     params=(
 
-                                         "video" + url_unique,
 
-                                         title,
 
-                                         gh_id,
 
-                                         account_name,
 
-                                         read_cnt,
 
-                                         like_cnt,
 
-                                         position,
 
-                                         publish_type,
 
-                                         article_url,
 
-                                         cover_url,
 
-                                         oss_path,
 
-                                         title_status,
 
-                                         create_time,
 
-                                         int(time.time()),
 
-                                         url_unique
 
-                                     )
 
-                                 )
 
-                                 log(
 
-                                     task='weixin_video_crawler',
 
-                                     function="insert_msg_list",
 
-                                     message="插入一条视频",
 
-                                     data={"account_name": account_name, "url": article_url}
 
-                                 )
 
-                             except Exception as e:
 
-                                 try:
 
-                                     update_sql = f"""
 
-                                         UPDATE publish_single_video_source
 
-                                         SET read_cnt = %s, like_cnt = %s
 
-                                         WHERE url_unique_md5 = %s;
 
-                                     """
 
-                                     self.db_client.update(
 
-                                         sql=update_sql,
 
-                                         params=(read_cnt, like_cnt, functions.generateGzhId(article_url))
 
-                                     )
 
-                                 except Exception as e:
 
-                                     error_stack = traceback.format_exc()
 
-                                     log(
 
-                                         task='weixin_video_crawler',
 
-                                         function="update_msg_list",
 
-                                         status="fail",
 
-                                         message="更新内容失败",
 
-                                         data={"error": str(e), "error_stack": error_stack, "url": article_url}
 
-                                     )
 
-                         else:
 
-                             continue
 
-                     except Exception as e:
 
-                         error_stack = traceback.format_exc()
 
-                         log(
 
-                             task='weixin_video_crawler',
 
-                             function="update_msg_list",
 
-                             status="fail",
 
-                             message="更新内容失败",
 
-                             data={"error": str(e), "error_stack": error_stack, "url": article_url}
 
-                         )
 
-     def crawler_task(self):
 
-         """
 
-         抓取任务
 
-         :return:
 
-         """
 
-         account_list = self.get_crawler_accounts()
 
-         for account_obj in tqdm(account_list, desc="crawler_video_for_each_account"):
 
-             try:
 
-                 self.crawler_article_video_list(account_obj)
 
-                 self.update_account_latest_crawler_timestamp(gh_id=account_obj["gh_id"])
 
-                 time.sleep(const.SLEEP_SECONDS)
 
-             except Exception as e:
 
-                 error_stack = traceback.format_exc()
 
-                 log(
 
-                     task='weixin_video_crawler',
 
-                     function="crawler_task",
 
-                     status="fail",
 
-                     message="抓取任务失败--单账号",
 
-                     data={"error": str(e), "error_stack": error_stack, "account_name": account_obj["account_name"]}
 
-                 )
 
-     def mention(self, start_timestamp):
 
-         """
 
-         飞书发送消息
 
-         :param start_timestamp:
 
-         :return:
 
-         """
 
-         sql = f"""select count(1) from publish_single_video_source where crawler_timestamp >= {start_timestamp};"""
 
-         response = self.db_client.select(sql)
 
-         new_articles_count = response[0][0]
 
-         bot(
 
-             title='微信抓取任务执行完成',
 
-             detail={
 
-                 "新增视频数量": new_articles_count
 
-             }
 
-         )
 
-     def run(self):
 
-         """
 
-         执行任务
 
-         :return:
 
-         """
 
-         start_timestamp = int(time.time())
 
-         self.crawler_task()
 
-         self.mention(start_timestamp)
 
 
  |