|
|
@@ -0,0 +1,258 @@
|
|
|
+import traceback
|
|
|
+
|
|
|
+from tqdm import tqdm
|
|
|
+
|
|
|
+from app.core.database import DatabaseManager
|
|
|
+from app.core.observability import LogService
|
|
|
+
|
|
|
+from app.infra.shared.tools import show_desc_to_sta
|
|
|
+from app.infra.crawler.wechat import get_article_list_from_account
|
|
|
+from app.infra.crawler.wechat import get_article_detail
|
|
|
+
|
|
|
+from ._const import AdPlatformAccountsMonitorTaskConst
|
|
|
+from ._mapper import AdPlatformAccountsMonitorMapper
|
|
|
+from ._utils import AdPlatformAccountsMonitorTaskUtils
|
|
|
+
|
|
|
+
|
|
|
+class AdPlatformAccountsMonitorTask(AdPlatformAccountsMonitorTaskConst):
|
|
|
+ def __init__(self, pool: DatabaseManager, log_service: LogService):
|
|
|
+ self.log_service = log_service
|
|
|
+ self.tool = AdPlatformAccountsMonitorTaskUtils()
|
|
|
+ self.mapper = AdPlatformAccountsMonitorMapper(pool, self.log_service)
|
|
|
+
|
|
|
+ # 更新文章详情
|
|
|
+ async def set_article_detail(self, article):
|
|
|
+ wx_sn = article["wx_sn"]
|
|
|
+ article_link = article["article_link"]
|
|
|
+ # acquire lock
|
|
|
+ acquire_lock = await self.mapper.update_fetch_status(
|
|
|
+ wx_sn, self.INIT_STATUS, self.PROCESSING_STATUS
|
|
|
+ )
|
|
|
+ if not acquire_lock:
|
|
|
+ print("锁抢占失败")
|
|
|
+ return acquire_lock
|
|
|
+
|
|
|
+ article_detail = await get_article_detail(
|
|
|
+ article_link, is_count=True, is_cache=False
|
|
|
+ )
|
|
|
+ if not article_detail:
|
|
|
+ return await self.mapper.update_fetch_status(
|
|
|
+ wx_sn, self.PROCESSING_STATUS, self.INIT_STATUS
|
|
|
+ )
|
|
|
+
|
|
|
+ # 更新文章信息
|
|
|
+ code = article_detail.get("code", None)
|
|
|
+
|
|
|
+ match code:
|
|
|
+ case self.CRAWL_SUCCESS_STATUS:
|
|
|
+ try:
|
|
|
+ article = article_detail.get("data", {}).get("data", {})
|
|
|
+ images = article.get("image_url_list", [])
|
|
|
+ params = (
|
|
|
+ article.get("body_text", None),
|
|
|
+ self.tool.json_dumps(images),
|
|
|
+ article.get("view_count", None),
|
|
|
+ article.get("like_count", None),
|
|
|
+ article.get("share_count", None),
|
|
|
+ article.get("looking_count", None),
|
|
|
+ int(article.get("publish_timestamp", 0) / 1000),
|
|
|
+ self.SUCCESS_STATUS,
|
|
|
+ wx_sn,
|
|
|
+ self.PROCESSING_STATUS,
|
|
|
+ )
|
|
|
+ return await self.mapper.update_article(params)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"更新文章详情失败-{article_link}-{e}")
|
|
|
+ return await self.mapper.update_fetch_status(
|
|
|
+ wx_sn, self.PROCESSING_STATUS, self.FAIL_STATUS
|
|
|
+ )
|
|
|
+
|
|
|
+ case _:
|
|
|
+ return await self.mapper.update_fetch_status(
|
|
|
+ wx_sn, self.PROCESSING_STATUS, self.FAIL_STATUS
|
|
|
+ )
|
|
|
+
|
|
|
+ # 存储文章
|
|
|
+ async def store_articles(self, gh_id, account_name, article_list):
|
|
|
+ params = []
|
|
|
+ for group_article in article_list:
|
|
|
+ base_info = group_article["AppMsg"]["BaseInfo"]
|
|
|
+ detail_info = group_article["AppMsg"]["DetailInfo"]
|
|
|
+ for single_article in detail_info:
|
|
|
+ try:
|
|
|
+ show_stat = show_desc_to_sta(single_article.get("ShowDesc", None))
|
|
|
+ single_param = (
|
|
|
+ gh_id,
|
|
|
+ account_name,
|
|
|
+ base_info["AppMsgId"],
|
|
|
+ base_info["Type"],
|
|
|
+ single_article["ItemIndex"],
|
|
|
+ single_article["Title"],
|
|
|
+ single_article["ContentUrl"],
|
|
|
+ single_article["CoverImgUrl"],
|
|
|
+ single_article["Digest"],
|
|
|
+ single_article["send_time"],
|
|
|
+ self.tool.extract_wx_sn(single_article["ContentUrl"]),
|
|
|
+ show_stat.get("show_view_count", 0),
|
|
|
+ show_stat.get("show_like_count", 0),
|
|
|
+ )
|
|
|
+ params.append(single_param)
|
|
|
+ except Exception as e:
|
|
|
+ print(f"存储文章失败-{single_article['ContentUrl']}-{e}")
|
|
|
+
|
|
|
+ return await self.mapper.save_articles_batch(article_tuple_list=params)
|
|
|
+
|
|
|
+ # 抓取单个账号
|
|
|
+ async def crawl_single_account(self, account):
|
|
|
+ account_name = account["account_name"]
|
|
|
+ gh_id = account["gh_id"]
|
|
|
+
|
|
|
+ publish_time_detail = await self.mapper.fetch_max_publish_timestamp(gh_id)
|
|
|
+ if publish_time_detail:
|
|
|
+ max_publish_timestamp = publish_time_detail[0]["publish_timestamp"]
|
|
|
+ if max_publish_timestamp is None:
|
|
|
+ max_publish_timestamp = self.tool.get_now_timestamp() - self.ACCOUNT_CRAWL_DURATION
|
|
|
+ else:
|
|
|
+ max_publish_timestamp = self.tool.get_now_timestamp() - self.ACCOUNT_CRAWL_DURATION
|
|
|
+
|
|
|
+ cursor = None
|
|
|
+ while True:
|
|
|
+ crawl_response = await get_article_list_from_account(
|
|
|
+ account_id=gh_id, index=cursor
|
|
|
+ )
|
|
|
+ await self.tool.sleep_between_each_request()
|
|
|
+ if not crawl_response:
|
|
|
+ return
|
|
|
+
|
|
|
+ code = crawl_response.get("code")
|
|
|
+ match code:
|
|
|
+ case self.CRAWL_SUCCESS_STATUS:
|
|
|
+ article_list = crawl_response.get("data", {}).get("data", [])
|
|
|
+ if not article_list:
|
|
|
+ return
|
|
|
+
|
|
|
+ # save articles
|
|
|
+ await self.store_articles(gh_id, account_name, article_list)
|
|
|
+
|
|
|
+ # update cursor
|
|
|
+ last_article = article_list[-1]
|
|
|
+ last_publish_timestamp = last_article["AppMsg"]["BaseInfo"][
|
|
|
+ "UpdateTime"
|
|
|
+ ]
|
|
|
+ if last_publish_timestamp <= max_publish_timestamp:
|
|
|
+ return
|
|
|
+
|
|
|
+ cursor = crawl_response["data"].get("next_cursor")
|
|
|
+ if not cursor:
|
|
|
+ return
|
|
|
+
|
|
|
+ case self.ACCOUNT_FORBIDDEN_STATUS:
|
|
|
+ msg = crawl_response.get("msg")
|
|
|
+ await self.mapper.update_account_status(
|
|
|
+ gh_id, self.VALID_STATUS, self.INVALID_STATUS, msg
|
|
|
+ )
|
|
|
+ return
|
|
|
+
|
|
|
+ case self.CRAWL_CRASH_CODE:
|
|
|
+ await self.log_service.log(
|
|
|
+ contents={
|
|
|
+ "task": "ad_platform_accounts_monitor",
|
|
|
+ "data": {"gh_id": gh_id},
|
|
|
+ "message": "爬虫挂掉",
|
|
|
+ "status": "fail",
|
|
|
+ }
|
|
|
+ )
|
|
|
+ return
|
|
|
+
|
|
|
+ case _:
|
|
|
+ print(crawl_response.get("msg", crawl_response))
|
|
|
+ return
|
|
|
+
|
|
|
+ # 计算账号阅读中位数,更新文章阅读中位数倍数
|
|
|
+ async def calculate_read_median(self, account):
|
|
|
+ gh_id = account["gh_id"]
|
|
|
+ fans_level = account["fans_level"]
|
|
|
+
|
|
|
+ head_articles = await self.mapper.fetch_head_articles(gh_id)
|
|
|
+ head_article_num = len(head_articles)
|
|
|
+ execute_time = self.tool.get_now_dt()
|
|
|
+
|
|
|
+ if not head_article_num:
|
|
|
+ print(
|
|
|
+ f"No article for account to calculate read median--{account['account_name']}"
|
|
|
+ )
|
|
|
+ remark = f"{execute_time}--计算阅读头条阅读中位数无文章"
|
|
|
+ await self.mapper.update_account_read_median(
|
|
|
+ gh_id=gh_id,
|
|
|
+ read_median=self.ZERO_FLOAT,
|
|
|
+ head_article_num=head_article_num,
|
|
|
+ remark=remark,
|
|
|
+ )
|
|
|
+
|
|
|
+ else:
|
|
|
+ read_list = [
|
|
|
+ (i.get("read_cnt") or self.INT_ZERO) for i in head_articles
|
|
|
+ ]
|
|
|
+ read_median = self.tool.get_median(read_list)
|
|
|
+ remark = f"{execute_time}--计算阅读头条阅读中位数"
|
|
|
+ await self.mapper.update_account_read_median(
|
|
|
+ gh_id=gh_id,
|
|
|
+ read_median=read_median,
|
|
|
+ head_article_num=head_article_num,
|
|
|
+ remark=remark,
|
|
|
+ )
|
|
|
+
|
|
|
+ # 更新账号的解构状态
|
|
|
+ read_median_rate = read_median / fans_level if fans_level else self.ZERO_FLOAT
|
|
|
+ if read_median_rate >= self.READ_MEDIAN_RATE_THRESHOLD:
|
|
|
+ await self.mapper.update_account_extract_status(gh_id, self.NEED_EXTRACT)
|
|
|
+
|
|
|
+ # 更新文章的阅读中位数倍数
|
|
|
+ for article in head_articles:
|
|
|
+ wx_sn: str = article["wx_sn"]
|
|
|
+ read_cnt = article.get("read_cnt") or self.INT_ZERO
|
|
|
+
|
|
|
+ if read_cnt > self.INT_ZERO:
|
|
|
+ multiplier = read_cnt / read_median if read_median else self.ZERO_FLOAT
|
|
|
+ else:
|
|
|
+ multiplier = self.ZERO_FLOAT
|
|
|
+ article_remark = f"{execute_time}--更新文章阅读中位数倍数"
|
|
|
+
|
|
|
+ await self.mapper.set_read_median_multiplier(
|
|
|
+ wx_sn=wx_sn, multiplier=multiplier, remark=article_remark
|
|
|
+ )
|
|
|
+
|
|
|
+ # 入口函数
|
|
|
+ async def deal(self, task_name):
|
|
|
+ match task_name:
|
|
|
+ case "crawl_articles":
|
|
|
+ account_list = await self.mapper.fetch_monitor_accounts()
|
|
|
+ for account in tqdm(account_list):
|
|
|
+ print(f"开始处理账号:{account['account_name']}")
|
|
|
+ try:
|
|
|
+ await self.crawl_single_account(account)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"获取账号文章失败--{account['account_name']}--{e}")
|
|
|
+ print(traceback.format_exc())
|
|
|
+
|
|
|
+ case "get_detail":
|
|
|
+ article_list = await self.mapper.get_head_article_list()
|
|
|
+ for article in tqdm(article_list, desc="处理文章详情"):
|
|
|
+ try:
|
|
|
+ await self.set_article_detail(article)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"获取文章详情失败-{article['article_link']}-{e}")
|
|
|
+
|
|
|
+ case "cal_read_median":
|
|
|
+ account_list = await self.mapper.fetch_monitor_accounts()
|
|
|
+ for account in tqdm(account_list):
|
|
|
+ print(f"开始处理账号:{account['account_name']}")
|
|
|
+ try:
|
|
|
+ await self.calculate_read_median(account)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"计算阅读中位数失败--{account['account_name']}--{e}")
|
|
|
+ print(traceback.format_exc())
|