""" @author: luojunhui """ import json import asyncio from applications.functions.kimi import KimiServer from applications.log import logging from applications.config import Config class KimiTask(object): """ KIMI task """ def __init__(self, mysql_client): """ :param mysql_client: """ self.mysql_client = mysql_client self.config = Config() async def getTasks(self): """ 获取 tasks :return: """ sql = f""" SELECT content_id, article_title, article_text FROM {self.config.articleText} WHERE kimi_status = 0 limit 5; """ content_list = await self.mysql_client.asyncSelect(sql) if content_list: task_list = [ { "contentId": i[0], "articleTitle": i[1], "articleText": i[2] } for i in content_list ] return task_list else: return [] async def processTask(self, params): """ do something :return: """ K = KimiServer() try: kimi_info = await K.search_kimi_schedule(params=params) kimi_title = kimi_info['k_title'] content_title = kimi_info['content_title'].replace("'", "").replace('"', "") content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False) update_kimi_sql = f""" UPDATE {self.config.articleText} SET kimi_title = %s, kimi_summary = %s, kimi_keys = %s, kimi_status = %s WHERE content_id = %s; """ await self.mysql_client.asyncInsert( sql=update_kimi_sql, params=(kimi_title, content_title, content_keys, 1, params['contentId']) ) except Exception as e: update_kimi_sql = f""" UPDATE {self.config.articleText} SET kimi_status = %s WHERE content_id = %s """ await self.mysql_client.asyncInsert( sql=update_kimi_sql, params=(2, params['contentId']) ) print("kimi error--{}".format(e)) async def deal(self): """ deal function :return: """ task_list = await self.getTasks() logging( code="5003", info="KIMI Task Got {} this time".format(len(task_list)), function="Kimi Task" ) if task_list: tasks = [self.processTask(params) for params in task_list] await asyncio.gather(*tasks) else: print("没有要处理的 kimi 文章")