weixinCategoryCrawler.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. """
  2. @author: luojunhui
  3. 抓取全局品类文章
  4. """
  5. import time
  6. from tqdm import tqdm
  7. from pymysql.cursors import DictCursor
  8. from applications import WeixinSpider, Functions, llm_sensitivity, log
  9. from coldStartTasks.filter import article_crawler_duplicate_filter
  10. # 常量
  11. ACCOUNT_GOOD_STATUS = 1
  12. # 账号是否每日抓取
  13. ACCOUNT_DAILY_SCRAPE = 1
  14. ACCOUNT_NOT_DAILY_SCRAPE = 0
  15. # 默认值
  16. DEFAULT_VIEW_COUNT = 0
  17. DEFAULT_LIKE_COUNT = 0
  18. DEFAULT_ARTICLE_STATUS = 1
  19. DEFAULT_TIMESTAMP = 1704038400
  20. class weixinCategory(object):
  21. """
  22. 微信全局品类账号抓取
  23. """
  24. def __init__(self, db_client):
  25. self.db_client_lam = db_client
  26. self.spider = WeixinSpider()
  27. self.function = Functions()
  28. def get_account_list(self, account_category):
  29. """
  30. 获取账号
  31. :param account_category 品类
  32. :return:
  33. """
  34. sql = f"""
  35. select gh_id, account_source, account_name, account_category, latest_update_time
  36. from long_articles_accounts
  37. where account_category = '{account_category}' and is_using = {ACCOUNT_GOOD_STATUS} and daily_scrape = {ACCOUNT_DAILY_SCRAPE};
  38. """
  39. account_tuple = self.db_client_lam.select(sql)
  40. result = [
  41. {
  42. "gh_id": i[0],
  43. "platform": i[1],
  44. "account_name": i[2],
  45. "category": i[3],
  46. "latest_timestamp": i[4],
  47. }
  48. for i in account_tuple
  49. ]
  50. return result
  51. def get_association_account_list(self, date_str):
  52. """
  53. 获取账号联想的轮询账号
  54. """
  55. group_id = date_str[-1]
  56. sql = f"""
  57. select account_id, gh_id, account_name, latest_update_time
  58. from long_articles_accounts
  59. where account_category = 'account_association' and is_using = {ACCOUNT_DAILY_SCRAPE} and daily_scrape = {ACCOUNT_NOT_DAILY_SCRAPE};
  60. """
  61. account_list = self.db_client_lam.select(sql, cursor_type=DictCursor)
  62. today_crawler_account_list = [i for i in account_list if str(i['account_id'])[-1] == group_id]
  63. return today_crawler_account_list
  64. def insert_data_into_db(self, gh_id, category, article_list):
  65. """
  66. 将数据更新到数据库
  67. :return:
  68. """
  69. success_records = []
  70. for article_obj in article_list:
  71. detail_article_list = article_obj["AppMsg"]["DetailInfo"]
  72. for obj in detail_article_list:
  73. try:
  74. # 判断文章是否存在相同的标题
  75. if article_crawler_duplicate_filter(
  76. new_article_title=obj["Title"], db_client=self.db_client_lam
  77. ):
  78. log(
  79. function="weixinCategory",
  80. task="weixinCategory",
  81. message="文章去重",
  82. data={"title": obj["Title"]}
  83. )
  84. continue
  85. show_stat = self.function.show_desc_to_sta(obj["ShowDesc"])
  86. show_view_count = show_stat.get("show_view_count", DEFAULT_VIEW_COUNT)
  87. show_like_count = show_stat.get("show_like_count", DEFAULT_LIKE_COUNT)
  88. unique_idx = self.function.generateGzhId(obj["ContentUrl"])
  89. insert_sql = f"""
  90. insert into crawler_meta_article
  91. (
  92. platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt,
  93. description, publish_time, crawler_time, status, unique_index, llm_sensitivity
  94. )
  95. VALUES
  96. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  97. """
  98. self.db_client_lam.update(
  99. sql=insert_sql,
  100. params=(
  101. "weixin",
  102. "account",
  103. category,
  104. gh_id,
  105. obj['ItemIndex'],
  106. obj["Title"],
  107. obj["ContentUrl"],
  108. show_view_count,
  109. show_like_count,
  110. obj["Digest"],
  111. obj["send_time"],
  112. int(time.time()),
  113. DEFAULT_ARTICLE_STATUS,
  114. unique_idx,
  115. obj.get("llm_sensitivity", -1)
  116. ),
  117. )
  118. success_records.append({
  119. 'unique_index': unique_idx, 'title': obj['Title']
  120. })
  121. except Exception as e:
  122. print(e)
  123. return success_records
  124. def update_article_sensitive_status(self, category, unique_index, status):
  125. """
  126. 更新文章敏感状态
  127. :return:
  128. """
  129. update_sql = f"""
  130. update crawler_meta_article
  131. set llm_sensitivity = %s
  132. where category = %s and unique_index = %s;
  133. """
  134. self.db_client_lam.update(sql=update_sql, params=(status, category, unique_index))
  135. def update_latest_account_timestamp(self, gh_id):
  136. """
  137. 更新账号的最新时间戳
  138. :return:
  139. """
  140. select_sql = f"""
  141. SELECT publish_time
  142. From crawler_meta_article
  143. WHERE out_account_id = '{gh_id}'
  144. ORDER BY publish_time DESC LIMIT 1;
  145. """
  146. result = self.db_client_lam.select(select_sql)
  147. time_stamp = result[0][0]
  148. dt_str = self.function.timestamp_to_str(time_stamp)
  149. update_sql = f"""
  150. update long_articles_accounts
  151. set latest_update_time = %s
  152. where gh_id = %s;
  153. """
  154. self.db_client_lam.update(sql=update_sql, params=(dt_str, gh_id))
  155. def update_each_account(self, gh_id, category, latest_time_stamp, index=None):
  156. """
  157. 更新账号文章
  158. :return:
  159. """
  160. response = self.spider.update_msg_list(ghId=gh_id, index=index)
  161. msg_list = response.get("data", {}).get("data")
  162. if msg_list:
  163. last_article_in_this_msg = msg_list[-1]
  164. success_records = self.insert_data_into_db(
  165. gh_id=gh_id, category=category, article_list=msg_list
  166. )
  167. last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"]["BaseInfo"]["UpdateTime"]
  168. if latest_time_stamp < last_time_stamp_in_this_msg:
  169. next_cursor = response["data"]["next_cursor"]
  170. return success_records + self.update_each_account(
  171. gh_id=gh_id,
  172. latest_time_stamp=latest_time_stamp,
  173. category=category,
  174. index=next_cursor,
  175. )
  176. else:
  177. # 更新最近抓取时间
  178. self.update_latest_account_timestamp(gh_id=gh_id)
  179. print("账号时间更新成功")
  180. return success_records
  181. else:
  182. print("No more data")
  183. return []
  184. def crawler_each_category(self, account_list, category):
  185. """
  186. 抓取每个品类
  187. :return:
  188. """
  189. success_records = []
  190. for account in tqdm(account_list, desc="crawler_each_category"):
  191. try:
  192. gh_id = account['gh_id']
  193. try:
  194. timestamp = int(account['latest_timestamp'].timestamp())
  195. except Exception as e:
  196. timestamp = DEFAULT_TIMESTAMP
  197. success_records += self.update_each_account(
  198. gh_id=gh_id,
  199. category=category,
  200. latest_time_stamp=timestamp
  201. )
  202. print("success")
  203. except Exception as e:
  204. print("fail because of {}".format(e))
  205. success_titles = [x['title'] for x in success_records]
  206. if success_titles:
  207. try:
  208. sensitive_results = llm_sensitivity.check_titles(success_titles)
  209. for record, sensitive_result in zip(success_records, sensitive_results):
  210. self.update_article_sensitive_status(
  211. category=category,
  212. unique_index=record['unique_index'],
  213. status=sensitive_result['hit_rule']
  214. )
  215. except Exception as e:
  216. print("failed to update sensitive status: {}".format(e))
  217. def deal(self, category_list, date_str):
  218. """
  219. :param category_list:
  220. :param date_str: YYYY-MM-DD
  221. :return:
  222. """
  223. # daily 品类账号抓取
  224. for category in category_list:
  225. account_list = self.get_account_list(category)
  226. self.crawler_each_category(account_list=account_list, category=category)
  227. # 账号联想账号轮询抓取
  228. association_account_list = self.get_association_account_list(date_str)
  229. self.crawler_each_category(account_list=association_account_list, category="association")
  230. # 抓完之后,执行相似度打分任务
  231. return
  232. def deal_accounts(self, account_list):
  233. """
  234. input account list
  235. :param account_list:
  236. :return:
  237. """
  238. account_tuple = tuple(account_list)
  239. sql = f"""
  240. SELECT gh_id, account_name, account_category, latest_update_time
  241. FROM long_articles_accounts
  242. WHERE account_name in {account_tuple};
  243. """
  244. response = self.db_client_lam.select(sql)
  245. for account in tqdm(response):
  246. try:
  247. gh_id = account[0]
  248. category = account[2]
  249. try:
  250. latest_timestamp = account[3].timestamp()
  251. except Exception as e:
  252. print(e)
  253. latest_timestamp = DEFAULT_TIMESTAMP
  254. self.update_each_account(
  255. gh_id=gh_id,
  256. category=category,
  257. latest_time_stamp=latest_timestamp
  258. )
  259. except Exception as e:
  260. print(e)