123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318 |
- """
- @author: luojunhui
- """
- import traceback
- from typing import Dict, List
- from aiomysql.cursors import DictCursor
- from applications.log import logging
- from applications.config import Config
- from applications.const import rematch_task_const
- from applications.etl_function import download_cover
- from applications.etl_function import download_video
- from applications.etl_function import generate_video_path
- from applications.etl_function import upload_to_oss
- from applications.spider import search_videos_from_web
- from .utils import get_kimi_result
- class ReMatchTask(object):
- """
- 重新匹配任务
- """
- def __init__(self, db_client):
- self.db_client = db_client
- self.config = Config()
- self.article_match_video_table = self.config.article_match_video_table
- self.article_text_table = self.config.article_text_table
- self.article_crawler_video_table = self.config.article_crawler_video_table
- async def get_tasks(self) -> List[Dict]:
- """
- get task needs to rematch
- """
- select_sql = f"""
- SELECT DISTINCT content_id
- FROM article_re_match_record
- WHERE status = {rematch_task_const.REMATCH_INIT_STATUS};
- """
- result = await self.db_client.async_select(select_sql, cursor_type=DictCursor)
- logging(
- code="rematch_1001",
- function="get_tasks",
- info="获取content_id数量: {}".format(len(result))
- )
- return result
- async def async_download_video_list(self, to_download_videos: List[Dict], illegal_platform_id_list: List[str]) -> int:
- """
- 异步下载视频
- """
- success_count = 0
- for video in to_download_videos:
- # check whether video is illegal
- out_key = "{}_{}".format(video['platform'], video['out_video_id'])
- if out_key in illegal_platform_id_list:
- continue
- # start download
- try:
- local_video_path, local_cover_path = generate_video_path(video['platform'], video['out_video_id'])
- # download videos
- file_path = await download_video(
- file_path=local_video_path,
- platform=video['platform'],
- video_url=video['video_url']
- )
- if not file_path:
- # 说明视频下载失败,无需上传该视频, 将该条记录设置为失败状态
- update_sql = f"""
- UPDATE {self.article_crawler_video_table}
- SET download_status = %s
- WHERE id = %s;
- """
- await self.db_client.async_insert(
- sql=update_sql,
- params=(rematch_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, video['id'])
- )
- else:
- # download cover
- cover_path = await download_cover(
- file_path=local_cover_path,
- platform=video['platform'],
- cover_url=video['cover_url']
- )
- # upload video to oss
- oss_video = await upload_to_oss(
- local_video_path=file_path,
- download_type="video"
- )
- # upload cover to oss
- if cover_path:
- oss_cover = await upload_to_oss(
- local_video_path=cover_path,
- download_type="image"
- )
- else:
- oss_cover = None
- # change status to success
- update_sql = f"""
- UPDATE {self.article_crawler_video_table}
- SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
- WHERE id = %s;
- """
- await self.db_client.async_insert(
- sql=update_sql,
- params=(
- oss_video,
- oss_cover,
- rematch_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS,
- video['id']
- )
- )
- success_count += 1
- # 如果下载的视频数已经大于3, 则直接退出循环,修改状态为ETL成功状态
- if success_count > rematch_task_const.MIN_MATCH_VIDEO_NUM:
- return success_count
- except Exception as e:
- update_sql = f"""
- UPDATE {self.article_crawler_video_table}
- SET download_status = %s
- WHERE id = %s;
- """
- await self.db_client.async_insert(
- sql=update_sql,
- params=(rematch_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, video['id'])
- )
- return success_count
- async def download_upload_task(self, content_id: str, illegal_platform_id_list: List[str], legal_not_downloaded_videos=None) -> int:
- """
- 下载任务
- """
- success_count = 0
- if legal_not_downloaded_videos:
- # 下载legal_not_download_videos, 记录下载成功数量
- new_download_count = await self.async_download_video_list(
- to_download_videos=legal_not_downloaded_videos,
- illegal_platform_id_list=illegal_platform_id_list
- )
- success_count += new_download_count
- return success_count
- select_sql = f"""
- SELECT id, out_video_id, platform, video_title, video_url, cover_url
- FROM {self.article_crawler_video_table}
- WHERE content_id = {content_id} AND download_status = {rematch_task_const.VIDEO_DOWNLOAD_INIT_STATUS};
- """
- to_download_videos = await self.db_client.async_select(select_sql, cursor_type=DictCursor)
- new_download_count = await self.async_download_video_list(
- to_download_videos=to_download_videos,
- illegal_platform_id_list=illegal_platform_id_list
- )
- success_count += new_download_count
- return success_count
- async def spider_task(self, content_id: str) -> int:
- """
- 爬虫抓取任务
- """
- kimi_result = await get_kimi_result(
- content_id=content_id,
- article_text_table=self.article_text_table,
- db_client=self.db_client
- )
- # 该搜索任务是content_id粒度,因此将trace_id 传为content_id
- search_videos_count = await search_videos_from_web(
- info={
- "ori_title": kimi_result['ori_title'],
- "kimi_summary": kimi_result['kimi_summary'],
- "kimi_keys": kimi_result['kimi_keys'],
- "trace_id": content_id,
- "gh_id": rematch_task_const.TASK_DEFAULT_GH_ID,
- "content_id": content_id,
- "crawler_video_table": self.article_crawler_video_table
- },
- gh_id_map={},
- db_client=self.db_client
- )
- return search_videos_count
- async def update_each_content_id(self, content_id: str) -> None:
- """
- 更新每一个content_id的视频信息
- """
- select_sql = f"""
- SELECT
- id, platform, out_video_id, video_oss_path, download_status, is_illegal, video_url, cover_url
- FROM
- {self.article_crawler_video_table}
- WHERE content_id = '{content_id}';
- """
- record_list = await self.db_client.async_select(select_sql, cursor_type=DictCursor)
- # 获取违规视频,违规视频out_id && platform
- illegal_videos = [record for record in record_list if record["is_illegal"] == rematch_task_const.VIDEO_UNSAFE]
- illegal_platform_id_list = [
- "{}_{}".format(record['platform'], record['out_video_id']) for record in illegal_videos
- ]
- # 处理非违规字段
- legal_downloaded_videos = []
- legal_not_downloaded_videos = []
- for record in record_list:
- if record['is_illegal'] == rematch_task_const.VIDEO_UNSAFE:
- continue
- else:
- out_key = "{}_{}".format(record['platform'], record['out_video_id'])
- if out_key in illegal_platform_id_list:
- continue
- if record['download_status'] == rematch_task_const.VIDEO_DOWNLOAD_INIT_STATUS:
- legal_not_downloaded_videos.append(record)
- elif record['download_status'] == rematch_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS:
- legal_downloaded_videos.append(record)
- else:
- continue
- if len(legal_downloaded_videos) >= rematch_task_const.MIN_MATCH_VIDEO_NUM:
- logging(
- code="rematch_1002",
- info="存在{}条以上待合规已下载内容,无需处理该content_id".format(rematch_task_const.MIN_MATCH_VIDEO_NUM),
- data={
- "content_id": content_id
- }
- )
- return
- elif len(legal_not_downloaded_videos + legal_downloaded_videos) >= rematch_task_const.MIN_MATCH_VIDEO_NUM:
- logging(
- code="rematch_1003",
- info="存在{}条以上待合规内容".format(rematch_task_const.MIN_MATCH_VIDEO_NUM),
- data={
- "content_id": content_id,
- "待下载数量": len(legal_not_downloaded_videos),
- "已下载数量": len(legal_downloaded_videos)
- }
- )
- await self.download_upload_task(
- content_id=content_id,
- illegal_platform_id_list=illegal_platform_id_list,
- legal_not_downloaded_videos=legal_not_downloaded_videos
- )
- else:
- logging(
- code="rematch_1004",
- info="重新执行爬虫任务",
- data={
- "content_id": content_id,
- }
- )
- await self.spider_task(content_id=content_id)
- await self.download_upload_task(
- content_id=content_id,
- illegal_platform_id_list=illegal_platform_id_list
- )
- async def check_whether_get_enough_videos(self, content_id: str) -> bool:
- """
- check 该content_id是否存在足够的视频
- """
- select_sql = f"""
- SELECT count(1)
- FROM {self.article_crawler_video_table}
- WHERE content_id = '{content_id}'
- AND download_status = {rematch_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
- AND is_illegal = {rematch_task_const.VIDEO_SAFE};
- """
- count_tuple = await self.db_client.async_select(select_sql)
- count = count_tuple[0][0]
- if count >= rematch_task_const.MIN_MATCH_VIDEO_NUM:
- return True
- else:
- return False
- async def update_content_id_status(self, content_id: str) -> int:
- """
- 更新content_id的status为成功
- """
- update_sql = f"""
- UPDATE article_re_match_record
- SET status = %s
- WHERE content_id = '{content_id}';
- """
- affected_rows = await self.db_client.async_insert(
- update_sql,
- params=(rematch_task_const.REMATCH_SUCCESS_STATUS,)
- )
- return affected_rows
- async def deal(self):
- """
- do job here
- """
- task_list = await self.get_tasks()
- for task in task_list:
- content_id = task['content_id']
- try:
- await self.update_each_content_id(content_id=content_id)
- if await self.check_whether_get_enough_videos(content_id=content_id):
- # 修改状态为1
- await self.update_content_id_status(content_id=content_id)
- else:
- continue
- except Exception as e:
- error_stack = traceback.format_exc()
- logging(
- code="rematch_1010",
- function="update_each_content_id",
- data={
- "error_stack": error_stack,
- "error": str(e),
- "content_id": content_id
- }
- )
|