|  | @@ -1,7 +1,6 @@
 | 
	
		
			
				|  |  |  """
 | 
	
		
			
				|  |  |  @author: luojunhui
 | 
	
		
			
				|  |  |  """
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  import json
 | 
	
		
			
				|  |  |  import time
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -20,7 +19,6 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |      """
 | 
	
		
			
				|  |  |      不存在历史已经发布的文章的匹配流程
 | 
	
		
			
				|  |  |      """
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |      TASK_INIT_STATUS = 0
 | 
	
		
			
				|  |  |      TASK_KIMI_FINISHED_STATUS = 1
 | 
	
		
			
				|  |  |      TASK_SPIDER_FINISHED_STATUS = 2
 | 
	
	
		
			
				|  | @@ -56,23 +54,22 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |                      content_status = {self.TASK_PROCESSING_STATUS} 
 | 
	
		
			
				|  |  |                  and process_times <= {self.TASK_MAX_PROCESS_TIMES}; 
 | 
	
		
			
				|  |  |          """
 | 
	
		
			
				|  |  | -        processing_articles = await self.mysql_client.async_select(
 | 
	
		
			
				|  |  | -            select_processing_sql
 | 
	
		
			
				|  |  | -        )
 | 
	
		
			
				|  |  | +        processing_articles = await self.mysql_client.async_select(select_processing_sql)
 | 
	
		
			
				|  |  |          if processing_articles:
 | 
	
		
			
				|  |  |              processing_list = [
 | 
	
		
			
				|  |  |                  {
 | 
	
		
			
				|  |  |                      "trace_id": item[0],
 | 
	
		
			
				|  |  |                      "content_status_update_time": item[1],
 | 
	
		
			
				|  |  | -                    "process_times": item[2],
 | 
	
		
			
				|  |  | +                    "process_times": item[2]
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  |                  for item in processing_articles
 | 
	
		
			
				|  |  |              ]
 | 
	
		
			
				|  |  |              for obj in processing_list:
 | 
	
		
			
				|  |  | -                if int(time.time()) - obj["content_status_update_time"] >= 3600:
 | 
	
		
			
				|  |  | +                if int(time.time()) - obj['content_status_update_time'] >= 3600:
 | 
	
		
			
				|  |  |                      # 认为该任务失败
 | 
	
		
			
				|  |  |                      await self.roll_back_content_status_when_fails(
 | 
	
		
			
				|  |  | -                        process_times=obj["process_times"] + 1, trace_id=obj["trace_id"]
 | 
	
		
			
				|  |  | +                        process_times=obj['process_times'] + 1,
 | 
	
		
			
				|  |  | +                        trace_id=obj['trace_id']
 | 
	
		
			
				|  |  |                      )
 | 
	
		
			
				|  |  |          # 将  process_times > 3 且状态不为 4 的任务的状态修改为失败,
 | 
	
		
			
				|  |  |          update_status_sql = f"""
 | 
	
	
		
			
				|  | @@ -88,8 +85,8 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |              params=(
 | 
	
		
			
				|  |  |                  self.TASK_FAIL_STATUS,
 | 
	
		
			
				|  |  |                  self.TASK_MAX_PROCESS_TIMES,
 | 
	
		
			
				|  |  | -                self.TASK_PUBLISHED_STATUS,
 | 
	
		
			
				|  |  | -            ),
 | 
	
		
			
				|  |  | +                self.TASK_PUBLISHED_STATUS
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  |          )
 | 
	
		
			
				|  |  |          # 获取  process_times <= 3 且  content_status = 0 的任务
 | 
	
		
			
				|  |  |          select_sql = f"""
 | 
	
	
		
			
				|  | @@ -110,7 +107,7 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |                      "content_id": i[1],
 | 
	
		
			
				|  |  |                      "flow_pool_level": i[2],
 | 
	
		
			
				|  |  |                      "gh_id": i[3],
 | 
	
		
			
				|  |  | -                    "process_times": i[4],
 | 
	
		
			
				|  |  | +                    "process_times": i[4]
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  |                  for i in tasks
 | 
	
		
			
				|  |  |              ]
 | 
	
	
		
			
				|  | @@ -134,9 +131,7 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |          else:
 | 
	
		
			
				|  |  |              return False
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    async def update_content_status(
 | 
	
		
			
				|  |  | -        self, new_content_status, trace_id, ori_content_status
 | 
	
		
			
				|  |  | -    ):
 | 
	
		
			
				|  |  | +    async def update_content_status(self, new_content_status, trace_id, ori_content_status):
 | 
	
		
			
				|  |  |          """
 | 
	
		
			
				|  |  |          :param new_content_status:
 | 
	
		
			
				|  |  |          :param trace_id:
 | 
	
	
		
			
				|  | @@ -150,7 +145,12 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |                      """
 | 
	
		
			
				|  |  |          row_counts = await self.mysql_client.async_insert(
 | 
	
		
			
				|  |  |              sql=update_sql,
 | 
	
		
			
				|  |  | -            params=(new_content_status, int(time.time()), trace_id, ori_content_status),
 | 
	
		
			
				|  |  | +            params=(
 | 
	
		
			
				|  |  | +                new_content_status,
 | 
	
		
			
				|  |  | +                int(time.time()),
 | 
	
		
			
				|  |  | +                trace_id,
 | 
	
		
			
				|  |  | +                ori_content_status
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  |          )
 | 
	
		
			
				|  |  |          return row_counts
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -176,8 +176,8 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |                  int(time.time()),
 | 
	
		
			
				|  |  |                  process_times + 1,
 | 
	
		
			
				|  |  |                  trace_id,
 | 
	
		
			
				|  |  | -                self.TASK_PROCESSING_STATUS,
 | 
	
		
			
				|  |  | -            ),
 | 
	
		
			
				|  |  | +                self.TASK_PROCESSING_STATUS
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  |          )
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      async def judge_whether_same_content_id_is_processing(self, content_id):
 | 
	
	
		
			
				|  | @@ -203,7 +203,7 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |                      self.TASK_KIMI_FINISHED_STATUS,
 | 
	
		
			
				|  |  |                      self.TASK_SPIDER_FINISHED_STATUS,
 | 
	
		
			
				|  |  |                      self.TASK_ETL_FINISHED_STATUS,
 | 
	
		
			
				|  |  | -                    self.TASK_PROCESSING_STATUS,
 | 
	
		
			
				|  |  | +                    self.TASK_PROCESSING_STATUS
 | 
	
		
			
				|  |  |                  }:
 | 
	
		
			
				|  |  |                      return True
 | 
	
		
			
				|  |  |              return False
 | 
	
	
		
			
				|  | @@ -229,7 +229,7 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |                  "like_count": i[2],
 | 
	
		
			
				|  |  |                  "video_oss_path": i[3],
 | 
	
		
			
				|  |  |                  "cover_oss_path": i[4],
 | 
	
		
			
				|  |  | -                "uid": i[5],
 | 
	
		
			
				|  |  | +                "uid": i[5]
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |              for i in res_tuple
 | 
	
		
			
				|  |  |          ]
 | 
	
	
		
			
				|  | @@ -258,21 +258,21 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |          """
 | 
	
		
			
				|  |  |          KIMI_SUCCESS_STATUS = 1
 | 
	
		
			
				|  |  |          KIMI_FAIL_STATUS = 2
 | 
	
		
			
				|  |  | -        content_id = params["content_id"]
 | 
	
		
			
				|  |  | -        trace_id = params["trace_id"]
 | 
	
		
			
				|  |  | -        process_times = params["process_times"]
 | 
	
		
			
				|  |  | +        content_id = params['content_id']
 | 
	
		
			
				|  |  | +        trace_id = params['trace_id']
 | 
	
		
			
				|  |  | +        process_times = params['process_times']
 | 
	
		
			
				|  |  |          kimi_status_code = await self.get_kimi_status(content_id=content_id)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          if kimi_status_code == KIMI_SUCCESS_STATUS:
 | 
	
		
			
				|  |  |              affected_rows = await self.update_content_status(
 | 
	
		
			
				|  |  |                  new_content_status=self.TASK_KIMI_FINISHED_STATUS,
 | 
	
		
			
				|  |  |                  trace_id=trace_id,
 | 
	
		
			
				|  |  | -                ori_content_status=self.TASK_INIT_STATUS,
 | 
	
		
			
				|  |  | +                ori_content_status=self.TASK_INIT_STATUS
 | 
	
		
			
				|  |  |              )
 | 
	
		
			
				|  |  |              if affected_rows == 0:
 | 
	
		
			
				|  |  |                  logging(
 | 
	
		
			
				|  |  |                      code="6000",
 | 
	
		
			
				|  |  | -                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return",
 | 
	
		
			
				|  |  | +                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
 | 
	
		
			
				|  |  |                  )
 | 
	
		
			
				|  |  |                  return
 | 
	
		
			
				|  |  |              get_kimi_sql = f"""
 | 
	
	
		
			
				|  | @@ -285,21 +285,24 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |                  "kimi_title": kimi_info[0][1],
 | 
	
		
			
				|  |  |                  "ori_title": kimi_info[0][0],
 | 
	
		
			
				|  |  |                  "kimi_summary": kimi_info[0][2],
 | 
	
		
			
				|  |  | -                "kimi_keys": json.loads(kimi_info[0][3]),
 | 
	
		
			
				|  |  | +                "kimi_keys": json.loads(kimi_info[0][3])
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |          elif kimi_status_code == self.ARTICLE_TEXT_TABLE_ERROR:
 | 
	
		
			
				|  |  | -            logging(code="4000", info="long_articles_text表中未找到 content_id")
 | 
	
		
			
				|  |  | +            logging(
 | 
	
		
			
				|  |  | +                code="4000",
 | 
	
		
			
				|  |  | +                info="long_articles_text表中未找到 content_id"
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  |          else:
 | 
	
		
			
				|  |  |              # 开始处理,讲 content_status 从 0  改为  101
 | 
	
		
			
				|  |  |              affected_rows = await self.update_content_status(
 | 
	
		
			
				|  |  |                  new_content_status=self.TASK_PROCESSING_STATUS,
 | 
	
		
			
				|  |  |                  trace_id=trace_id,
 | 
	
		
			
				|  |  | -                ori_content_status=self.TASK_INIT_STATUS,
 | 
	
		
			
				|  |  | +                ori_content_status=self.TASK_INIT_STATUS
 | 
	
		
			
				|  |  |              )
 | 
	
		
			
				|  |  |              if affected_rows == 0:
 | 
	
		
			
				|  |  |                  logging(
 | 
	
		
			
				|  |  |                      code="6000",
 | 
	
		
			
				|  |  | -                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return",
 | 
	
		
			
				|  |  | +                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
 | 
	
		
			
				|  |  |                  )
 | 
	
		
			
				|  |  |                  return
 | 
	
		
			
				|  |  |              K = KimiServer()
 | 
	
	
		
			
				|  | @@ -313,14 +316,12 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |                  article_obj = {
 | 
	
		
			
				|  |  |                      "article_title": res[0][0],
 | 
	
		
			
				|  |  |                      "article_text": res[0][1],
 | 
	
		
			
				|  |  | -                    "content_id": content_id,
 | 
	
		
			
				|  |  | +                    "content_id": content_id
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  |                  kimi_info = await K.search_kimi_schedule(params=article_obj)
 | 
	
		
			
				|  |  | -                kimi_title = kimi_info["k_title"]
 | 
	
		
			
				|  |  | -                content_title = (
 | 
	
		
			
				|  |  | -                    kimi_info["content_title"].replace("'", "").replace('"', "")
 | 
	
		
			
				|  |  | -                )
 | 
	
		
			
				|  |  | -                content_keys = json.dumps(kimi_info["content_keys"], ensure_ascii=False)
 | 
	
		
			
				|  |  | +                kimi_title = kimi_info['k_title']
 | 
	
		
			
				|  |  | +                content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
 | 
	
		
			
				|  |  | +                content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
 | 
	
		
			
				|  |  |                  update_kimi_sql = f"""
 | 
	
		
			
				|  |  |                          UPDATE {self.article_text_table} 
 | 
	
		
			
				|  |  |                          SET
 | 
	
	
		
			
				|  | @@ -331,24 +332,18 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |                          WHERE content_id = %s;"""
 | 
	
		
			
				|  |  |                  await self.mysql_client.async_insert(
 | 
	
		
			
				|  |  |                      sql=update_kimi_sql,
 | 
	
		
			
				|  |  | -                    params=(
 | 
	
		
			
				|  |  | -                        kimi_title,
 | 
	
		
			
				|  |  | -                        content_title,
 | 
	
		
			
				|  |  | -                        content_keys,
 | 
	
		
			
				|  |  | -                        KIMI_SUCCESS_STATUS,
 | 
	
		
			
				|  |  | -                        params["content_id"],
 | 
	
		
			
				|  |  | -                    ),
 | 
	
		
			
				|  |  | +                    params=(kimi_title, content_title, content_keys, KIMI_SUCCESS_STATUS, params['content_id'])
 | 
	
		
			
				|  |  |                  )
 | 
	
		
			
				|  |  |                  await self.update_content_status(
 | 
	
		
			
				|  |  |                      new_content_status=self.TASK_KIMI_FINISHED_STATUS,
 | 
	
		
			
				|  |  |                      trace_id=trace_id,
 | 
	
		
			
				|  |  | -                    ori_content_status=self.TASK_PROCESSING_STATUS,
 | 
	
		
			
				|  |  | +                    ori_content_status=self.TASK_PROCESSING_STATUS
 | 
	
		
			
				|  |  |                  )
 | 
	
		
			
				|  |  |                  return {
 | 
	
		
			
				|  |  |                      "kimi_title": kimi_title,
 | 
	
		
			
				|  |  | -                    "ori_title": article_obj["article_title"],
 | 
	
		
			
				|  |  | +                    "ori_title": article_obj['article_title'],
 | 
	
		
			
				|  |  |                      "kimi_summary": content_title,
 | 
	
		
			
				|  |  | -                    "kimi_keys": kimi_info["content_keys"],
 | 
	
		
			
				|  |  | +                    "kimi_keys": kimi_info['content_keys']
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  |              except Exception as e:
 | 
	
		
			
				|  |  |                  # kimi 任务处理失败
 | 
	
	
		
			
				|  | @@ -359,11 +354,16 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |                          WHERE content_id = %s
 | 
	
		
			
				|  |  |                          """
 | 
	
		
			
				|  |  |                  await self.mysql_client.async_insert(
 | 
	
		
			
				|  |  | -                    sql=update_kimi_sql, params=(KIMI_FAIL_STATUS, content_id)
 | 
	
		
			
				|  |  | +                    sql=update_kimi_sql,
 | 
	
		
			
				|  |  | +                    params=(
 | 
	
		
			
				|  |  | +                        KIMI_FAIL_STATUS,
 | 
	
		
			
				|  |  | +                        content_id
 | 
	
		
			
				|  |  | +                    )
 | 
	
		
			
				|  |  |                  )
 | 
	
		
			
				|  |  |                  # 将状态由 101  回退为  0
 | 
	
		
			
				|  |  |                  await self.roll_back_content_status_when_fails(
 | 
	
		
			
				|  |  | -                    process_times=process_times, trace_id=trace_id
 | 
	
		
			
				|  |  | +                    process_times=process_times,
 | 
	
		
			
				|  |  | +                    trace_id=trace_id
 | 
	
		
			
				|  |  |                  )
 | 
	
		
			
				|  |  |                  return {}
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -373,10 +373,10 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |          :return:
 | 
	
		
			
				|  |  |          """
 | 
	
		
			
				|  |  |          SPIDER_INIT_STATUS = 1
 | 
	
		
			
				|  |  | -        trace_id = params["trace_id"]
 | 
	
		
			
				|  |  | -        content_id = params["content_id"]
 | 
	
		
			
				|  |  | -        process_times = params["process_times"]
 | 
	
		
			
				|  |  | -        gh_id = params["gh_id"]
 | 
	
		
			
				|  |  | +        trace_id = params['trace_id']
 | 
	
		
			
				|  |  | +        content_id = params['content_id']
 | 
	
		
			
				|  |  | +        process_times = params['process_times']
 | 
	
		
			
				|  |  | +        gh_id = params['gh_id']
 | 
	
		
			
				|  |  |          select_sql = f"""
 | 
	
		
			
				|  |  |          select count(id) from {self.article_crawler_video_table} where content_id = '{content_id}';
 | 
	
		
			
				|  |  |          """
 | 
	
	
		
			
				|  | @@ -386,50 +386,53 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |              await self.update_content_status(
 | 
	
		
			
				|  |  |                  new_content_status=self.TASK_SPIDER_FINISHED_STATUS,
 | 
	
		
			
				|  |  |                  trace_id=trace_id,
 | 
	
		
			
				|  |  | -                ori_content_status=SPIDER_INIT_STATUS,
 | 
	
		
			
				|  |  | +                ori_content_status=SPIDER_INIT_STATUS
 | 
	
		
			
				|  |  |              )
 | 
	
		
			
				|  |  |              return True
 | 
	
		
			
				|  |  |          # 开始处理,将状态由 1 改成  101
 | 
	
		
			
				|  |  |          affected_rows = await self.update_content_status(
 | 
	
		
			
				|  |  |              new_content_status=self.TASK_PROCESSING_STATUS,
 | 
	
		
			
				|  |  |              ori_content_status=SPIDER_INIT_STATUS,
 | 
	
		
			
				|  |  | -            trace_id=trace_id,
 | 
	
		
			
				|  |  | +            trace_id=trace_id
 | 
	
		
			
				|  |  |          )
 | 
	
		
			
				|  |  |          if affected_rows == 0:
 | 
	
		
			
				|  |  |              logging(
 | 
	
		
			
				|  |  | -                code="6000", info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
 | 
	
		
			
				|  |  | +                code="6000",
 | 
	
		
			
				|  |  | +                info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
 | 
	
		
			
				|  |  |              )
 | 
	
		
			
				|  |  |              return False
 | 
	
		
			
				|  |  |          try:
 | 
	
		
			
				|  |  |              search_videos_count = await search_videos_from_web(
 | 
	
		
			
				|  |  |                  info={
 | 
	
		
			
				|  |  | -                    "ori_title": kimi_result["ori_title"],
 | 
	
		
			
				|  |  | -                    "kimi_summary": kimi_result["kimi_summary"],
 | 
	
		
			
				|  |  | -                    "kimi_keys": kimi_result["kimi_keys"],
 | 
	
		
			
				|  |  | +                    "ori_title": kimi_result['ori_title'],
 | 
	
		
			
				|  |  | +                    "kimi_summary": kimi_result['kimi_summary'],
 | 
	
		
			
				|  |  | +                    "kimi_keys": kimi_result['kimi_keys'],
 | 
	
		
			
				|  |  |                      "trace_id": trace_id,
 | 
	
		
			
				|  |  |                      "gh_id": gh_id,
 | 
	
		
			
				|  |  |                      "content_id": content_id,
 | 
	
		
			
				|  |  | -                    "crawler_video_table": self.article_crawler_video_table,
 | 
	
		
			
				|  |  | +                    "crawler_video_table": self.article_crawler_video_table
 | 
	
		
			
				|  |  |                  },
 | 
	
		
			
				|  |  |                  gh_id_map=self.account_map,
 | 
	
		
			
				|  |  | -                db_client=self.mysql_client,
 | 
	
		
			
				|  |  | +                db_client=self.mysql_client
 | 
	
		
			
				|  |  |              )
 | 
	
		
			
				|  |  |              if search_videos_count >= 3:
 | 
	
		
			
				|  |  |                  # 表示爬虫任务执行成功, 将状态从 101  改为 2
 | 
	
		
			
				|  |  |                  await self.update_content_status(
 | 
	
		
			
				|  |  |                      new_content_status=self.TASK_SPIDER_FINISHED_STATUS,
 | 
	
		
			
				|  |  |                      trace_id=trace_id,
 | 
	
		
			
				|  |  | -                    ori_content_status=self.TASK_PROCESSING_STATUS,
 | 
	
		
			
				|  |  | +                    ori_content_status=self.TASK_PROCESSING_STATUS
 | 
	
		
			
				|  |  |                  )
 | 
	
		
			
				|  |  |                  return True
 | 
	
		
			
				|  |  |              else:
 | 
	
		
			
				|  |  |                  await self.roll_back_content_status_when_fails(
 | 
	
		
			
				|  |  | -                    process_times=process_times + 1, trace_id=trace_id
 | 
	
		
			
				|  |  | +                    process_times=process_times + 1,
 | 
	
		
			
				|  |  | +                    trace_id=trace_id
 | 
	
		
			
				|  |  |                  )
 | 
	
		
			
				|  |  |                  return False
 | 
	
		
			
				|  |  |          except Exception as e:
 | 
	
		
			
				|  |  |              await self.roll_back_content_status_when_fails(
 | 
	
		
			
				|  |  | -                process_times=process_times + 1, trace_id=trace_id
 | 
	
		
			
				|  |  | +                process_times=process_times + 1,
 | 
	
		
			
				|  |  | +                trace_id=trace_id
 | 
	
		
			
				|  |  |              )
 | 
	
		
			
				|  |  |              print("爬虫处理失败: {}".format(e))
 | 
	
		
			
				|  |  |              return False
 | 
	
	
		
			
				|  | @@ -443,8 +446,8 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |          VIDEO_DOWNLOAD_SUCCESS_STATUS = 2
 | 
	
		
			
				|  |  |          VIDEO_DOWNLOAD_FAIL_STATUS = 3
 | 
	
		
			
				|  |  |          ETL_TASK_INIT_STATUS = 2
 | 
	
		
			
				|  |  | -        trace_id = params["trace_id"]
 | 
	
		
			
				|  |  | -        content_id = params["content_id"]
 | 
	
		
			
				|  |  | +        trace_id = params['trace_id']
 | 
	
		
			
				|  |  | +        content_id = params['content_id']
 | 
	
		
			
				|  |  |          # 判断是否有三条已经下载完成的视频
 | 
	
		
			
				|  |  |          select_sql = f"""
 | 
	
		
			
				|  |  |              select count(id) 
 | 
	
	
		
			
				|  | @@ -457,12 +460,12 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |              affect_rows = await self.update_content_status(
 | 
	
		
			
				|  |  |                  ori_content_status=ETL_TASK_INIT_STATUS,
 | 
	
		
			
				|  |  |                  trace_id=trace_id,
 | 
	
		
			
				|  |  | -                new_content_status=self.TASK_ETL_FINISHED_STATUS,
 | 
	
		
			
				|  |  | +                new_content_status=self.TASK_ETL_FINISHED_STATUS
 | 
	
		
			
				|  |  |              )
 | 
	
		
			
				|  |  |              if affect_rows == 0:
 | 
	
		
			
				|  |  |                  logging(
 | 
	
		
			
				|  |  |                      code="6000",
 | 
	
		
			
				|  |  | -                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return",
 | 
	
		
			
				|  |  | +                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
 | 
	
		
			
				|  |  |                  )
 | 
	
		
			
				|  |  |                  return False
 | 
	
		
			
				|  |  |              return True
 | 
	
	
		
			
				|  | @@ -471,12 +474,12 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |              affected_rows = await self.update_content_status(
 | 
	
		
			
				|  |  |                  ori_content_status=ETL_TASK_INIT_STATUS,
 | 
	
		
			
				|  |  |                  trace_id=trace_id,
 | 
	
		
			
				|  |  | -                new_content_status=self.TASK_PROCESSING_STATUS,
 | 
	
		
			
				|  |  | +                new_content_status=self.TASK_PROCESSING_STATUS
 | 
	
		
			
				|  |  |              )
 | 
	
		
			
				|  |  |              if affected_rows == 0:
 | 
	
		
			
				|  |  |                  logging(
 | 
	
		
			
				|  |  |                      code="6000",
 | 
	
		
			
				|  |  | -                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return",
 | 
	
		
			
				|  |  | +                    info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
 | 
	
		
			
				|  |  |                  )
 | 
	
		
			
				|  |  |                  return False
 | 
	
		
			
				|  |  |              select_sql = f"""
 | 
	
	
		
			
				|  | @@ -485,9 +488,7 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |                  WHERE content_id = '{content_id}' and download_status != {VIDEO_DOWNLOAD_SUCCESS_STATUS}
 | 
	
		
			
				|  |  |                  ORDER BY score DESC;
 | 
	
		
			
				|  |  |              """
 | 
	
		
			
				|  |  | -            videos_need_to_download_tuple = await self.mysql_client.async_select(
 | 
	
		
			
				|  |  | -                select_sql
 | 
	
		
			
				|  |  | -            )
 | 
	
		
			
				|  |  | +            videos_need_to_download_tuple = await self.mysql_client.async_select(select_sql)
 | 
	
		
			
				|  |  |              downloaded_count = 0
 | 
	
		
			
				|  |  |              for line in videos_need_to_download_tuple:
 | 
	
		
			
				|  |  |                  params = {
 | 
	
	
		
			
				|  | @@ -498,30 +499,30 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |                      "video_url": line[4],
 | 
	
		
			
				|  |  |                      "cover_url": line[5],
 | 
	
		
			
				|  |  |                      "user_id": line[6],
 | 
	
		
			
				|  |  | -                    "trace_id": line[7],
 | 
	
		
			
				|  |  | +                    "trace_id": line[7]
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  |                  try:
 | 
	
		
			
				|  |  | -                    local_video_path, local_cover_path = generate_video_path(
 | 
	
		
			
				|  |  | -                        params["platform"], params["video_id"]
 | 
	
		
			
				|  |  | -                    )
 | 
	
		
			
				|  |  | +                    local_video_path, local_cover_path = generate_video_path(params['platform'], params['video_id'])
 | 
	
		
			
				|  |  |                      # download videos
 | 
	
		
			
				|  |  |                      file_path = await download_video(
 | 
	
		
			
				|  |  |                          file_path=local_video_path,
 | 
	
		
			
				|  |  | -                        platform=params["platform"],
 | 
	
		
			
				|  |  | -                        video_url=params["video_url"],
 | 
	
		
			
				|  |  | +                        platform=params['platform'],
 | 
	
		
			
				|  |  | +                        video_url=params['video_url']
 | 
	
		
			
				|  |  |                      )
 | 
	
		
			
				|  |  |                      # download cover
 | 
	
		
			
				|  |  |                      cover_path = await download_cover(
 | 
	
		
			
				|  |  |                          file_path=local_cover_path,
 | 
	
		
			
				|  |  | -                        platform=params["platform"],
 | 
	
		
			
				|  |  | -                        cover_url=params["cover_url"],
 | 
	
		
			
				|  |  | +                        platform=params['platform'],
 | 
	
		
			
				|  |  | +                        cover_url=params['cover_url']
 | 
	
		
			
				|  |  |                      )
 | 
	
		
			
				|  |  |                      oss_video = await upload_to_oss(
 | 
	
		
			
				|  |  | -                        local_video_path=file_path, download_type="video"
 | 
	
		
			
				|  |  | +                        local_video_path=file_path,
 | 
	
		
			
				|  |  | +                        download_type="video"
 | 
	
		
			
				|  |  |                      )
 | 
	
		
			
				|  |  |                      if cover_path:
 | 
	
		
			
				|  |  |                          oss_cover = await upload_to_oss(
 | 
	
		
			
				|  |  | -                            local_video_path=cover_path, download_type="image"
 | 
	
		
			
				|  |  | +                            local_video_path=cover_path,
 | 
	
		
			
				|  |  | +                            download_type="image"
 | 
	
		
			
				|  |  |                          )
 | 
	
		
			
				|  |  |                      else:
 | 
	
		
			
				|  |  |                          oss_cover = None
 | 
	
	
		
			
				|  | @@ -536,15 +537,15 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |                              oss_video,
 | 
	
		
			
				|  |  |                              oss_cover,
 | 
	
		
			
				|  |  |                              VIDEO_DOWNLOAD_SUCCESS_STATUS,
 | 
	
		
			
				|  |  | -                            params["id"],
 | 
	
		
			
				|  |  | -                        ),
 | 
	
		
			
				|  |  | +                            params['id']
 | 
	
		
			
				|  |  | +                        )
 | 
	
		
			
				|  |  |                      )
 | 
	
		
			
				|  |  |                      downloaded_count += 1
 | 
	
		
			
				|  |  |                      if downloaded_count > 3:
 | 
	
		
			
				|  |  |                          await self.update_content_status(
 | 
	
		
			
				|  |  |                              ori_content_status=self.TASK_PROCESSING_STATUS,
 | 
	
		
			
				|  |  |                              trace_id=trace_id,
 | 
	
		
			
				|  |  | -                            new_content_status=self.TASK_ETL_FINISHED_STATUS,
 | 
	
		
			
				|  |  | +                            new_content_status=self.TASK_ETL_FINISHED_STATUS
 | 
	
		
			
				|  |  |                          )
 | 
	
		
			
				|  |  |                          return True
 | 
	
		
			
				|  |  |                  except Exception as e:
 | 
	
	
		
			
				|  | @@ -555,19 +556,19 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |                      """
 | 
	
		
			
				|  |  |                      await self.mysql_client.async_insert(
 | 
	
		
			
				|  |  |                          sql=update_sql,
 | 
	
		
			
				|  |  | -                        params=(VIDEO_DOWNLOAD_FAIL_STATUS, params["id"]),
 | 
	
		
			
				|  |  | +                        params=(VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
 | 
	
		
			
				|  |  |                      )
 | 
	
		
			
				|  |  |              if downloaded_count >= 3:
 | 
	
		
			
				|  |  |                  await self.update_content_status(
 | 
	
		
			
				|  |  |                      ori_content_status=self.TASK_PROCESSING_STATUS,
 | 
	
		
			
				|  |  |                      trace_id=trace_id,
 | 
	
		
			
				|  |  | -                    new_content_status=self.TASK_ETL_FINISHED_STATUS,
 | 
	
		
			
				|  |  | +                    new_content_status=self.TASK_ETL_FINISHED_STATUS
 | 
	
		
			
				|  |  |                  )
 | 
	
		
			
				|  |  |                  return True
 | 
	
		
			
				|  |  |              else:
 | 
	
		
			
				|  |  |                  await self.roll_back_content_status_when_fails(
 | 
	
		
			
				|  |  | -                    process_times=params["process_times"] + 1,
 | 
	
		
			
				|  |  | -                    trace_id=params["trace_id"],
 | 
	
		
			
				|  |  | +                    process_times=params['process_times'] + 1,
 | 
	
		
			
				|  |  | +                    trace_id=params['trace_id']
 | 
	
		
			
				|  |  |                  )
 | 
	
		
			
				|  |  |                  return False
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -579,20 +580,21 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |          :return:
 | 
	
		
			
				|  |  |          """
 | 
	
		
			
				|  |  |          PUBLISH_DEFAULT_STATUS = 3
 | 
	
		
			
				|  |  | -        gh_id = params["gh_id"]
 | 
	
		
			
				|  |  | -        flow_pool_level = params["flow_pool_level"]
 | 
	
		
			
				|  |  | -        content_id = params["content_id"]
 | 
	
		
			
				|  |  | -        trace_id = params["trace_id"]
 | 
	
		
			
				|  |  | -        process_times = params["process_times"]
 | 
	
		
			
				|  |  | +        gh_id = params['gh_id']
 | 
	
		
			
				|  |  | +        flow_pool_level = params['flow_pool_level']
 | 
	
		
			
				|  |  | +        content_id = params['content_id']
 | 
	
		
			
				|  |  | +        trace_id = params['trace_id']
 | 
	
		
			
				|  |  | +        process_times = params['process_times']
 | 
	
		
			
				|  |  |          # 开始处理,将状态修改为操作状态
 | 
	
		
			
				|  |  |          affected_rows = await self.update_content_status(
 | 
	
		
			
				|  |  |              ori_content_status=PUBLISH_DEFAULT_STATUS,
 | 
	
		
			
				|  |  |              trace_id=trace_id,
 | 
	
		
			
				|  |  | -            new_content_status=self.TASK_PROCESSING_STATUS,
 | 
	
		
			
				|  |  | +            new_content_status=self.TASK_PROCESSING_STATUS
 | 
	
		
			
				|  |  |          )
 | 
	
		
			
				|  |  |          if affected_rows == 0:
 | 
	
		
			
				|  |  |              logging(
 | 
	
		
			
				|  |  | -                code="6000", info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
 | 
	
		
			
				|  |  | +                code="6000",
 | 
	
		
			
				|  |  | +                info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
 | 
	
		
			
				|  |  |              )
 | 
	
		
			
				|  |  |              return False
 | 
	
		
			
				|  |  |          try:
 | 
	
	
		
			
				|  | @@ -617,21 +619,21 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |              L = []
 | 
	
		
			
				|  |  |              for video_obj in video_list:
 | 
	
		
			
				|  |  |                  params = {
 | 
	
		
			
				|  |  | -                    "videoPath": video_obj["video_oss_path"],
 | 
	
		
			
				|  |  | -                    "uid": video_obj["uid"],
 | 
	
		
			
				|  |  | -                    "title": kimi_title,
 | 
	
		
			
				|  |  | +                    "videoPath": video_obj['video_oss_path'],
 | 
	
		
			
				|  |  | +                    "uid": video_obj['uid'],
 | 
	
		
			
				|  |  | +                    "title": kimi_title
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  |                  publish_response = await publish_to_pq(params)
 | 
	
		
			
				|  |  | -                video_id = publish_response["data"]["id"]
 | 
	
		
			
				|  |  | +                video_id = publish_response['data']['id']
 | 
	
		
			
				|  |  |                  response = await get_pq_video_detail(video_id)
 | 
	
		
			
				|  |  |                  obj = {
 | 
	
		
			
				|  |  | -                    "uid": video_obj["uid"],
 | 
	
		
			
				|  |  | -                    "source": video_obj["platform"],
 | 
	
		
			
				|  |  | +                    "uid": video_obj['uid'],
 | 
	
		
			
				|  |  | +                    "source": video_obj['platform'],
 | 
	
		
			
				|  |  |                      "kimiTitle": kimi_title,
 | 
	
		
			
				|  |  | -                    "videoId": response["data"][0]["id"],
 | 
	
		
			
				|  |  | -                    "videoCover": response["data"][0]["shareImgPath"],
 | 
	
		
			
				|  |  | -                    "videoPath": response["data"][0]["videoPath"],
 | 
	
		
			
				|  |  | -                    "videoOss": video_obj["video_oss_path"],
 | 
	
		
			
				|  |  | +                    "videoId": response['data'][0]['id'],
 | 
	
		
			
				|  |  | +                    "videoCover": response['data'][0]['shareImgPath'],
 | 
	
		
			
				|  |  | +                    "videoPath": response['data'][0]['videoPath'],
 | 
	
		
			
				|  |  | +                    "videoOss": video_obj['video_oss_path']
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  |                  L.append(obj)
 | 
	
		
			
				|  |  |              update_sql = f"""
 | 
	
	
		
			
				|  | @@ -647,12 +649,13 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |                      json.dumps(L, ensure_ascii=False),
 | 
	
		
			
				|  |  |                      process_times + 1,
 | 
	
		
			
				|  |  |                      trace_id,
 | 
	
		
			
				|  |  | -                    self.TASK_PROCESSING_STATUS,
 | 
	
		
			
				|  |  | -                ),
 | 
	
		
			
				|  |  | +                    self.TASK_PROCESSING_STATUS
 | 
	
		
			
				|  |  | +                )
 | 
	
		
			
				|  |  |              )
 | 
	
		
			
				|  |  |          except Exception as e:
 | 
	
		
			
				|  |  |              await self.roll_back_content_status_when_fails(
 | 
	
		
			
				|  |  | -                process_times=params["process_times"] + 1, trace_id=params["trace_id"]
 | 
	
		
			
				|  |  | +                process_times=params['process_times'] + 1,
 | 
	
		
			
				|  |  | +                trace_id=params['trace_id']
 | 
	
		
			
				|  |  |              )
 | 
	
		
			
				|  |  |              print(e)
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -665,55 +668,79 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |          # step1: 执行 kimi 操作
 | 
	
		
			
				|  |  |          # time.sleep(5) # 测试多个进程操作同一个 task 的等待时间
 | 
	
		
			
				|  |  |          kimi_result = await self.kimi_task(params)
 | 
	
		
			
				|  |  | -        trace_id = params["trace_id"]
 | 
	
		
			
				|  |  | +        trace_id = params['trace_id']
 | 
	
		
			
				|  |  |          if kimi_result:
 | 
	
		
			
				|  |  |              # 等待 kimi 操作执行完成之后,开始执行 spider_task
 | 
	
		
			
				|  |  |              print("kimi success")
 | 
	
		
			
				|  |  | -            logging(code=3001, info="kimi success", trace_id=trace_id)
 | 
	
		
			
				|  |  | +            logging(
 | 
	
		
			
				|  |  | +                code=3001,
 | 
	
		
			
				|  |  | +                info="kimi success",
 | 
	
		
			
				|  |  | +                trace_id=trace_id
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  |              spider_flag = await self.spider_task(params=params, kimi_result=kimi_result)
 | 
	
		
			
				|  |  |              if spider_flag:
 | 
	
		
			
				|  |  |                  # 等待爬虫执行完成后,开始执行 etl_task
 | 
	
		
			
				|  |  |                  print("spider success")
 | 
	
		
			
				|  |  | -                logging(code=3002, info="spider_success", trace_id=trace_id)
 | 
	
		
			
				|  |  | +                logging(
 | 
	
		
			
				|  |  | +                    code=3002,
 | 
	
		
			
				|  |  | +                    info="spider_success",
 | 
	
		
			
				|  |  | +                    trace_id=trace_id
 | 
	
		
			
				|  |  | +                )
 | 
	
		
			
				|  |  |                  etl_flag = await self.etl_task(params)
 | 
	
		
			
				|  |  |                  if etl_flag:
 | 
	
		
			
				|  |  |                      # 等待下载上传完成,执行发布任务
 | 
	
		
			
				|  |  |                      print("etl success")
 | 
	
		
			
				|  |  | -                    logging(code="3003", info="etl_success", trace_id=trace_id)
 | 
	
		
			
				|  |  | +                    logging(
 | 
	
		
			
				|  |  | +                        code="3003",
 | 
	
		
			
				|  |  | +                        info="etl_success",
 | 
	
		
			
				|  |  | +                        trace_id=trace_id
 | 
	
		
			
				|  |  | +                    )
 | 
	
		
			
				|  |  |                      try:
 | 
	
		
			
				|  |  | -                        await self.publish_task(params, kimi_result["kimi_title"])
 | 
	
		
			
				|  |  | -                        logging(code="3004", info="publish_success", trace_id=trace_id)
 | 
	
		
			
				|  |  | +                        await self.publish_task(params, kimi_result['kimi_title'])
 | 
	
		
			
				|  |  | +                        logging(
 | 
	
		
			
				|  |  | +                            code="3004",
 | 
	
		
			
				|  |  | +                            info="publish_success",
 | 
	
		
			
				|  |  | +                            trace_id=trace_id
 | 
	
		
			
				|  |  | +                        )
 | 
	
		
			
				|  |  |                      except Exception as e:
 | 
	
		
			
				|  |  |                          logging(
 | 
	
		
			
				|  |  |                              code="6004",
 | 
	
		
			
				|  |  |                              info="publish 失败--{}".format(e),
 | 
	
		
			
				|  |  | -                            trace_id=params["trace_id"],
 | 
	
		
			
				|  |  | +                            trace_id=params['trace_id']
 | 
	
		
			
				|  |  |                          )
 | 
	
		
			
				|  |  |                  else:
 | 
	
		
			
				|  |  |                      logging(
 | 
	
		
			
				|  |  | -                        code="6003", info="ETL 处理失败", trace_id=params["trace_id"]
 | 
	
		
			
				|  |  | +                        code="6003",
 | 
	
		
			
				|  |  | +                        info="ETL 处理失败",
 | 
	
		
			
				|  |  | +                        trace_id=params['trace_id']
 | 
	
		
			
				|  |  |                      )
 | 
	
		
			
				|  |  |              else:
 | 
	
		
			
				|  |  | -                logging(code="6002", info="爬虫处理失败", trace_id=params["trace_id"])
 | 
	
		
			
				|  |  | +                logging(
 | 
	
		
			
				|  |  | +                    code="6002",
 | 
	
		
			
				|  |  | +                    info="爬虫处理失败",
 | 
	
		
			
				|  |  | +                    trace_id=params['trace_id']
 | 
	
		
			
				|  |  | +                )
 | 
	
		
			
				|  |  |          else:
 | 
	
		
			
				|  |  | -            logging(code="6001", info="kimi 处理失败", trace_id=params["trace_id"])
 | 
	
		
			
				|  |  | +            logging(
 | 
	
		
			
				|  |  | +                code="6001",
 | 
	
		
			
				|  |  | +                info="kimi 处理失败",
 | 
	
		
			
				|  |  | +                trace_id=params['trace_id']
 | 
	
		
			
				|  |  | +            )
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      async def process_task(self, params):
 | 
	
		
			
				|  |  |          """
 | 
	
		
			
				|  |  |          处理任务
 | 
	
		
			
				|  |  |          :return:
 | 
	
		
			
				|  |  |          """
 | 
	
		
			
				|  |  | -        content_id = params["content_id"]
 | 
	
		
			
				|  |  | +        content_id = params['content_id']
 | 
	
		
			
				|  |  |          download_videos = await self.get_video_list(content_id)
 | 
	
		
			
				|  |  |          if not download_videos:
 | 
	
		
			
				|  |  |              # 开始处理, 判断是否有相同的文章 id 正在处理
 | 
	
		
			
				|  |  | -            processing_flag = await self.judge_whether_same_content_id_is_processing(
 | 
	
		
			
				|  |  | -                content_id
 | 
	
		
			
				|  |  | -            )
 | 
	
		
			
				|  |  | +            processing_flag = await self.judge_whether_same_content_id_is_processing(content_id)
 | 
	
		
			
				|  |  |              if processing_flag:
 | 
	
		
			
				|  |  |                  logging(
 | 
	
		
			
				|  |  |                      code="9001",
 | 
	
		
			
				|  |  | -                    info="该 content id 正在处理中, 跳过此任务--{}".format(content_id),
 | 
	
		
			
				|  |  | +                    info="该 content id 正在处理中, 跳过此任务--{}".format(content_id)
 | 
	
		
			
				|  |  |                  )
 | 
	
		
			
				|  |  |              else:
 | 
	
		
			
				|  |  |                  await self.start_process(params=params)
 | 
	
	
		
			
				|  | @@ -730,7 +757,7 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |          task_dict = {}
 | 
	
		
			
				|  |  |          # 对 content_id去重
 | 
	
		
			
				|  |  |          for task in task_list:
 | 
	
		
			
				|  |  | -            key = task["content_id"]
 | 
	
		
			
				|  |  | +            key = task['content_id']
 | 
	
		
			
				|  |  |              task_dict[key] = task
 | 
	
		
			
				|  |  |          process_list = []
 | 
	
		
			
				|  |  |          for item in task_dict:
 | 
	
	
		
			
				|  | @@ -738,7 +765,7 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |          logging(
 | 
	
		
			
				|  |  |              code="5001",
 | 
	
		
			
				|  |  |              info="Match Task Got {} this time".format(len(process_list)),
 | 
	
		
			
				|  |  | -            function="Publish Task",
 | 
	
		
			
				|  |  | +            function="Publish Task"
 | 
	
		
			
				|  |  |          )
 | 
	
		
			
				|  |  |          if task_list:
 | 
	
		
			
				|  |  |              total_task = len(process_list)
 | 
	
	
		
			
				|  | @@ -749,4 +776,7 @@ class NewContentIdTask(object):
 | 
	
		
			
				|  |  |              b = time.time()
 | 
	
		
			
				|  |  |              print("处理时间: {} s".format(b - a))
 | 
	
		
			
				|  |  |          else:
 | 
	
		
			
				|  |  | -            logging(code="9008", info="没有要处理的请求")
 | 
	
		
			
				|  |  | +            logging(
 | 
	
		
			
				|  |  | +                code="9008",
 | 
	
		
			
				|  |  | +                info="没有要处理的请求"
 | 
	
		
			
				|  |  | +            )
 |