kimi_task.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import asyncio
  6. from applications.functions.kimi import KimiServer
  7. from applications.log import logging
  8. from applications.config import Config
  9. class KimiTask(object):
  10. """
  11. KIMI task
  12. """
  13. def __init__(self, mysql_client):
  14. """
  15. :param mysql_client:
  16. """
  17. self.mysql_client = mysql_client
  18. self.config = Config()
  19. async def getTasks(self):
  20. """
  21. 获取 tasks
  22. :return:
  23. """
  24. sql = f"""
  25. SELECT content_id, article_title, article_text
  26. FROM {self.config.articleText}
  27. WHERE kimi_status = 0
  28. limit 5;
  29. """
  30. content_list = await self.mysql_client.asyncSelect(sql)
  31. if content_list:
  32. task_list = [
  33. {
  34. "contentId": i[0],
  35. "articleTitle": i[1],
  36. "articleText": i[2]
  37. } for i in content_list
  38. ]
  39. return task_list
  40. else:
  41. return []
  42. async def processTask(self, params):
  43. """
  44. do something
  45. :return:
  46. """
  47. K = KimiServer()
  48. try:
  49. kimi_info = await K.search_kimi_schedule(params=params)
  50. kimi_title = kimi_info['k_title']
  51. content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
  52. content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
  53. update_kimi_sql = f"""
  54. UPDATE {self.config.articleText}
  55. SET
  56. kimi_title = %s,
  57. kimi_summary = %s,
  58. kimi_keys = %s,
  59. kimi_status = %s
  60. WHERE content_id = %s;
  61. """
  62. await self.mysql_client.asyncInsert(
  63. sql=update_kimi_sql,
  64. params=(kimi_title, content_title, content_keys, 1, params['contentId'])
  65. )
  66. except Exception as e:
  67. update_kimi_sql = f"""
  68. UPDATE {self.config.articleText}
  69. SET
  70. kimi_status = %s
  71. WHERE content_id = %s
  72. """
  73. await self.mysql_client.asyncInsert(
  74. sql=update_kimi_sql,
  75. params=(2, params['contentId'])
  76. )
  77. print("kimi error--{}".format(e))
  78. async def deal(self):
  79. """
  80. deal function
  81. :return:
  82. """
  83. task_list = await self.getTasks()
  84. logging(
  85. code="5003",
  86. info="KIMI Task Got {} this time".format(len(task_list)),
  87. function="Kimi Task"
  88. )
  89. if task_list:
  90. tasks = [self.processTask(params) for params in task_list]
  91. await asyncio.gather(*tasks)
  92. else:
  93. print("没有要处理的 kimi 文章")