# -*- coding: utf-8 -*- # @Author: wangkun # @Time: 2023/3/27 import requests from mq_http_sdk.mq_client import * from mq_http_sdk.mq_exception import MQExceptionBase from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import os, sys, jieba import time import random import difflib sys.path.append(os.getcwd()) from common.common import Common from common.feishu import Feishu from common.scheduling_db import MysqlHelper # from common import Common # from feishu import Feishu # from scheduling_db import MysqlHelper def get_user_from_mysql(log_type, crawler, source, env, action=""): sql = f"select * from crawler_user_v3 where source='{source}' and mode='{log_type}'" results = MysqlHelper.get_values(log_type, crawler, sql, env, action=action) if results: return results else: Common.logger(log_type, crawler).warning(f"爬虫:{crawler},没有查到抓取名单") return [] def similarity(title1, title2): # 分词 seg1 = jieba.lcut(title1) seg2 = jieba.lcut(title2) # 构建TF-IDF向量 tfidf_vectorizer = TfidfVectorizer() tfidf_matrix = tfidf_vectorizer.fit_transform(["".join(seg1), "".join(seg2)]) # 计算余弦相似度 similar = cosine_similarity(tfidf_matrix[0], tfidf_matrix[1])[0][0] return similar def title_like(log_type, crawler, platform, title, env): """ 标题相似度 :param log_type: 日志 :param crawler: 哪款爬虫 :param platform: 爬虫渠道,如:公众号 / 小年糕 :param title: 视频标题 :param env: 环境 :return: 相似度>=80%,返回 True;反之,返回 False """ select_sql = ( f""" select video_title from crawler_video where platform="{platform}" """ ) video_list = MysqlHelper.get_values(log_type, crawler, select_sql, env, action="") # print(video_list) if len(video_list) == 0: return False for video_dict in video_list: video_title = video_dict["video_title"] # print(video_title) if difflib.SequenceMatcher(None, title, video_title).quick_ratio() >= 0.8: return True else: continue return False def get_config_from_mysql(log_type, source, env, text, action=""): select_sql = f"""select * from crawler_config where source="{source}" """ contents = MysqlHelper.get_values(log_type, source, select_sql, env, action=action) title_list = [] filter_list = [] emoji_list = [] search_word_list = [] for content in contents: config = content["config"] config_dict = eval(config) for k, v in config_dict.items(): if k == "title": title_list_config = v.split(",") for title in title_list_config: title_list.append(title) if k == "filter": filter_list_config = v.split(",") for filter_word in filter_list_config: filter_list.append(filter_word) if k == "emoji": emoji_list_config = v.split(",") for emoji in emoji_list_config: emoji_list.append(emoji) if k == "search_word": search_word_list_config = v.split(",") for search_word in search_word_list_config: search_word_list.append(search_word) if text == "title": return title_list elif text == "filter": return filter_list elif text == "emoji": return emoji_list elif text == "search_word": return search_word_list def get_rule_from_mysql(task_id, log_type, crawler, env): select_rule_sql = f"""select rule from crawler_task_v3 where id={task_id}""" rule_list = MysqlHelper.get_values( log_type, crawler, select_rule_sql, env, action="" ) return json.loads(rule_list[0]["rule"]) def random_title(log_type, crawler, env, text): random_title_list = get_config_from_mysql(log_type, crawler, env, text) return random.choice(random_title_list) def task_fun(task_str): task_str = task_str.replace("'[", "[").replace("]'", "]") task_dict = dict(eval(task_str)) rule = task_dict["rule"] task_dict["rule"] = dict() for item in rule: for k, val in item.items(): task_dict["rule"][k] = val rule_dict = task_dict["rule"] task_dict = {"task_dict": task_dict, "rule_dict": rule_dict} return task_dict def task_fun_mq(task_str): task_str = task_str.replace('"[', "[").replace(']"', "]").replace("\\", "") task_dict = dict(eval(task_str)) rule = task_dict["rule"] task_dict["rule"] = dict() for item in rule: for k, val in item.items(): task_dict["rule"][k] = val rule_dict = task_dict["rule"] task_dict = {"task_dict": task_dict, "rule_dict": rule_dict} return task_dict def get_consumer(topic_name, group_id): # 初始化client。 mq_client = MQClient( # 设置HTTP协议客户端接入点,进入云消息队列 RocketMQ 版控制台实例详情页面的接入点区域查看。 "http://1894469520484605.mqrest.cn-qingdao-public.aliyuncs.com", # AccessKey ID,阿里云身份验证标识。获取方式,请参见创建AccessKey。 "LTAI4G7puhXtLyHzHQpD6H7A", # AccessKey Secret,阿里云身份验证密钥。获取方式,请参见创建AccessKey。 "nEbq3xWNQd1qLpdy2u71qFweHkZjSG", ) # 消息所属的Topic,在云消息队列 RocketMQ 版控制台创建。 # topic_name = "${TOPIC}" topic_name = str(topic_name) # 您在云消息队列 RocketMQ 版控制台创建的Group ID。 # group_id = "${GROUP_ID}" group_id = str(group_id) # Topic所属的实例ID,在云消息队列 RocketMQ 版控制台创建。 # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在云消息队列 RocketMQ 版控制台的实例详情页面查看。 instance_id = "MQ_INST_1894469520484605_BXhXuzkZ" consumer = mq_client.get_consumer(instance_id, topic_name, group_id) return consumer def ack_message(log_type, crawler, recv_msgs, consumer): # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。 # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。 try: receipt_handle_list = [msg.receipt_handle for msg in recv_msgs] consumer.ack_message(receipt_handle_list) Common.logger(log_type, crawler).info( f"Ack {len(receipt_handle_list)} Message Succeed.\n" ) except MQExceptionBase as err: Common.logger(log_type, crawler).info(f"Ack Message Fail! Exception:{err}\n") def download_rule(log_type, crawler, video_dict, rule_dict): """ 下载视频的基本规则 :param log_type: 日志 :param crawler: 哪款爬虫 :param video_dict: 视频信息,字典格式 :param rule_dict: 规则信息,字典格式 :return: 满足规则,返回 True;反之,返回 False """ # 格式化 video_dict:publish_time_stamp if "publish_time_stamp" in video_dict.keys(): video_dict["publish_time"] = video_dict["publish_time_stamp"] * 1000 # 格式化 video_dict:period if "period" not in video_dict.keys() and "publish_time" in video_dict.keys(): video_dict["period"] = int( (int(time.time() * 1000) - video_dict["publish_time"]) / (3600 * 24 * 1000) ) # 格式化 rule_dict 最大值取值为 0 的问题 for rule_value in rule_dict.values(): if rule_value["max"] == 0: rule_value["max"] = 999999999999999 # 格式化 rule_dict 有的 key,video_dict 中没有的问题 for rule_key in rule_dict.keys(): if rule_key not in video_dict.keys(): video_dict[rule_key] = int(rule_dict[rule_key]["max"] / 2) # 比较结果,输出:True / False for video_key, video_value in video_dict.items(): for rule_key, rule_value in rule_dict.items(): if video_key == rule_key == "period": result = 0 <= int(video_value) <= int(rule_value["max"]) Common.logger(log_type, crawler).info( f'{video_key}: 0 <= {video_value} <= {rule_value["min"]}, {result}' ) elif video_key == rule_key: result = ( int(rule_value["min"]) <= int(video_value) <= int(rule_value["max"]) ) Common.logger(log_type, crawler).info( f'{video_key}: {rule_value["min"]} <= {video_value} <= {rule_value["max"]},{result}' ) else: result = True if result is False: return False else: continue return True def download_rule_v2(log_type, crawler, video_dict, rule_dict): """ 下载视频的基本规则 :param log_type: 日志 :param crawler: 哪款爬虫 :param video_dict: 视频信息,字典格式 :param rule_dict: 规则信息,字典格式 :return: 满足规则,返回 True;反之,返回 False """ # 格式化 video_dict:publish_time_stamp if video_dict.get("publish_time_stamp"): video_dict["publish_time"] = video_dict["publish_time_stamp"] * 1000 # 格式化 video_dict:period if ( video_dict.get("publish_time") and video_dict.get("period", "noperiod") == "noperiod" ): video_dict["period"] = int( (int(time.time() * 1000) - video_dict["publish_time"]) / (3600 * 24 * 1000) ) # 格式化 rule_dict 最大值取值为 0 的问题 for key in video_dict: if rule_dict.get(key): max_value = ( int(rule_dict[key]["max"]) if int(rule_dict[key]["max"]) > 0 else 999999999999999 ) if key == "peroid": flag = 0 <= int(video_dict[key]) <= max_value Common.logger(log_type, crawler).info( "{}: 0 <= {} <= {}, {}".format( key, video_dict[key], max_value, flag ) ) if not flag: return flag else: flag = int(rule_dict[key]["min"]) <= int(video_dict[key] <= max_value) Common.logger(log_type, crawler).info( "{}: {} <= {} <= {}, {}".format( key, rule_dict[key]["min"], video_dict[key], max_value, flag ) ) if not flag: return flag else: continue return True def get_word_score(log_type, crawler, score_sheet, word): while True: score_sheet = Feishu.get_values_batch(log_type, crawler, score_sheet) if score_sheet is None: time.sleep(1) continue for i in range(1, len(score_sheet)): if word not in [y for x in score_sheet for y in x]: return 0 if word == score_sheet[i][0]: word_score = score_sheet[i][8] return word_score def get_title_score(log_type, crawler, stop_sheet, score_sheet, title): # 获取停用词列表 # while True: stop_word_list = [] stop_word_sheet = Feishu.get_values_batch(log_type, crawler, stop_sheet) if stop_word_sheet is None: return 0 # time.sleep(1) # continue for x in stop_word_sheet: for y in x: if y is None: pass else: stop_word_list.append(y) # break # 文本分词 cut_word_list = [] cut_list = jieba.lcut(title) for cut_item in cut_list: if cut_item == " ": continue if cut_item in stop_word_list: continue cut_word_list.append(cut_item) # 获取权重分列表 score_list = [] for word in cut_word_list: word_score = get_word_score(log_type, crawler, score_sheet, word) score_list.append(word_score) # 获取标题的权重总分 title_score = sum(score_list) return title_score def task_unbind(log_type, crawler, taskid, uids, env): if env == "dev": url = "https://testadmin.piaoquantv.com/manager/crawler/v3/task/unbind" else: url = "https://admin.piaoquantv.com/manager/crawler/v3/task/unbind" params = { "taskId": taskid, # 任务 ID "uids": uids, # 解绑用户uid(多个英文逗号隔开),例如"3222121,213231" "operator": "", # 默认 system } Common.logger(log_type, crawler).info(f"url:{url}") Common.logging(log_type, crawler, env, f"url:{url}") Common.logger(log_type, crawler).info(f"params:{params}") Common.logging(log_type, crawler, env, f"params:{params}") response = requests.post(url=url, json=params) Common.logger(log_type, crawler).info(f"task_unbind_response:{response.text}") Common.logging(log_type, crawler, env, f"task_unbind_response:{response.text}") if response.status_code == 200 and response.json()["code"] == 0: return "success" else: return response.text def clean_title(strings): return ( strings.strip() .replace("\n", "") .replace("/", "") .replace("\r", "") .replace("#", "") .replace(".", "。") .replace("\\", "") .replace("&NBSP", "") .replace(":", "") .replace("*", "") .replace("?", "") .replace("?", "") .replace('"', "") .replace("<", "") .replace(">", "") .replace("|", "") .replace(" ", "") .replace('"', "") .replace("'", "") ) if __name__ == "__main__": print(get_title_score("recommend", "kuaishou", "16QspO", "0usaDk", "像梦一场")) pass