weixin_account_crawler.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. """
  2. @author: luojunhui
  3. """
  4. import time
  5. import traceback
  6. from typing import List, Set, Dict, Tuple
  7. from tqdm import tqdm
  8. from datetime import datetime
  9. from pymysql.cursors import DictCursor
  10. from applications import WeixinSpider, longArticlesMySQL, log, bot, aiditApi
  11. from applications.const import WeixinVideoCrawlerConst
  12. from applications.functions import Functions
  13. const = WeixinVideoCrawlerConst()
  14. function = Functions()
  15. def get_inner_account_gh_id() -> Set[str]:
  16. """
  17. 获取内部账号名称
  18. :return:
  19. """
  20. accounts = aiditApi.get_publish_account_from_aigc()
  21. gh_id_list = [i['ghId'] for i in accounts]
  22. return set(gh_id_list)
  23. class WeixinAccountCrawler(object):
  24. """
  25. 账号抓取
  26. """
  27. def __init__(self):
  28. self.db_client = longArticlesMySQL()
  29. self.spider = WeixinSpider()
  30. self.crawler_account_count = 0
  31. def get_crawler_articles(self) -> List[Dict]:
  32. """
  33. 获取已经抓取到的文章,判断其是否有链接账号,若有则继续抓账号
  34. :return:
  35. """
  36. sql = f"""
  37. SELECT id, article_url
  38. FROM publish_single_video_source
  39. WHERE source_account = {const.NEED_SCAN_SOURCE_ACCOUNT};
  40. """
  41. article_url_list = self.db_client.select(sql, cursor_type=DictCursor)
  42. return article_url_list
  43. def update_crawler_article_status(self, article_id_tuple: Tuple[int, ...]) -> int:
  44. """
  45. :param article_id_tuple:
  46. :return:
  47. """
  48. sql = """
  49. UPDATE publish_single_video_source
  50. SET source_account = %s
  51. WHERE id IN %s;
  52. """
  53. affected_rows = self.db_client.update(sql, (const.DO_NOT_NEED_SOURCE_ACCOUNT, article_id_tuple))
  54. return affected_rows
  55. def get_seed_titles(self, run_date) -> List[str]:
  56. """
  57. :return:
  58. """
  59. publish_timestamp_threshold = int(datetime.strptime(run_date, "%Y-%m-%d").timestamp()) - const.STAT_PERIOD
  60. sql = f"""
  61. SELECT distinct title
  62. FROM datastat_sort_strategy
  63. WHERE read_rate > {const.READ_AVG_MULTIPLE} and view_count > {const.MIN_READ_COUNT} and publish_timestamp > {publish_timestamp_threshold}
  64. ORDER BY read_rate DESC;
  65. """
  66. title_list = self.db_client.select(sql, cursor_type=DictCursor)
  67. title_list = [i['title'] for i in title_list]
  68. return title_list
  69. def is_original(self, article_url: str) -> bool:
  70. """
  71. 判断视频是否是原创
  72. :return:
  73. """
  74. response = self.spider.get_article_text(article_url)
  75. data = response['data']['data']
  76. return data['is_original']
  77. def insert_account(self, gh_id: str, account_name: str) -> int:
  78. """
  79. 插入账号
  80. :param account_name:
  81. :param gh_id:
  82. :return:
  83. """
  84. init_date = time.strftime("%Y-%m-%d", time.localtime())
  85. sql = """
  86. INSERT IGNORE INTO weixin_account_for_videos
  87. (gh_id, account_name, account_init_date)
  88. VALUES
  89. (%s, %s, %s);
  90. """
  91. insert_rows = self.db_client.update(sql, (gh_id, account_name, init_date))
  92. return insert_rows
  93. def process_search_result(self, response: Dict, inner_account_set: Set[str]):
  94. """
  95. 处理搜索结果
  96. :param response:
  97. :param inner_account_set:
  98. :return:
  99. """
  100. if response['code'] != const.REQUEST_SUCCESS:
  101. return
  102. article_list = response['data']['data']
  103. if article_list:
  104. for article in article_list:
  105. try:
  106. # 先判断账号是否内部账号
  107. article_url = article['url']
  108. account_detail = self.spider.get_account_by_url(article_url)
  109. account_detail = account_detail['data']['data']
  110. account_name = account_detail['account_name']
  111. gh_id = account_detail['wx_gh']
  112. if gh_id in inner_account_set:
  113. continue
  114. # 判断搜索结果是否原创
  115. if self.is_original(article_url):
  116. continue
  117. # 判断是否有视频链接
  118. try:
  119. video_url = function.get_video_url(article_url)
  120. except Exception as e:
  121. continue
  122. if not video_url:
  123. continue
  124. # 将账号抓取进来
  125. insert_rows = self.insert_account(gh_id=gh_id, account_name=account_name)
  126. if insert_rows:
  127. log(
  128. task="account_crawler_v1",
  129. function="process_search_result",
  130. message="insert account success",
  131. data={
  132. "gh_id": gh_id,
  133. "account_name": account_name
  134. }
  135. )
  136. self.crawler_account_count += 1
  137. except Exception as e:
  138. log(
  139. task="account_crawler_v1",
  140. function="process_search_result",
  141. message="insert account error",
  142. data={
  143. "error": str(e),
  144. "traceback": traceback.format_exc(),
  145. "data": article
  146. }
  147. )
  148. def search_title_in_weixin(self, title: str, inner_account_set: Set[str]) -> None:
  149. """
  150. 调用搜索接口,在微信搜索
  151. :param inner_account_set:
  152. :param title:
  153. :return:
  154. """
  155. for page_index in tqdm(range(1, const.MAX_SEARCH_PAGE_NUM + 1), desc='searching: {}'.format(title)):
  156. try:
  157. response = self.spider.search_articles(title, page=str(page_index))
  158. self.process_search_result(response, inner_account_set)
  159. time.sleep(const.SLEEP_SECONDS)
  160. except Exception as e:
  161. log(
  162. task="account_crawler_v1",
  163. function="search_title_in_weixin",
  164. message="search title error",
  165. data={
  166. "error": str(e),
  167. "traceback": traceback.format_exc(),
  168. "title": title
  169. }
  170. )
  171. def run(self, run_date=None) -> None:
  172. """
  173. 入口函数
  174. :return:
  175. """
  176. if not run_date:
  177. run_date = time.strftime("%Y-%m-%d", time.localtime())
  178. # get seed titles
  179. title_list = self.get_seed_titles(run_date)
  180. # get inner accounts set
  181. inner_account_gh_id_set = get_inner_account_gh_id()
  182. start_time = time.time()
  183. for title in tqdm(title_list, desc="search each title"):
  184. self.search_title_in_weixin(title, inner_account_gh_id_set)
  185. # 通知
  186. bot(
  187. title="微信账号抓取V1完成",
  188. detail={
  189. "总更新账号数量": self.crawler_account_count,
  190. "总耗时": time.time() - start_time,
  191. "种子标题数量": len(title_list)
  192. },
  193. mention=False
  194. )
  195. def run_v2(self) -> None:
  196. """
  197. 入口函数
  198. :return:
  199. """
  200. # get article list
  201. crawler_article_list = self.get_crawler_articles()
  202. article_id_list = []
  203. insert_account_count = 0
  204. for crawler_article_obj in tqdm(crawler_article_list, desc="crawler article list"):
  205. try:
  206. article_id = crawler_article_obj['id']
  207. # 记录处理过的id
  208. article_id_list.append(int(article_id))
  209. article_url = crawler_article_obj['article_url']
  210. # 判断文章是否原创
  211. if self.is_original(article_url):
  212. continue
  213. try:
  214. source_account_info = function.get_source_account(article_url)
  215. except Exception as e:
  216. continue
  217. if not source_account_info:
  218. continue
  219. if source_account_info:
  220. account_name = source_account_info['name']
  221. gh_id = source_account_info['gh_id']
  222. affected_rows = self.insert_account(gh_id=gh_id, account_name=account_name)
  223. insert_account_count += affected_rows
  224. else:
  225. continue
  226. except Exception as e:
  227. print(e)
  228. print(traceback.format_exc())
  229. article_id_tuple = tuple(article_id_list)
  230. affected_rows = self.update_crawler_article_status(article_id_tuple)
  231. bot(
  232. title="微信账号抓取V2完成",
  233. detail={
  234. "扫描文章数量": len(crawler_article_list),
  235. "新增账号数量": insert_account_count
  236. },
  237. mention=False
  238. )