""" @author: luojunhui """ import threading from datetime import datetime, timezone import hashlib import requests import pymysql class Functions(object): """ functions class """ @classmethod def getTitleScore(cls, title_list, account_name): """ 标题打分 :param title_list: :param account_name: :return: """ url = "http://192.168.100.31:6060/score_list" body = { "account_nickname_list": [account_name], "text_list": title_list, "max_time": None, "min_time": None, "interest_type": "avg", "sim_type": "mean", "rate": 0.1 } response = requests.post(url=url, headers={}, json=body).json() return response @classmethod def getTitleAccountScore(cls, title, account_list): """ 标题打分 :param title: :param account_list: :return: """ url = "http://192.168.100.31:6060/score_list" body = { "account_nickname_list": account_list, "text_list": [title], "max_time": None, "min_time": None, "interest_type": "avg", "sim_type": "mean", "rate": 0.1 } response = requests.post(url=url, headers={}, json=body).json() L = [] for account in account_list: account_score = response[account]['score_list'][0] L.append([account, account_score]) return L @classmethod def matchLinkByIdTuple(cls, channel_id_tuple): """ Use channelContentId to match articleUrl :param channel_id_tuple: :return: """ connection = pymysql.connect( host='rm-bp12k5fuh5zyx31d28o.mysql.rds.aliyuncs.com', port=3306, user='wx2023_ad', password='wx2023_adP@assword1234', db='adplatform', charset='utf8mb4' ) sql = f"""select id, account_id, link, item_index, title from changwen_article where id in {channel_id_tuple};""" cursor = connection.cursor() cursor.execute(sql) article_link = cursor.fetchall() L = {} for line in article_link: key = line[0] value = { "gh_key": "{}_{}".format(line[1], line[3]), "url": line[2], "title": line[4] } L[key] = value return L @classmethod def TitleSimilarity(cls, title_list, target_title): """ 计算标题相似度 :return: """ def title_sim_v2(title_a, title_b, thredhold=0.8): """ :param title_a: :param title_b: :param thredhold: :return: """ if len(title_a) < 1 or len(title_b) < 1: return False set_a = set(title_a) set_b = set(title_b) set_cross = set_a & set_b set_union = set_a | set_b if not set_union: return False min_len = max(min(len(set_a), len(set_b)), 1) rate = len(set_cross) / min_len if rate >= thredhold: return True else: return False for title in title_list: sim_score = title_sim_v2(target_title, title) if sim_score: return True return False @classmethod def show_desc_to_sta(cls, show_desc): """ :return: """ def decode_show_v(show_v): """ :param show_v: :return: """ foo = show_v.replace('千', 'e3').replace('万', 'e4').replace('亿', 'e8') foo = eval(foo) return int(foo) def decode_show_k(show_k): """ :param show_k: :return: """ this_dict = { '阅读': 'show_view_count', # 文章 '看过': 'show_view_count', # 图文 '观看': 'show_view_count', # 视频 '赞': 'show_like_count', '付费': 'show_pay_count', '赞赏': 'show_zs_count', } if show_k not in this_dict: print(f'error from decode_show_k, show_k not found: {show_k}') return this_dict.get(show_k, 'show_unknown') show_desc = show_desc.replace('+', '') sta = {} for show_kv in show_desc.split('\u2004\u2005'): if not show_kv: continue show_k, show_v = show_kv.split('\u2006') k = decode_show_k(show_k) v = decode_show_v(show_v) sta[k] = v res = { 'show_view_count': sta.get('show_view_count', 0), 'show_like_count': sta.get('show_like_count', 0), 'show_pay_count': sta.get('show_pay_count', 0), 'show_zs_count': sta.get('show_zs_count', 0), } return res @classmethod def generateGzhId(cls, url): """ generate url :param url: :return: """ biz = url.split("biz=")[1].split("&")[0] idx = url.split("&idx=")[1].split("&")[0] sn = url.split("&sn=")[1].split("&")[0] url_bit = "{}-{}-{}".format(biz, idx, sn).encode() md5_hash = hashlib.md5() md5_hash.update(url_bit) md5_value = md5_hash.hexdigest() return md5_value @classmethod def time_stamp_to_str(cls, timestamp): """ :param timestamp: """ dt_object = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc).astimezone() date_string = dt_object.strftime('%Y-%m-%d %H:%M:%S') return date_string @classmethod def job_with_thread(cls, job_func): """ 每个任务放到单个线程中 :param job_func: :return: """ job_thread = threading.Thread(target=job_func) job_thread.start() @classmethod def str_to_md5(cls, strings): """ 字符串转化为 md5 值 :param strings: :return: """ # 将字符串转换为字节 original_bytes = strings.encode('utf-8') # 创建一个md5 hash对象 md5_hash = hashlib.md5() # 更新hash对象,传入原始字节 md5_hash.update(original_bytes) # 获取16进制形式的MD5哈希值 md5_value = md5_hash.hexdigest() return md5_value