123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- """
- @author: luojunhui
- """
- import json
- import time
- import asyncio
- from applications.log import logging
- from .history_task import historyContentIdTask
- class NewHistoryTask(historyContentIdTask):
- """
- 新的历史任务
- """
- async def get_tasks_new(self):
- """
- 获取任务
- :return:
- """
- select_sql1 = f"""
- SELECT
- ART.trace_id,
- ART.content_id,
- ART.flow_pool_level,
- ART.gh_id,
- ART.process_times,
- ART.publish_flag
- FROM {self.article_match_video_table} ART
- JOIN (
- select content_id, count(1) as cnt
- from {self.article_crawler_video_table}
- where download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
- group by content_id
- ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
- WHERE ART.content_status = {self.const.TASK_INIT_STATUS} and ART.process_times <= {self.const.TASK_MAX_PROCESS_TIMES}
- AND ART.publish_flag = {self.const.DO_NOT_NEED_PUBLISH}
- ORDER BY ART.flow_pool_level, ART.request_timestamp
- LIMIT {self.history_coroutines};
- """
- tasks = await self.mysql_client.async_select(sql=select_sql1)
- task_obj_list = [
- {
- "trace_id": item[0],
- "content_id": item[1],
- "flow_pool_level": item[2],
- "gh_id": item[3],
- "process_times": item[4],
- "publish_flag": item[5]
- } for item in tasks
- ]
- return task_obj_list
- async def deal_new(self):
- """
- 处理publish_flag=2的任务
- """
- task_list = await self.get_tasks_new()
- logging(
- code="newHistory1001",
- info="History content_task Task Got {} this time".format(len(task_list)),
- function="History Contents Task"
- )
- if task_list:
- a = time.time()
- tasks = [self.process_task(params) for params in task_list]
- await asyncio.gather(*tasks)
- b = time.time()
- print("{} s 内处理了{}个任务".format(b - a, len(task_list)))
- else:
- print("暂时未获得历史已存在文章")
|