functions.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import re
  6. import html
  7. import hashlib
  8. import threading
  9. import oss2
  10. import requests
  11. from uuid import uuid4
  12. from datetime import datetime, timezone
  13. from fake_useragent import FakeUserAgent
  14. class Functions(object):
  15. """
  16. functions class
  17. """
  18. @classmethod
  19. def show_desc_to_sta(cls, show_desc):
  20. """
  21. :return:
  22. """
  23. def decode_show_v(show_v):
  24. """
  25. :param show_v:
  26. :return:
  27. """
  28. foo = show_v.replace('千', 'e3').replace('万', 'e4').replace('亿', 'e8')
  29. foo = eval(foo)
  30. return int(foo)
  31. def decode_show_k(show_k):
  32. """
  33. :param show_k:
  34. :return:
  35. """
  36. this_dict = {
  37. '阅读': 'show_view_count', # 文章
  38. '看过': 'show_view_count', # 图文
  39. '观看': 'show_view_count', # 视频
  40. '赞': 'show_like_count',
  41. '付费': 'show_pay_count',
  42. '赞赏': 'show_zs_count',
  43. }
  44. if show_k not in this_dict:
  45. print(f'error from decode_show_k, show_k not found: {show_k}')
  46. return this_dict.get(show_k, 'show_unknown')
  47. show_desc = show_desc.replace('+', '')
  48. sta = {}
  49. for show_kv in show_desc.split('\u2004\u2005'):
  50. if not show_kv:
  51. continue
  52. show_k, show_v = show_kv.split('\u2006')
  53. k = decode_show_k(show_k)
  54. v = decode_show_v(show_v)
  55. sta[k] = v
  56. res = {
  57. 'show_view_count': sta.get('show_view_count', 0),
  58. 'show_like_count': sta.get('show_like_count', 0),
  59. 'show_pay_count': sta.get('show_pay_count', 0),
  60. 'show_zs_count': sta.get('show_zs_count', 0),
  61. }
  62. return res
  63. @classmethod
  64. def generateGzhId(cls, url):
  65. """
  66. generate url
  67. :param url:
  68. :return:
  69. """
  70. biz = url.split("biz=")[1].split("&")[0]
  71. idx = url.split("&idx=")[1].split("&")[0]
  72. sn = url.split("&sn=")[1].split("&")[0]
  73. url_bit = "{}-{}-{}".format(biz, idx, sn).encode()
  74. md5_hash = hashlib.md5()
  75. md5_hash.update(url_bit)
  76. md5_value = md5_hash.hexdigest()
  77. return md5_value
  78. @classmethod
  79. def job_with_thread(cls, job_func):
  80. """
  81. 每个任务放到单个线程中
  82. :param job_func:
  83. :return:
  84. """
  85. job_thread = threading.Thread(target=job_func)
  86. job_thread.start()
  87. @classmethod
  88. def str_to_md5(cls, strings):
  89. """
  90. 字符串转化为 md5 值
  91. :param strings:
  92. :return:
  93. """
  94. # 将字符串转换为字节
  95. original_bytes = strings.encode('utf-8')
  96. # 创建一个md5 hash对象
  97. md5_hash = hashlib.md5()
  98. # 更新hash对象,传入原始字节
  99. md5_hash.update(original_bytes)
  100. # 获取16进制形式的MD5哈希值
  101. md5_value = md5_hash.hexdigest()
  102. return md5_value
  103. @classmethod
  104. def float_to_percentage(cls, value, decimals=3) -> str:
  105. """
  106. 把小数转化为百分数
  107. :param value:
  108. :param decimals:
  109. :return:
  110. """
  111. percentage_value = round(value * 100, decimals)
  112. return "{}%".format(percentage_value)
  113. @classmethod
  114. def str_to_timestamp(cls, date_string, string_format='%Y-%m-%d') -> int:
  115. """
  116. :param string_format:
  117. :param date_string:
  118. :return:
  119. """
  120. date_obj = datetime.strptime(date_string, string_format)
  121. timestamp = date_obj.timestamp()
  122. return int(timestamp)
  123. @classmethod
  124. def timestamp_to_str(cls, timestamp, string_format='%Y-%m-%d %H:%M:%S') -> str:
  125. """
  126. :param string_format:
  127. :param timestamp:
  128. """
  129. dt_object = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone()
  130. date_string = dt_object.strftime(string_format)
  131. return date_string
  132. @classmethod
  133. def proxy(cls):
  134. """
  135. 快代理
  136. """
  137. # 隧道域名:端口号
  138. tunnel = "l901.kdltps.com:15818"
  139. # 用户名密码方式
  140. username = "t11983523373311"
  141. password = "mtuhdr2z"
  142. proxies = {
  143. "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel},
  144. "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}
  145. }
  146. return proxies
  147. @classmethod
  148. def get_video_url(cls, article_url):
  149. """
  150. :param article_url:
  151. :return:
  152. """
  153. response = requests.get(
  154. url=article_url,
  155. headers={'User-Agent': FakeUserAgent().random},
  156. )
  157. html_text = response.text
  158. w = re.search(
  159. r"mp_video_trans_info.*url:\s*\(\'(.*?)\'\)\.replace", html_text, re.S | re.M
  160. ).group(1)
  161. url = html.unescape(
  162. re.sub(
  163. r"\\x\d+", lambda x: bytes.fromhex(x.group().replace("\\x", "")).decode(), w
  164. )
  165. )
  166. return url
  167. @classmethod
  168. def get_source_account(cls, article_url: str) -> dict:
  169. """
  170. 获取公众号名称和头像
  171. :param article_url:
  172. :return:
  173. """
  174. response = requests.get(
  175. url=article_url,
  176. headers={'User-Agent': FakeUserAgent().random},
  177. )
  178. html_text = response.text
  179. # 正则表达式用于提取 hit_nickname 和 hit_username
  180. regex_nickname = r"hit_nickname:\s*'([^']+)'"
  181. regex_username = r"hit_username:\s*'([^']+)'"
  182. # 提取 hit_nickname 和 hit_username
  183. nickname = re.search(regex_nickname, html_text)
  184. username = re.search(regex_username, html_text)
  185. # 输出提取的结果
  186. if nickname and username:
  187. return {
  188. 'name': nickname.group(1),
  189. 'gh_id': username.group(1)
  190. }
  191. else:
  192. return {}
  193. @classmethod
  194. def download_gzh_video(cls, article_url):
  195. """
  196. 下载公众号视频
  197. :param article_url:
  198. :return:
  199. """
  200. try:
  201. video_url = cls.get_video_url(article_url)
  202. except Exception as e:
  203. return
  204. save_path = "static/{}.mp4".format(cls.str_to_md5(video_url))
  205. headers = {
  206. 'Accept': '*/*',
  207. 'Accept-Language': 'zh,zh-CN;q=0.9',
  208. 'Connection': 'keep-alive',
  209. 'Origin': 'https://mp.weixin.qq.com',
  210. 'Referer': 'https://mp.weixin.qq.com/',
  211. 'Sec-Fetch-Dest': 'video',
  212. 'Sec-Fetch-Mode': 'cors',
  213. 'Sec-Fetch-Site': 'cross-site',
  214. 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
  215. 'sec-ch-ua': '"Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"',
  216. 'sec-ch-ua-mobile': '?0',
  217. 'sec-ch-ua-platform': '"macOS"'
  218. }
  219. res = requests.get(video_url, headers=headers)
  220. with open(save_path, "wb") as f:
  221. f.write(res.content)
  222. TEN_KB = 1024 * 10
  223. if os.path.getsize(save_path) > TEN_KB:
  224. return save_path
  225. else:
  226. return None
  227. @classmethod
  228. def upload_to_oss(cls, local_video_path):
  229. """
  230. 把视频上传到 oss
  231. :return:
  232. """
  233. oss_video_key = "long_articles/video/" + str(uuid4())
  234. access_key_id = "LTAIP6x1l3DXfSxm"
  235. access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
  236. endpoint = "oss-cn-hangzhou.aliyuncs.com"
  237. bucket_name = "art-pubbucket"
  238. bucket = oss2.Bucket(
  239. oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
  240. )
  241. bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)
  242. return oss_video_key