""" @author: luojunhui """ import json import time import asyncio from applications.log import logging from .history_task import historyContentIdTask class NewHistoryTask(historyContentIdTask): """ 新的历史任务 """ async def get_tasks_new(self): """ 获取任务 :return: """ select_sql1 = f""" SELECT ART.trace_id, ART.content_id, ART.flow_pool_level, ART.gh_id, ART.process_times, ART.publish_flag FROM {self.article_match_video_table} ART JOIN ( select content_id, count(1) as cnt from {self.article_crawler_video_table} where download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS} group by content_id ) VID on ART.content_id = VID.content_id and VID.cnt >= 3 WHERE ART.content_status = {self.const.TASK_INIT_STATUS} and ART.process_times <= {self.const.TASK_MAX_PROCESS_TIMES} AND ART.publish_flag = {self.const.DO_NOT_NEED_PUBLISH} ORDER BY ART.flow_pool_level, ART.request_timestamp LIMIT {self.history_coroutines}; """ tasks = await self.mysql_client.async_select(sql=select_sql1) task_obj_list = [ { "trace_id": item[0], "content_id": item[1], "flow_pool_level": item[2], "gh_id": item[3], "process_times": item[4], "publish_flag": item[5] } for item in tasks ] return task_obj_list async def deal_new(self): """ 处理publish_flag=2的任务 """ task_list = await self.get_tasks_new() logging( code="newHistory1001", info="History content_task Task Got {} this time".format(len(task_list)), function="History Contents Task" ) if task_list: a = time.time() tasks = [self.process_task(params) for params in task_list] await asyncio.gather(*tasks) b = time.time() print("{} s 内处理了{}个任务".format(b - a, len(task_list))) else: print("暂时未获得历史已存在文章")