etl_task.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import oss2
  6. import aiohttp
  7. import aiofiles
  8. import asyncio
  9. import requests
  10. from datetime import datetime
  11. from hashlib import md5
  12. from uuid import uuid4
  13. from fake_useragent import FakeUserAgent
  14. from applications.config import Config
  15. from applications.log import logging
  16. async def download_cover(file_path, platform, cover_url):
  17. """
  18. 下载视频封面
  19. :param platform:
  20. :param cover_url:
  21. :param file_path:
  22. :return:
  23. """
  24. headers = request_header(platform=platform, url=cover_url, download_type="cover")
  25. response = requests.get(url=cover_url, headers=headers)
  26. if b"<html>" in response.content:
  27. return None
  28. elif response.status_code != 200:
  29. return None
  30. else:
  31. with open(file_path, "wb") as f:
  32. f.write(response.content)
  33. return file_path
  34. def request_header(platform, url, download_type="video"):
  35. """
  36. 请求头
  37. :return:
  38. """
  39. if platform == "xg_search":
  40. if "v9-xg-web-pc.ixigua.com" in url:
  41. headers = {
  42. "Accept": "*/*",
  43. "Accept-Language": "zh-CN,zh;q=0.9",
  44. "Host": "v9-xg-web-pc.ixigua.com",
  45. "User-Agent": FakeUserAgent().chrome,
  46. "Origin": "https://www.ixigua.com/",
  47. "Referer": "https://www.ixigua.com/"
  48. }
  49. elif "v3-xg-web-pc.ixigua.com" in url:
  50. headers = {
  51. "Accept": "*/*",
  52. "Accept-Language": "zh-CN,zh;q=0.9",
  53. "Host": "v3-xg-web-pc.ixigua.com",
  54. "User-Agent": FakeUserAgent().chrome,
  55. "Origin": "https://www.ixigua.com/",
  56. "Referer": "https://www.ixigua.com/"
  57. }
  58. elif download_type == "cover":
  59. headers = {
  60. 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
  61. 'Accept-Language': 'en,zh;q=0.9,zh-CN;q=0.8',
  62. 'Cache-Control': 'max-age=0',
  63. 'Proxy-Connection': 'keep-alive',
  64. 'Upgrade-Insecure-Requests': '1',
  65. 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'
  66. }
  67. else:
  68. headers = {
  69. "Accept": "*/*",
  70. "Accept-Language": "zh-CN,zh;q=0.9",
  71. "Host": "v3-xg-web-pc.ixigua.com",
  72. "User-Agent": FakeUserAgent().chrome,
  73. "Origin": "https://www.ixigua.com/",
  74. "Referer": "https://www.ixigua.com/"
  75. }
  76. elif platform == "baidu_search":
  77. headers = {
  78. "Accept": "*/*",
  79. "Accept-Language": "zh-CN,zh;q=0.9",
  80. "User-Agent": FakeUserAgent().chrome,
  81. }
  82. elif platform == "wx_search":
  83. headers = {
  84. "Accept": "*/*",
  85. "Accept-Language": "zh-CN,zh;q=0.9",
  86. "User-Agent": FakeUserAgent().chrome,
  87. "Origin": "https://mp.weixin.qq.com",
  88. "Referer": "https://mp.weixin.qq.com"
  89. }
  90. elif platform == "dy_search":
  91. headers = {
  92. 'accept': '*/*',
  93. 'accept-language': 'en,zh;q=0.9,zh-CN;q=0.8',
  94. 'priority': 'i',
  95. 'range': 'bytes=0-',
  96. 'referer': 'https://v11-coldf.douyinvod.com/',
  97. 'user-agent': FakeUserAgent().chrome
  98. }
  99. else:
  100. headers = {}
  101. return headers
  102. async def download_video(file_path, platform, video_url, download_type="video"):
  103. """
  104. :param download_type:
  105. :param video_url:
  106. :param platform:
  107. :param file_path:
  108. :return:
  109. """
  110. headers = request_header(platform=platform, url=video_url, download_type=download_type)
  111. if os.path.exists(file_path):
  112. file_size = os.path.getsize(file_path)
  113. headers["Range"] = f"bytes={file_size}-"
  114. else:
  115. file_size = 0
  116. async with aiohttp.ClientSession() as session:
  117. async with session.get(video_url, headers=headers) as response:
  118. if response.status in [200, 206]:
  119. if file_size > 0:
  120. async with aiofiles.open(file_path, "ab+") as f:
  121. # 以1MB为单位分块下载
  122. async for chunk in response.content.iter_chunked(1024 * 1024):
  123. await f.write(chunk)
  124. else:
  125. async with aiofiles.open(file_path, "wb") as f:
  126. # 以1MB为单位分块下载
  127. async for chunk in response.content.iter_chunked(1024 * 1024):
  128. await f.write(chunk)
  129. else:
  130. print(response.status)
  131. return file_path
  132. def generate_video_path(platform, video_id):
  133. """
  134. 通过视频信息生成唯一视频地址
  135. :return:
  136. """
  137. index = "{}-{}-{}".format(platform, video_id, uuid4())
  138. index = md5(index.encode()).hexdigest()
  139. file_name = "{}.mp4".format(index)
  140. cover_name = "{}.png".format(index)
  141. file_path = os.path.join(os.getcwd(), "static", file_name)
  142. cover_path = os.path.join(os.getcwd(), "static", cover_name)
  143. return file_path, cover_path
  144. async def upload_to_oss(local_video_path, download_type):
  145. """
  146. 把视频上传到 oss
  147. :return:
  148. """
  149. oss_video_key = "long_articles/{}/".format(download_type) + str(uuid4())
  150. access_key_id = "LTAIP6x1l3DXfSxm"
  151. access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
  152. endpoint = "oss-cn-hangzhou.aliyuncs.com"
  153. bucket_name = "art-pubbucket"
  154. bucket = oss2.Bucket(
  155. oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
  156. )
  157. bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)
  158. return oss_video_key
  159. class AsyncETL(object):
  160. """
  161. 视频下载功能
  162. """
  163. def __init__(self, mysql_client):
  164. # self.proxy = {
  165. # "http://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
  166. # "https://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
  167. # }
  168. self.max_retry = 5
  169. self.mysql_client = mysql_client
  170. self.config = Config()
  171. self.article_crawler_video_table = self.config.article_crawler_video_table
  172. self.article_match_video_table = self.config.article_match_video_table
  173. async def get_tasks(self):
  174. """
  175. 获取视频 id
  176. :return:
  177. """
  178. select_sql = f"""
  179. SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id, trace_id
  180. FROM {self.article_crawler_video_table}
  181. WHERE download_status = 0
  182. ORDER BY id
  183. LIMIT 10;
  184. """
  185. result = await self.mysql_client.async_select(select_sql)
  186. if result:
  187. tasks = [
  188. {
  189. "id": line[0],
  190. "video_id": line[1],
  191. "platform": line[2],
  192. "video_title": line[3],
  193. "video_url": line[4],
  194. "cover_url": line[5],
  195. "user_id": line[6],
  196. "trace_id": line[7]
  197. }
  198. for line in result
  199. ]
  200. return tasks
  201. else:
  202. return []
  203. async def process_task(self, params):
  204. """
  205. 处理 task
  206. :return:
  207. """
  208. downloading_status = 1
  209. downloaded_status = 2
  210. download_failed_status = 3
  211. update_sql_0 = f"""
  212. UPDATE {self.article_crawler_video_table}
  213. SET download_status = %s
  214. WHERE id = %s;
  215. """
  216. await self.mysql_client.async_insert(
  217. sql=update_sql_0,
  218. params=(downloading_status, params['id'])
  219. )
  220. try:
  221. local_video_path, local_cover_path = generate_video_path(params['platform'], params['video_id'])
  222. # download videos
  223. file_path = await download_video(
  224. file_path=local_video_path,
  225. platform=params['platform'],
  226. video_url=params['video_url']
  227. )
  228. # download cover
  229. cover_path = await download_cover(
  230. file_path=local_cover_path,
  231. platform=params['platform'],
  232. cover_url=params['cover_url']
  233. )
  234. oss_video = await upload_to_oss(
  235. local_video_path=file_path,
  236. download_type="video"
  237. )
  238. if cover_path:
  239. oss_cover = await upload_to_oss(
  240. local_video_path=cover_path,
  241. download_type="image"
  242. )
  243. else:
  244. oss_cover = None
  245. update_sql = f"""
  246. UPDATE {self.article_crawler_video_table}
  247. SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
  248. WHERE id = %s;
  249. """
  250. await self.mysql_client.async_insert(
  251. sql=update_sql,
  252. params=(
  253. oss_video,
  254. oss_cover,
  255. downloaded_status,
  256. params['id']
  257. )
  258. )
  259. except Exception as e:
  260. update_sql = f"""
  261. UPDATE {self.article_crawler_video_table}
  262. SET download_status = %s
  263. WHERE id = %s;
  264. """
  265. await self.mysql_client.async_insert(
  266. sql=update_sql,
  267. params=(download_failed_status, params['id'])
  268. )
  269. print("抓取 failed--{}".format(e))
  270. async def deal(self):
  271. """
  272. ETL Deal Task
  273. :return:
  274. """
  275. task_list = await self.get_tasks()
  276. logging(
  277. code="5001",
  278. info="ETL Task Got {} this time".format(len(task_list)),
  279. function="ETL"
  280. )
  281. if task_list:
  282. tasks = [self.process_task(params) for params in task_list]
  283. await asyncio.gather(*tasks)