functions.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import re
  6. import html
  7. import hashlib
  8. import threading
  9. import oss2
  10. import requests
  11. from uuid import uuid4
  12. from datetime import datetime, timezone
  13. from fake_useragent import FakeUserAgent
  14. from urllib.parse import urlparse, parse_qs
  15. class Functions(object):
  16. """
  17. functions class
  18. """
  19. @classmethod
  20. def show_desc_to_sta(cls, show_desc):
  21. """
  22. :return:
  23. """
  24. def decode_show_v(show_v):
  25. """
  26. :param show_v:
  27. :return:
  28. """
  29. foo = show_v.replace('千', 'e3').replace('万', 'e4').replace('亿', 'e8')
  30. foo = eval(foo)
  31. return int(foo)
  32. def decode_show_k(show_k):
  33. """
  34. :param show_k:
  35. :return:
  36. """
  37. this_dict = {
  38. '阅读': 'show_view_count', # 文章
  39. '看过': 'show_view_count', # 图文
  40. '观看': 'show_view_count', # 视频
  41. '赞': 'show_like_count',
  42. '付费': 'show_pay_count',
  43. '赞赏': 'show_zs_count',
  44. }
  45. if show_k not in this_dict:
  46. print(f'error from decode_show_k, show_k not found: {show_k}')
  47. return this_dict.get(show_k, 'show_unknown')
  48. show_desc = show_desc.replace('+', '')
  49. sta = {}
  50. for show_kv in show_desc.split('\u2004\u2005'):
  51. if not show_kv:
  52. continue
  53. show_k, show_v = show_kv.split('\u2006')
  54. k = decode_show_k(show_k)
  55. v = decode_show_v(show_v)
  56. sta[k] = v
  57. res = {
  58. 'show_view_count': sta.get('show_view_count', 0),
  59. 'show_like_count': sta.get('show_like_count', 0),
  60. 'show_pay_count': sta.get('show_pay_count', 0),
  61. 'show_zs_count': sta.get('show_zs_count', 0),
  62. }
  63. return res
  64. @classmethod
  65. def generateGzhId(cls, url):
  66. """
  67. generate url
  68. :param url:
  69. :return:
  70. """
  71. biz = url.split("biz=")[1].split("&")[0]
  72. idx = url.split("&idx=")[1].split("&")[0]
  73. sn = url.split("&sn=")[1].split("&")[0]
  74. url_bit = "{}-{}-{}".format(biz, idx, sn).encode()
  75. md5_hash = hashlib.md5()
  76. md5_hash.update(url_bit)
  77. md5_value = md5_hash.hexdigest()
  78. return md5_value
  79. @classmethod
  80. def job_with_thread(cls, job_func):
  81. """
  82. 每个任务放到单个线程中
  83. :param job_func:
  84. :return:
  85. """
  86. job_thread = threading.Thread(target=job_func)
  87. job_thread.start()
  88. @classmethod
  89. def str_to_md5(cls, strings):
  90. """
  91. 字符串转化为 md5 值
  92. :param strings:
  93. :return:
  94. """
  95. # 将字符串转换为字节
  96. original_bytes = strings.encode('utf-8')
  97. # 创建一个md5 hash对象
  98. md5_hash = hashlib.md5()
  99. # 更新hash对象,传入原始字节
  100. md5_hash.update(original_bytes)
  101. # 获取16进制形式的MD5哈希值
  102. md5_value = md5_hash.hexdigest()
  103. return md5_value
  104. @classmethod
  105. def float_to_percentage(cls, value, decimals=3) -> str:
  106. """
  107. 把小数转化为百分数
  108. :param value:
  109. :param decimals:
  110. :return:
  111. """
  112. percentage_value = round(value * 100, decimals)
  113. return "{}%".format(percentage_value)
  114. @classmethod
  115. def str_to_timestamp(cls, date_string, string_format='%Y-%m-%d') -> int:
  116. """
  117. :param string_format:
  118. :param date_string:
  119. :return:
  120. """
  121. date_obj = datetime.strptime(date_string, string_format)
  122. timestamp = date_obj.timestamp()
  123. return int(timestamp)
  124. @classmethod
  125. def timestamp_to_str(cls, timestamp, string_format='%Y-%m-%d %H:%M:%S') -> str:
  126. """
  127. :param string_format:
  128. :param timestamp:
  129. """
  130. dt_object = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone()
  131. date_string = dt_object.strftime(string_format)
  132. return date_string
  133. @classmethod
  134. def proxy(cls):
  135. """
  136. 快代理
  137. """
  138. # 隧道域名:端口号
  139. tunnel = "l901.kdltps.com:15818"
  140. # 用户名密码方式
  141. username = "t11983523373311"
  142. password = "mtuhdr2z"
  143. proxies = {
  144. "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel},
  145. "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}
  146. }
  147. return proxies
  148. @classmethod
  149. def get_video_url(cls, article_url):
  150. """
  151. :param article_url:
  152. :return:
  153. """
  154. response = requests.get(
  155. url=article_url,
  156. headers={'User-Agent': FakeUserAgent().random},
  157. # proxies=cls.proxy()
  158. )
  159. html_text = response.text
  160. w = re.search(
  161. r"mp_video_trans_info.*url:\s*\(\'(.*?)\'\)\.replace", html_text, re.S | re.M
  162. ).group(1)
  163. url = html.unescape(
  164. re.sub(
  165. r"\\x\d+", lambda x: bytes.fromhex(x.group().replace("\\x", "")).decode(), w
  166. )
  167. )
  168. return url
  169. @classmethod
  170. def get_source_account(cls, article_url: str) -> dict:
  171. """
  172. 获取公众号名称和头像
  173. :param article_url:
  174. :return:
  175. """
  176. response = requests.get(
  177. url=article_url,
  178. headers={'User-Agent': FakeUserAgent().random},
  179. # proxies=cls.proxy()
  180. )
  181. html_text = response.text
  182. # 正则表达式用于提取 hit_nickname 和 hit_username
  183. regex_nickname = r"hit_nickname:\s*'([^']+)'"
  184. regex_username = r"hit_username:\s*'([^']+)'"
  185. # 提取 hit_nickname 和 hit_username
  186. nickname = re.search(regex_nickname, html_text)
  187. username = re.search(regex_username, html_text)
  188. # 输出提取的结果
  189. if nickname and username:
  190. return {
  191. 'name': nickname.group(1),
  192. 'gh_id': username.group(1)
  193. }
  194. else:
  195. return {}
  196. @classmethod
  197. def download_gzh_video(cls, article_url):
  198. """
  199. 下载公众号视频
  200. :param article_url:
  201. :return:
  202. """
  203. try:
  204. video_url = cls.get_video_url(article_url)
  205. except Exception as e:
  206. return
  207. save_path = "static/{}.mp4".format(cls.str_to_md5(video_url))
  208. headers = {
  209. 'Accept': '*/*',
  210. 'Accept-Language': 'zh,zh-CN;q=0.9',
  211. 'Connection': 'keep-alive',
  212. 'Origin': 'https://mp.weixin.qq.com',
  213. 'Referer': 'https://mp.weixin.qq.com/',
  214. 'Sec-Fetch-Dest': 'video',
  215. 'Sec-Fetch-Mode': 'cors',
  216. 'Sec-Fetch-Site': 'cross-site',
  217. 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
  218. 'sec-ch-ua': '"Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"',
  219. 'sec-ch-ua-mobile': '?0',
  220. 'sec-ch-ua-platform': '"macOS"'
  221. }
  222. res = requests.get(video_url, headers=headers)
  223. with open(save_path, "wb") as f:
  224. f.write(res.content)
  225. TEN_KB = 1024 * 10
  226. if os.path.getsize(save_path) > TEN_KB:
  227. return save_path
  228. else:
  229. return None
  230. @classmethod
  231. def upload_to_oss(cls, local_video_path):
  232. """
  233. 把视频上传到 oss
  234. :return:
  235. """
  236. oss_video_key = "long_articles/video/" + str(uuid4())
  237. access_key_id = "LTAIP6x1l3DXfSxm"
  238. access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
  239. endpoint = "oss-cn-hangzhou.aliyuncs.com"
  240. bucket_name = "art-pubbucket"
  241. bucket = oss2.Bucket(
  242. oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
  243. )
  244. bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)
  245. return oss_video_key
  246. @classmethod
  247. def extract_path(cls, path: str):
  248. """
  249. 提取path参数
  250. :param path:
  251. :return:
  252. """
  253. params = parse_qs(urlparse(path).query)
  254. jump_page = params.get('jumpPage', [None])[0]
  255. if jump_page:
  256. params2 = parse_qs(jump_page)
  257. res = {
  258. "video_id": params2['pages/user-videos?id'][0],
  259. "root_source_id": params2['rootSourceId'][0],
  260. }
  261. return res
  262. else:
  263. return {}
  264. @classmethod
  265. def extract_params_from_url(cls, url: str, key: str):
  266. """
  267. extract params from url
  268. """
  269. params = parse_qs(urlparse(url).query)
  270. info = params.get(key, [])
  271. return info[0] if info else None
  272. @classmethod
  273. def download_baidu_videos(cls, video_url, save_path):
  274. """
  275. :param video_url: baidu video url
  276. :param save_path: save path
  277. """
  278. if os.path.exists(save_path):
  279. return save_path
  280. response = requests.get(
  281. video_url,
  282. headers={
  283. 'User-Agent': FakeUserAgent().chrome,
  284. "Accept": "*/*",
  285. "Accept-Language": "zh-CN,zh;q=0.9"
  286. }
  287. )
  288. with open(save_path, 'wb') as f:
  289. f.write(response.content)
  290. TEN_KB = 1024 * 10
  291. if os.path.getsize(save_path) > TEN_KB:
  292. return save_path
  293. else:
  294. return None