async_etl.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import os
  6. import oss2
  7. import aiohttp
  8. import aiofiles
  9. from hashlib import md5
  10. from uuid import uuid4
  11. import requests
  12. from fake_useragent import FakeUserAgent
  13. async def upload_to_oss(local_video_path):
  14. """
  15. 把视频上传到 oss
  16. :return:
  17. """
  18. oss_video_key = str(uuid4())
  19. access_key_id = "LTAIP6x1l3DXfSxm"
  20. access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
  21. endpoint = "oss-cn-hangzhou.aliyuncs.com"
  22. bucket_name = "art-pubbucket"
  23. bucket = oss2.Bucket(
  24. oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
  25. )
  26. bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)
  27. return oss_video_key
  28. class AsyncETL(object):
  29. """
  30. 视频下载功能
  31. """
  32. def __init__(self, video_obj):
  33. self.platform = video_obj["platform"]
  34. self.video_id = video_obj["video_id"]
  35. self.video_url = video_obj["video_url"]
  36. self.uid = video_obj["user_id"]
  37. self.title = video_obj["video_title"]
  38. self.cover_url = video_obj["cover_url"]
  39. self.proxy = {
  40. "http://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
  41. "https://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
  42. }
  43. self.max_retry = 5
  44. def request_header(self, type_="video"):
  45. """
  46. 请求头
  47. :return:
  48. """
  49. if self.platform == "xg_search":
  50. if "v9-xg-web-pc.ixigua.com" in self.video_url:
  51. headers = {
  52. "Accept": "*/*",
  53. "Accept-Language": "zh-CN,zh;q=0.9",
  54. "Host": "v9-xg-web-pc.ixigua.com",
  55. "User-Agent": FakeUserAgent().chrome,
  56. "Origin": "https://www.ixigua.com/",
  57. "Referer": "https://www.ixigua.com/"
  58. }
  59. elif "v3-xg-web-pc.ixigua.com" in self.video_url:
  60. headers = {
  61. "Accept": "*/*",
  62. "Accept-Language": "zh-CN,zh;q=0.9",
  63. "Host": "v3-xg-web-pc.ixigua.com",
  64. "User-Agent": FakeUserAgent().chrome,
  65. "Origin": "https://www.ixigua.com/",
  66. "Referer": "https://www.ixigua.com/"
  67. }
  68. elif type_ == "cover":
  69. headers = {
  70. 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
  71. 'Accept-Language': 'en,zh;q=0.9,zh-CN;q=0.8',
  72. 'Cache-Control': 'max-age=0',
  73. 'Proxy-Connection': 'keep-alive',
  74. 'Upgrade-Insecure-Requests': '1',
  75. 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'
  76. }
  77. else:
  78. headers = {
  79. "Accept": "*/*",
  80. "Accept-Language": "zh-CN,zh;q=0.9",
  81. "Host": "v3-xg-web-pc.ixigua.com",
  82. "User-Agent": FakeUserAgent().chrome,
  83. "Origin": "https://www.ixigua.com/",
  84. "Referer": "https://www.ixigua.com/"
  85. }
  86. elif self.platform == "baidu_search":
  87. headers = {
  88. "Accept": "*/*",
  89. "Accept-Language": "zh-CN,zh;q=0.9",
  90. "User-Agent": FakeUserAgent().chrome,
  91. }
  92. elif self.platform == "wx_search":
  93. headers = {
  94. "Accept": "*/*",
  95. "Accept-Language": "zh-CN,zh;q=0.9",
  96. "User-Agent": FakeUserAgent().chrome,
  97. "Origin": "https://mp.weixin.qq.com",
  98. "Referer": "https://mp.weixin.qq.com"
  99. }
  100. elif self.platform == "dy_search":
  101. headers = {
  102. 'accept': '*/*',
  103. 'accept-language': 'en,zh;q=0.9,zh-CN;q=0.8',
  104. 'priority': 'i',
  105. 'range': 'bytes=0-',
  106. 'referer': 'https://v11-coldf.douyinvod.com/',
  107. 'user-agent': FakeUserAgent().chrome
  108. }
  109. else:
  110. headers = {}
  111. return headers
  112. def generate_video_path(self):
  113. """
  114. 通过视频信息生成唯一视频地址
  115. :return:
  116. """
  117. index = "{}-{}".format(self.platform, self.video_id)
  118. index = md5(index.encode()).hexdigest()
  119. file_name = "{}.mp4".format(index)
  120. cover_name = "{}.png".format(index)
  121. file_path = os.path.join(os.getcwd(), "videos", file_name)
  122. cover_path = os.path.join(os.getcwd(), "videos", cover_name)
  123. return file_path, cover_path
  124. async def publish_by__request(self, video_path, cover):
  125. """
  126. 发布
  127. :return:
  128. """
  129. url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send"
  130. headers = {
  131. "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0",
  132. "cookie": "JSESSIONID=4DEA2B5173BB9A9E82DB772C0ACDBC9F; JSESSIONID=D02C334150025222A0B824A98B539B78",
  133. "referer": "http://appspeed.piaoquantv.com",
  134. "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
  135. "accept-language": "zh-CN,zh-Hans;q=0.9",
  136. "Content-Type": "application/x-www-form-urlencoded",
  137. }
  138. payload = {
  139. "coverImgPath": cover,
  140. "deviceToken": "9ef064f2f7869b3fd67d6141f8a899175dddc91240971172f1f2a662ef891408",
  141. "fileExtensions": "MP4",
  142. "loginUid": self.uid,
  143. "networkType": "Wi-Fi",
  144. "platform": "iOS",
  145. "requestId": "fb972cbd4f390afcfd3da1869cd7d001",
  146. "sessionId": "362290597725ce1fa870d7be4f46dcc2",
  147. "subSessionId": "362290597725ce1fa870d7be4f46dcc2",
  148. "title": self.title,
  149. "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
  150. "uid": self.uid,
  151. "versionCode": "486",
  152. "versionName": "3.4.12",
  153. "videoFromScene": "1",
  154. "videoPath": video_path,
  155. "viewStatus": "1",
  156. }
  157. response = requests.post(
  158. url=url,
  159. headers=headers,
  160. data=payload,
  161. )
  162. return response.json()
  163. async def download(self, file_path):
  164. """
  165. :param file_path:
  166. :return:
  167. """
  168. headers = self.request_header()
  169. if os.path.exists(file_path):
  170. file_size = os.path.getsize(file_path)
  171. headers["Range"] = f"bytes={file_size}-"
  172. else:
  173. file_size = 0
  174. async with aiohttp.ClientSession() as session:
  175. async with session.get(self.video_url, headers=headers) as response:
  176. if response.status in [200, 206]:
  177. mode = "ab+" if file_size > 0 else "wb"
  178. f = await aiofiles.open(file_path, mode)
  179. await f.write(await response.read())
  180. await f.close()
  181. else:
  182. print(response.status)
  183. return file_path
  184. async def download_cover(self, file_path):
  185. """
  186. 下载视频封面
  187. :param file_path:
  188. :return:
  189. """
  190. headers = self.request_header(type_="cover")
  191. response = requests.get(url=self.cover_url, headers=headers)
  192. with open(file_path, "wb") as f:
  193. f.write(response.content)
  194. return file_path
  195. async def etl_deal(self):
  196. """
  197. ETL Deal Task
  198. :return:
  199. """
  200. local_video_path, local_cover_path = self.generate_video_path()
  201. # download videos
  202. file_path = await self.download(local_video_path)
  203. # download cover
  204. cover_path = await self.download_cover(local_cover_path)
  205. # upload to oss
  206. oss_video = await upload_to_oss(
  207. local_video_path=file_path,
  208. )
  209. # 读取cover, 若img是html格式,则不上传
  210. with open(cover_path, encoding="utf-8") as f:
  211. img_data = f.read()
  212. if "<html>" in img_data:
  213. oss_cover = None
  214. else:
  215. oss_cover = await upload_to_oss(
  216. local_video_path=cover_path
  217. )
  218. # publish to pq
  219. result = await self.publish_by__request(
  220. video_path=oss_video,
  221. cover=oss_cover
  222. )
  223. return result["data"]["id"]