""" @author: luojunhui """ import json import asyncio from applications.functions.kimi import KimiServer from applications.log import logging from applications.config import Config class KimiTask(object): """ KIMI task """ def __init__(self, mysql_client): """ :param mysql_client: """ self.mysql_client = mysql_client self.article_match_video_table = Config().article_match_video_table self.article_text_table = Config().article_text_table async def get_tasks(self): """ 获取 tasks :return: """ sql = f""" SELECT content_id, article_title, article_text FROM {self.article_text_table} WHERE kimi_status = 0 limit 5; """ content_list = await self.mysql_client.async_select(sql) if content_list: task_list = [ { "contentId": i[0], "articleTitle": i[1], "articleText": i[2] } for i in content_list ] return task_list else: return [] async def process_task(self, params): """ do something :return: """ kimi_success_status = 1 kimi_fail_status = 2 K = KimiServer() try: kimi_info = await K.search_kimi_schedule(params=params) kimi_title = kimi_info['k_title'] content_title = kimi_info['content_title'].replace("'", "").replace('"', "") content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False) update_kimi_sql = f""" UPDATE {self.article_text_table} SET kimi_title = %s, kimi_summary = %s, kimi_keys = %s, kimi_status = %s WHERE content_id = %s;""" await self.mysql_client.async_insert( sql=update_kimi_sql, params=(kimi_title, content_title, content_keys, kimi_success_status, params['contentId']) ) except Exception as e: update_kimi_sql = f""" UPDATE {self.article_match_video_table} SET kimi_status = %s WHERE content_id = %s """ await self.mysql_client.async_insert( sql=update_kimi_sql, params=(kimi_fail_status, params['contentId']) ) print("kimi error--{}".format(e)) async def deal(self): """ deal function :return: """ task_list = await self.get_tasks() logging( code="5001", info="KIMI Task Got {} this time".format(len(task_list)), function="Kimi Task" ) if task_list: tasks = [self.process_task(params) for params in task_list] await asyncio.gather(*tasks) else: print("没有要处理的 kimi 文章")