1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- """
- @author: luojunhui
- """
- import json
- import asyncio
- from applications.functions.kimi import KimiServer
- from applications.functions.log import logging
- from applications.config import Config
- class KimiTask(object):
- """
- KIMI task
- """
- def __init__(self, mysql_client):
- """
- :param mysql_client:
- """
- self.mysql_client = mysql_client
- self.config = Config()
- async def getTasks(self):
- """
- 获取 tasks
- :return:
- """
- sql = f"""
- SELECT content_id, article_title, article_text
- FROM {self.config.articleText}
- WHERE kimi_status = 0
- limit 5;
- """
- content_list = await self.mysql_client.asyncSelect(sql)
- if content_list:
- task_list = [
- {
- "contentId": i[0],
- "articleTitle": i[1],
- "articleText": i[2]
- } for i in content_list
- ]
- return task_list
- else:
- return []
- async def processTask(self, params):
- """
- do something
- :return:
- """
- K = KimiServer()
- try:
- kimi_info = await K.search_kimi_schedule(params=params)
- kimi_title = kimi_info['k_title']
- content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
- content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
- update_kimi_sql = f"""
- UPDATE {self.config.articleText}
- SET
- kimi_title = %s,
- kimi_summary = %s,
- kimi_keys = %s,
- kimi_status = %s
- WHERE content_id = %s;
- """
- await self.mysql_client.asyncInsert(
- sql=update_kimi_sql,
- params=(kimi_title, content_title, content_keys, 1, params['contentId'])
- )
- except Exception as e:
- update_kimi_sql = f"""
- UPDATE {self.config.articleText}
- SET
- kimi_status = %s
- WHERE content_id = %s
- """
- await self.mysql_client.asyncInsert(
- sql=update_kimi_sql,
- params=(2, params['contentId'])
- )
- print("kimi error--{}".format(e))
- async def deal(self):
- """
- deal function
- :return:
- """
- task_list = await self.getTasks()
- if task_list:
- tasks = [self.processTask(params) for params in task_list]
- await asyncio.gather(*tasks)
- else:
- logging(
- code="9008",
- info="没有要处理的 kimi 任务"
- )
|