""" @author: luojunhui kimi 相关方法 """ import json from typing import Dict from applications.const import new_content_id_task_const from applications.functions.kimi import KimiServer async def get_kimi_status(content_id, article_text_table, db_client) -> int: """ 通过 content_id 获取kimi info :return: """ select_sql = f""" SELECT kimi_status FROM {article_text_table} WHERE content_id = '{content_id}'; """ response = await db_client.async_select(select_sql) if response: kimi_status = response[0][0] return kimi_status else: return new_content_id_task_const.ARTICLE_TEXT_TABLE_ERROR async def get_kimi_result(content_id, article_text_table, db_client) -> Dict: """ 获取kimi的返回结果 """ get_kimi_sql = f""" SELECT article_title, kimi_title, kimi_summary, kimi_keys FROM {article_text_table} WHERE content_id = '{content_id}'; """ kimi_info = await db_client.async_select(get_kimi_sql) if kimi_info: try: response = { "kimi_title": kimi_info[0][1], "ori_title": kimi_info[0][0], "kimi_summary": kimi_info[0][2], "kimi_keys": json.loads(kimi_info[0][3]) } return response except Exception as e: return {} else: return {} async def generate_kimi_result(content_id, article_text_table, db_client, safe_score) -> Dict: """ 为content_id执行kimi操作 """ K = KimiServer() select_sql = f""" SELECT article_title, article_text FROM {article_text_table} WHERE content_id = '{content_id}'; """ res = await db_client.async_select(select_sql) article_obj = { "article_title": res[0][0], "article_text": res[0][1], "content_id": content_id } kimi_info = await K.search_kimi_schedule(params=article_obj, safe_score=safe_score) kimi_title = kimi_info['k_title'] content_title = kimi_info['content_title'].replace("'", "").replace('"', "") content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False) update_kimi_sql = f""" UPDATE {article_text_table} SET kimi_title = %s, kimi_summary = %s, kimi_keys = %s, kimi_status = %s WHERE content_id = %s;""" await db_client.async_insert( sql=update_kimi_sql, params=( kimi_title, content_title, content_keys, new_content_id_task_const.KIMI_SUCCESS_STATUS, content_id) ) return { "kimi_title": kimi_title, "ori_title": article_obj['article_title'], "kimi_summary": content_title, "kimi_keys": kimi_info['content_keys'] } async def update_kimi_status(content_id, article_text_table, db_client, kimi_info, success_status, init_status) -> int: """ 更新 kimi 记录 """ update_sql = f""" UPDATE {article_text_table} SET kimi_title = %s, kimi_summary = %s, kimi_keys = %s, kimi_status = %s WHERE content_id = %s and kimi_status = %s; """ affected_rows = await db_client.async_insert( sql=update_sql, params=( kimi_info['kimi_title'], kimi_info['kimi_summary'], json.dumps(kimi_info['kimi_keys'], ensure_ascii=False), success_status, content_id, init_status ) ) return affected_rows