process_deal.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. """
  2. @author: luojunhui
  3. """
  4. import asyncio
  5. from applications.static.config import db_article
  6. from applications.schedule import search_videos
  7. from applications.functions.log import logging
  8. class ProcessDeal(object):
  9. """
  10. 定时执行任务
  11. """
  12. def __init__(self, mysql_client):
  13. """
  14. :param mysql_client:
  15. """
  16. self.mysql_client = mysql_client
  17. async def get_task(self):
  18. """
  19. 获取任务
  20. :return:
  21. """
  22. select_sql1 = f"""
  23. SELECT DISTINCT (content_id)
  24. FROM {db_article}
  25. WHERE content_status = 0 and process_times <= 5
  26. ORDER BY request_time_stamp
  27. ASC
  28. LIMIT 4;
  29. """
  30. print(select_sql1)
  31. content_ids = await self.mysql_client.async_select(select_sql1)
  32. cil = []
  33. for content_id in content_ids:
  34. cil.append(content_id[0])
  35. content_ids_tuple = str(cil).replace("[", "(").replace("]", ")")
  36. if len(content_ids_tuple) > 3:
  37. select_sql = f"""
  38. SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
  39. FROM {db_article}
  40. WHERE content_id in {content_ids_tuple}
  41. ORDER BY request_time_stamp
  42. ASC
  43. """
  44. print(select_sql)
  45. task_list = await self.mysql_client.async_select(sql=select_sql)
  46. task_obj_list = [
  47. {
  48. "trace_id": item[0],
  49. "content_id": item[1],
  50. "gh_id": item[2],
  51. "title": item[3],
  52. "text": item[4],
  53. "content_status": item[5],
  54. "process_times": item[6]
  55. } for item in task_list
  56. ]
  57. logging(
  58. code="9001",
  59. info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
  60. data=task_obj_list
  61. )
  62. return task_obj_list
  63. else:
  64. return []
  65. async def get_history_contents(self, content_id):
  66. """
  67. check whether the content id exists
  68. :return: trace_id or None
  69. """
  70. select_sql = f"""
  71. SELECT trace_id, content_status
  72. FROM {db_article}
  73. WHERE content_id = '{content_id}'
  74. ORDER BY id DESC;
  75. """
  76. result = await self.mysql_client.async_select(select_sql)
  77. if result:
  78. for item in result:
  79. trace_id, content_status = item
  80. if content_status == 2:
  81. return trace_id
  82. else:
  83. continue
  84. return None
  85. else:
  86. return None
  87. async def judge_content_processing(self, content_id):
  88. """
  89. 判断该content_id是否在处理中
  90. :param content_id:
  91. :return:
  92. """
  93. select_sql = f"""
  94. SELECT trace_id, content_status
  95. FROM {db_article}
  96. WHERE content_id = '{content_id}'
  97. ORDER BY id DESC;
  98. """
  99. result = await self.mysql_client.async_select(select_sql)
  100. if result:
  101. for item in result:
  102. trace_id, content_status = item
  103. if content_status == 1:
  104. return False
  105. return True
  106. else:
  107. return True
  108. async def insert_history_contents_videos(self, history_trace_id, params):
  109. """
  110. 插入历史视频id
  111. :return:
  112. """
  113. select_sql = f"""
  114. SELECT kimi_title, recall_video_id1, recall_video_id2, recall_video_id3
  115. FROM {db_article}
  116. WHERE trace_id = '{history_trace_id}';
  117. """
  118. info = await self.mysql_client.async_select(sql=select_sql)
  119. kimi_title, vid1, vid2, vid3 = info[0]
  120. update_sql = f"""
  121. UPDATE {db_article}
  122. SET
  123. kimi_title='{kimi_title}',
  124. recall_video_id1={vid1},
  125. recall_video_id2={"NULL" if vid2 is None else vid2},
  126. recall_video_id3={"NULL" if vid3 is None else vid3},
  127. content_status=2,
  128. process_times = {int(params['process_times']) + 1}
  129. WHERE trace_id = '{params['trace_id']}'
  130. """
  131. await self.mysql_client.async_insert(update_sql)
  132. logging(
  133. code="9002",
  134. info="已从历史文章更新,历史id: {}".format(history_trace_id),
  135. trace_id=params['trace_id']
  136. )
  137. async def process_video_id(self, title, trace_id, process_times):
  138. """
  139. 如果video_id在标题中,则做特殊处理
  140. :return:
  141. """
  142. video_id = title.split("video_id=")[-1]
  143. update_sql = f"""
  144. UPDATE
  145. {db_article}
  146. SET
  147. recall_video_id1 = '{video_id}',
  148. content_status = 2,
  149. process_times = {int(process_times) + 1}
  150. WHERE
  151. trace_id = '{trace_id}';"""
  152. await self.mysql_client.async_insert(update_sql)
  153. async def start_process(self, params):
  154. """
  155. 开始处理
  156. :param params:
  157. :return:
  158. """
  159. # 更新文章contentId为1, 说明该文章正在处理中
  160. update_sql = f"""
  161. UPDATE {db_article}
  162. SET
  163. content_status = 1
  164. WHERE
  165. trace_id = '{params["trace_id"]}'
  166. """
  167. await self.mysql_client.async_insert(sql=update_sql)
  168. try:
  169. # 判断标题中是否包含video_id
  170. if "video_id=" in params['title']:
  171. logging(
  172. code="9006",
  173. info="视频生成文本测试",
  174. trace_id=params['trace_id']
  175. )
  176. await self.process_video_id(
  177. title=params['title'],
  178. trace_id=params['trace_id'],
  179. process_times=params['process_times']
  180. )
  181. else:
  182. await search_videos(
  183. params={"title": params['title'], "content": params['text'], "trace_id": params['trace_id']},
  184. trace_id=params['trace_id'],
  185. gh_id=params['gh_id'],
  186. mysql_client=self.mysql_client
  187. )
  188. # 执行完成之后,判断是否存在视频id
  189. select_sql = f"""
  190. SELECT recall_video_id1, recall_video_id2, recall_video_id3
  191. FROM {db_article}
  192. WHERE trace_id = '{params["trace_id"]}';
  193. """
  194. result = await self.mysql_client.async_select(sql=select_sql)
  195. vid1, vid2, vid3 = result[0]
  196. if vid1:
  197. update_sql2 = f"""
  198. UPDATE {db_article}
  199. SET
  200. content_status = 2,
  201. process_times = {int(params['process_times']) + 1}
  202. WHERE trace_id = '{params["trace_id"]}';
  203. """
  204. await self.mysql_client.async_insert(sql=update_sql2)
  205. logging(
  206. code="9008",
  207. info="视频搜索成功, 状态修改为2",
  208. trace_id=params['trace_id']
  209. )
  210. else:
  211. update_sql3 = f"""
  212. UPDATE {db_article}
  213. SET
  214. content_status = 0,
  215. process_times = {int(params['process_times']) + 1}
  216. WHERE trace_id = '{params["trace_id"]}';
  217. """
  218. await self.mysql_client.async_insert(sql=update_sql3)
  219. logging(
  220. code="9018",
  221. info="视频搜索失败,回退状态为0",
  222. trace_id=params['trace_id']
  223. )
  224. except Exception as e:
  225. logging(
  226. code="9018",
  227. info="{}异常错误:{}, 回退状态为0".format(params['trace_id'], e),
  228. trace_id=params['trace_id']
  229. )
  230. update_sql4 = f"""
  231. UPDATE {db_article}
  232. SET
  233. content_status = 0,
  234. process_times = {int(params['process_times']) + 1}
  235. WHERE trace_id = '{params["trace_id"]}';
  236. """
  237. await self.mysql_client.async_insert(sql=update_sql4)
  238. async def process_task(self, params):
  239. """
  240. 异步执行
  241. :param params:
  242. :return:
  243. """
  244. content_id = params['content_id']
  245. trace_id = params['trace_id']
  246. # 判断该文章是否已经生成了
  247. history_trace_id = await self.get_history_contents(content_id)
  248. if history_trace_id:
  249. # 说明已经存在了结果, 将该条记录下的video_id拿出来
  250. logging(
  251. code="9001",
  252. info="存在历史文章",
  253. trace_id=trace_id
  254. )
  255. await self.insert_history_contents_videos(history_trace_id, params)
  256. else:
  257. flag = await self.judge_content_processing(content_id)
  258. if flag:
  259. logging(
  260. code="9004",
  261. info="无正在处理的文章ID, 开始处理",
  262. trace_id=trace_id
  263. )
  264. await self.start_process(params=params)
  265. else:
  266. logging(
  267. code="9003",
  268. info="该文章ID正在请求--文章ID {}".format(content_id),
  269. trace_id=trace_id
  270. )
  271. async def deal(self):
  272. """
  273. 处理
  274. :return:
  275. """
  276. task_list = await self.get_task()
  277. task_dict = {}
  278. for task in task_list:
  279. key = task['content_id']
  280. task_dict[key] = task
  281. process_list = []
  282. for item in task_dict:
  283. process_list.append(task_dict[item])
  284. if process_list:
  285. # for task in task_list:
  286. # await self.process_task(task)
  287. tasks = [self.process_task(params) for params in process_list]
  288. await asyncio.gather(*tasks)
  289. else:
  290. logging(
  291. code="9008",
  292. info="没有要处理的请求"
  293. )