etl_task.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import oss2
  6. import aiohttp
  7. import aiofiles
  8. import asyncio
  9. from hashlib import md5
  10. from uuid import uuid4
  11. import requests
  12. from fake_useragent import FakeUserAgent
  13. from applications.config import Config
  14. async def downloadCover(file_path, platform, cover_url):
  15. """
  16. 下载视频封面
  17. :param platform:
  18. :param cover_url:
  19. :param file_path:
  20. :return:
  21. """
  22. headers = requestHeader(platform=platform, url=cover_url, download_type="cover")
  23. response = requests.get(url=cover_url, headers=headers)
  24. if b"<html>" in response.content:
  25. return None
  26. elif response.status_code != 200:
  27. return None
  28. else:
  29. with open(file_path, "wb") as f:
  30. f.write(response.content)
  31. return file_path
  32. def requestHeader(platform, url, download_type="video"):
  33. """
  34. 请求头
  35. :return:
  36. """
  37. if platform == "xg_search":
  38. if "v9-xg-web-pc.ixigua.com" in url:
  39. headers = {
  40. "Accept": "*/*",
  41. "Accept-Language": "zh-CN,zh;q=0.9",
  42. "Host": "v9-xg-web-pc.ixigua.com",
  43. "User-Agent": FakeUserAgent().chrome,
  44. "Origin": "https://www.ixigua.com/",
  45. "Referer": "https://www.ixigua.com/"
  46. }
  47. elif "v3-xg-web-pc.ixigua.com" in url:
  48. headers = {
  49. "Accept": "*/*",
  50. "Accept-Language": "zh-CN,zh;q=0.9",
  51. "Host": "v3-xg-web-pc.ixigua.com",
  52. "User-Agent": FakeUserAgent().chrome,
  53. "Origin": "https://www.ixigua.com/",
  54. "Referer": "https://www.ixigua.com/"
  55. }
  56. elif download_type == "cover":
  57. headers = {
  58. 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
  59. 'Accept-Language': 'en,zh;q=0.9,zh-CN;q=0.8',
  60. 'Cache-Control': 'max-age=0',
  61. 'Proxy-Connection': 'keep-alive',
  62. 'Upgrade-Insecure-Requests': '1',
  63. 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'
  64. }
  65. else:
  66. headers = {
  67. "Accept": "*/*",
  68. "Accept-Language": "zh-CN,zh;q=0.9",
  69. "Host": "v3-xg-web-pc.ixigua.com",
  70. "User-Agent": FakeUserAgent().chrome,
  71. "Origin": "https://www.ixigua.com/",
  72. "Referer": "https://www.ixigua.com/"
  73. }
  74. elif platform == "baidu_search":
  75. headers = {
  76. "Accept": "*/*",
  77. "Accept-Language": "zh-CN,zh;q=0.9",
  78. "User-Agent": FakeUserAgent().chrome,
  79. }
  80. elif platform == "wx_search":
  81. headers = {
  82. "Accept": "*/*",
  83. "Accept-Language": "zh-CN,zh;q=0.9",
  84. "User-Agent": FakeUserAgent().chrome,
  85. "Origin": "https://mp.weixin.qq.com",
  86. "Referer": "https://mp.weixin.qq.com"
  87. }
  88. elif platform == "dy_search":
  89. headers = {
  90. 'accept': '*/*',
  91. 'accept-language': 'en,zh;q=0.9,zh-CN;q=0.8',
  92. 'priority': 'i',
  93. 'range': 'bytes=0-',
  94. 'referer': 'https://v11-coldf.douyinvod.com/',
  95. 'user-agent': FakeUserAgent().chrome
  96. }
  97. else:
  98. headers = {}
  99. return headers
  100. async def downloadVideo(file_path, platform, video_url, download_type="video"):
  101. """
  102. :param download_type:
  103. :param video_url:
  104. :param platform:
  105. :param file_path:
  106. :return:
  107. """
  108. headers = requestHeader(platform=platform, url=video_url, download_type=download_type)
  109. if os.path.exists(file_path):
  110. file_size = os.path.getsize(file_path)
  111. headers["Range"] = f"bytes={file_size}-"
  112. else:
  113. file_size = 0
  114. async with aiohttp.ClientSession() as session:
  115. async with session.get(video_url, headers=headers) as response:
  116. if response.status in [200, 206]:
  117. if file_size > 0:
  118. async with aiofiles.open(file_path, "ab+") as f:
  119. # 以1MB为单位分块下载
  120. async for chunk in response.content.iter_chunked(1024 * 1024):
  121. await f.write(chunk)
  122. else:
  123. async with aiofiles.open(file_path, "wb") as f:
  124. # 以1MB为单位分块下载
  125. async for chunk in response.content.iter_chunked(1024 * 1024):
  126. await f.write(chunk)
  127. else:
  128. print(response.status)
  129. return file_path
  130. def generateVideoPath(platform, video_id):
  131. """
  132. 通过视频信息生成唯一视频地址
  133. :return:
  134. """
  135. index = "{}-{}-{}".format(platform, video_id, uuid4())
  136. index = md5(index.encode()).hexdigest()
  137. file_name = "{}.mp4".format(index)
  138. cover_name = "{}.png".format(index)
  139. file_path = os.path.join(os.getcwd(), "static", file_name)
  140. cover_path = os.path.join(os.getcwd(), "static", cover_name)
  141. return file_path, cover_path
  142. async def uploadToOss(local_video_path, download_type):
  143. """
  144. 把视频上传到 oss
  145. :return:
  146. """
  147. oss_video_key = "long_articles/{}/".format(download_type) + str(uuid4())
  148. access_key_id = "LTAIP6x1l3DXfSxm"
  149. access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
  150. endpoint = "oss-cn-hangzhou.aliyuncs.com"
  151. bucket_name = "art-pubbucket"
  152. bucket = oss2.Bucket(
  153. oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
  154. )
  155. bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)
  156. return oss_video_key
  157. class AsyncETL(object):
  158. """
  159. 视频下载功能
  160. """
  161. def __init__(self, mysql_client):
  162. # self.proxy = {
  163. # "http://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
  164. # "https://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
  165. # }
  166. self.max_retry = 5
  167. self.mysql_client = mysql_client
  168. self.article_crawler_videos = Config().articleCrawlerVideos
  169. async def getTasks(self):
  170. """
  171. 获取视频 id
  172. :return:
  173. """
  174. select_sql = f"""
  175. SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id
  176. FROM {self.article_crawler_videos}
  177. WHERE download_status = 0
  178. ORDER BY id
  179. LIMIT 10;
  180. """
  181. result = await self.mysql_client.asyncSelect(select_sql)
  182. if result:
  183. tasks = [
  184. {
  185. "id": line[0],
  186. "video_id": line[1],
  187. "platform": line[2],
  188. "video_title": line[3],
  189. "video_url": line[4],
  190. "cover_url": line[5],
  191. "user_id": line[6]
  192. }
  193. for line in result
  194. ]
  195. return tasks
  196. else:
  197. return []
  198. async def processTask(self, params):
  199. """
  200. 处理 task
  201. :return:
  202. {
  203. "id": line[0],
  204. "video_id": line[1],
  205. "platform": line[2],
  206. "video_title": line[3],
  207. "video_url": line[4],
  208. "cover_url": line[5],
  209. "user_id": line[6]
  210. }
  211. """
  212. update_sql_0 = f"""
  213. UPDATE {self.article_crawler_videos}
  214. SET download_status = %s
  215. WHERE id = %s;
  216. """
  217. await self.mysql_client.asyncInsert(
  218. sql=update_sql_0,
  219. params=(1, params['id'])
  220. )
  221. try:
  222. local_video_path, local_cover_path = generateVideoPath(params['platform'], params['video_id'])
  223. # download videos
  224. file_path = await downloadVideo(
  225. file_path=local_video_path,
  226. platform=params['platform'],
  227. video_url=params['video_url']
  228. )
  229. # download cover
  230. cover_path = await downloadCover(
  231. file_path=local_cover_path,
  232. platform=params['platform'],
  233. cover_url=params['cover_url']
  234. )
  235. oss_video = await uploadToOss(
  236. local_video_path=file_path,
  237. download_type="video"
  238. )
  239. if cover_path:
  240. oss_cover = await uploadToOss(
  241. local_video_path=cover_path,
  242. download_type="image"
  243. )
  244. else:
  245. oss_cover = None
  246. update_sql = f"""
  247. UPDATE {self.article_crawler_videos}
  248. SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
  249. WHERE id = %s;
  250. """
  251. await self.mysql_client.asyncInsert(
  252. sql=update_sql,
  253. params=(
  254. oss_video,
  255. oss_cover,
  256. 2,
  257. params['id']
  258. )
  259. )
  260. except Exception as e:
  261. print("failed", e)
  262. update_sql = f"""
  263. UPDATE {self.article_crawler_videos}
  264. SET download_status = %s
  265. WHERE id = %s;
  266. """
  267. await self.mysql_client.asyncInsert(
  268. sql=update_sql,
  269. params=(3, params['id'])
  270. )
  271. async def deal(self):
  272. """
  273. ETL Deal Task
  274. :return:
  275. """
  276. task_list = await self.getTasks()
  277. if task_list:
  278. tasks = [self.processTask(params) for params in task_list]
  279. await asyncio.gather(*tasks)
  280. else:
  281. print("No spider tasks")