crawler_gzh_videos.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. """
  2. @author: luojunhui
  3. @task: 抓取公众号视频
  4. """
  5. import json, time
  6. import traceback
  7. from typing import List, Dict
  8. from pymysql.cursors import DictCursor
  9. from tqdm import tqdm
  10. from applications import log
  11. from applications.api import ApolloApi, FeishuBotApi
  12. from applications.const import WeixinVideoCrawlerConst
  13. from applications.db import DatabaseConnector
  14. from applications.pipeline import scrape_video_entities_process
  15. from applications.utils import (
  16. generate_gzh_id,
  17. download_gzh_video,
  18. upload_to_oss,
  19. show_desc_to_sta,
  20. Item,
  21. insert_into_single_video_source_table,
  22. )
  23. from config import long_articles_config
  24. from cold_start.crawler.wechat import get_article_list_from_account
  25. from cold_start.filter import video_crawler_duplicate_filter
  26. class CrawlerGzhVideos:
  27. def __init__(self):
  28. self.db_client = DatabaseConnector(long_articles_config)
  29. self.db_client.connect()
  30. self.apollo = ApolloApi(env="prod")
  31. self.const = WeixinVideoCrawlerConst()
  32. self.festival_list = json.loads(self.apollo.get_config_value("festival"))
  33. self.feishu_bot = FeishuBotApi()
  34. def is_festival(self, title: str) -> bool:
  35. """
  36. 判断是否为节日
  37. :param title:
  38. :return:
  39. """
  40. for festival in self.festival_list:
  41. if festival in title:
  42. return True
  43. return False
  44. def set_status_for_title(self, title: str) -> int:
  45. """
  46. set title_status for each title
  47. """
  48. if self.is_festival(title):
  49. return self.const.TITLE_FESTIVAL_STATUS
  50. elif len(title) < self.const.TITLE_MIN_LENGTH:
  51. return self.const.TITLE_SHORT_STATUS
  52. else:
  53. return self.const.TITLE_DEFAULT_STATUS
  54. def is_video_downloaded(self, url_unique: str) -> bool:
  55. """
  56. check whether video has been downloaded
  57. """
  58. fetch_query = f"""
  59. select id from publish_single_video_source where url_unique_md5 = %s;
  60. """
  61. return self.db_client.fetch(query=fetch_query, params=(url_unique,))
  62. def insert_msg_list(self, account_name, gh_id, msg_list: List[Dict]) -> None:
  63. """
  64. 插入视频信息
  65. :param gh_id:
  66. :param account_name:
  67. :param msg_list:
  68. :return:
  69. """
  70. for info in msg_list:
  71. create_time = (
  72. info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
  73. )
  74. publish_type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
  75. detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
  76. if detail_article_list:
  77. for article in tqdm(
  78. detail_article_list,
  79. desc="{}: crawler_in_msg_list".format(account_name),
  80. ):
  81. article_url = article.get("ContentUrl", None)
  82. url_unique = generate_gzh_id(article_url)
  83. # 判断该视频链接是否下载,若已经下载则直接跳过
  84. if self.is_video_downloaded(url_unique):
  85. print("url exists")
  86. continue
  87. title = article.get("Title", None)
  88. if not title:
  89. continue
  90. # 判断标题是否重复
  91. if video_crawler_duplicate_filter(title, self.db_client):
  92. log(
  93. task="weixin_video_crawler",
  94. function="insert_msg_list",
  95. message="标题去重",
  96. data={"url": article_url},
  97. )
  98. continue
  99. try:
  100. download_path = download_gzh_video(article_url)
  101. if download_path:
  102. oss_path = upload_to_oss(local_video_path=download_path)
  103. position = article.get("ItemIndex", None)
  104. cover_url = article.get("CoverImgUrl", None)
  105. show_desc = article.get("ShowDesc", None)
  106. show_stat = show_desc_to_sta(show_desc)
  107. read_cnt = show_stat.get("show_view_count", 0)
  108. like_cnt = show_stat.get("show_like_count", 0)
  109. title_status = self.set_status_for_title(title)
  110. insert_sql = f"""
  111. INSERT INTO publish_single_video_source
  112. (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_index, article_publish_type, article_url, cover_url, video_oss_path, bad_status, publish_timestamp, crawler_timestamp, url_unique_md5)
  113. values
  114. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  115. """
  116. try:
  117. self.db_client.save(
  118. query=insert_sql,
  119. params=(
  120. "video" + url_unique,
  121. title,
  122. gh_id,
  123. account_name,
  124. read_cnt,
  125. like_cnt,
  126. position,
  127. publish_type,
  128. article_url,
  129. cover_url,
  130. oss_path,
  131. title_status,
  132. create_time,
  133. int(time.time()),
  134. url_unique,
  135. ),
  136. )
  137. log(
  138. task="weixin_video_crawler",
  139. function="insert_msg_list",
  140. message="插入一条视频",
  141. data={
  142. "account_name": account_name,
  143. "url": article_url,
  144. },
  145. )
  146. except Exception as e:
  147. try:
  148. update_sql = f"""
  149. UPDATE publish_single_video_source
  150. SET read_cnt = %s, like_cnt = %s
  151. WHERE url_unique_md5 = %s;
  152. """
  153. self.db_client.save(
  154. query=update_sql,
  155. params=(
  156. read_cnt,
  157. like_cnt,
  158. generate_gzh_id(article_url),
  159. ),
  160. )
  161. except Exception as e:
  162. error_stack = traceback.format_exc()
  163. log(
  164. task="weixin_video_crawler",
  165. function="update_msg_list",
  166. status="fail",
  167. message="更新内容失败",
  168. data={
  169. "error": str(e),
  170. "error_stack": error_stack,
  171. "url": article_url,
  172. },
  173. )
  174. else:
  175. continue
  176. except Exception as e:
  177. error_stack = traceback.format_exc()
  178. log(
  179. task="weixin_video_crawler",
  180. function="update_msg_list",
  181. status="fail",
  182. message="更新内容失败",
  183. data={
  184. "error": str(e),
  185. "error_stack": error_stack,
  186. "url": article_url,
  187. },
  188. )
  189. def crawler_article_video_list(self, account_obj: Dict, cursor=None):
  190. """
  191. 抓取单个账号的文章列表,获取视频
  192. :param cursor:
  193. :param account_obj:
  194. :return: 返回待下载的视频列表
  195. """
  196. gh_id = account_obj["gh_id"]
  197. account_name = account_obj["account_name"]
  198. latest_crawler_timestamp = account_obj["latest_crawler_timestamp"]
  199. if latest_crawler_timestamp is None:
  200. latest_crawler_timestamp = self.const.DEFAULT_TIMESTAMP
  201. # 调用爬虫接口
  202. response = get_article_list_from_account(gh_id, index=cursor)
  203. if response["code"] == self.const.REQUEST_SUCCESS:
  204. # 一般返回最近10天的msg_list
  205. msg_list = response.get("data", {}).get("data", [])
  206. if msg_list:
  207. last_msg = msg_list[-1]
  208. last_msg_base_info = last_msg["AppMsg"]["BaseInfo"]
  209. last_msg_create_timestamp = last_msg_base_info["CreateTime"]
  210. self.insert_msg_list(
  211. account_name=account_name, gh_id=gh_id, msg_list=msg_list
  212. )
  213. if last_msg_create_timestamp > latest_crawler_timestamp:
  214. next_cursor = response["data"]["next_cursor"]
  215. return self.crawler_article_video_list(
  216. account_obj=account_obj, cursor=next_cursor
  217. )
  218. else:
  219. return []
  220. else:
  221. return []
  222. return []
  223. class CrawlerGzhAccountVideos(CrawlerGzhVideos):
  224. def get_crawler_accounts(self) -> List[Dict]:
  225. """
  226. 获取微信公众号列表
  227. :return:
  228. """
  229. select_sql = f"""
  230. SELECT gh_id, account_name, latest_crawler_timestamp
  231. FROM weixin_account_for_videos
  232. WHERE status = {self.const.ACCOUNT_CRAWL_STATUS}
  233. ORDER BY latest_crawler_timestamp;
  234. """
  235. response = self.db_client.fetch(select_sql, DictCursor)
  236. return response
  237. def update_account_latest_crawler_timestamp(self, gh_id: str) -> int:
  238. """
  239. 更新最新抓取时间戳
  240. :param gh_id:
  241. :return:
  242. """
  243. update_sql = f"""
  244. UPDATE weixin_account_for_videos
  245. SET latest_crawler_timestamp = (
  246. SELECT max(publish_timestamp)
  247. FROM publish_single_video_source
  248. WHERE out_account_id = %s
  249. )
  250. WHERE gh_id = %s;
  251. """
  252. affected_rows = self.db_client.save(query=update_sql, params=(gh_id, gh_id))
  253. return affected_rows
  254. def deal(self):
  255. account_list = self.get_crawler_accounts()
  256. for account_obj in tqdm(account_list, desc="crawler_video_for_each_account"):
  257. try:
  258. self.crawler_article_video_list(account_obj)
  259. self.update_account_latest_crawler_timestamp(gh_id=account_obj["gh_id"])
  260. time.sleep(self.const.SLEEP_SECONDS)
  261. except Exception as e:
  262. error_stack = traceback.format_exc()
  263. log(
  264. task="weixin_video_crawler",
  265. function="crawler_task",
  266. status="fail",
  267. message="抓取任务失败--单账号",
  268. data={
  269. "error": str(e),
  270. "error_stack": error_stack,
  271. "account_name": account_obj["account_name"],
  272. },
  273. )
  274. class CrawlerGzhMetaVideos(CrawlerGzhVideos):
  275. def clearget_meta_article_list(self, limit=100000):
  276. fetch_query = f"""
  277. select article_id, title, out_account_id, read_cnt, like_cnt, article_index, link, publish_time
  278. from crawler_meta_article
  279. where platform = 'weixin' and score > 0.5 and status = 1 and has_video = 0
  280. order by article_id
  281. desc limit 1000;
  282. """
  283. return self.db_client.fetch(fetch_query, cursor_type=DictCursor)
  284. def update_article_status(self, article_id, ori_status, new_status):
  285. update_query = f"""
  286. update crawler_meta_article
  287. set has_video = %s
  288. where has_video = %s and article_id = %s;
  289. """
  290. return self.db_client.save(
  291. query=update_query,
  292. params=(new_status, ori_status, article_id)
  293. )
  294. def crawler_each_video(self, video_data):
  295. """
  296. crawler single video data
  297. """
  298. # lock
  299. affected_rows = self.update_article_status(
  300. article_id=video_data['article_id'],
  301. ori_status=self.const.INIT_STATUS,
  302. new_status=self.const.PROCESSING_STATUS
  303. )
  304. if not affected_rows:
  305. return
  306. video_item = Item()
  307. unique_id = generate_gzh_id(video_data["link"])
  308. # add info to item
  309. video_item.add("content_trace_id", f"video{unique_id}")
  310. video_item.add("url_unique_md5", unique_id)
  311. video_item.add("article_title", video_data['title'])
  312. video_item.add("out_account_id", video_data["out_account_id"])
  313. video_item.add("out_account_name", "article_meta")
  314. video_item.add("publish_timestamp", video_data["publish_time"])
  315. video_item.add("read_cnt", video_data["read_cnt"])
  316. video_item.add("like_cnt", video_data["like_cnt"])
  317. video_item.add("article_index", video_data["article_index"])
  318. video_item.add("platform", "gzh")
  319. video_item.add("article_url", video_data["link"])
  320. video_item.add("crawler_timestamp", int(time.time()))
  321. # check item before insert
  322. video_item.check(source="video")
  323. try:
  324. item_with_oss_path = scrape_video_entities_process(
  325. video_item=video_item.item, db_client=self.db_client
  326. )
  327. if item_with_oss_path:
  328. insert_into_single_video_source_table(
  329. db_client=self.db_client, video_item=item_with_oss_path
  330. )
  331. self.update_article_status(
  332. article_id=video_data['article_id'],
  333. ori_status=self.const.PROCESSING_STATUS,
  334. new_status=self.const.SUCCESS_STATUS
  335. )
  336. else:
  337. self.update_article_status(
  338. article_id=video_data['article_id'],
  339. ori_status=self.const.PROCESSING_STATUS,
  340. new_status=self.const.FAIL_STATUS
  341. )
  342. except Exception as e:
  343. detail = {
  344. "video_item": video_item.item,
  345. "error": str(e),
  346. "traceback": traceback.format_exc(),
  347. }
  348. log(
  349. task="crawler_gzh_videos",
  350. function="crawler_each_video",
  351. message="crawler_gzh_videos failed",
  352. status="failed",
  353. data=detail,
  354. )
  355. self.update_article_status(
  356. article_id=video_data['article_id'],
  357. ori_status=self.const.PROCESSING_STATUS,
  358. new_status=self.const.FAIL_STATUS
  359. )
  360. def deal(self):
  361. meta_article_list = self.get_meta_article_list()
  362. for article in tqdm(meta_article_list):
  363. self.crawler_each_video(article)