async_etl.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import oss2
  6. import aiohttp
  7. import aiofiles
  8. from hashlib import md5
  9. from uuid import uuid4
  10. import requests
  11. from fake_useragent import FakeUserAgent
  12. async def upload_to_oss(local_video_path):
  13. """
  14. 把视频上传到 oss
  15. :return:
  16. """
  17. oss_video_key = str(uuid4())
  18. access_key_id = "LTAIP6x1l3DXfSxm"
  19. access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
  20. endpoint = "oss-cn-hangzhou.aliyuncs.com"
  21. bucket_name = "art-pubbucket"
  22. bucket = oss2.Bucket(
  23. oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
  24. )
  25. bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)
  26. return oss_video_key
  27. class AsyncETL(object):
  28. """
  29. 视频下载功能
  30. """
  31. def __init__(self, video_obj):
  32. self.platform = video_obj["platform"]
  33. self.video_id = video_obj["video_id"]
  34. self.video_url = video_obj["video_url"]
  35. self.uid = video_obj["user_id"]
  36. self.title = video_obj["video_title"]
  37. self.cover_url = video_obj["cover_url"]
  38. self.proxy = {
  39. "http://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
  40. "https://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
  41. }
  42. self.max_retry = 5
  43. def request_header(self, type_="video"):
  44. """
  45. 请求头
  46. :return:
  47. """
  48. if self.platform == "xg_search":
  49. if "v9-xg-web-pc.ixigua.com" in self.video_url:
  50. headers = {
  51. "Accept": "*/*",
  52. "Accept-Language": "zh-CN,zh;q=0.9",
  53. "Host": "v9-xg-web-pc.ixigua.com",
  54. "User-Agent": FakeUserAgent().chrome,
  55. "Origin": "https://www.ixigua.com/",
  56. "Referer": "https://www.ixigua.com/"
  57. }
  58. elif "v3-xg-web-pc.ixigua.com" in self.video_url:
  59. headers = {
  60. "Accept": "*/*",
  61. "Accept-Language": "zh-CN,zh;q=0.9",
  62. "Host": "v3-xg-web-pc.ixigua.com",
  63. "User-Agent": FakeUserAgent().chrome,
  64. "Origin": "https://www.ixigua.com/",
  65. "Referer": "https://www.ixigua.com/"
  66. }
  67. elif type_ == "cover":
  68. headers = {
  69. 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
  70. 'Accept-Language': 'en,zh;q=0.9,zh-CN;q=0.8',
  71. 'Cache-Control': 'max-age=0',
  72. 'Proxy-Connection': 'keep-alive',
  73. 'Upgrade-Insecure-Requests': '1',
  74. 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'
  75. }
  76. else:
  77. headers = {
  78. "Accept": "*/*",
  79. "Accept-Language": "zh-CN,zh;q=0.9",
  80. "Host": "v3-xg-web-pc.ixigua.com",
  81. "User-Agent": FakeUserAgent().chrome,
  82. "Origin": "https://www.ixigua.com/",
  83. "Referer": "https://www.ixigua.com/"
  84. }
  85. elif self.platform == "baidu_search":
  86. headers = {
  87. "Accept": "*/*",
  88. "Accept-Language": "zh-CN,zh;q=0.9",
  89. "User-Agent": FakeUserAgent().chrome,
  90. }
  91. elif self.platform == "wx_search":
  92. headers = {
  93. "Accept": "*/*",
  94. "Accept-Language": "zh-CN,zh;q=0.9",
  95. "User-Agent": FakeUserAgent().chrome,
  96. "Origin": "https://mp.weixin.qq.com",
  97. "Referer": "https://mp.weixin.qq.com"
  98. }
  99. elif self.platform == "dy_search":
  100. headers = {
  101. 'accept': '*/*',
  102. 'accept-language': 'en,zh;q=0.9,zh-CN;q=0.8',
  103. 'priority': 'i',
  104. 'range': 'bytes=0-',
  105. 'referer': 'https://v11-coldf.douyinvod.com/',
  106. 'user-agent': FakeUserAgent().chrome
  107. }
  108. else:
  109. headers = {}
  110. return headers
  111. def generate_video_path(self):
  112. """
  113. 通过视频信息生成唯一视频地址
  114. :return:
  115. """
  116. index = "{}-{}".format(self.platform, self.video_id)
  117. index = md5(index.encode()).hexdigest()
  118. file_name = "{}.mp4".format(index)
  119. cover_name = "{}.png".format(index)
  120. file_path = os.path.join(os.getcwd(), "videos", file_name)
  121. cover_path = os.path.join(os.getcwd(), "videos", cover_name)
  122. return file_path, cover_path
  123. async def publish_by__request(self, video_path, cover):
  124. """
  125. 发布
  126. :return:
  127. """
  128. url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send"
  129. headers = {
  130. "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0",
  131. "cookie": "JSESSIONID=4DEA2B5173BB9A9E82DB772C0ACDBC9F; JSESSIONID=D02C334150025222A0B824A98B539B78",
  132. "referer": "http://appspeed.piaoquantv.com",
  133. "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
  134. "accept-language": "zh-CN,zh-Hans;q=0.9",
  135. "Content-Type": "application/x-www-form-urlencoded",
  136. }
  137. payload = {
  138. "coverImgPath": cover,
  139. "deviceToken": "9ef064f2f7869b3fd67d6141f8a899175dddc91240971172f1f2a662ef891408",
  140. "fileExtensions": "MP4",
  141. "loginUid": self.uid,
  142. "networkType": "Wi-Fi",
  143. "platform": "iOS",
  144. "requestId": "fb972cbd4f390afcfd3da1869cd7d001",
  145. "sessionId": "362290597725ce1fa870d7be4f46dcc2",
  146. "subSessionId": "362290597725ce1fa870d7be4f46dcc2",
  147. "title": self.title,
  148. "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
  149. "uid": self.uid,
  150. "versionCode": "486",
  151. "versionName": "3.4.12",
  152. "videoFromScene": "1",
  153. "videoPath": video_path,
  154. "viewStatus": "1",
  155. }
  156. response = requests.post(
  157. url=url,
  158. headers=headers,
  159. data=payload,
  160. )
  161. return response.json()
  162. async def download(self, file_path):
  163. """
  164. :param file_path:
  165. :return:
  166. """
  167. headers = self.request_header()
  168. if os.path.exists(file_path):
  169. file_size = os.path.getsize(file_path)
  170. headers["Range"] = f"bytes={file_size}-"
  171. else:
  172. file_size = 0
  173. async with aiohttp.ClientSession() as session:
  174. async with session.get(self.video_url, headers=headers) as response:
  175. if response.status in [200, 206]:
  176. mode = "ab+" if file_size > 0 else "wb"
  177. f = await aiofiles.open(file_path, mode)
  178. await f.write(await response.read())
  179. await f.close()
  180. else:
  181. print(response.status)
  182. return file_path
  183. async def download_cover(self, file_path):
  184. """
  185. 下载视频封面
  186. :param file_path:
  187. :return:
  188. """
  189. headers = self.request_header(type_="cover")
  190. response = requests.get(url=self.cover_url, headers=headers)
  191. with open(file_path, "wb") as f:
  192. f.write(response.content)
  193. return file_path
  194. async def etl_deal(self):
  195. """
  196. ETL Deal Task
  197. :return:
  198. """
  199. local_video_path, local_cover_path = self.generate_video_path()
  200. # download videos
  201. file_path = await self.download(local_video_path)
  202. # download cover
  203. cover_path = await self.download_cover(local_cover_path)
  204. # upload to oss
  205. oss_video = await upload_to_oss(
  206. local_video_path=file_path,
  207. )
  208. oss_cover = await upload_to_oss(
  209. local_video_path=cover_path
  210. )
  211. # publish to pq
  212. result = await self.publish_by__request(
  213. video_path=oss_video,
  214. cover=oss_cover
  215. )
  216. return result["data"]["id"]