cooperate_accounts_monitor.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. import json
  2. from tqdm import tqdm
  3. from datetime import datetime, timedelta
  4. from urllib.parse import unquote, parse_qs, urlparse
  5. from applications.utils import fetch_from_odps
  6. from applications.crawler.wechat import get_article_list_from_account
  7. from applications.crawler.wechat import get_article_detail
  8. class CooperateAccountsMonitorTaskConst:
  9. INVALID_STATUS = 0
  10. VALID_STATUS = 1
  11. INIT_STATUS = 0
  12. PROCESSING_STATUS = 1
  13. SUCCESS_STATUS = 2
  14. FAIL_STATUS = 99
  15. HAS_MINI_PROGRAM = 1
  16. DONT_HAS_MINI_PROGRAM = 0
  17. ARTICLE_NUM = 100
  18. class CooperateAccountsMonitorTaskUtils(CooperateAccountsMonitorTaskConst):
  19. @staticmethod
  20. def get_monitor_account_list():
  21. dt = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")
  22. week_ago = (datetime.today() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
  23. query = f"""
  24. SELECT 公众号名, ghid, count(DISTINCT mid) AS uv
  25. FROM loghubods.opengid_base_data
  26. WHERE dt = {dt}
  27. AND hotsencetype = 1058
  28. AND usersharedepth = 0
  29. AND channel = '公众号合作-即转-稳定'
  30. AND 点击时间 >= '{week_ago}'
  31. GROUP BY 公众号名, ghid
  32. ORDER BY uv DESC
  33. ;
  34. """
  35. result = fetch_from_odps(query)
  36. return result
  37. @staticmethod
  38. def extract_page_path(page_path):
  39. # 解析外层 URL
  40. parsed_url = urlparse(page_path)
  41. outer_params = parse_qs(parsed_url.query)
  42. # 取出并解码 jumpPage
  43. jump_page = outer_params.get("jumpPage", [""])[0]
  44. if not jump_page:
  45. return None, None
  46. decoded_jump_page = unquote(jump_page)
  47. # 解析 jumpPage 内层参数
  48. inner_query = urlparse(decoded_jump_page).query
  49. inner_params = parse_qs(inner_query)
  50. video_id = inner_params.get("id", [None])[0]
  51. root_source_id = inner_params.get("rootSourceId", [None])[0]
  52. return video_id, root_source_id
  53. @staticmethod
  54. def extract_wx_sn(content_url):
  55. if not content_url:
  56. return None
  57. query = urlparse(content_url).query
  58. return parse_qs(query).get("sn", [None])[0]
  59. class CooperateAccountsMonitorTask(CooperateAccountsMonitorTaskUtils):
  60. def __init__(self, pool, log_client):
  61. self.pool = pool
  62. self.log_client = log_client
  63. # 获取 gh_id 的兜底逻辑
  64. async def fetch_gh_id(self, account_name):
  65. query = """
  66. SELECT gh_id FROM content_platform_gzh_account WHERE name = %s AND status = %s;
  67. """
  68. fetch_response = await self.pool.async_fetch(
  69. query=query, db_name="growth", params=(account_name, self.VALID_STATUS)
  70. )
  71. return fetch_response[0].get("gh_id", None) if fetch_response else None
  72. # 修改 fetch 状态
  73. async def update_fetch_status(self, wx_sn, ori_status, new_status):
  74. query = """
  75. UPDATE cooperate_accounts_daily_detail SET fetch_status = %s WHERE wx_sn = %s AND fetch_status = %s;
  76. """
  77. return await self.pool.async_save(
  78. query=query, params=(new_status, wx_sn, ori_status)
  79. )
  80. # 更新文章详情
  81. async def set_article_detail(self, article):
  82. wx_sn = article["wx_sn"]
  83. article_link = article["article_link"]
  84. # acquire lock
  85. acquire_lock = await self.update_fetch_status(
  86. wx_sn, self.INIT_STATUS, self.PROCESSING_STATUS
  87. )
  88. if not acquire_lock:
  89. print("锁抢占失败")
  90. return acquire_lock
  91. article_detail = await get_article_detail(
  92. article_link, is_count=True, is_cache=False
  93. )
  94. if not article_detail:
  95. return await self.update_fetch_status(
  96. wx_sn, self.PROCESSING_STATUS, self.INIT_STATUS
  97. )
  98. # 更新文章信息
  99. code = article_detail.get("code", None)
  100. match code:
  101. case 0:
  102. try:
  103. article = article_detail.get("data", {}).get("data", {})
  104. body_text = article.get("body_text", None)
  105. images = article.get("image_url_list", [])
  106. mini_program = article.get("mini_program", [])
  107. has_mini_program = (
  108. self.HAS_MINI_PROGRAM
  109. if mini_program
  110. else self.DONT_HAS_MINI_PROGRAM
  111. )
  112. read_cnt = article.get("view_count", None)
  113. like_cnt = article.get("like_count", None)
  114. share_cnt = article.get("share_count", None)
  115. looking_cnt = article.get("looking_count", None)
  116. publish_timestamp = article.get("publish_timestamp", None)
  117. await self.store_mini_program(mini_program, wx_sn)
  118. query = """
  119. UPDATE cooperate_accounts_daily_detail SET
  120. article_text = %s,
  121. article_images = %s,
  122. read_cnt = %s,
  123. like_cnt = %s,
  124. share_cnt = %s,
  125. looking_cnt = %s,
  126. publish_timestamp = %s,
  127. fetch_status = %s,
  128. has_mini_program = %s
  129. WHERE wx_sn = %s AND fetch_status = %s;
  130. """
  131. return await self.pool.async_save(
  132. query=query,
  133. params=(
  134. body_text,
  135. json.dumps(images, ensure_ascii=False),
  136. read_cnt,
  137. like_cnt,
  138. share_cnt,
  139. looking_cnt,
  140. int(publish_timestamp / 1000),
  141. self.SUCCESS_STATUS,
  142. has_mini_program,
  143. wx_sn,
  144. self.PROCESSING_STATUS,
  145. ),
  146. )
  147. except Exception as e:
  148. print(f"更新文章详情失败-{article_link}-{e}")
  149. return await self.update_fetch_status(
  150. wx_sn, self.PROCESSING_STATUS, self.FAIL_STATUS
  151. )
  152. case _:
  153. return await self.update_fetch_status(
  154. wx_sn, self.PROCESSING_STATUS, self.FAIL_STATUS
  155. )
  156. # 存储小程序信息
  157. async def store_mini_program(self, mini_program, wx_sn):
  158. for card_index, i in enumerate(mini_program, 1):
  159. try:
  160. video_id, root_source_id = self.extract_page_path(i["path"])
  161. card_title = i["title"]
  162. card_cover = i["image_url"]
  163. mini_name = i["nike_name"]
  164. query = """
  165. INSERT INTO cooperate_accounts_daily_mini_info
  166. (wx_sn, card_title, card_cover, video_id, root_source_id, mini_program_name, card_index)
  167. VALUES
  168. (%s, %s, %s, %s, %s, %s, %s);
  169. """
  170. await self.pool.async_save(
  171. query=query,
  172. params=(
  173. wx_sn,
  174. card_title,
  175. card_cover,
  176. video_id,
  177. root_source_id,
  178. mini_name,
  179. card_index,
  180. ),
  181. )
  182. except Exception as e:
  183. print(e)
  184. continue
  185. # 存储文章
  186. async def store_articles(self, gh_id, account_name, article_list):
  187. params = []
  188. for group_article in article_list:
  189. base_info = group_article["AppMsg"]["BaseInfo"]
  190. detail_info = group_article["AppMsg"]["DetailInfo"]
  191. for single_article in detail_info:
  192. single_param = (
  193. gh_id,
  194. account_name,
  195. base_info["AppMsgId"],
  196. base_info["Type"],
  197. single_article["ItemIndex"],
  198. single_article["Title"],
  199. single_article["ContentUrl"],
  200. single_article["CoverImgUrl"],
  201. single_article["Digest"],
  202. single_article["send_time"],
  203. self.extract_wx_sn(single_article["ContentUrl"]),
  204. )
  205. params.append(single_param)
  206. query = """
  207. INSERT IGNORE INTO cooperate_accounts_daily_detail
  208. (gh_id, account_name, app_msg_id, publish_type, position, article_title, article_link, article_cover, article_desc, publish_timestamp, wx_sn)
  209. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  210. """
  211. await self.pool.async_save(query=query, params=params, batch=True)
  212. # 存储单个账号
  213. async def store_single_accounts(self, account):
  214. account_name = account.公众号名
  215. gh_id = account.ghid
  216. if not gh_id:
  217. gh_id = await self.fetch_gh_id(account_name)
  218. if not gh_id:
  219. return
  220. # 只抓最新的文章
  221. crawl_response = await get_article_list_from_account(gh_id)
  222. if not crawl_response:
  223. return
  224. code = crawl_response.get("code")
  225. match code:
  226. case 0:
  227. article_list = crawl_response.get("data", {} or {}).get("data", [])
  228. # 将文章存储到库中
  229. await self.store_articles(gh_id, account_name, article_list)
  230. case _:
  231. print(crawl_response["msg"])
  232. pass
  233. # 获取待处理的文章
  234. async def get_article_list(self):
  235. query = """
  236. SELECT wx_sn, article_link FROM cooperate_accounts_daily_detail WHERE fetch_status = %s LIMIT %s;
  237. """
  238. return await self.pool.async_fetch(
  239. query=query, params=(self.INIT_STATUS, self.ARTICLE_NUM)
  240. )
  241. # 入口函数
  242. async def deal(self, task_name):
  243. match task_name:
  244. case "save_articles":
  245. account_list = self.get_monitor_account_list()
  246. for account in tqdm(account_list):
  247. try:
  248. await self.store_single_accounts(account)
  249. except Exception as e:
  250. print(f"获取账号文章失败--{account.公众号名}--{e}")
  251. case "get_detail":
  252. article_list = await self.get_article_list()
  253. for article in tqdm(article_list, desc="处理文章详情"):
  254. try:
  255. await self.set_article_detail(article)
  256. except Exception as e:
  257. print(f"获取文章详情失败-{article['article_link']}-{e}")