| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- import json
- import traceback
- from datetime import datetime, timedelta
- from tqdm.asyncio import tqdm
- from applications.crawler.wechat import get_article_detail
- from applications.pipeline import insert_into_mini_program_detail_pool
- from applications.utils import (
- get_beijing_date,
- handle_spider_exception,
- transform_to_beijing_date,
- extract_root_source_id,
- )
- class Const:
- ARTICLE_SUCCESS_CODE = 0
- # 记录默认状态
- DEFAULT_STATUS = 0
- # 请求接口失败状态
- REQUEST_FAIL_STATUS = -1
- # 文章被删除状态
- DELETE_STATUS = -2
- # 未知原因无信息返回状态
- UNKNOWN_STATUS = -3
- # 文章违规状态
- ILLEGAL_STATUS = -4
- class RecycleRootSourceIdDetail(Const):
- def __init__(self, pool, log_client, trace_id, run_date):
- self.pool = pool
- self.log_client = log_client
- self.trace_id = trace_id
- self.run_date = run_date
- if not self.run_date:
- self.run_date = get_beijing_date()
- async def get_publish_articles_last_day(self):
- """获取前一天的所有发文"""
- query = """
- SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
- FROM official_articles_v2
- WHERE FROM_UNIXTIME(publish_timestamp)
- BETWEEN DATE_SUB(%s, INTERVAL 1 DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND);
- """
- article_list = await self.pool.async_fetch(
- query=query,
- db_name="piaoquan_crawler",
- params=(self.run_date, self.run_date),
- )
- return article_list
- async def get_mini_program_info_by_root_source_id(self, root_source_id_list):
- query = """
- select video_id, root_source_id
- from long_articles_root_source_id
- where root_source_id in %s;
- """
- return await self.pool.async_fetch(
- query=query, params=(tuple(root_source_id_list),)
- )
- async def get_article_mini_program_detail(self, url, root_source_id_list):
- if not root_source_id_list:
- try:
- article_detail = await get_article_detail(url)
- response_code = article_detail["code"]
- if response_code == self.ARTICLE_SUCCESS_CODE:
- mini_info = article_detail["data"]["data"]["mini_program"]
- return mini_info
- else:
- return []
- except Exception as e:
- await handle_spider_exception(
- log_client=self.log_client,
- error=e,
- traceback=traceback.format_exc(),
- trace_id=self.trace_id,
- task_name=self.__class__.__name__,
- )
- return []
- else:
- mini_program_info = await self.get_mini_program_info_by_root_source_id(
- root_source_id_list
- )
- if mini_program_info:
- return [
- {
- "app_id": "wx89e7eb06478361d7",
- "avatar": "https://rescdn.yishihui.com/0temp/logo.png",
- "image_url": "",
- "nike_name": "票圈 l 3亿人喜欢的视频平台",
- "root_source_id": item["root_source_id"],
- "video_id": item["video_id"],
- "service_type": "0",
- "title": "",
- "type": "card",
- }
- for item in mini_program_info
- ]
- else:
- return []
- async def record_single_article(self, article):
- url = article["ContentUrl"]
- wx_sn = article["wx_sn"].decode("utf-8")
- publish_timestamp = article["publish_timestamp"]
- root_source_id_list = (
- json.loads(article["root_source_id_list"])
- if article["root_source_id_list"]
- else []
- )
- # get article mini program info
- article_mini_program_detail = await self.get_article_mini_program_detail(
- url, root_source_id_list
- )
- if not article_mini_program_detail:
- return {}
- else:
- try:
- publish_date = transform_to_beijing_date(publish_timestamp)
- # generate T+0, T+1, T+2 date string
- recall_dt_str_list = [
- (publish_date + timedelta(days=i)).strftime("%Y-%m-%d")
- for i in range(3)
- ]
- for date_str in recall_dt_str_list:
- for video_index, mini_item in enumerate(
- article_mini_program_detail, 1
- ):
- image_url = mini_item["image_url"]
- nick_name = mini_item["nike_name"]
- # extract video id and root_source_id
- if mini_item.get("root_source_id") and mini_item.get(
- "video_id"
- ):
- root_source_id = mini_item["root_source_id"]
- video_id = mini_item["video_id"]
- else:
- id_info = extract_root_source_id(mini_item["path"])
- root_source_id = id_info["root_source_id"]
- video_id = id_info["video_id"]
- kimi_title = mini_item["title"]
- await insert_into_mini_program_detail_pool(
- self.pool,
- raw={
- "wx_sn": wx_sn,
- "mini_title": kimi_title,
- "root_source_id": root_source_id,
- "video_id": video_id,
- "mini_name": nick_name,
- "cover_url": image_url,
- "publish_dt": publish_date.strftime("%Y-%m-%d"),
- "recall_dt": date_str,
- "video_index": video_index,
- },
- )
- return {}
- except Exception as e:
- print(e)
- error_msg = traceback.format_exc()
- return article
- async def deal(self):
- """deal function"""
- # step 1, record articles to detail table
- publish_articles_list = await self.get_publish_articles_last_day()
- for article in tqdm(publish_articles_list, desc="更新文章"):
- await self.record_single_article(article)
- # step2, update root_source_id detail info
|