123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- # -*- coding: utf-8 -*-
- # @Author: wangkun
- # @Time: 2023/3/27
- import requests
- from mq_http_sdk.mq_client import *
- from mq_http_sdk.mq_exception import MQExceptionBase
- import os, sys, jieba
- import time
- import random
- import difflib
- sys.path.append(os.getcwd())
- from common.common import Common
- from common.feishu import Feishu
- from common.scheduling_db import MysqlHelper
- # from common import Common
- # from feishu import Feishu
- # from scheduling_db import MysqlHelper
- def get_user_from_mysql(log_type, crawler, source, env, action=''):
- sql = f"select * from crawler_user_v3 where source='{source}' and mode='{log_type}'"
- results = MysqlHelper.get_values(log_type, crawler, sql, env, action=action)
- if results:
- return results
- else:
- Common.logger(log_type, crawler).warning(f"爬虫:{crawler},没有查到抓取名单")
- return []
- def title_like(log_type, crawler, platform, title, env):
- """
- 标题相似度
- :param log_type: 日志
- :param crawler: 哪款爬虫
- :param platform: 爬虫渠道,如:公众号 / 小年糕
- :param title: 视频标题
- :param env: 环境
- :return: 相似度>=80%,返回 True;反之,返回 False
- """
- select_sql = f""" select video_title from crawler_video where platform="{platform}" """
- video_list = MysqlHelper.get_values(log_type, crawler, select_sql, env, action="")
- # print(video_list)
- if len(video_list) == 0:
- return False
- for video_dict in video_list:
- video_title = video_dict["video_title"]
- # print(video_title)
- if difflib.SequenceMatcher(None, title, video_title).quick_ratio() >= 0.8:
- return True
- else:
- continue
- return False
- def get_config_from_mysql(log_type, source, env, text, action=''):
- select_sql = f"""select * from crawler_config where source="{source}" """
- contents = MysqlHelper.get_values(log_type, source, select_sql, env, action=action)
- title_list = []
- filter_list = []
- emoji_list = []
- search_word_list = []
- for content in contents:
- config = content['config']
- config_dict = eval(config)
- for k, v in config_dict.items():
- if k == "title":
- title_list_config = v.split(",")
- for title in title_list_config:
- title_list.append(title)
- if k == "filter":
- filter_list_config = v.split(",")
- for filter_word in filter_list_config:
- filter_list.append(filter_word)
- if k == "emoji":
- emoji_list_config = v.split(",")
- for emoji in emoji_list_config:
- emoji_list.append(emoji)
- if k == "search_word":
- search_word_list_config = v.split(",")
- for search_word in search_word_list_config:
- search_word_list.append(search_word)
- if text == "title":
- return title_list
- elif text == "filter":
- return filter_list
- elif text == "emoji":
- return emoji_list
- elif text == "search_word":
- return search_word_list
- def random_title(log_type, crawler, env, text):
- random_title_list = get_config_from_mysql(log_type, crawler, env, text)
- return random.choice(random_title_list)
- def task_fun(task_str):
- task_str = task_str.replace("'[", '[').replace("]'", ']')
- task_dict = dict(eval(task_str))
- rule = task_dict['rule']
- task_dict['rule'] = dict()
- for item in rule:
- for k, val in item.items():
- task_dict['rule'][k] = val
- rule_dict = task_dict['rule']
- task_dict = {
- "task_dict": task_dict,
- "rule_dict": rule_dict
- }
- return task_dict
- def task_fun_mq(task_str):
- task_str = task_str.replace('"[', '[').replace(']"', ']').replace('\\', '')
- task_dict = dict(eval(task_str))
- rule = task_dict['rule']
- task_dict['rule'] = dict()
- for item in rule:
- for k, val in item.items():
- task_dict['rule'][k] = val
- rule_dict = task_dict['rule']
- task_dict = {
- "task_dict": task_dict,
- "rule_dict": rule_dict
- }
- return task_dict
- def get_consumer(topic_name, group_id):
- # 初始化client。
- mq_client = MQClient(
- # 设置HTTP协议客户端接入点,进入云消息队列 RocketMQ 版控制台实例详情页面的接入点区域查看。
- "http://1894469520484605.mqrest.cn-qingdao-public.aliyuncs.com",
- # AccessKey ID,阿里云身份验证标识。获取方式,请参见创建AccessKey。
- "LTAI4G7puhXtLyHzHQpD6H7A",
- # AccessKey Secret,阿里云身份验证密钥。获取方式,请参见创建AccessKey。
- "nEbq3xWNQd1qLpdy2u71qFweHkZjSG"
- )
- # 消息所属的Topic,在云消息队列 RocketMQ 版控制台创建。
- # topic_name = "${TOPIC}"
- topic_name = str(topic_name)
- # 您在云消息队列 RocketMQ 版控制台创建的Group ID。
- # group_id = "${GROUP_ID}"
- group_id = str(group_id)
- # Topic所属的实例ID,在云消息队列 RocketMQ 版控制台创建。
- # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在云消息队列 RocketMQ 版控制台的实例详情页面查看。
- instance_id = "MQ_INST_1894469520484605_BXhXuzkZ"
- consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
- return consumer
- def ack_message(log_type, crawler, recv_msgs, consumer):
- # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
- # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
- try:
- receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
- consumer.ack_message(receipt_handle_list)
- Common.logger(log_type, crawler).info(f"Ack {len(receipt_handle_list)} Message Succeed.\n")
- except MQExceptionBase as err:
- Common.logger(log_type, crawler).info(f"Ack Message Fail! Exception:{err}\n")
- def download_rule(log_type, crawler, video_dict, rule_dict):
- """
- 下载视频的基本规则
- :param log_type: 日志
- :param crawler: 哪款爬虫
- :param video_dict: 视频信息,字典格式
- :param rule_dict: 规则信息,字典格式
- :return: 满足规则,返回 True;反之,返回 False
- """
- # 格式化 video_dict:publish_time_stamp
- if "publish_time_stamp" in video_dict.keys():
- video_dict["publish_time"] = video_dict["publish_time_stamp"] * 1000
- # 格式化 video_dict:period
- if "period" not in video_dict.keys() and "publish_time" in video_dict.keys():
- video_dict["period"] = int((int(time.time() * 1000) - video_dict["publish_time"]) / (3600 * 24 * 1000))
- # 格式化 rule_dict 最大值取值为 0 的问题
- for rule_value in rule_dict.values():
- if rule_value["max"] == 0:
- rule_value["max"] = 999999999999999
- # 格式化 rule_dict 有的 key,video_dict 中没有的问题
- for rule_key in rule_dict.keys():
- if rule_key not in video_dict.keys():
- video_dict[rule_key] = int(rule_dict[rule_key]["max"] / 2)
- # 比较结果,输出:True / False
- for video_key, video_value in video_dict.items():
- for rule_key, rule_value in rule_dict.items():
- if video_key == rule_key == "period":
- result = 0 <= int(video_value) <= int(rule_value["max"])
- Common.logger(log_type, crawler).info(f'{video_key}: 0 <= {video_value} <= {rule_value["min"]}, {result}')
- elif video_key == rule_key:
- result = int(rule_value["min"]) <= int(video_value) <= int(rule_value["max"])
- Common.logger(log_type, crawler).info(f'{video_key}: {rule_value["min"]} <= {video_value} <= {rule_value["max"]},{result}')
- else:
- result = True
- if result is False:
- return False
- else:
- continue
- return True
- def get_word_score(log_type, crawler, score_sheet, word):
- while True:
- score_sheet = Feishu.get_values_batch(log_type, crawler, score_sheet)
- if score_sheet is None:
- time.sleep(1)
- continue
- for i in range(1, len(score_sheet)):
- if word not in [y for x in score_sheet for y in x]:
- return 0
- if word == score_sheet[i][0]:
- word_score = score_sheet[i][8]
- return word_score
- def get_title_score(log_type, crawler, stop_sheet, score_sheet, title):
- # 获取停用词列表
- # while True:
- stop_word_list = []
- stop_word_sheet = Feishu.get_values_batch(log_type, crawler, stop_sheet)
- if stop_word_sheet is None:
- return 0
- # time.sleep(1)
- # continue
- for x in stop_word_sheet:
- for y in x:
- if y is None:
- pass
- else:
- stop_word_list.append(y)
- # break
- # 文本分词
- cut_word_list = []
- cut_list = jieba.lcut(title)
- for cut_item in cut_list:
- if cut_item == " ":
- continue
- if cut_item in stop_word_list:
- continue
- cut_word_list.append(cut_item)
- # 获取权重分列表
- score_list = []
- for word in cut_word_list:
- word_score = get_word_score(log_type, crawler, score_sheet, word)
- score_list.append(word_score)
- # 获取标题的权重总分
- title_score = sum(score_list)
- return title_score
- def task_unbind(log_type, crawler, taskid, uids, env):
- if env == "dev":
- url = "https://testadmin.piaoquantv.com/manager/crawler/v3/task/unbind"
- else:
- url = "https://admin.piaoquantv.com/manager/crawler/v3/task/unbind"
- params = {
- "taskId": taskid, # 任务 ID
- "uids": uids, # 解绑用户uid(多个英文逗号隔开),例如"3222121,213231"
- "operator": "" # 默认 system
- }
- Common.logger(log_type, crawler).info(f"url:{url}")
- Common.logging(log_type, crawler, env, f"url:{url}")
- Common.logger(log_type, crawler).info(f"params:{params}")
- Common.logging(log_type, crawler, env, f"params:{params}")
- response = requests.post(url=url, json=params)
- Common.logger(log_type, crawler).info(f"task_unbind_response:{response.text}")
- Common.logging(log_type, crawler, env, f"task_unbind_response:{response.text}")
- if response.status_code == 200 and response.json()["code"] == 0:
- return "success"
- else:
- return response.text
- if __name__ == "__main__":
- print(get_title_score("recommend", "kuaishou", "16QspO", "0usaDk", '像梦一场'))
- pass
|