weixinCategoryCrawler.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. """
  2. @author: luojunhui
  3. 抓取全局品类文章
  4. """
  5. import json
  6. import time
  7. from tqdm import tqdm
  8. from pymysql.cursors import DictCursor
  9. from applications import WeixinSpider, Functions, log
  10. from coldStartTasks.filter import article_crawler_duplicate_filter
  11. from config import apolloConfig
  12. # 常量
  13. ACCOUNT_GOOD_STATUS = 1
  14. # 账号是否每日抓取
  15. ACCOUNT_DAILY_SCRAPE = 1
  16. ACCOUNT_NOT_DAILY_SCRAPE = 0
  17. # 默认值
  18. DEFAULT_VIEW_COUNT = 0
  19. DEFAULT_LIKE_COUNT = 0
  20. DEFAULT_ARTICLE_STATUS = 1
  21. DEFAULT_TIMESTAMP = 1717171200
  22. # 标题sensitivity
  23. TITLE_SENSITIVE = 1
  24. TITLE_NOT_SENSITIVE = 0
  25. config = apolloConfig()
  26. sensitive_word_list = json.loads(config.getConfigValue("sensitive_word_list"))
  27. def whether_title_sensitive(title: str) -> bool:
  28. """
  29. : param title:
  30. 判断视频是否的标题是否包含敏感词
  31. """
  32. for word in sensitive_word_list:
  33. if word in title:
  34. return True
  35. return False
  36. class weixinCategory(object):
  37. """
  38. 微信全局品类账号抓取
  39. """
  40. def __init__(self, db_client):
  41. self.db_client_lam = db_client
  42. self.spider = WeixinSpider()
  43. self.function = Functions()
  44. def get_account_list(self, account_category):
  45. """
  46. 获取账号
  47. :param account_category 品类
  48. :return:
  49. """
  50. sql = f"""
  51. select gh_id, account_source, account_name, account_category, latest_update_time
  52. from long_articles_accounts
  53. where account_category = '{account_category}' and is_using = {ACCOUNT_GOOD_STATUS} and daily_scrape = {ACCOUNT_DAILY_SCRAPE};
  54. """
  55. account_tuple = self.db_client_lam.select(sql)
  56. result = [
  57. {
  58. "gh_id": i[0],
  59. "platform": i[1],
  60. "account_name": i[2],
  61. "category": i[3],
  62. "latest_timestamp": i[4],
  63. }
  64. for i in account_tuple
  65. ]
  66. return result
  67. def get_association_account_list(self, date_str):
  68. """
  69. 获取账号联想的轮询账号
  70. """
  71. group_id = date_str[-1]
  72. sql = f"""
  73. select account_id, gh_id, account_name, latest_update_time
  74. from long_articles_accounts
  75. where account_category = 'account_association' and is_using = {ACCOUNT_DAILY_SCRAPE} and daily_scrape = {ACCOUNT_NOT_DAILY_SCRAPE};
  76. """
  77. account_list = self.db_client_lam.select(sql, cursor_type=DictCursor)
  78. today_crawler_account_list = [i for i in account_list if str(i['account_id'])[-1] == group_id]
  79. return today_crawler_account_list
  80. def insert_data_into_db(self, gh_id, category, article_list):
  81. """
  82. 将数据更新到数据库
  83. :return:
  84. """
  85. success_records = []
  86. for article_obj in article_list:
  87. detail_article_list = article_obj["AppMsg"]["DetailInfo"]
  88. for obj in detail_article_list:
  89. try:
  90. # 判断文章是否存在相同的标题
  91. if article_crawler_duplicate_filter(
  92. new_article_title=obj["Title"], db_client=self.db_client_lam
  93. ):
  94. log(
  95. function="weixinCategory",
  96. task="weixinCategory",
  97. message="文章去重",
  98. data={"title": obj["Title"]}
  99. )
  100. continue
  101. # 判断标题是否包含敏感词
  102. title_sensitivity = TITLE_SENSITIVE if whether_title_sensitive(obj["Title"]) else TITLE_NOT_SENSITIVE
  103. show_stat = self.function.show_desc_to_sta(obj["ShowDesc"])
  104. show_view_count = show_stat.get("show_view_count", DEFAULT_VIEW_COUNT)
  105. show_like_count = show_stat.get("show_like_count", DEFAULT_LIKE_COUNT)
  106. unique_idx = self.function.generateGzhId(obj["ContentUrl"])
  107. insert_sql = f"""
  108. insert into crawler_meta_article
  109. (
  110. platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt,
  111. description, publish_time, crawler_time, status, unique_index, llm_sensitivity, title_sensitivity
  112. )
  113. VALUES
  114. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  115. """
  116. self.db_client_lam.update(
  117. sql=insert_sql,
  118. params=(
  119. "weixin",
  120. "account",
  121. category,
  122. gh_id,
  123. obj['ItemIndex'],
  124. obj["Title"],
  125. obj["ContentUrl"],
  126. show_view_count,
  127. show_like_count,
  128. obj["Digest"],
  129. obj["send_time"],
  130. int(time.time()),
  131. DEFAULT_ARTICLE_STATUS,
  132. unique_idx,
  133. obj.get("llm_sensitivity", -1),
  134. title_sensitivity
  135. ),
  136. )
  137. success_records.append({
  138. 'unique_index': unique_idx, 'title': obj['Title']
  139. })
  140. except Exception as e:
  141. print(e)
  142. return success_records
  143. def update_latest_account_timestamp(self, gh_id):
  144. """
  145. 更新账号的最新时间戳
  146. :return:
  147. """
  148. select_sql = f"""
  149. SELECT publish_time
  150. From crawler_meta_article
  151. WHERE out_account_id = '{gh_id}'
  152. ORDER BY publish_time DESC LIMIT 1;
  153. """
  154. result = self.db_client_lam.select(select_sql)
  155. time_stamp = result[0][0]
  156. dt_str = self.function.timestamp_to_str(time_stamp)
  157. update_sql = f"""
  158. update long_articles_accounts
  159. set latest_update_time = %s
  160. where gh_id = %s;
  161. """
  162. self.db_client_lam.update(sql=update_sql, params=(dt_str, gh_id))
  163. def update_each_account(self, gh_id, category, latest_time_stamp, index=None):
  164. """
  165. 更新账号文章
  166. :return:
  167. """
  168. response = self.spider.update_msg_list(ghId=gh_id, index=index)
  169. msg_list = response.get("data", {}).get("data")
  170. if msg_list:
  171. last_article_in_this_msg = msg_list[-1]
  172. success_records = self.insert_data_into_db(
  173. gh_id=gh_id, category=category, article_list=msg_list
  174. )
  175. last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"]["BaseInfo"]["UpdateTime"]
  176. if latest_time_stamp < last_time_stamp_in_this_msg:
  177. next_cursor = response["data"]["next_cursor"]
  178. return success_records + self.update_each_account(
  179. gh_id=gh_id,
  180. latest_time_stamp=latest_time_stamp,
  181. category=category,
  182. index=next_cursor,
  183. )
  184. else:
  185. # 更新最近抓取时间
  186. self.update_latest_account_timestamp(gh_id=gh_id)
  187. print("账号时间更新成功")
  188. return success_records
  189. else:
  190. print("No more data")
  191. return []
  192. def crawler_each_category(self, account_list, category):
  193. """
  194. 抓取每个品类
  195. :return:
  196. """
  197. success_records = []
  198. for account in tqdm(account_list, desc="crawler_each_category"):
  199. try:
  200. gh_id = account['gh_id']
  201. try:
  202. timestamp = int(account['latest_timestamp'].timestamp())
  203. except Exception as e:
  204. timestamp = DEFAULT_TIMESTAMP
  205. success_records += self.update_each_account(
  206. gh_id=gh_id,
  207. category=category,
  208. latest_time_stamp=timestamp
  209. )
  210. print("success")
  211. except Exception as e:
  212. print("fail because of {}".format(e))
  213. def deal(self, category_list, date_str):
  214. """
  215. :param category_list:
  216. :param date_str: YYYY-MM-DD
  217. :return:
  218. """
  219. # daily 品类账号抓取
  220. for category in category_list:
  221. account_list = self.get_account_list(category)
  222. self.crawler_each_category(account_list=account_list, category=category)
  223. # 账号联想账号轮询抓取
  224. association_account_list = self.get_association_account_list(date_str)
  225. self.crawler_each_category(account_list=association_account_list, category="account_association")
  226. def deal_accounts(self, account_list):
  227. """
  228. input account list
  229. :param account_list: 具体账号抓取,只抓一页
  230. :return:
  231. """
  232. account_tuple = tuple(account_list)
  233. sql = f"""
  234. SELECT gh_id, account_name, account_category, latest_update_time
  235. FROM long_articles_accounts
  236. WHERE account_name in {account_tuple};
  237. """
  238. response = self.db_client_lam.select(sql)
  239. for account in tqdm(response):
  240. try:
  241. gh_id = account[0]
  242. category = account[2]
  243. try:
  244. latest_timestamp = account[3].timestamp()
  245. except Exception as e:
  246. print(e)
  247. latest_timestamp = DEFAULT_TIMESTAMP
  248. self.update_each_account(
  249. gh_id=gh_id,
  250. category=category,
  251. latest_time_stamp=latest_timestamp
  252. )
  253. except Exception as e:
  254. print(e)