| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- import json
- import traceback
- from datetime import datetime, timedelta
- from tqdm.asyncio import tqdm
- from applications.crawler.wechat import get_article_detail
- from applications.utils import get_beijing_date, handle_spider_exception, transform_to_beijing_date, extract_root_source_id
- class Const:
- ARTICLE_SUCCESS_CODE = 0
- # 记录默认状态
- DEFAULT_STATUS = 0
- # 请求接口失败状态
- REQUEST_FAIL_STATUS = -1
- # 文章被删除状态
- DELETE_STATUS = -2
- # 未知原因无信息返回状态
- UNKNOWN_STATUS = -3
- # 文章违规状态
- ILLEGAL_STATUS = -4
- class RecycleRootSourceIdDetail(Const):
- def __init__(self, pool, log_client, trace_id, run_date):
- self.pool = pool
- self.log_client = log_client
- self.trace_id = trace_id
- self.run_date = run_date
- if not self.run_date:
- self.run_date = get_beijing_date()
- async def get_publish_articles_last_day(self):
- """获取前一天的所有发文"""
- query = """
- SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
- FROM official_articles_v2
- WHERE FROM_UNIXTIME(publish_timestamp)
- BETWEEN DATE_SUB(%s, INTERVAL 1 DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND);
- """
- article_list = await self.pool.async_fetch(
- query=query,
- db_name="piaoquan_crawler",
- params=(self.run_date, self.run_date),
- )
- return article_list
- async def get_mini_program_info_by_root_source_id(self, root_source_id_list):
- query = """
- select video_id, root_source_id
- from long_articles_root_source_id
- where root_source_id in %s;
- """
- return await self.pool.async_fetch(query=query, params=(tuple(root_source_id_list),))
- async def get_article_mini_program_detail(self, url, root_source_id_list):
- if not root_source_id_list:
- try:
- article_detail = await get_article_detail(url)
- response_code = article_detail["code"]
- if response_code == self.ARTICLE_SUCCESS_CODE:
- mini_info = article_detail["data"]["data"]["mini_program"]
- return mini_info
- else:
- return []
- except Exception as e:
- await handle_spider_exception(
- log_client=self.log_client,
- error=e,
- traceback=traceback.format_exc(),
- trace_id=self.trace_id,
- task_name=self.__class__.__name__
- )
- return []
- else:
- mini_program_info = await self.get_mini_program_info_by_root_source_id(root_source_id_list)
- if mini_program_info:
- return [
- {
- "app_id": "wx89e7eb06478361d7",
- "avatar": "https://rescdn.yishihui.com/0temp/logo.png",
- "image_url": "",
- "nike_name": "票圈 l 3亿人喜欢的视频平台",
- "root_source_id": item["root_source_id"],
- "video_id": item["video_id"],
- "service_type": "0",
- "title": "",
- "type": "card",
- } for item in mini_program_info
- ]
- else:
- return []
- async def record_single_article(self, article):
- url = article["ContentUrl"]
- wx_sn = article["wx_sn"].decode("utf-8")
- publish_timestamp = article["publish_timestamp"]
- root_source_id_list = json.loads(article["root_source_id_list"]) if article["root_source_id_list"] else []
- # get article mini program info
- article_mini_program_detail = await self.get_article_mini_program_detail(url, root_source_id_list)
- if not article_mini_program_detail:
- return {}
- else:
- try:
- publish_date = transform_to_beijing_date(publish_timestamp)
- # generate T+0, T+1, T+2 date string
- recall_dt_str_list = [
- (publish_date + timedelta(days=i)).strftime("%Y-%m-%d")
- for i in range(3)
- ]
- for date_str in recall_dt_str_list:
- for video_index, mini_item in enumerate(
- article_mini_program_detail, 1
- ):
- image_url = mini_item["image_url"]
- nick_name = mini_item["nike_name"]
- # extract video id and root_source_id
- if mini_item.get("root_source_id") and mini_item.get(
- "video_id"
- ):
- root_source_id = mini_item["root_source_id"]
- video_id = mini_item["video_id"]
- else:
- id_info = extract_root_source_id(mini_item["path"])
- root_source_id = id_info["root_source_id"]
- video_id = id_info["video_id"]
- kimi_title = mini_item["title"]
- # self.insert_each_root_source_id(
- # wx_sn=wx_sn,
- # mini_title=kimi_title,
- # mini_name=nick_name,
- # cover_url=image_url,
- # video_index=video_index,
- # root_source_id=root_source_id,
- # video_id=video_id,
- # publish_dt=publish_date.strftime("%Y-%m-%d"),
- # recall_dt=date_str,
- # )
- return {}
- except Exception as e:
- print(e)
- error_msg = traceback.format_exc()
- return article
- async def deal(self):
- """deal function"""
- # step 1, record articles to detail table
- publish_articles_list = await self.get_publish_articles_last_day()
- for article in tqdm(publish_articles_list, desc="更新文章"):
- await self.record_single_article(article)
- # step2, update root_source_id detail info
|