123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- """
- @author: luojunhui
- """
- import json
- import time
- import asyncio
- from applications.config import Config
- from applications.functions.log import logging
- from applications.functions.pqFunctions import publishToPQ
- class historyContentIdTask(object):
- """
- 处理已经匹配过小程序的文章
- """
- def __init__(self, mysql_client):
- """
- :param mysql_client:
- """
- self.mysql_client = mysql_client
- self.article_text = Config().articleText
- self.article_video = Config().articleVideos
- self.article_crawler_video = Config().articleCrawlerVideos
- self.history_coroutines = Config().getConfigValue("historyArticleCoroutines")
- async def getTaskList(self):
- """
- 获取任务
- :return:
- """
- select_sql1 = f"""
- SELECT
- ART.trace_id,
- ART.content_id,
- ART.flow_pool_level,
- ART.gh_id,
- ART.process_times
- FROM {self.article_video} ART
- JOIN (
- select content_id, count(1) as cnt
- from {self.article_crawler_video}
- where download_status = 2
- group by content_id
- ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
- WHERE ART.content_status = 0 and ART.process_times <= 3
- ORDER BY request_timestamp
- LIMIT {self.history_coroutines};
- """
- tasks = await self.mysql_client.asyncSelect(sql=select_sql1)
- task_obj_list = [
- {
- "trace_id": item[0],
- "content_id": item[1],
- "flow_pool_level": item[2],
- "gh_id": item[3],
- "process_times": item[4]
- } for item in tasks
- ]
- logging(
- code="9001",
- info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
- data=task_obj_list
- )
- return task_obj_list
- async def getVideoList(self, content_id):
- """
- content_id
- :return:
- """
- sql = f"""
- SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
- FROM {self.article_crawler_video}
- WHERE content_id = '{content_id}' and download_status = 2;
- """
- res_tuple = await self.mysql_client.asyncSelect(sql)
- if len(res_tuple) >= 3:
- return [
- {
- "platform": i[0],
- "play_count": i[1],
- "like_count": i[2],
- "video_oss_path": i[3],
- "cover_oss_path": i[4],
- "uid": i[5]
- }
- for i in res_tuple]
- else:
- return []
- async def getKimiTitle(self, content_id):
- """
- 获取 kimiTitle
- :param content_id:
- :return:
- """
- select_sql = f"""
- select kimi_title from {self.article_text} where content_id = '{content_id}';
- """
- res_tuple = await self.mysql_client.asyncSelect(select_sql)
- if res_tuple:
- return res_tuple[0][0]
- else:
- return False
- async def publishVideosToPq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos, process_times):
- """
- 发布至 pq
- :param process_times:
- :param trace_id:
- :param download_videos: 已下载的视频---> list [{}, {}, {}.... ]
- :param gh_id: 公众号 id ---> str
- :param kimi_title: kimi 标题 ---> str
- :param flow_pool_level: 流量池层级 ---> str
- :return:
- """
- video_list = download_videos[:3]
- match flow_pool_level:
- case "autoArticlePoolLevel4":
- print("冷启层")
- video_list = []
- case "autoArticlePoolLevel3":
- print("暂时未知层")
- video_list = []
- case "autoArticlePoolLevel2":
- print("次条层")
- video_list = []
- case "autoArticlePoolLevel1":
- print("头条层")
- video_list = []
- L = []
- for video_obj in video_list:
- params = {
- "videoPath": video_obj['video_oss_path'],
- "uid": video_obj['uid'],
- "title": kimi_title
- }
- response = await publishToPQ(params)
- # time.sleep(2)
- obj = {
- "uid": video_obj['uid'],
- "source": video_obj['platform'],
- "kimiTitle": kimi_title,
- "videoId": response['data']['id'],
- "videoCover": response['data']['shareImgPath'],
- "videoPath": response['data']['videoPath'],
- "videoOss": video_obj['video_oss_path'].split("/")[-1]
- }
- L.append(obj)
- update_sql = f"""
- UPDATE {self.article_video}
- SET content_status = %s, response = %s, process_times = %s
- WHERE trace_id = %s;
- """
- await self.mysql_client.asyncInsert(
- sql=update_sql,
- params=(2, json.dumps(L, ensure_ascii=False), process_times + 1, trace_id)
- )
- async def processTask(self, params):
- """
- 异步执行
- :param params:
- :return:
- """
- content_id = params['content_id']
- trace_id = params['trace_id']
- flow_pool_level = params['flow_pool_level'],
- gh_id = params['gh_id']
- process_times = params['process_times']
- # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
- download_videos = await self.getVideoList(content_id=content_id)
- if download_videos:
- # 把状态修改为 4
- update_sql = f"""
- UPDATE {self.article_video}
- SET content_status = %s
- WHERE trace_id = %s;
- """
- await self.mysql_client.asyncInsert(
- sql=update_sql,
- params=(4, trace_id)
- )
- kimi_title = await self.getKimiTitle(content_id)
- if kimi_title:
- await self.publishVideosToPq(
- flow_pool_level=flow_pool_level,
- kimi_title=kimi_title,
- gh_id=gh_id,
- trace_id=trace_id,
- download_videos=download_videos,
- process_times=process_times
- )
- else:
- print("Kimi title 生成失败---后续加报警")
- else:
- pass
- async def deal(self):
- """
- 处理
- :return:
- """
- task_list = await self.getTaskList()
- if task_list:
- tasks = [self.processTask(params) for params in task_list]
- await asyncio.gather(*tasks)
- else:
- logging(
- code="9008",
- info="没有要处理的请求"
- )
|