etl_task.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import oss2
  6. import aiohttp
  7. import aiofiles
  8. import asyncio
  9. import requests
  10. from datetime import datetime
  11. from hashlib import md5
  12. from uuid import uuid4
  13. from fake_useragent import FakeUserAgent
  14. from applications.config import Config
  15. from applications.log import logging
  16. async def downloadCover(file_path, platform, cover_url):
  17. """
  18. 下载视频封面
  19. :param platform:
  20. :param cover_url:
  21. :param file_path:
  22. :return:
  23. """
  24. headers = requestHeader(platform=platform, url=cover_url, download_type="cover")
  25. response = requests.get(url=cover_url, headers=headers)
  26. if b"<html>" in response.content:
  27. return None
  28. elif response.status_code != 200:
  29. return None
  30. else:
  31. with open(file_path, "wb") as f:
  32. f.write(response.content)
  33. return file_path
  34. def requestHeader(platform, url, download_type="video"):
  35. """
  36. 请求头
  37. :return:
  38. """
  39. if platform == "xg_search":
  40. if "v9-xg-web-pc.ixigua.com" in url:
  41. headers = {
  42. "Accept": "*/*",
  43. "Accept-Language": "zh-CN,zh;q=0.9",
  44. "Host": "v9-xg-web-pc.ixigua.com",
  45. "User-Agent": FakeUserAgent().chrome,
  46. "Origin": "https://www.ixigua.com/",
  47. "Referer": "https://www.ixigua.com/"
  48. }
  49. elif "v3-xg-web-pc.ixigua.com" in url:
  50. headers = {
  51. "Accept": "*/*",
  52. "Accept-Language": "zh-CN,zh;q=0.9",
  53. "Host": "v3-xg-web-pc.ixigua.com",
  54. "User-Agent": FakeUserAgent().chrome,
  55. "Origin": "https://www.ixigua.com/",
  56. "Referer": "https://www.ixigua.com/"
  57. }
  58. elif download_type == "cover":
  59. headers = {
  60. 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
  61. 'Accept-Language': 'en,zh;q=0.9,zh-CN;q=0.8',
  62. 'Cache-Control': 'max-age=0',
  63. 'Proxy-Connection': 'keep-alive',
  64. 'Upgrade-Insecure-Requests': '1',
  65. 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'
  66. }
  67. else:
  68. headers = {
  69. "Accept": "*/*",
  70. "Accept-Language": "zh-CN,zh;q=0.9",
  71. "Host": "v3-xg-web-pc.ixigua.com",
  72. "User-Agent": FakeUserAgent().chrome,
  73. "Origin": "https://www.ixigua.com/",
  74. "Referer": "https://www.ixigua.com/"
  75. }
  76. elif platform == "baidu_search":
  77. headers = {
  78. "Accept": "*/*",
  79. "Accept-Language": "zh-CN,zh;q=0.9",
  80. "User-Agent": FakeUserAgent().chrome,
  81. }
  82. elif platform == "wx_search":
  83. headers = {
  84. "Accept": "*/*",
  85. "Accept-Language": "zh-CN,zh;q=0.9",
  86. "User-Agent": FakeUserAgent().chrome,
  87. "Origin": "https://mp.weixin.qq.com",
  88. "Referer": "https://mp.weixin.qq.com"
  89. }
  90. elif platform == "dy_search":
  91. headers = {
  92. 'accept': '*/*',
  93. 'accept-language': 'en,zh;q=0.9,zh-CN;q=0.8',
  94. 'priority': 'i',
  95. 'range': 'bytes=0-',
  96. 'referer': 'https://v11-coldf.douyinvod.com/',
  97. 'user-agent': FakeUserAgent().chrome
  98. }
  99. else:
  100. headers = {}
  101. return headers
  102. async def downloadVideo(file_path, platform, video_url, download_type="video"):
  103. """
  104. :param download_type:
  105. :param video_url:
  106. :param platform:
  107. :param file_path:
  108. :return:
  109. """
  110. headers = requestHeader(platform=platform, url=video_url, download_type=download_type)
  111. if os.path.exists(file_path):
  112. file_size = os.path.getsize(file_path)
  113. headers["Range"] = f"bytes={file_size}-"
  114. else:
  115. file_size = 0
  116. async with aiohttp.ClientSession() as session:
  117. async with session.get(video_url, headers=headers) as response:
  118. if response.status in [200, 206]:
  119. if file_size > 0:
  120. async with aiofiles.open(file_path, "ab+") as f:
  121. # 以1MB为单位分块下载
  122. async for chunk in response.content.iter_chunked(1024 * 1024):
  123. await f.write(chunk)
  124. else:
  125. async with aiofiles.open(file_path, "wb") as f:
  126. # 以1MB为单位分块下载
  127. async for chunk in response.content.iter_chunked(1024 * 1024):
  128. await f.write(chunk)
  129. else:
  130. print(response.status)
  131. return file_path
  132. def generateVideoPath(platform, video_id):
  133. """
  134. 通过视频信息生成唯一视频地址
  135. :return:
  136. """
  137. index = "{}-{}-{}".format(platform, video_id, uuid4())
  138. index = md5(index.encode()).hexdigest()
  139. file_name = "{}.mp4".format(index)
  140. cover_name = "{}.png".format(index)
  141. file_path = os.path.join(os.getcwd(), "static", file_name)
  142. cover_path = os.path.join(os.getcwd(), "static", cover_name)
  143. return file_path, cover_path
  144. async def uploadToOss(local_video_path, download_type):
  145. """
  146. 把视频上传到 oss
  147. :return:
  148. """
  149. oss_video_key = "long_articles/{}/".format(download_type) + str(uuid4())
  150. access_key_id = "LTAIP6x1l3DXfSxm"
  151. access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
  152. endpoint = "oss-cn-hangzhou.aliyuncs.com"
  153. bucket_name = "art-pubbucket"
  154. bucket = oss2.Bucket(
  155. oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
  156. )
  157. bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)
  158. return oss_video_key
  159. class AsyncETL(object):
  160. """
  161. 视频下载功能
  162. """
  163. def __init__(self, mysql_client):
  164. # self.proxy = {
  165. # "http://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
  166. # "https://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
  167. # }
  168. self.max_retry = 5
  169. self.mysql_client = mysql_client
  170. self.article_crawler_videos = Config().articleCrawlerVideos
  171. async def getTasks(self):
  172. """
  173. 获取视频 id
  174. :return:
  175. """
  176. select_sql = f"""
  177. SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id
  178. FROM {self.article_crawler_videos}
  179. WHERE download_status = 0
  180. ORDER BY id
  181. LIMIT 10;
  182. """
  183. result = await self.mysql_client.asyncSelect(select_sql)
  184. if result:
  185. tasks = [
  186. {
  187. "id": line[0],
  188. "video_id": line[1],
  189. "platform": line[2],
  190. "video_title": line[3],
  191. "video_url": line[4],
  192. "cover_url": line[5],
  193. "user_id": line[6]
  194. }
  195. for line in result
  196. ]
  197. return tasks
  198. else:
  199. return []
  200. async def processTask(self, params):
  201. """
  202. 处理 task
  203. :return:
  204. {
  205. "id": line[0],
  206. "video_id": line[1],
  207. "platform": line[2],
  208. "video_title": line[3],
  209. "video_url": line[4],
  210. "cover_url": line[5],
  211. "user_id": line[6]
  212. }
  213. """
  214. update_sql_0 = f"""
  215. UPDATE {self.article_crawler_videos}
  216. SET download_status = %s
  217. WHERE id = %s;
  218. """
  219. await self.mysql_client.asyncInsert(
  220. sql=update_sql_0,
  221. params=(1, params['id'])
  222. )
  223. try:
  224. local_video_path, local_cover_path = generateVideoPath(params['platform'], params['video_id'])
  225. # download videos
  226. file_path = await downloadVideo(
  227. file_path=local_video_path,
  228. platform=params['platform'],
  229. video_url=params['video_url']
  230. )
  231. # download cover
  232. cover_path = await downloadCover(
  233. file_path=local_cover_path,
  234. platform=params['platform'],
  235. cover_url=params['cover_url']
  236. )
  237. oss_video = await uploadToOss(
  238. local_video_path=file_path,
  239. download_type="video"
  240. )
  241. if cover_path:
  242. oss_cover = await uploadToOss(
  243. local_video_path=cover_path,
  244. download_type="image"
  245. )
  246. else:
  247. oss_cover = None
  248. update_sql = f"""
  249. UPDATE {self.article_crawler_videos}
  250. SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
  251. WHERE id = %s;
  252. """
  253. await self.mysql_client.asyncInsert(
  254. sql=update_sql,
  255. params=(
  256. oss_video,
  257. oss_cover,
  258. 2,
  259. params['id']
  260. )
  261. )
  262. except Exception as e:
  263. print("failed", e)
  264. update_sql = f"""
  265. UPDATE {self.article_crawler_videos}
  266. SET download_status = %s
  267. WHERE id = %s;
  268. """
  269. await self.mysql_client.asyncInsert(
  270. sql=update_sql,
  271. params=(3, params['id'])
  272. )
  273. async def deal(self):
  274. """
  275. ETL Deal Task
  276. :return:
  277. """
  278. task_list = await self.getTasks()
  279. logging(
  280. code="5001",
  281. info="ETL Task Got {} this time".format(len(task_list)),
  282. function="ETL"
  283. )
  284. if task_list:
  285. tasks = [self.processTask(params) for params in task_list]
  286. await asyncio.gather(*tasks)