123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- """
- @author: luojunhui
- """
- import time
- import asyncio
- import datetime
- from applications.db import AsyncMySQLClient
- from applications.spider import search_videos_from_web
- from tasks.utils.kimi_task import get_kimi_result
- from tasks.utils.etl_task import async_download_videos
- class ContentRematch:
- """
- 内容重新匹配
- """
- def __init__(self, db_client):
- self.db_client = db_client
- async def get_content_list(self):
- """
- 获取待匹配的content_id列表
- """
- select_sql = f"""
- SELECT content_id, count(1)
- FROM long_articles_crawler_videos
- WHERE download_status = 2
- GROUP BY content_id
- HAVING count(1) < 3
- LIMIT 10;
- """
- content_list = await self.db_client.async_select(select_sql)
- return content_list
- async def get_illegal_out_ids(self, content_id: str) -> list:
- """
- 获取违规的外站视频id
- """
- select_sql = f"""
- SELECT platform, out_video_id
- FROM long_articles_crawler_videos
- WHERE content_id = '{content_id}' and is_illegal = 1;
- """
- response = await self.db_client.async_select(select_sql)
- if response:
- result = ["{}_{}".format(line[0], line[1]) for line in response]
- return result
- else:
- return []
- async def spider_task(self, params, kimi_result):
- """
- 爬虫任务
- :return:
- """
- content_id = params['content_id']
- gh_id = "test_gh_id"
- trace_id = "rematch_{}".format(content_id)
- try:
- search_videos_count = await search_videos_from_web(
- info={
- "ori_title": kimi_result['ori_title'],
- "kimi_summary": kimi_result['kimi_summary'],
- "kimi_keys": kimi_result['kimi_keys'],
- "trace_id": trace_id,
- "gh_id": gh_id,
- "content_id": content_id,
- "crawler_video_table": "long_articles_crawler_videos",
- },
- gh_id_map={},
- db_client=self.db_client
- )
- if search_videos_count >= 3:
- return True
- else:
- return False
- except Exception as e:
- return False
- async def etl_task(self, content_id, trace_id):
- """
- etl任务
- """
- # download videos
- illegal_videos = await self.get_illegal_out_ids(content_id)
- downloaded_count = await async_download_videos(
- trace_id=trace_id,
- content_id=content_id,
- article_crawler_video_table="long_articles_crawler_videos",
- db_client=self.db_client,
- illegal_videos=illegal_videos
- )
- if downloaded_count >= 3:
- return True
- else:
- return False
- async def process_each_content(self, content):
- """
- 处理每个content_id
- """
- content_id = content[0]
- kimi_result = await get_kimi_result(
- content_id=content_id,
- db_client=self.db_client,
- article_text_table="long_articles_text",
- )
- if not kimi_result:
- return
- spider_flag = await self.spider_task(
- params={
- "content_id": content_id,
- },
- kimi_result=kimi_result
- )
- if not spider_flag:
- return
- etl_flag = await self.etl_task(
- content_id=content_id,
- trace_id="rematch_{}".format(content_id)
- )
- if not etl_flag:
- return
- return True
- async def deal(self):
- """
- 处理每个content_id
- """
- content_list = await self.get_content_list()
- for content in content_list:
- print(content)
- await self.process_each_content(content)
- async def main_job():
- """
- main job
- :return:
- """
- async with AsyncMySQLClient() as long_articles_pool:
- _task = ContentRematch(long_articles_pool)
- await _task.deal()
- if __name__ == '__main__':
- while True:
- asyncio.run(main_job())
- now_str = datetime.datetime.now().__str__()
- print("{} 请求执行完成, 等待60s".format(now_str))
- time.sleep(60)
|