| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390 |
- import asyncio
- import json
- import random
- import traceback
- from tqdm import tqdm
- from datetime import datetime, timedelta
- from urllib.parse import unquote, parse_qs, urlparse
- from applications.utils import fetch_from_odps, show_desc_to_sta
- from applications.crawler.wechat import get_article_list_from_account
- from applications.crawler.wechat import get_article_detail
- class CooperateAccountsMonitorTaskConst:
- INVALID_STATUS = 0
- VALID_STATUS = 1
- INIT_STATUS = 0
- PROCESSING_STATUS = 1
- SUCCESS_STATUS = 2
- FAIL_STATUS = 99
- HAS_MINI_PROGRAM = 1
- DONT_HAS_MINI_PROGRAM = 0
- ARTICLE_NUM = 100
- class CooperateAccountsMonitorTaskUtils(CooperateAccountsMonitorTaskConst):
- @staticmethod
- def get_uv_account_list():
- # dt = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")
- week_ago = (datetime.today() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
- query = f"""
- SELECT 公众号名, ghid, count(DISTINCT mid) AS uv
- FROM loghubods.opengid_base_data
- WHERE dt = MAX_PT('loghubods.opengid_base_data')
- AND hotsencetype = 1058
- AND usersharedepth = 0
- AND channel = '公众号合作-即转-稳定'
- AND 点击时间 >= '{week_ago}'
- GROUP BY 公众号名, ghid
- ORDER BY uv DESC
- ;
- """
- result = fetch_from_odps(query)
- return result
- @staticmethod
- def extract_page_path(page_path):
- # 解析外层 URL
- parsed_url = urlparse(page_path)
- outer_params = parse_qs(parsed_url.query)
- # 取出并解码 jumpPage
- jump_page = outer_params.get("jumpPage", [""])[0]
- if not jump_page:
- return None, None
- decoded_jump_page = unquote(jump_page)
- # 解析 jumpPage 内层参数
- inner_query = urlparse(decoded_jump_page).query
- inner_params = parse_qs(inner_query)
- video_id = inner_params.get("id", [None])[0]
- root_source_id = inner_params.get("rootSourceId", [None])[0]
- return video_id, root_source_id
- @staticmethod
- def extract_wx_sn(content_url):
- if not content_url:
- return None
- query = urlparse(content_url).query
- return parse_qs(query).get("sn", [None])[0]
- class CooperateAccountsMonitorMapper(CooperateAccountsMonitorTaskUtils):
- def __init__(self, pool, log_client):
- self.pool = pool
- self.log_client = log_client
- # 从 growth 数据库获取账号
- async def fetch_monitor_accounts(self):
- query = """
- SELECT t2.name AS partner_name, t2.channel AS partner_id,
- t1.name AS account_name, t1.gh_id
- FROM content_platform_gzh_account t1 JOIN content_platform_account t2
- ON t1.create_account_id = t2.id
- WHERE t1.status = 1;
- """
- return await self.pool.async_fetch(query=query, db_name="growth")
- # 获取 gh_id 的兜底逻辑
- async def fetch_gh_id(self, account_name):
- query = """
- SELECT gh_id FROM content_platform_gzh_account WHERE name = %s AND status = %s;
- """
- fetch_response = await self.pool.async_fetch(
- query=query, db_name="growth", params=(account_name, self.VALID_STATUS)
- )
- return fetch_response[0].get("gh_id", None) if fetch_response else None
- # 修改 fetch 状态
- async def update_fetch_status(self, wx_sn, ori_status, new_status):
- query = """
- UPDATE cooperate_accounts_daily_detail SET fetch_status = %s WHERE wx_sn = %s AND fetch_status = %s;
- """
- return await self.pool.async_save(
- query=query, params=(new_status, wx_sn, ori_status)
- )
- # 存储账号并且维护账号状态
- async def save_account(self, account):
- query = """
- INSERT IGNORE INTO cooperate_accounts (partner_name, partner_id, account_name, gh_id)
- VALUES (%s, %s, %s, %s);
- """
- return await self.pool.async_save(
- query=query,
- params=(
- account["partner_name"],
- account["partner_id"],
- account["account_name"],
- account["gh_id"],
- ),
- )
- # 修改账号状态
- async def update_account_status(self, gh_id, ori_status, new_status, remark):
- query = """
- UPDATE cooperate_accounts SET status = %s, remark = %s WHERE gh_id = %s AND status = %s;
- """
- return await self.pool.async_save(
- query=query, params=(new_status, remark, gh_id, ori_status)
- )
- # 获取账号状态
- async def get_account_status(self, gh_id):
- query = """
- SELECT status FROM cooperate_accounts WHERE gh_id = %s;
- """
- fetch_response = await self.pool.async_fetch(query=query, params=(gh_id,))
- return fetch_response
- class CooperateAccountsMonitorTask(CooperateAccountsMonitorMapper):
- def __init__(self, pool, log_client):
- super().__init__(pool, log_client)
- # 更新文章详情
- async def set_article_detail(self, article):
- wx_sn = article["wx_sn"]
- article_link = article["article_link"]
- # acquire lock
- acquire_lock = await self.update_fetch_status(
- wx_sn, self.INIT_STATUS, self.PROCESSING_STATUS
- )
- if not acquire_lock:
- print("锁抢占失败")
- return acquire_lock
- article_detail = await get_article_detail(
- article_link, is_count=True, is_cache=False
- )
- if not article_detail:
- return await self.update_fetch_status(
- wx_sn, self.PROCESSING_STATUS, self.INIT_STATUS
- )
- # 更新文章信息
- code = article_detail.get("code", None)
- match code:
- case 0:
- try:
- article = article_detail.get("data", {}).get("data", {})
- body_text = article.get("body_text", None)
- images = article.get("image_url_list", [])
- mini_program = article.get("mini_program", [])
- has_mini_program = (
- self.HAS_MINI_PROGRAM
- if mini_program
- else self.DONT_HAS_MINI_PROGRAM
- )
- read_cnt = article.get("view_count", None)
- like_cnt = article.get("like_count", None)
- share_cnt = article.get("share_count", None)
- looking_cnt = article.get("looking_count", None)
- publish_timestamp = article.get("publish_timestamp", None)
- await self.store_mini_program(mini_program, wx_sn)
- query = """
- UPDATE cooperate_accounts_daily_detail SET
- article_text = %s,
- article_images = %s,
- read_cnt = %s,
- like_cnt = %s,
- share_cnt = %s,
- looking_cnt = %s,
- publish_timestamp = %s,
- fetch_status = %s,
- has_mini_program = %s
- WHERE wx_sn = %s AND fetch_status = %s;
- """
- return await self.pool.async_save(
- query=query,
- params=(
- body_text,
- json.dumps(images, ensure_ascii=False),
- read_cnt,
- like_cnt,
- share_cnt,
- looking_cnt,
- int(publish_timestamp / 1000),
- self.SUCCESS_STATUS,
- has_mini_program,
- wx_sn,
- self.PROCESSING_STATUS,
- ),
- )
- except Exception as e:
- print(f"更新文章详情失败-{article_link}-{e}")
- return await self.update_fetch_status(
- wx_sn, self.PROCESSING_STATUS, self.FAIL_STATUS
- )
- case _:
- return await self.update_fetch_status(
- wx_sn, self.PROCESSING_STATUS, self.FAIL_STATUS
- )
- # 存储小程序信息
- async def store_mini_program(self, mini_program, wx_sn):
- for card_index, i in enumerate(mini_program, 1):
- try:
- video_id, root_source_id = self.extract_page_path(i["path"])
- card_title = i["title"]
- card_cover = i["image_url"]
- mini_name = i["nike_name"]
- query = """
- INSERT INTO cooperate_accounts_daily_mini_info
- (wx_sn, card_title, card_cover, video_id, root_source_id, mini_program_name, card_index)
- VALUES
- (%s, %s, %s, %s, %s, %s, %s);
- """
- await self.pool.async_save(
- query=query,
- params=(
- wx_sn,
- card_title,
- card_cover,
- video_id,
- root_source_id,
- mini_name,
- card_index,
- ),
- )
- except Exception as e:
- print(e)
- continue
- # 存储文章
- async def store_articles(self, gh_id, account_name, article_list):
- params = []
- for group_article in article_list:
- base_info = group_article["AppMsg"]["BaseInfo"]
- detail_info = group_article["AppMsg"]["DetailInfo"]
- for single_article in detail_info:
- show_stat = show_desc_to_sta(single_article.get("ShowDesc", None))
- single_param = (
- gh_id,
- account_name,
- base_info["AppMsgId"],
- base_info["Type"],
- single_article["ItemIndex"],
- single_article["Title"],
- single_article["ContentUrl"],
- single_article["CoverImgUrl"],
- single_article["Digest"],
- single_article["send_time"],
- self.extract_wx_sn(single_article["ContentUrl"]),
- show_stat.get("show_view_count", 0),
- show_stat.get("show_like_count", 0)
- )
- params.append(single_param)
- query = """
- INSERT INTO cooperate_accounts_daily_detail
- (gh_id, account_name, app_msg_id, publish_type, position, article_title, article_link, article_cover, article_desc, publish_timestamp, wx_sn, read_cnt, like_cnt)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- read_cnt = VALUES(read_cnt), like_cnt = VALUES(like_cnt);
- """
- await self.pool.async_save(query=query, params=params, batch=True)
- # 存储单个账号
- async def store_single_accounts(self, account):
- account_name = account["account_name"]
- gh_id = account["gh_id"]
- account_status = await self.get_account_status(gh_id)
- if not account_status:
- # 账号没存在长文数据库
- affected_row = await self.save_account(account)
- print(f"账号{account_name}存储到数据库")
- if not affected_row:
- return
- account_status = await self.get_account_status(gh_id)
- account_using_status = account_status[0].get("status", None)
- if not account_using_status:
- print("账号违规")
- return
- # 只抓最新的文章
- crawl_response = await get_article_list_from_account(gh_id)
- await asyncio.sleep(random.randint(1, 3))
- if not crawl_response:
- return
- code = crawl_response.get("code")
- match code:
- case 0:
- article_list = crawl_response.get("data", {} or {}).get("data", [])
- # 将文章存储到库中
- await self.store_articles(gh_id, account_name, article_list)
- case 25013:
- msg = crawl_response.get("msg")
- await self.update_account_status(
- gh_id, self.VALID_STATUS, self.INVALID_STATUS, msg
- )
- case _:
- print(crawl_response["msg"])
- pass
- # 获取待处理的文章
- async def get_article_list(self, account_name_tuple):
- query = """
- SELECT wx_sn, article_link FROM cooperate_accounts_daily_detail
- WHERE fetch_status = %s AND account_name IN %s ORDER BY position LIMIT %s;
- """
- return await self.pool.async_fetch(
- query=query, params=(self.INIT_STATUS, account_name_tuple, self.ARTICLE_NUM)
- )
- # 入口函数
- async def deal(self, task_name):
- match task_name:
- case "save_articles":
- account_list = await self.fetch_monitor_accounts()
- for account in tqdm(account_list):
- print(f"开始处理账号:{account['account_name']}")
- try:
- await self.store_single_accounts(account)
- except Exception as e:
- print(f"获取账号文章失败--{account['account_name']}--{e}")
- print(traceback.format_exc())
- case "get_detail":
- has_uv_accounts = self.get_uv_account_list()
- has_uv_name_list = []
- for i in has_uv_accounts:
- account_name = i.公众号名
- if account_name:
- has_uv_name_list.append(account_name)
- if has_uv_name_list:
- account_name_tuple = tuple(has_uv_name_list)
- article_list = await self.get_article_list(account_name_tuple)
- for article in tqdm(article_list, desc="处理文章详情"):
- try:
- await self.set_article_detail(article)
- except Exception as e:
- print(f"获取文章详情失败-{article['article_link']}-{e}")
- else:
- print("没有需要处理详情的账号")
- return
|