async_etl.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import oss2
  6. import aiohttp
  7. import aiofiles
  8. from hashlib import md5
  9. from uuid import uuid4
  10. import requests
  11. from fake_useragent import FakeUserAgent
  12. async def upload_to_oss(local_video_path):
  13. """
  14. 把视频上传到 oss
  15. :return:
  16. """
  17. oss_video_key = str(uuid4())
  18. access_key_id = "LTAIP6x1l3DXfSxm"
  19. access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
  20. endpoint = "oss-cn-hangzhou.aliyuncs.com"
  21. bucket_name = "art-pubbucket"
  22. bucket = oss2.Bucket(
  23. oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
  24. )
  25. bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)
  26. return oss_video_key
  27. class AsyncETL(object):
  28. """
  29. 视频下载功能
  30. """
  31. def __init__(self, video_obj):
  32. self.platform = video_obj["platform"]
  33. self.video_id = video_obj["video_id"]
  34. self.video_url = video_obj["video_url"]
  35. self.uid = video_obj["user_id"]
  36. self.title = video_obj["video_title"]
  37. self.cover_url = video_obj["cover_url"]
  38. self.proxy = {
  39. "http://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
  40. "https://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
  41. }
  42. self.max_retry = 5
  43. def request_header(self):
  44. """
  45. 请求头
  46. :return:
  47. """
  48. if self.platform == "xg_search":
  49. if "v9-xg-web-pc.ixigua.com" in self.video_url:
  50. headers = {
  51. "Accept": "*/*",
  52. "Accept-Language": "zh-CN,zh;q=0.9",
  53. "Host": "v9-xg-web-pc.ixigua.com",
  54. "User-Agent": FakeUserAgent().chrome,
  55. "Origin": "https://www.ixigua.com/",
  56. "Referer": "https://www.ixigua.com/"
  57. }
  58. elif "v3-xg-web-pc.ixigua.com" in self.video_url:
  59. headers = {
  60. "Accept": "*/*",
  61. "Accept-Language": "zh-CN,zh;q=0.9",
  62. "Host": "v3-xg-web-pc.ixigua.com",
  63. "User-Agent": FakeUserAgent().chrome,
  64. "Origin": "https://www.ixigua.com/",
  65. "Referer": "https://www.ixigua.com/"
  66. }
  67. else:
  68. headers = {
  69. "Accept": "*/*",
  70. "Accept-Language": "zh-CN,zh;q=0.9",
  71. "Host": "v3-xg-web-pc.ixigua.com",
  72. "User-Agent": FakeUserAgent().chrome,
  73. "Origin": "https://www.ixigua.com/",
  74. "Referer": "https://www.ixigua.com/"
  75. }
  76. elif self.platform == "baidu_search":
  77. headers = {
  78. "Accept": "*/*",
  79. "Accept-Language": "zh-CN,zh;q=0.9",
  80. "User-Agent": FakeUserAgent().chrome,
  81. }
  82. elif self.platform == "wx_search":
  83. headers = {
  84. "Accept": "*/*",
  85. "Accept-Language": "zh-CN,zh;q=0.9",
  86. "User-Agent": FakeUserAgent().chrome,
  87. "Origin": "https://mp.weixin.qq.com",
  88. "Referer": "https://mp.weixin.qq.com"
  89. }
  90. else:
  91. headers = {}
  92. return headers
  93. def generate_video_path(self):
  94. """
  95. 通过视频信息生成唯一视频地址
  96. :return:
  97. """
  98. index = "{}-{}".format(self.platform, self.video_id)
  99. index = md5(index.encode()).hexdigest()
  100. file_name = "{}.mp4".format(index)
  101. cover_name = "{}.png".format(index)
  102. file_path = os.path.join(os.getcwd(), "videos", file_name)
  103. cover_path = os.path.join(os.getcwd(), "videos", cover_name)
  104. return file_path, cover_path
  105. async def publish_by__request(self, video_path, cover):
  106. """
  107. 发布
  108. :return:
  109. """
  110. url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send"
  111. headers = {
  112. "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0",
  113. "cookie": "JSESSIONID=4DEA2B5173BB9A9E82DB772C0ACDBC9F; JSESSIONID=D02C334150025222A0B824A98B539B78",
  114. "referer": "http://appspeed.piaoquantv.com",
  115. "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
  116. "accept-language": "zh-CN,zh-Hans;q=0.9",
  117. "Content-Type": "application/x-www-form-urlencoded",
  118. }
  119. payload = {
  120. "coverImgPath": cover,
  121. "deviceToken": "9ef064f2f7869b3fd67d6141f8a899175dddc91240971172f1f2a662ef891408",
  122. "fileExtensions": "MP4",
  123. "loginUid": self.uid,
  124. "networkType": "Wi-Fi",
  125. "platform": "iOS",
  126. "requestId": "fb972cbd4f390afcfd3da1869cd7d001",
  127. "sessionId": "362290597725ce1fa870d7be4f46dcc2",
  128. "subSessionId": "362290597725ce1fa870d7be4f46dcc2",
  129. "title": self.title,
  130. "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
  131. "uid": self.uid,
  132. "versionCode": "486",
  133. "versionName": "3.4.12",
  134. "videoFromScene": "1",
  135. "videoPath": video_path,
  136. "viewStatus": "1",
  137. }
  138. response = requests.post(
  139. url=url,
  140. headers=headers,
  141. data=payload,
  142. )
  143. return response.json()
  144. async def download(self, file_path):
  145. """
  146. :param file_path:
  147. :return:
  148. """
  149. headers = self.request_header()
  150. if os.path.exists(file_path):
  151. file_size = os.path.getsize(file_path)
  152. headers["Range"] = f"bytes={file_size}-"
  153. else:
  154. file_size = 0
  155. async with aiohttp.ClientSession() as session:
  156. async with session.get(self.video_url, headers=headers) as response:
  157. if response.status in [200, 206]:
  158. mode = "ab+" if file_size > 0 else "wb"
  159. f = await aiofiles.open(file_path, mode)
  160. await f.write(await response.read())
  161. await f.close()
  162. else:
  163. print(response.status)
  164. return file_path
  165. async def download_cover(self, file_path):
  166. """
  167. 下载视频封面
  168. :param file_path:
  169. :return:
  170. """
  171. headers = self.request_header()
  172. response = requests.get(url=self.cover_url, headers=headers)
  173. with open(file_path, "wb") as f:
  174. f.write(response.content)
  175. return file_path
  176. async def etl_deal(self):
  177. """
  178. ETL Deal Task
  179. :return:
  180. """
  181. local_video_path, local_cover_path = self.generate_video_path()
  182. # download videos
  183. file_path = await self.download(local_video_path)
  184. # download cover
  185. cover_path = await self.download_cover(local_cover_path)
  186. # upload to oss
  187. oss_video = await upload_to_oss(
  188. local_video_path=file_path,
  189. )
  190. oss_cover = await upload_to_oss(
  191. local_video_path=cover_path
  192. )
  193. # publish to pq
  194. result = await self.publish_by__request(
  195. video_path=oss_video,
  196. cover=oss_cover
  197. )
  198. return result["data"]["id"]