functions.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. """
  2. @author: luojunhui
  3. """
  4. import threading
  5. import hashlib
  6. from datetime import datetime, timezone
  7. class Functions(object):
  8. """
  9. functions class
  10. """
  11. @classmethod
  12. def show_desc_to_sta(cls, show_desc):
  13. """
  14. :return:
  15. """
  16. def decode_show_v(show_v):
  17. """
  18. :param show_v:
  19. :return:
  20. """
  21. foo = show_v.replace('千', 'e3').replace('万', 'e4').replace('亿', 'e8')
  22. foo = eval(foo)
  23. return int(foo)
  24. def decode_show_k(show_k):
  25. """
  26. :param show_k:
  27. :return:
  28. """
  29. this_dict = {
  30. '阅读': 'show_view_count', # 文章
  31. '看过': 'show_view_count', # 图文
  32. '观看': 'show_view_count', # 视频
  33. '赞': 'show_like_count',
  34. '付费': 'show_pay_count',
  35. '赞赏': 'show_zs_count',
  36. }
  37. if show_k not in this_dict:
  38. print(f'error from decode_show_k, show_k not found: {show_k}')
  39. return this_dict.get(show_k, 'show_unknown')
  40. show_desc = show_desc.replace('+', '')
  41. sta = {}
  42. for show_kv in show_desc.split('\u2004\u2005'):
  43. if not show_kv:
  44. continue
  45. show_k, show_v = show_kv.split('\u2006')
  46. k = decode_show_k(show_k)
  47. v = decode_show_v(show_v)
  48. sta[k] = v
  49. res = {
  50. 'show_view_count': sta.get('show_view_count', 0),
  51. 'show_like_count': sta.get('show_like_count', 0),
  52. 'show_pay_count': sta.get('show_pay_count', 0),
  53. 'show_zs_count': sta.get('show_zs_count', 0),
  54. }
  55. return res
  56. @classmethod
  57. def generateGzhId(cls, url):
  58. """
  59. generate url
  60. :param url:
  61. :return:
  62. """
  63. biz = url.split("biz=")[1].split("&")[0]
  64. idx = url.split("&idx=")[1].split("&")[0]
  65. sn = url.split("&sn=")[1].split("&")[0]
  66. url_bit = "{}-{}-{}".format(biz, idx, sn).encode()
  67. md5_hash = hashlib.md5()
  68. md5_hash.update(url_bit)
  69. md5_value = md5_hash.hexdigest()
  70. return md5_value
  71. @classmethod
  72. def job_with_thread(cls, job_func):
  73. """
  74. 每个任务放到单个线程中
  75. :param job_func:
  76. :return:
  77. """
  78. job_thread = threading.Thread(target=job_func)
  79. job_thread.start()
  80. @classmethod
  81. def str_to_md5(cls, strings):
  82. """
  83. 字符串转化为 md5 值
  84. :param strings:
  85. :return:
  86. """
  87. # 将字符串转换为字节
  88. original_bytes = strings.encode('utf-8')
  89. # 创建一个md5 hash对象
  90. md5_hash = hashlib.md5()
  91. # 更新hash对象,传入原始字节
  92. md5_hash.update(original_bytes)
  93. # 获取16进制形式的MD5哈希值
  94. md5_value = md5_hash.hexdigest()
  95. return md5_value
  96. @classmethod
  97. def float_to_percentage(cls, value, decimals=3) -> str:
  98. """
  99. 把小数转化为百分数
  100. :param value:
  101. :param decimals:
  102. :return:
  103. """
  104. percentage_value = round(value * 100, decimals)
  105. return "{}%".format(percentage_value)
  106. @classmethod
  107. def str_to_timestamp(cls, date_string, string_format='%Y-%m-%d') -> int:
  108. """
  109. :param string_format:
  110. :param date_string:
  111. :return:
  112. """
  113. date_obj = datetime.strptime(date_string, string_format)
  114. timestamp = date_obj.timestamp()
  115. return int(timestamp)
  116. @classmethod
  117. def timestamp_to_str(cls, timestamp, string_format='%Y-%m-%d %H:%M:%S') -> str:
  118. """
  119. :param string_format:
  120. :param timestamp:
  121. """
  122. dt_object = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone()
  123. date_string = dt_object.strftime(string_format)
  124. return date_string