kimi_task.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import asyncio
  6. from applications.functions.kimi import KimiServer
  7. from applications.log import logging
  8. from applications.config import Config
  9. class KimiTask(object):
  10. """
  11. KIMI task
  12. """
  13. def __init__(self, mysql_client):
  14. """
  15. :param mysql_client:
  16. """
  17. self.mysql_client = mysql_client
  18. self.article_match_video_table = Config().article_match_video_table
  19. self.article_text_table = Config().article_text_table
  20. async def get_tasks(self):
  21. """
  22. 获取 tasks
  23. :return:
  24. """
  25. sql = f"""
  26. SELECT content_id, article_title, article_text
  27. FROM {self.article_text_table}
  28. WHERE kimi_status = 0
  29. limit 5;
  30. """
  31. content_list = await self.mysql_client.async_select(sql)
  32. if content_list:
  33. task_list = [
  34. {
  35. "contentId": i[0],
  36. "articleTitle": i[1],
  37. "articleText": i[2]
  38. } for i in content_list
  39. ]
  40. return task_list
  41. else:
  42. return []
  43. async def process_task(self, params):
  44. """
  45. do something
  46. :return:
  47. """
  48. kimi_success_status = 1
  49. kimi_fail_status = 2
  50. K = KimiServer()
  51. try:
  52. kimi_info = await K.search_kimi_schedule(params=params)
  53. kimi_title = kimi_info['k_title']
  54. content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
  55. content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
  56. update_kimi_sql = f"""
  57. UPDATE {self.article_text_table}
  58. SET
  59. kimi_title = %s,
  60. kimi_summary = %s,
  61. kimi_keys = %s,
  62. kimi_status = %s
  63. WHERE content_id = %s;"""
  64. await self.mysql_client.async_insert(
  65. sql=update_kimi_sql,
  66. params=(kimi_title, content_title, content_keys, kimi_success_status, params['contentId'])
  67. )
  68. except Exception as e:
  69. update_kimi_sql = f"""
  70. UPDATE {self.article_match_video_table}
  71. SET
  72. kimi_status = %s
  73. WHERE content_id = %s
  74. """
  75. await self.mysql_client.async_insert(
  76. sql=update_kimi_sql,
  77. params=(kimi_fail_status, params['contentId'])
  78. )
  79. print("kimi error--{}".format(e))
  80. async def deal(self):
  81. """
  82. deal function
  83. :return:
  84. """
  85. task_list = await self.get_tasks()
  86. logging(
  87. code="5001",
  88. info="KIMI Task Got {} this time".format(len(task_list)),
  89. function="Kimi Task"
  90. )
  91. if task_list:
  92. tasks = [self.process_task(params) for params in task_list]
  93. await asyncio.gather(*tasks)
  94. else:
  95. print("没有要处理的 kimi 文章")