functions.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. """
  2. @author: luojunhui
  3. """
  4. import time
  5. import json
  6. import random
  7. import hashlib
  8. from datetime import datetime, timedelta
  9. import uuid
  10. import requests
  11. import urllib.parse
  12. def request_for_info(video_id):
  13. """
  14. 请求数据
  15. :param video_id:
  16. :return:
  17. """
  18. url = "https://longvideoapi.piaoquantv.com/longvideoapi/openapi/video/batchSelectVideoInfo"
  19. data = {
  20. "videoIdList": [video_id]
  21. }
  22. header = {
  23. "Content-Type": "application/json",
  24. }
  25. response = requests.post(url, headers=header, data=json.dumps(data))
  26. return response.json()
  27. def get_info_lists(vid_list):
  28. """
  29. 获取视频list
  30. :param vid_list:
  31. :return:
  32. """
  33. url = "https://longvideoapi.piaoquantv.com/longvideoapi/openapi/video/batchSelectVideoInfo"
  34. data = {
  35. "videoIdList": vid_list
  36. }
  37. header = {
  38. "Content-Type": "application/json",
  39. }
  40. response = requests.post(url, headers=header, data=json.dumps(data))
  41. return response.json()
  42. def generate_daily_strings(start_date, end_date):
  43. """
  44. Generate daily date_str
  45. :param start_date:
  46. :param end_date:
  47. :return:
  48. """
  49. start = datetime.strptime(start_date, "%Y%m%d")
  50. end = datetime.strptime(end_date, "%Y%m%d")
  51. current = start
  52. date_strings = []
  53. while current <= end:
  54. date_strings.append(current.strftime("%Y%m%d"))
  55. current += timedelta(days=1)
  56. return date_strings
  57. def whisper(video_id):
  58. """
  59. input video_id, output video_text
  60. :param video_id:
  61. :return:
  62. """
  63. url = "http://61.48.133.26:5999/video_to_text"
  64. body = {
  65. "video_id": video_id
  66. }
  67. header = {
  68. "Content-Type": "application/json",
  69. }
  70. response = requests.post(
  71. url=url,
  72. json=body,
  73. headers=header
  74. )
  75. return response.json()
  76. def hash_title(title):
  77. """
  78. hash map
  79. :param title:
  80. :return:
  81. """
  82. # 创建md5哈希对象
  83. hash_object = hashlib.md5()
  84. # 对标题进行编码
  85. title_bytes = title.encode('utf-8')
  86. # 更新哈希对象
  87. hash_object.update(title_bytes)
  88. # 获取十六进制形式的哈希值
  89. hash_hex = hash_object.hexdigest()
  90. return hash_hex
  91. def create_gzh_path(video_id, shared_uid):
  92. """
  93. :param video_id: 视频 id
  94. :param shared_uid: 分享 id
  95. """
  96. def generate_source_id():
  97. """
  98. generate_source_id
  99. :return:
  100. """
  101. timestamp = str(int(time.time() * 1000))
  102. random_str = str(random.randint(1000, 9999))
  103. hash_input = f"{timestamp}-{random_str}"
  104. return hashlib.md5(hash_input.encode()).hexdigest()
  105. root_share_id = str(uuid.uuid4())
  106. source_id = "video_to_articles" + generate_source_id()
  107. url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}&rootSourceId={source_id}"
  108. # 自动把 root_share_id 加入到白名单
  109. # auto_white(root_share_id)
  110. return root_share_id, source_id, f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}"
  111. def chunks(chunk_list, chunk_size):
  112. """
  113. 分页
  114. :param chunk_list:
  115. :param chunk_size:
  116. """
  117. for i in range(0, len(chunk_list), chunk_size):
  118. yield chunk_list[i: i + chunk_size]