new_task.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. """
  2. @author: luojunhui
  3. """
  4. import asyncio
  5. import json
  6. from static.config import db_article, db_video
  7. from applications.functions.log import logging
  8. from static.config import mysql_coroutines
  9. from applications.functions.common import async_post
  10. async def publishToPQ(video_obj):
  11. """
  12. publish video to pq
  13. :return:
  14. """
  15. oss_path = video_obj['videoPath']
  16. uid = video_obj['uid']
  17. title = video_obj['title']
  18. cover = video_obj['coverPath']
  19. url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send"
  20. headers = {
  21. "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0",
  22. "cookie": "JSESSIONID=4DEA2B5173BB9A9E82DB772C0ACDBC9F; JSESSIONID=D02C334150025222A0B824A98B539B78",
  23. "referer": "http://appspeed.piaoquantv.com",
  24. "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
  25. "accept-language": "zh-CN,zh-Hans;q=0.9",
  26. "Content-Type": "application/x-www-form-urlencoded",
  27. }
  28. payload = {
  29. "coverImgPath": cover,
  30. "deviceToken": "9ef064f2f7869b3fd67d6141f8a899175dddc91240971172f1f2a662ef891408",
  31. "fileExtensions": "MP4",
  32. "loginUid": uid,
  33. "networkType": "Wi-Fi",
  34. "platform": "iOS",
  35. "requestId": "fb972cbd4f390afcfd3da1869cd7d001",
  36. "sessionId": "362290597725ce1fa870d7be4f46dcc2",
  37. "subSessionId": "362290597725ce1fa870d7be4f46dcc2",
  38. "title": title,
  39. "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
  40. "uid": uid,
  41. "versionCode": "486",
  42. "versionName": "3.4.12",
  43. "videoFromScene": "1",
  44. "videoPath": oss_path,
  45. "viewStatus": "1",
  46. }
  47. response = await async_post(url, headers, payload)
  48. return response
  49. async def getPQVideoDetail(video_id):
  50. """
  51. 获取票圈视频详情信息
  52. :return:
  53. """
  54. url = "https://longvideoapi.piaoquantv.com/longvideoapi/openapi/video/batchSelectVideoInfo"
  55. data = {
  56. "videoIdList": [video_id]
  57. }
  58. header = {
  59. "Content-Type": "application/json",
  60. }
  61. response = await async_post(url, header, json.dumps(data))
  62. return response
  63. async def getNewVideoIds(video_obj_list):
  64. """
  65. video
  66. :return:
  67. """
  68. vid_list = []
  69. for video_obj in video_obj_list:
  70. # video_obj 里面的信息对于历史数据可能不全,需要从pq获取
  71. try:
  72. if len(vid_list) >= 3:
  73. return vid_list
  74. else:
  75. pq_response = await publishToPQ(video_obj)
  76. video_id = pq_response['data']['id']
  77. vid_list.append(video_id)
  78. except:
  79. continue
  80. return vid_list
  81. class MatchTask3(object):
  82. """
  83. 处理已经匹配过小程序的文章
  84. """
  85. def __init__(self, mysql_client):
  86. """
  87. :param mysql_client:
  88. """
  89. self.mysql_client = mysql_client
  90. async def getTaskList(self):
  91. """
  92. 获取任务
  93. :return:
  94. """
  95. select_sql1 = f"""
  96. SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
  97. FROM {db_article}
  98. WHERE content_status = 0 and process_times <= 3
  99. ORDER BY request_time_stamp
  100. ASC
  101. LIMIT {mysql_coroutines};
  102. """
  103. tasks = await self.mysql_client.async_select(sql=select_sql1)
  104. task_obj_list = [
  105. {
  106. "trace_id": item[0],
  107. "content_id": item[1],
  108. "gh_id": item[2],
  109. "title": item[3],
  110. "text": item[4],
  111. "content_status": item[5],
  112. "process_times": item[6]
  113. } for item in tasks
  114. ]
  115. logging(
  116. code="9001",
  117. info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
  118. data=task_obj_list
  119. )
  120. return task_obj_list
  121. async def getHistoryVideoOssPath(self, content_id):
  122. """
  123. check whether the contents videos exists
  124. :param content_id:
  125. :return:
  126. """
  127. select_sql = f"""
  128. SELECT video_title, uid, video_path, cover_path
  129. FROM {db_video}
  130. where content_id = '{content_id}' and oss_status = 1 order by request_time DESC;
  131. """
  132. content_videos = await self.mysql_client.async_select(select_sql)
  133. video_list = [
  134. {
  135. "title": line[0],
  136. "uid": line[1],
  137. "videoPath": line[2],
  138. "coverPath": line[3]
  139. }
  140. for line in content_videos
  141. ]
  142. if len(video_list) >= 3:
  143. return video_list
  144. else:
  145. return None
  146. async def useExistOssPath(self, video_info_list, params):
  147. """
  148. 使用已经存在的视频id
  149. :return:
  150. """
  151. trace_id = params['trace_id']
  152. content_id = params['content_id']
  153. select_sql = f"""
  154. SELECT kimi_title
  155. FROM {db_article}
  156. WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
  157. """
  158. info = await self.mysql_client.async_select(sql=select_sql)
  159. kimi_title = info[0]
  160. video_id_list = await getNewVideoIds(video_info_list)
  161. vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
  162. update_sql = f"""
  163. UPDATE {db_article}
  164. SET
  165. kimi_title=%s,
  166. recall_video_id1=%s,
  167. recall_video_id2=%s,
  168. recall_video_id3=%s,
  169. content_status=%s,
  170. process_times = %s
  171. WHERE trace_id = %s
  172. """
  173. await self.mysql_client.async_insert(
  174. sql=update_sql,
  175. params=(
  176. kimi_title,
  177. vid1,
  178. vid2,
  179. vid3,
  180. 2,
  181. int(params['process_times']) + 1,
  182. trace_id
  183. )
  184. )
  185. logging(
  186. code="9002",
  187. info="已从历史文章更新,文章id: {}".format(content_id),
  188. trace_id=trace_id
  189. )
  190. async def processTask(self, params):
  191. """
  192. 异步执行
  193. :param params:
  194. :return:
  195. """
  196. content_id = params['content_id']
  197. trace_id = params['trace_id']
  198. # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
  199. oss_path_list = await self.getHistoryVideoOssPath(content_id=content_id)
  200. if oss_path_list:
  201. # 说明已经存在了结果, 将该条记录下的video_oss拿出来
  202. logging(
  203. code="9001",
  204. info="存在历史文章",
  205. trace_id=trace_id
  206. )
  207. await self.useExistOssPath(video_info_list=oss_path_list, params=params)
  208. else:
  209. pass
  210. async def deal(self):
  211. """
  212. 处理
  213. :return:
  214. """
  215. task_list = await self.getTaskList()
  216. task_dict = {}
  217. for task in task_list:
  218. key = task['content_id']
  219. task_dict[key] = task
  220. process_list = []
  221. for item in task_dict:
  222. process_list.append(task_dict[item])
  223. if process_list:
  224. tasks = [self.processTask(params) for params in process_list]
  225. await asyncio.gather(*tasks)
  226. else:
  227. logging(
  228. code="9008",
  229. info="没有要处理的请求"
  230. )