async_etl.py 7.3 KB


  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import time
  6. import oss2
  7. import json
  8. import aiohttp
  9. import aiofiles
  10. from hashlib import md5
  11. from uuid import uuid4
  12. import requests
  13. from fake_useragent import FakeUserAgent
  14. async def upload_to_oss(local_video_path):
  15. """
  16. 把视频上传到 oss
  17. :return:
  18. """
  19. oss_video_key = str(uuid4())
  20. access_key_id = "LTAIP6x1l3DXfSxm"
  21. access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
  22. endpoint = "oss-cn-hangzhou.aliyuncs.com"
  23. bucket_name = "art-pubbucket"
  24. bucket = oss2.Bucket(
  25. oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
  26. )
  27. bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)
  28. return oss_video_key
  29. class AsyncETL(object):
  30. """
  31. 视频下载功能
  32. """
  33. def __init__(self, video_obj):
  34. self.platform = video_obj["platform"]
  35. self.video_id = video_obj["video_id"]
  36. self.video_url = video_obj["video_url"]
  37. self.uid = video_obj["user_id"]
  38. self.title = video_obj["video_title"]
  39. self.cover_url = video_obj["cover_url"]
  40. self.proxy = {
  41. "http://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
  42. "https://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
  43. }
  44. self.max_retry = 5
  45. def request_header(self):
  46. """
  47. 请求头
  48. :return:
  49. """
  50. if self.platform == "xg_search":
  51. if "v9-xg-web-pc.ixigua.com" in self.video_url:
  52. headers = {
  53. "Accept": "*/*",
  54. "Accept-Language": "zh-CN,zh;q=0.9",
  55. "Host": "v9-xg-web-pc.ixigua.com",
  56. "User-Agent": FakeUserAgent().chrome,
  57. "Origin": "https://www.ixigua.com/",
  58. "Referer": "https://www.ixigua.com/"
  59. }
  60. elif "v3-xg-web-pc.ixigua.com" in self.video_url:
  61. headers = {
  62. "Accept": "*/*",
  63. "Accept-Language": "zh-CN,zh;q=0.9",
  64. "Host": "v3-xg-web-pc.ixigua.com",
  65. "User-Agent": FakeUserAgent().chrome,
  66. "Origin": "https://www.ixigua.com/",
  67. "Referer": "https://www.ixigua.com/"
  68. }
  69. else:
  70. headers = {
  71. "Accept": "*/*",
  72. "Accept-Language": "zh-CN,zh;q=0.9",
  73. "Host": "v3-xg-web-pc.ixigua.com",
  74. "User-Agent": FakeUserAgent().chrome,
  75. "Origin": "https://www.ixigua.com/",
  76. "Referer": "https://www.ixigua.com/"
  77. }
  78. elif self.platform == "baidu_search":
  79. headers = {
  80. "Accept": "*/*",
  81. "Accept-Language": "zh-CN,zh;q=0.9",
  82. "User-Agent": FakeUserAgent().chrome,
  83. }
  84. elif self.platform == "wx_search":
  85. headers = {
  86. "Accept": "*/*",
  87. "Accept-Language": "zh-CN,zh;q=0.9",
  88. "User-Agent": FakeUserAgent().chrome,
  89. "Origin": "https://mp.weixin.qq.com",
  90. "Referer": "https://mp.weixin.qq.com"
  91. }
  92. else:
  93. headers = {}
  94. return headers
  95. def generate_video_path(self):
  96. """
  97. 通过视频信息生成唯一视频地址
  98. :return:
  99. """
  100. index = "{}-{}".format(self.platform, self.video_id)
  101. index = md5(index.encode()).hexdigest()
  102. file_name = "{}.mp4".format(index)
  103. cover_name = "{}.png".format(index)
  104. file_path = os.path.join(os.getcwd(), "videos", file_name)
  105. cover_path = os.path.join(os.getcwd(), "videos", cover_name)
  106. return file_path, cover_path
  107. async def publish_by__request(self, video_path, cover):
  108. """
  109. 发布
  110. :return:
  111. """
  112. url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send"
  113. headers = {
  114. "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0",
  115. "cookie": "JSESSIONID=4DEA2B5173BB9A9E82DB772C0ACDBC9F; JSESSIONID=D02C334150025222A0B824A98B539B78",
  116. "referer": "http://appspeed.piaoquantv.com",
  117. "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
  118. "accept-language": "zh-CN,zh-Hans;q=0.9",
  119. "Content-Type": "application/x-www-form-urlencoded",
  120. }
  121. payload = {
  122. "coverImgPath": cover,
  123. "deviceToken": "9ef064f2f7869b3fd67d6141f8a899175dddc91240971172f1f2a662ef891408",
  124. "fileExtensions": "MP4",
  125. "loginUid": self.uid,
  126. "networkType": "Wi-Fi",
  127. "platform": "iOS",
  128. "requestId": "fb972cbd4f390afcfd3da1869cd7d001",
  129. "sessionId": "362290597725ce1fa870d7be4f46dcc2",
  130. "subSessionId": "362290597725ce1fa870d7be4f46dcc2",
  131. "title": self.title,
  132. "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
  133. "uid": self.uid,
  134. "versionCode": "486",
  135. "versionName": "3.4.12",
  136. "videoFromScene": "1",
  137. "videoPath": video_path,
  138. "viewStatus": "1",
  139. }
  140. response = requests.post(
  141. url=url,
  142. headers=headers,
  143. data=payload,
  144. )
  145. return response.json()
  146. async def download(self, file_path):
  147. """
  148. :param file_path:
  149. :return:
  150. """
  151. headers = self.request_header()
  152. if os.path.exists(file_path):
  153. file_size = os.path.getsize(file_path)
  154. headers["Range"] = f"bytes={file_size}-"
  155. else:
  156. file_size = 0
  157. async with aiohttp.ClientSession() as session:
  158. async with session.get(self.video_url, headers=headers) as response:
  159. if response.status in [200, 206]:
  160. mode = "ab+" if file_size > 0 else "wb"
  161. f = await aiofiles.open(file_path, mode)
  162. await f.write(await response.read())
  163. await f.close()
  164. else:
  165. print(response.status)
  166. return file_path
  167. async def download_cover(self, file_path):
  168. """
  169. 下载视频封面
  170. :param file_path:
  171. :return:
  172. """
  173. headers = self.request_header()
  174. response = requests.get(url=self.cover_url, headers=headers)
  175. with open(file_path, "wb") as f:
  176. f.write(response.content)
  177. return file_path
  178. async def etl_deal(self):
  179. """
  180. ETL Deal Task
  181. :return:
  182. """
  183. local_video_path, local_cover_path = self.generate_video_path()
  184. # download videos
  185. file_path = await self.download(local_video_path)
  186. # download cover
  187. cover_path = await self.download_cover(local_cover_path)
  188. # upload to oss
  189. oss_video = await upload_to_oss(
  190. local_video_path=file_path,
  191. )
  192. oss_cover = await upload_to_oss(
  193. local_video_path=cover_path
  194. )
  195. # publish to pq
  196. result = await self.publish_by__request(
  197. video_path=oss_video,
  198. cover=oss_cover
  199. )
  200. print(json.dumps(result, ensure_ascii=False, indent=4))
  201. a = time.time()
  202. os.remove(file_path)
  203. os.remove(cover_path)
  204. b = time.time()
  205. print(b - a)
  206. return result["data"]["id"]