import asyncio import json import random import traceback from tqdm import tqdm from datetime import datetime, timedelta from urllib.parse import unquote, parse_qs, urlparse from applications.utils import fetch_from_odps from applications.crawler.wechat import get_article_list_from_account from applications.crawler.wechat import get_article_detail class CooperateAccountsMonitorTaskConst: INVALID_STATUS = 0 VALID_STATUS = 1 INIT_STATUS = 0 PROCESSING_STATUS = 1 SUCCESS_STATUS = 2 FAIL_STATUS = 99 HAS_MINI_PROGRAM = 1 DONT_HAS_MINI_PROGRAM = 0 ARTICLE_NUM = 200 class CooperateAccountsMonitorTaskUtils(CooperateAccountsMonitorTaskConst): @staticmethod def get_monitor_account_list(): # dt = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d") week_ago = (datetime.today() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S") query = f""" SELECT 公众号名, ghid, count(DISTINCT mid) AS uv FROM loghubods.opengid_base_data WHERE dt = MAX_PT('loghubods.opengid_base_data') AND hotsencetype = 1058 AND usersharedepth = 0 AND channel = '公众号合作-即转-稳定' AND 点击时间 >= '{week_ago}' GROUP BY 公众号名, ghid ORDER BY uv DESC ; """ result = fetch_from_odps(query) return result @staticmethod def extract_page_path(page_path): # 解析外层 URL parsed_url = urlparse(page_path) outer_params = parse_qs(parsed_url.query) # 取出并解码 jumpPage jump_page = outer_params.get("jumpPage", [""])[0] if not jump_page: return None, None decoded_jump_page = unquote(jump_page) # 解析 jumpPage 内层参数 inner_query = urlparse(decoded_jump_page).query inner_params = parse_qs(inner_query) video_id = inner_params.get("id", [None])[0] root_source_id = inner_params.get("rootSourceId", [None])[0] return video_id, root_source_id @staticmethod def extract_wx_sn(content_url): if not content_url: return None query = urlparse(content_url).query return parse_qs(query).get("sn", [None])[0] class CooperateAccountsMonitorMapper(CooperateAccountsMonitorTaskUtils): def __init__(self, pool, log_client): self.pool = pool self.log_client = log_client # 从 growth 数据库获取账号 async def fetch_monitor_accounts(self): query = """ SELECT t2.name AS partner_name, t2.channel AS partner_id, t1.name AS account_name, t1.gh_id FROM content_platform_gzh_account t1 JOIN content_platform_account t2 ON t1.create_account_id = t2.id WHERE t1.status = 1; """ return await self.pool.async_fetch(query=query, db_name="growth") # 获取 gh_id 的兜底逻辑 async def fetch_gh_id(self, account_name): query = """ SELECT gh_id FROM content_platform_gzh_account WHERE name = %s AND status = %s; """ fetch_response = await self.pool.async_fetch( query=query, db_name="growth", params=(account_name, self.VALID_STATUS) ) return fetch_response[0].get("gh_id", None) if fetch_response else None # 修改 fetch 状态 async def update_fetch_status(self, wx_sn, ori_status, new_status): query = """ UPDATE cooperate_accounts_daily_detail SET fetch_status = %s WHERE wx_sn = %s AND fetch_status = %s; """ return await self.pool.async_save( query=query, params=(new_status, wx_sn, ori_status) ) # 存储账号并且维护账号状态 async def save_account(self, account): query = """ INSERT IGNORE INTO cooperate_accounts (partner_name, partner_id, account_name, gh_id) VALUES (%s, %s, %s, %s); """ return await self.pool.async_save( query=query, params=( account["partner_name"], account["partner_id"], account["account_name"], account["gh_id"], ), ) # 修改账号状态 async def update_account_status(self, gh_id, ori_status, new_status, remark): query = """ UPDATE cooperate_accounts SET status = %s, remark = %s WHERE gh_id = %s AND status = %s; """ return await self.pool.async_save( query=query, params=(new_status, remark, gh_id, ori_status) ) # 获取账号状态 async def get_account_status(self, gh_id): query = """ SELECT status FROM cooperate_accounts WHERE gh_id = %s; """ fetch_response = await self.pool.async_fetch(query=query, params=(gh_id,)) return fetch_response class CooperateAccountsMonitorTask(CooperateAccountsMonitorMapper): def __init__(self, pool, log_client): super().__init__(pool, log_client) # 更新文章详情 async def set_article_detail(self, article): wx_sn = article["wx_sn"] article_link = article["article_link"] # acquire lock acquire_lock = await self.update_fetch_status( wx_sn, self.INIT_STATUS, self.PROCESSING_STATUS ) if not acquire_lock: print("锁抢占失败") return acquire_lock article_detail = await get_article_detail( article_link, is_count=True, is_cache=False ) if not article_detail: return await self.update_fetch_status( wx_sn, self.PROCESSING_STATUS, self.INIT_STATUS ) # 更新文章信息 code = article_detail.get("code", None) match code: case 0: try: article = article_detail.get("data", {}).get("data", {}) body_text = article.get("body_text", None) images = article.get("image_url_list", []) mini_program = article.get("mini_program", []) has_mini_program = ( self.HAS_MINI_PROGRAM if mini_program else self.DONT_HAS_MINI_PROGRAM ) read_cnt = article.get("view_count", None) like_cnt = article.get("like_count", None) share_cnt = article.get("share_count", None) looking_cnt = article.get("looking_count", None) publish_timestamp = article.get("publish_timestamp", None) await self.store_mini_program(mini_program, wx_sn) query = """ UPDATE cooperate_accounts_daily_detail SET article_text = %s, article_images = %s, read_cnt = %s, like_cnt = %s, share_cnt = %s, looking_cnt = %s, publish_timestamp = %s, fetch_status = %s, has_mini_program = %s WHERE wx_sn = %s AND fetch_status = %s; """ return await self.pool.async_save( query=query, params=( body_text, json.dumps(images, ensure_ascii=False), read_cnt, like_cnt, share_cnt, looking_cnt, int(publish_timestamp / 1000), self.SUCCESS_STATUS, has_mini_program, wx_sn, self.PROCESSING_STATUS, ), ) except Exception as e: print(f"更新文章详情失败-{article_link}-{e}") return await self.update_fetch_status( wx_sn, self.PROCESSING_STATUS, self.FAIL_STATUS ) case _: return await self.update_fetch_status( wx_sn, self.PROCESSING_STATUS, self.FAIL_STATUS ) # 存储小程序信息 async def store_mini_program(self, mini_program, wx_sn): for card_index, i in enumerate(mini_program, 1): try: video_id, root_source_id = self.extract_page_path(i["path"]) card_title = i["title"] card_cover = i["image_url"] mini_name = i["nike_name"] query = """ INSERT INTO cooperate_accounts_daily_mini_info (wx_sn, card_title, card_cover, video_id, root_source_id, mini_program_name, card_index) VALUES (%s, %s, %s, %s, %s, %s, %s); """ await self.pool.async_save( query=query, params=( wx_sn, card_title, card_cover, video_id, root_source_id, mini_name, card_index, ), ) except Exception as e: print(e) continue # 存储文章 async def store_articles(self, gh_id, account_name, article_list): params = [] for group_article in article_list: base_info = group_article["AppMsg"]["BaseInfo"] detail_info = group_article["AppMsg"]["DetailInfo"] for single_article in detail_info: single_param = ( gh_id, account_name, base_info["AppMsgId"], base_info["Type"], single_article["ItemIndex"], single_article["Title"], single_article["ContentUrl"], single_article["CoverImgUrl"], single_article["Digest"], single_article["send_time"], self.extract_wx_sn(single_article["ContentUrl"]), ) params.append(single_param) query = """ INSERT IGNORE INTO cooperate_accounts_daily_detail (gh_id, account_name, app_msg_id, publish_type, position, article_title, article_link, article_cover, article_desc, publish_timestamp, wx_sn) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ await self.pool.async_save(query=query, params=params, batch=True) # 存储单个账号 async def store_single_accounts(self, account): account_name = account["account_name"] gh_id = account["gh_id"] account_status = await self.get_account_status(gh_id) if not account_status: # 账号没存在长文数据库 affected_row = await self.save_account(account) print(f"账号{account_name}存储到数据库") if not affected_row: return account_status = await self.get_account_status(gh_id) account_using_status = account_status[0].get("status", None) if not account_using_status: print("账号违规") return # 只抓最新的文章 crawl_response = await get_article_list_from_account(gh_id) await asyncio.sleep(random.randint(1, 3)) if not crawl_response: return code = crawl_response.get("code") match code: case 0: article_list = crawl_response.get("data", {} or {}).get("data", []) # 将文章存储到库中 await self.store_articles(gh_id, account_name, article_list) case 25013: msg = crawl_response.get("msg") await self.update_account_status( gh_id, self.VALID_STATUS, self.INVALID_STATUS, msg ) case _: print(crawl_response["msg"]) pass # 获取待处理的文章 async def get_article_list(self): query = """ SELECT wx_sn, article_link FROM cooperate_accounts_daily_detail WHERE fetch_status = %s ORDER BY position LIMIT %s; """ return await self.pool.async_fetch( query=query, params=(self.INIT_STATUS, self.ARTICLE_NUM) ) # 入口函数 async def deal(self, task_name): match task_name: case "save_articles": account_list = await self.fetch_monitor_accounts() for account in tqdm(account_list): print(f"开始处理账号:{account['account_name']}") try: await self.store_single_accounts(account) except Exception as e: print(f"获取账号文章失败--{account['account_name']}--{e}") print(traceback.format_exc()) case "get_detail": article_list = await self.get_article_list() for article in tqdm(article_list, desc="处理文章详情"): try: await self.set_article_detail(article) except Exception as e: print(f"获取文章详情失败-{article['article_link']}-{e}")