123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- """
- @author: luojunhui
- """
- import json
- import asyncio
- from applications.functions.kimi import KimiServer
- from applications.log import logging
- from applications.config import Config
- class KimiTask(object):
- """
- KIMI task
- """
- def __init__(self, mysql_client):
- """
- :param mysql_client:
- """
- self.mysql_client = mysql_client
- self.article_match_video_table = Config().article_match_video_table
- self.article_text_table = Config().article_text_table
- async def get_tasks(self):
- """
- 获取 tasks
- :return:
- """
- sql = f"""
- SELECT content_id, article_title, article_text
- FROM {self.article_text_table}
- WHERE kimi_status = 0
- limit 5;
- """
- content_list = await self.mysql_client.async_select(sql)
- if content_list:
- task_list = [
- {
- "contentId": i[0],
- "articleTitle": i[1],
- "articleText": i[2]
- } for i in content_list
- ]
- return task_list
- else:
- return []
- async def process_task(self, params):
- """
- do something
- :return:
- """
- kimi_success_status = 1
- kimi_fail_status = 2
- K = KimiServer()
- try:
- kimi_info = await K.search_kimi_schedule(params=params)
- kimi_title = kimi_info['k_title']
- content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
- content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
- update_kimi_sql = f"""
- UPDATE {self.article_text_table}
- SET
- kimi_title = %s,
- kimi_summary = %s,
- kimi_keys = %s,
- kimi_status = %s
- WHERE content_id = %s;"""
- await self.mysql_client.async_insert(
- sql=update_kimi_sql,
- params=(kimi_title, content_title, content_keys, kimi_success_status, params['contentId'])
- )
- except Exception as e:
- update_kimi_sql = f"""
- UPDATE {self.article_match_video_table}
- SET
- kimi_status = %s
- WHERE content_id = %s
- """
- await self.mysql_client.async_insert(
- sql=update_kimi_sql,
- params=(kimi_fail_status, params['contentId'])
- )
- print("kimi error--{}".format(e))
- async def deal(self):
- """
- deal function
- :return:
- """
- task_list = await self.get_tasks()
- logging(
- code="5001",
- info="KIMI Task Got {} this time".format(len(task_list)),
- function="Kimi Task"
- )
- if task_list:
- tasks = [self.process_task(params) for params in task_list]
- await asyncio.gather(*tasks)
- else:
- print("没有要处理的 kimi 文章")
|