task3.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. """
  2. @author: luojunhui
  3. """
  4. import asyncio
  5. from static.config import db_article, db_video, mysql_coroutines
  6. from applications.functions.log import logging
  7. from applications.functions.pqFunctions import *
  8. from applications.functions.apollo import Config
  9. class MatchTask3(object):
  10. """
  11. 处理已经匹配过小程序的文章
  12. """
  13. def __init__(self, pq_client, denet_client, long_article_client):
  14. """
  15. 初始化HistoryArticleMySQLClient
  16. """
  17. self.config = Config(env="prod")
  18. self.pq_client = pq_client
  19. self.denet_client = denet_client
  20. self.long_article_client = long_article_client
  21. self.account_negative_category = json.loads(self.config.get_config_value("account_negative_category"))
  22. async def getTaskList(self):
  23. """
  24. 获取任务
  25. :return:
  26. """
  27. select_sql1 = f"""
  28. SELECT
  29. ART.trace_id,
  30. ART.content_id,
  31. ART.gh_id,
  32. ART.article_title,
  33. ART.article_text,
  34. ART.content_status,
  35. ART.process_times
  36. FROM {db_article} ART
  37. JOIN (
  38. select content_id, count(1) as cnt
  39. from {db_video}
  40. where oss_status = 1
  41. group by content_id
  42. ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
  43. WHERE ART.content_status = 0 and ART.process_times <= 3
  44. ORDER BY request_time_stamp
  45. LIMIT {mysql_coroutines};
  46. """
  47. tasks = await self.pq_client.async_select(sql=select_sql1)
  48. task_obj_list = [
  49. {
  50. "trace_id": item[0],
  51. "content_id": item[1],
  52. "gh_id": item[2],
  53. "title": item[3],
  54. "text": item[4],
  55. "content_status": item[5],
  56. "process_times": item[6]
  57. } for item in tasks
  58. ]
  59. logging(
  60. code="9001",
  61. function="task3.get_task",
  62. info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
  63. data=[x['content_id'] for x in task_obj_list]
  64. )
  65. return task_obj_list
  66. async def getHistoryVideoOssPath(self, content_id):
  67. """
  68. check whether the contents videos exists
  69. :param content_id:
  70. :return:
  71. """
  72. select_sql = f"""
  73. SELECT video_title, uid, video_path, cover_path
  74. FROM {db_video}
  75. where content_id = '{content_id}' and oss_status = 1 order by request_time DESC;
  76. """
  77. content_videos = await self.pq_client.async_select(select_sql)
  78. video_list = [
  79. {
  80. "title": line[0],
  81. "uid": line[1],
  82. "videoPath": line[2],
  83. "coverPath": line[3]
  84. }
  85. for line in content_videos
  86. ]
  87. if len(video_list) >= 3:
  88. return video_list
  89. else:
  90. return None
  91. async def useExistOssPath(self, video_info_list, params):
  92. """
  93. 使用已经存在的视频id
  94. :return:
  95. """
  96. trace_id = params['trace_id']
  97. content_id = params['content_id']
  98. select_sql = f"""
  99. SELECT kimi_title
  100. FROM {db_article}
  101. WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
  102. """
  103. info = await self.pq_client.async_select(sql=select_sql)
  104. kimi_title = info[0]
  105. video_id_list = await getNewVideoIds(video_info_list)
  106. vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
  107. update_sql = f"""
  108. UPDATE {db_article}
  109. SET
  110. kimi_title=%s,
  111. recall_video_id1=%s,
  112. recall_video_id2=%s,
  113. recall_video_id3=%s,
  114. content_status=%s,
  115. process_times = %s
  116. WHERE trace_id = %s
  117. """
  118. await self.pq_client.async_insert(
  119. sql=update_sql,
  120. params=(
  121. kimi_title,
  122. vid1,
  123. vid2,
  124. vid3,
  125. 2,
  126. int(params['process_times']) + 1,
  127. trace_id
  128. )
  129. )
  130. logging(
  131. code="9002",
  132. info="已从历史文章更新,文章id: {}".format(content_id),
  133. trace_id=trace_id
  134. )
  135. async def get_content_pool_level(self, content_id) -> str:
  136. """
  137. 获取文章的内容池等级
  138. :param content_id:
  139. """
  140. select_sql = f"""
  141. SELECT produce_plan.plan_tag
  142. FROM produce_plan
  143. JOIN produce_plan_exe_record
  144. ON produce_plan.id = produce_plan_exe_record.plan_id
  145. WHERE produce_plan_exe_record.plan_exe_id = '{content_id}';
  146. """
  147. result = await self.denet_client.async_select(sql=select_sql)
  148. if result:
  149. return result[0][0]
  150. else:
  151. logging(
  152. code="5858",
  153. function="task3.get_content_pool_level",
  154. info="没有找到该文章的内容池等级",
  155. data={'content_id': content_id}
  156. )
  157. return "ERROR"
  158. async def check_title_category(self, content_id, gh_id, trace_id) -> bool:
  159. """
  160. 判断该文章的品类是否属于该账号的品类
  161. :param trace_id:
  162. :param content_id:
  163. :param gh_id:
  164. :return:
  165. """
  166. bad_category_list = self.account_negative_category.get(gh_id, [])
  167. logging(
  168. code="history1101",
  169. info="该账号的 negative 类型列表",
  170. trace_id=trace_id,
  171. data=bad_category_list
  172. )
  173. if bad_category_list:
  174. sql = f"""
  175. SELECT category
  176. FROM article_category
  177. WHERE produce_content_id = '{content_id}';
  178. """
  179. result = await self.long_article_client.async_select(sql)
  180. if result:
  181. category = result[0][0]
  182. logging(
  183. code="history1102",
  184. info="文章的品类-{}".format(category),
  185. trace_id=trace_id
  186. )
  187. if category in bad_category_list:
  188. return True
  189. return False
  190. async def processTask(self, params):
  191. """
  192. 异步执行
  193. :param params:
  194. :return:
  195. """
  196. MISMATCH_STATUS = 95
  197. TASK_INIT_STATUS = 0
  198. content_id = params['content_id']
  199. trace_id = params['trace_id']
  200. gh_id = params['gh_id']
  201. flow_pool_level = await self.get_content_pool_level(content_id)
  202. flow_pool_level_list = flow_pool_level.split("/")
  203. if "autoArticlePoolLevel4" in flow_pool_level_list:
  204. # 判断文章的品类是否属于该账号的 negative 类型
  205. negative_category_status = await self.check_title_category(
  206. content_id=content_id,
  207. gh_id=gh_id,
  208. trace_id=trace_id
  209. )
  210. if negative_category_status:
  211. # 修改状态为品类不匹配状态
  212. logging(
  213. code="history1002",
  214. info="文章属于该账号的 negative 类型",
  215. trace_id=trace_id
  216. )
  217. update_sql = f"""
  218. UPDATE {db_article}
  219. SET content_status = %s
  220. WHERE trace_id = %s and content_status = %s;
  221. """
  222. affected_rows = await self.pq_client.async_insert(
  223. sql=update_sql,
  224. params=(
  225. MISMATCH_STATUS,
  226. trace_id,
  227. TASK_INIT_STATUS
  228. )
  229. )
  230. logging(
  231. code="history1003",
  232. info="已经修改该文章状态为 品类不匹配状态",
  233. trace_id=trace_id
  234. )
  235. if affected_rows == 0:
  236. print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
  237. return
  238. # 处理完成之后,直接return
  239. return
  240. # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
  241. oss_path_list = await self.getHistoryVideoOssPath(content_id=content_id)
  242. if oss_path_list:
  243. # 说明已经存在了结果, 将该条记录下的video_oss拿出来
  244. logging(
  245. code="9001",
  246. info="存在历史文章",
  247. trace_id=trace_id
  248. )
  249. try:
  250. await self.useExistOssPath(video_info_list=oss_path_list, params=params)
  251. except Exception as e:
  252. print(e)
  253. else:
  254. pass
  255. async def deal(self):
  256. """
  257. 处理
  258. :return:
  259. """
  260. task_list = await self.getTaskList()
  261. if task_list:
  262. tasks = [self.processTask(params) for params in task_list]
  263. await asyncio.gather(*tasks)
  264. else:
  265. logging(
  266. code="9008",
  267. info="没有要处理的请求"
  268. )