123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- import json
- from datetime import datetime, timedelta
- from tqdm.asyncio import tqdm
- from applications.crawler.wechat import get_article_detail
- from applications.utils import extract_root_source_id
- class MiniProgramConst:
- ARTICLE_SUCCESS_CODE = 0
- # 记录默认状态
- DEFAULT_STATUS = 0
- # 请求接口失败状态
- REQUEST_FAIL_STATUS = -1
- # 文章被删除状态
- DELETE_STATUS = -2
- # 未知原因无信息返回状态
- UNKNOWN_STATUS = -3
- # 文章违规状态
- ILLEGAL_STATUS = -4
- TASK_NAME = "recycle_mini_program_detail_daily"
- ARTICLE_TABLE = "official_articles_v2"
- DETAIL_TABLE = "long_articles_detail_info"
- # 更新文章周期天数
- ARTICLE_RECYCLE_DAYS = 1
- # 更新root_source_id天数
- ROOT_SOURCE_ID_UPDATE_DAYS = 3
- class RecycleMiniProgramDetailBase(MiniProgramConst):
- def __init__(self, pool, log_client, trace_id):
- self.pool = pool
- self.log_client = log_client
- self.trace_id = trace_id
- # 构造小程序信息对象
- @staticmethod
- async def create_mini_info(response: list[dict]) -> list[dict]:
- return [
- {
- "app_id": "wx89e7eb06478361d7",
- "avatar": "https://rescdn.yishihui.com/0temp/logo.png",
- "image_url": "",
- "nike_name": "票圈 l 3亿人喜欢的视频平台",
- "root_source_id": item["root_source_id"],
- "video_id": item["video_id"],
- "service_type": "0",
- "title": "",
- "type": "card",
- }
- for item in response
- ]
- # 获取单个 root_source_id 的首层,裂变详情
- async def fetch_root_source_id_result(self, root_source_id, dt) -> list[dict]:
- query = f"""
- SELECT first_uv, split0, split0_head, split0_recommend, split1, split1_head, split1_recommend, split2, split2_head, split2_recommend
- FROM changwen_data_rootsourceid
- WHERE root_source_id = %s AND dt = %s;
- """
- return await self.pool.async_fetch(query=query, params=(root_source_id, dt))
- # 获取制定天发布的文章
- async def fetch_published_articles(
- self, run_date: str, date_delta: int
- ) -> list[dict]:
- query = f"""
- SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
- FROM official_articles_v2
- WHERE FROM_UNIXTIME(publish_timestamp)
- BETWEEN DATE_SUB(%s, INTERVAL %s DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND);
- """
- return await self.pool.async_fetch(
- query=query,
- db_name="piaoquan_crawler",
- params=(run_date, date_delta, run_date),
- )
- # 从数据库获取root_source_id信息
- async def fetch_root_source_id_from_db(
- self, root_source_id_list: list[str]
- ) -> list[dict[str, str]]:
- query = """
- SELECT video_id, root_source_id FROM long_articles_root_source_id WHERE root_source_id in %s;
- """
- return await self.pool.async_fetch(
- query=query, params=(tuple(root_source_id_list),)
- )
- # 获取制定几天前的root_source_id
- async def fetch_root_source_id_in_last_days(
- self, run_date: str, date_delta: int
- ) -> list[dict[str, str]]:
- query = f"""
- SELECT recall_dt, root_source_id
- FROM {self.DETAIL_TABLE}
- WHERE publish_dt
- BETWEEN DATE_SUB(%s, INTERVAL %s DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND);
- """
- return await self.pool.async_fetch(
- query=query,
- db_name="piaoquan_crawler",
- params=(run_date, date_delta, run_date),
- )
- async def whether_article_record_exist(self, wx_sn, recall_dt, video_id):
- query = f"""
- SELECT * FROM {self.DETAIL_TABLE}
- WHERE wx_sn = %s AND publish_dt = %s AND video_id = %s;
- """
- return await self.pool.async_fetch(query=query, db_name="piaoquan_crawler", params=(wx_sn, recall_dt, video_id))
- # 插入新的root_source_id
- async def insert_each_root_source_id(self, params: tuple) -> None:
- # 记录root_source_id信息
- query = f"""
- INSERT IGNORE INTO {self.DETAIL_TABLE}
- (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- return await self.pool.async_save(
- query=query, db_name="piaoquan_crawler", params=params
- )
- # 更新root_source_id 首层 && 裂变信息
- async def update_each_root_source_id(self, item: dict) -> None:
- recall_dt = item["recall_dt"]
- root_source_id = item["root_source_id"]
- mini_program_detail = await self.fetch_root_source_id_result(
- root_source_id, recall_dt
- )
- if not mini_program_detail:
- return
- mini_program_detail = mini_program_detail[0]
- query = f"""
- UPDATE {self.DETAIL_TABLE}
- SET first_level = %s,
- fission_0 = %s, fission_0_head = %s, fission_0_recommend = %s,
- fission_1 = %s, fission_1_head = %s, fission_1_recommend = %s,
- fission_2 = %s, fission_2_head = %s, fission_2_recommend = %s
- WHERE root_source_id = %s and recall_dt = %s;
- """
- await self.pool.async_save(
- query=query,
- db_name="piaoquan_crawler",
- params=(
- mini_program_detail["first_uv"],
- mini_program_detail["split0"],
- mini_program_detail["split0_head"],
- mini_program_detail["split0_recommend"],
- mini_program_detail["split1"],
- mini_program_detail["split1_head"],
- mini_program_detail["split1_recommend"],
- mini_program_detail["split2"],
- mini_program_detail["split2_head"],
- mini_program_detail["split2_recommend"],
- root_source_id,
- recall_dt,
- ),
- )
- class RecycleMiniProgramDetailTask(RecycleMiniProgramDetailBase):
- # 业务层
- def __init__(self, pool, log_client, trace_id):
- super().__init__(pool, log_client, trace_id)
- async def handle_single_task(self, article):
- """
- record each article into long_articles_detail_info table
- """
- url, publish_timestamp, wx_sn = (
- article["ContentUrl"],
- article["publish_timestamp"],
- article["wx_sn"].decode(),
- )
- root_source_id_list = (
- json.loads(article["root_source_id_list"])
- if article["root_source_id_list"]
- else []
- )
- mini_detail = []
- # 获取小程序信息
- if root_source_id_list:
- root_source_id_detail = await self.fetch_root_source_id_from_db(
- root_source_id_list
- )
- mini_detail = await self.create_mini_info(root_source_id_detail)
- # no root_source_id_list in article
- try:
- article_detail = await get_article_detail(article_link=url)
- response_code = article_detail["code"]
- if response_code == self.ARTICLE_SUCCESS_CODE:
- mini_detail = article_detail["data"]["data"].get("mini_program", [])
- except Exception as e:
- raise Exception("Weixin Detail Spider Error", e)
- # 将root_source_id信息插入mysql表格
- if not mini_detail:
- return
- publish_date = datetime.fromtimestamp(publish_timestamp)
- publish_dt = publish_date.strftime("%Y-%m-%d")
- # 记录T+0, T+1, T+2 三天的数据
- recall_dt_list = [
- (publish_date + timedelta(days=i)).strftime("%Y-%m-%d") for i in range(3)
- ]
- for dt in recall_dt_list:
- for video_index, mini_item in enumerate(mini_detail, start=1):
- image_url = mini_item["image_url"]
- nickname = mini_item["nike_name"]
- if mini_item.get("root_source_id") and mini_item.get("video_id"):
- root_source_id, video_id = (
- mini_item["root_source_id"],
- mini_item["video_id"],
- )
- else:
- id_info = extract_root_source_id(mini_item["path"])
- root_source_id, video_id = (
- id_info["root_source_id"],
- id_info["video_id"],
- )
- if self.whether_article_record_exist(wx_sn=wx_sn, video_id=video_id, recall_dt=dt):
- continue
- kimi_title = mini_item["title"]
- await self.insert_each_root_source_id(
- params=(
- wx_sn,
- kimi_title,
- nickname,
- image_url,
- video_index,
- root_source_id,
- video_id,
- publish_dt,
- dt,
- )
- )
- # 记录每日发文 && root_source_id 信息
- async def record_published_articles_job(self, date_delta: int, run_date: str):
- # get tasks
- tasks = await self.fetch_published_articles(run_date, date_delta)
- for task in tqdm(tasks):
- await self.handle_single_task(task)
- # 更新root_source_id的首层,裂变信息
- async def update_mini_program_detail_job(self, date_delta: int, run_date: str):
- mini_info_list = await self.fetch_root_source_id_in_last_days(
- run_date, date_delta
- )
- fail_cnt = 0
- for mini_item in tqdm(
- mini_info_list, total=len(mini_info_list), desc="update_each_root_source_id"
- ):
- try:
- await self.update_each_root_source_id(mini_item)
- except Exception as e:
- fail_cnt += 1
- print(e)
- if fail_cnt:
- # add bot
- raise Exception(f"Fail Count {fail_cnt}")
- # 业务入口
- async def deal(self, params: dict):
- # 解析请求参数
- run_date = params.get("run_date")
- if not run_date:
- run_date = datetime.today().strftime("%Y-%m-%d")
- record_date_delta = params.get("record_date_delta", self.ARTICLE_RECYCLE_DAYS)
- update_date_delta = params.get(
- "update_date_delta", self.ROOT_SOURCE_ID_UPDATE_DAYS
- )
- # 业务执行
- await self.record_published_articles_job(
- date_delta=record_date_delta, run_date=run_date
- )
- await self.update_mini_program_detail_job(
- date_delta=update_date_delta, run_date=run_date
- )
|