weixin_video_crawler.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. """
  2. @author: luojunhui
  3. 抓取视频
  4. """
  5. import json
  6. import time
  7. import traceback
  8. from typing import List, Dict
  9. from tqdm import tqdm
  10. from pymysql.cursors import DictCursor
  11. from config import apolloConfig
  12. from applications import bot
  13. from applications import log
  14. from applications import Functions
  15. from applications import WeixinSpider
  16. from applications import longArticlesMySQL
  17. from applications.const import WeixinVideoCrawlerConst
  18. from coldStartTasks.filter import video_crawler_duplicate_filter
  19. spider = WeixinSpider()
  20. functions = Functions()
  21. config = apolloConfig(env="prod")
  22. const = WeixinVideoCrawlerConst()
  23. class WeixinVideoCrawler(object):
  24. """
  25. 微信视频抓取
  26. """
  27. def __init__(self):
  28. self.db_client = longArticlesMySQL()
  29. self.festival_list = json.loads(config.getConfigValue("festival"))
  30. def is_festival(self, title: str) -> bool:
  31. """
  32. 判断是否为节假日
  33. :param title:
  34. :return:
  35. """
  36. for festival in self.festival_list:
  37. if festival in title:
  38. return True
  39. return False
  40. def get_title_status(self, title: str) -> int:
  41. """
  42. 通过标题获取文章状态
  43. :param title:
  44. :return:
  45. """
  46. if self.is_festival(title):
  47. return const.TITLE_FESTIVAL_STATUS
  48. elif len(title) < const.TITLE_MIN_LENGTH:
  49. return const.TITLE_SHORT_STATUS
  50. else:
  51. return const.TITLE_DEFAULT_STATUS
  52. def update_account_latest_crawler_timestamp(self, gh_id: str) -> int:
  53. """
  54. 更新最新抓取时间戳
  55. :param gh_id:
  56. :return:
  57. """
  58. update_sql = f"""
  59. UPDATE weixin_account_for_videos
  60. SET latest_crawler_timestamp = (
  61. SELECT max(publish_timestamp)
  62. FROM publish_single_video_source
  63. WHERE out_account_id = %s
  64. )
  65. WHERE gh_id = %s;
  66. """
  67. affected_rows = self.db_client.update(
  68. sql=update_sql,
  69. params=(gh_id, gh_id)
  70. )
  71. return affected_rows
  72. def get_crawler_accounts(self) -> List[Dict]:
  73. """
  74. 获取微信公众号列表
  75. :return:
  76. """
  77. select_sql = f"""
  78. SELECT gh_id, account_name, latest_crawler_timestamp
  79. FROM weixin_account_for_videos
  80. WHERE status = {const.ACCOUNT_CRAWL_STATUS}
  81. ORDER BY latest_crawler_timestamp;
  82. """
  83. response = self.db_client.select(select_sql, DictCursor)
  84. return response
  85. def crawler_article_video_list(self, account_obj: Dict, cursor=None):
  86. """
  87. 抓取单个账号的文章列表,获取视频
  88. :param cursor:
  89. :param account_obj:
  90. :return: 返回待下载的视频列表
  91. """
  92. gh_id = account_obj["gh_id"]
  93. account_name = account_obj["account_name"]
  94. latest_crawler_timestamp = account_obj["latest_crawler_timestamp"]
  95. if latest_crawler_timestamp is None:
  96. latest_crawler_timestamp = const.DEFAULT_TIMESTAMP
  97. # 调用爬虫接口
  98. response = spider.update_msg_list(gh_id, index=cursor)
  99. if response['code'] == const.REQUEST_SUCCESS:
  100. # 一般返回最近10天的msg_list
  101. msg_list = response.get('data', {}).get("data", [])
  102. if msg_list:
  103. last_msg = msg_list[-1]
  104. last_msg_base_info = last_msg['AppMsg']['BaseInfo']
  105. last_msg_create_timestamp = last_msg_base_info['CreateTime']
  106. self.insert_msg_list(account_name=account_name, gh_id=gh_id, msg_list=msg_list)
  107. if last_msg_create_timestamp > latest_crawler_timestamp:
  108. next_cursor = response['data']['next_cursor']
  109. return self.crawler_article_video_list(account_obj=account_obj, cursor=next_cursor)
  110. else:
  111. return []
  112. else:
  113. return []
  114. return []
  115. def is_downloaded(self, url_unique: str) -> bool:
  116. """
  117. 判断该视频是否已经下载
  118. :param url_unique:
  119. :return:
  120. """
  121. select_sql = f"""
  122. SELECT id
  123. FROM publish_single_video_source
  124. WHERE url_unique_md5 = '{url_unique}';
  125. """
  126. response = self.db_client.select(select_sql)
  127. if response:
  128. return True
  129. else:
  130. return False
  131. def insert_msg_list(self, account_name, gh_id, msg_list: List[Dict]) -> None:
  132. """
  133. 插入视频信息
  134. :param gh_id:
  135. :param account_name:
  136. :param msg_list:
  137. :return:
  138. """
  139. for info in msg_list:
  140. create_time = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
  141. publish_type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
  142. detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
  143. if detail_article_list:
  144. for article in tqdm(detail_article_list, desc="{}: crawler_in_msg_list".format(account_name)):
  145. article_url = article.get("ContentUrl", None)
  146. url_unique = functions.generateGzhId(article_url)
  147. # 判断该视频链接是否下载,若已经下载则直接跳过
  148. if self.is_downloaded(url_unique):
  149. print("url exists")
  150. continue
  151. title = article.get("Title", None)
  152. if not title:
  153. continue
  154. # 判断标题是否重复
  155. if video_crawler_duplicate_filter(title, self.db_client):
  156. log(
  157. task='weixin_video_crawler',
  158. function="insert_msg_list",
  159. message="标题去重",
  160. data={"url": article_url}
  161. )
  162. continue
  163. try:
  164. download_path = functions.download_gzh_video(article_url)
  165. if download_path:
  166. oss_path = functions.upload_to_oss(local_video_path=download_path)
  167. position = article.get("ItemIndex", None)
  168. cover_url = article.get("CoverImgUrl", None)
  169. show_desc = article.get("ShowDesc", None)
  170. show_stat = functions.show_desc_to_sta(show_desc)
  171. read_cnt = show_stat.get("show_view_count", 0)
  172. like_cnt = show_stat.get("show_like_count", 0)
  173. title_status = self.get_title_status(title)
  174. insert_sql = f"""
  175. INSERT INTO publish_single_video_source
  176. (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_index, article_publish_type, article_url, cover_url, video_oss_path, bad_status, publish_timestamp, crawler_timestamp, url_unique_md5)
  177. values
  178. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  179. """
  180. try:
  181. self.db_client.update(
  182. sql=insert_sql,
  183. params=(
  184. "video" + url_unique,
  185. title,
  186. gh_id,
  187. account_name,
  188. read_cnt,
  189. like_cnt,
  190. position,
  191. publish_type,
  192. article_url,
  193. cover_url,
  194. oss_path,
  195. title_status,
  196. create_time,
  197. int(time.time()),
  198. url_unique
  199. )
  200. )
  201. log(
  202. task='weixin_video_crawler',
  203. function="insert_msg_list",
  204. message="插入一条视频",
  205. data={"account_name": account_name, "url": article_url}
  206. )
  207. except Exception as e:
  208. try:
  209. update_sql = f"""
  210. UPDATE publish_single_video_source
  211. SET read_cnt = %s, like_cnt = %s
  212. WHERE url_unique_md5 = %s;
  213. """
  214. self.db_client.update(
  215. sql=update_sql,
  216. params=(read_cnt, like_cnt, functions.generateGzhId(article_url))
  217. )
  218. except Exception as e:
  219. error_stack = traceback.format_exc()
  220. log(
  221. task='weixin_video_crawler',
  222. function="update_msg_list",
  223. status="fail",
  224. message="更新内容失败",
  225. data={"error": str(e), "error_stack": error_stack, "url": article_url}
  226. )
  227. else:
  228. continue
  229. except Exception as e:
  230. error_stack = traceback.format_exc()
  231. log(
  232. task='weixin_video_crawler',
  233. function="update_msg_list",
  234. status="fail",
  235. message="更新内容失败",
  236. data={"error": str(e), "error_stack": error_stack, "url": article_url}
  237. )
  238. def crawler_task(self):
  239. """
  240. 抓取任务
  241. :return:
  242. """
  243. account_list = self.get_crawler_accounts()
  244. for account_obj in tqdm(account_list, desc="crawler_video_for_each_account"):
  245. try:
  246. self.crawler_article_video_list(account_obj)
  247. self.update_account_latest_crawler_timestamp(gh_id=account_obj["gh_id"])
  248. time.sleep(const.SLEEP_SECONDS)
  249. except Exception as e:
  250. error_stack = traceback.format_exc()
  251. log(
  252. task='weixin_video_crawler',
  253. function="crawler_task",
  254. status="fail",
  255. message="抓取任务失败--单账号",
  256. data={"error": str(e), "error_stack": error_stack, "account_name": account_obj["account_name"]}
  257. )
  258. def mention(self, start_timestamp):
  259. """
  260. 飞书发送消息
  261. :param start_timestamp:
  262. :return:
  263. """
  264. sql = f"""select count(1) from publish_single_video_source where crawler_timestamp >= {start_timestamp};"""
  265. response = self.db_client.select(sql)
  266. new_articles_count = response[0][0]
  267. bot(
  268. title='微信抓取任务执行完成',
  269. detail={
  270. "新增视频数量": new_articles_count
  271. }
  272. )
  273. def run(self):
  274. """
  275. 执行任务
  276. :return:
  277. """
  278. start_timestamp = int(time.time())
  279. self.crawler_task()
  280. self.mention(start_timestamp)