# -*- coding: utf-8 -*- import json import time from datetime import datetime from datetime import date, timedelta from datetime import datetime from loguru import logger import os import requests import sys,os sys.path.append(os.getcwd()) # from application.common.feishu.feishu_utils import FeishuUtils # from application.common.log import Local # from application.common.feishu.feishu_utils import FeishuUtils # from application.common.log import Local class Local(object): # 统一获取当前时间 2022-04-14 20:13:51.244472 now = datetime.now() # 昨天 2022-04-13 yesterday = (date.today() + timedelta(days=-1)).strftime("%Y-%m-%d") # 今天 2022-04-14 today = date.today() # 明天 2022-04-15 tomorrow = (date.today() + timedelta(days=1)).strftime("%Y-%m-%d") # 使用 logger 模块生成日志 @staticmethod def logger(platform, mode): """ 使用 logger 模块生成日志 """ # 日志路径 log_dir = f"./log_store/{platform}/" log_path = os.getcwd() + os.sep + log_dir if not os.path.isdir(log_path): os.makedirs(log_path) # 日志文件名 # log_name = time.strftime("%Y-%m-%d", time.localtime(time.time())) + f'-{crawler}-{log_type}.log' # log_name = datetime.datetime.now().strftime('%Y-%m-%d') + f'-{crawler}-{log_type}.log' # log_name = f"{date.today():%Y-%m-%d}-{crawler}-{log_type}.log" log_name = f"{platform}-{mode}-{datetime.now().date().strftime('%Y-%m-%d')}.log" # 日志不打印到控制台 logger.remove(handler_id=None) # rotation="500 MB",实现每 500MB 存储一个文件 # rotation="12:00",实现每天 12:00 创建一个文件 # rotation="1 week",每周创建一个文件 # retention="10 days",每隔10天之后就会清理旧的日志 # 初始化日志 # logger.add(f"{log_dir}{log_name}", level="INFO", rotation="00:00", retention="10 days", enqueue=True) logger.add(os.path.join(log_dir, log_name), level="INFO", rotation="00:00", retention="10 days", enqueue=True) return logger """ 飞书表配置: token 鉴权 / 增删改查 / 机器人报警 """ import json import os import sys import requests import urllib3 from loguru import logger proxies = {"http": None, "https": None} class FeishuUtils: """ 编辑飞书云文档 """ succinct_url = "https://w42nne6hzg.feishu.cn/sheets/" # 飞书路径token @classmethod def spreadsheettoken(cls, crawler): if crawler == "summary": return "KsoMsyP2ghleM9tzBfmcEEXBnXg" else: return crawler # 获取飞书api token @classmethod def get_token(cls): """ 获取飞书api token :return: """ url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/" post_data = {"app_id": "cli_a13ad2afa438d00b", # 这里账号密码是发布应用的后台账号及密码 "app_secret": "4tK9LY9VbiQlY5umhE42dclBFo6t4p5O"} urllib3.disable_warnings() response = requests.post(url=url, data=post_data, proxies=proxies, verify=False) tenant_access_token = response.json()["tenant_access_token"] return tenant_access_token # 获取表格元数据 @classmethod def get_metainfo(cls, crawler): """ 获取表格元数据 :return: """ try: get_metainfo_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \ + cls.spreadsheettoken(crawler) + "/metainfo" headers = { "Authorization": "Bearer " + cls.get_token(), "Content-Type": "application/json; charset=utf-8" } params = { "extFields": "protectedRange", # 额外返回的字段,extFields=protectedRange时返回保护行列信息 "user_id_type": "open_id" # 返回的用户id类型,可选open_id,union_id } urllib3.disable_warnings() r = requests.get(url=get_metainfo_url, headers=headers, params=params, proxies=proxies, verify=False) response = json.loads(r.content.decode("utf8")) return response except Exception as e: logger.error("获取表格元数据异常:{}", e) # 读取工作表中所有数据 @classmethod def get_values_batch(cls, crawler, sheetid): """ 读取工作表中所有数据 :param crawler: 哪个爬虫 :param sheetid: 哪张表 :return: 所有数据 """ get_values_batch_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \ + cls.spreadsheettoken(crawler) + "/values_batch_get" headers = { "Authorization": "Bearer " + cls.get_token(), "Content-Type": "application/json; charset=utf-8" } params = { "ranges": sheetid, "valueRenderOption": "ToString", "dateTimeRenderOption": "", "user_id_type": "open_id" } urllib3.disable_warnings() r = requests.get(url=get_values_batch_url, headers=headers, params=params, proxies=proxies, verify=False) response = json.loads(r.content.decode("utf8")) values = response["data"]["valueRanges"][0]["values"] return values # 工作表,插入行或列 @classmethod def insert_columns(cls, crawler, sheetid, majordimension, startindex, endindex): """ 工作表插入行或列 :param log_type: 日志路径 :param crawler: 哪个爬虫的云文档 :param sheetid:哪张工作表 :param majordimension:行或者列, ROWS、COLUMNS :param startindex:开始位置 :param endindex:结束位置 """ try: insert_columns_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \ + cls.spreadsheettoken(crawler) + "/insert_dimension_range" headers = { "Authorization": "Bearer " + cls.get_token(), "Content-Type": "application/json; charset=utf-8" } body = { "dimension": { "sheetId": sheetid, "majorDimension": majordimension, # 默认 ROWS ,可选 ROWS、COLUMNS "startIndex": startindex, # 开始的位置 "endIndex": endindex # 结束的位置 }, "inheritStyle": "AFTER" # BEFORE 或 AFTER,不填为不继承 style } urllib3.disable_warnings() r = requests.post(url=insert_columns_url, headers=headers, json=body, proxies=proxies, verify=False) except Exception as e: logger.error("插入行或列异常:{}", e) # 写入数据 @classmethod def update_values(cls, crawler, sheetid, ranges, values): """ 写入数据 :param log_type: 日志路径 :param crawler: 哪个爬虫的云文档 :param sheetid:哪张工作表 :param ranges:单元格范围 :param values:写入的具体数据,list """ try: update_values_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \ + cls.spreadsheettoken(crawler) + "/values_batch_update" headers = { "Authorization": "Bearer " + cls.get_token(), "Content-Type": "application/json; charset=utf-8" } body = { "valueRanges": [ { "range": sheetid + "!" + ranges, "values": values }, ], } urllib3.disable_warnings() r = requests.post(url=update_values_url, headers=headers, json=body, proxies=proxies, verify=False) except Exception as e: logger.error("写入数据异常:{}", e) # 合并单元格 @classmethod def merge_cells(cls, crawler, sheetid, ranges): """ 合并单元格 :param log_type: 日志路径 :param crawler: 哪个爬虫 :param sheetid:哪张工作表 :param ranges:需要合并的单元格范围 """ try: merge_cells_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \ + cls.spreadsheettoken(crawler) + "/merge_cells" headers = { "Authorization": "Bearer " + cls.get_token(), "Content-Type": "application/json; charset=utf-8" } body = { "range": sheetid + "!" + ranges, "mergeType": "MERGE_ROWS" } urllib3.disable_warnings() r = requests.post(url=merge_cells_url, headers=headers, json=body, proxies=proxies, verify=False) except Exception as e: logger.error("合并单元格异常:{}", e) # 读取单元格数据 @classmethod def get_range_value(cls, crawler, sheetid, cell): """ 读取单元格内容 :param log_type: 日志路径 :param crawler: 哪个爬虫 :param sheetid: 哪张工作表 :param cell: 哪个单元格 :return: 单元格内容 """ try: get_range_value_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \ + cls.spreadsheettoken(crawler) + "/values/" + sheetid + "!" + cell headers = { "Authorization": "Bearer " + cls.get_token(), "Content-Type": "application/json; charset=utf-8" } params = { "valueRenderOption": "FormattedValue", # dateTimeRenderOption=FormattedString 计算并对时间日期按照其格式进行格式化,但不会对数字进行格式化,返回格式化后的字符串。 "dateTimeRenderOption": "", # 返回的用户id类型,可选open_id,union_id "user_id_type": "open_id" } urllib3.disable_warnings() r = requests.get(url=get_range_value_url, headers=headers, params=params, proxies=proxies, verify=False) # logger.error(r.text) return r.json()["data"]["valueRange"]["values"][0] except Exception as e: logger.error("读取单元格数据异常:{}", e) # 获取表内容 @classmethod def get_sheet_content(cls, crawler, sheet_id): try: sheet = Feishu.get_values_batch(crawler, sheet_id) content_list = [] for x in sheet: for y in x: if y is None: pass else: content_list.append(y) return content_list except Exception as e: logger.error(f'get_sheet_content:{e}\n') # 删除行或列,可选 ROWS、COLUMNS @classmethod def dimension_range(cls, log_type, crawler, sheetid, major_dimension, startindex, endindex): """ 删除行或列 :param log_type: 日志路径 :param crawler: 哪个爬虫 :param sheetid:工作表 :param major_dimension:默认 ROWS ,可选 ROWS、COLUMNS :param startindex:开始的位置 :param endindex:结束的位置 :return: """ try: dimension_range_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \ + cls.spreadsheettoken(crawler) + "/dimension_range" headers = { "Authorization": "Bearer " + cls.get_token(), "Content-Type": "application/json; charset=utf-8" } body = { "dimension": { "sheetId": sheetid, "majorDimension": major_dimension, "startIndex": startindex, "endIndex": endindex } } urllib3.disable_warnings() r = requests.delete(url=dimension_range_url, headers=headers, json=body, proxies=proxies, verify=False) except Exception as e: logger.error("删除视频数据异常:{}", e) # 获取用户 ID @classmethod def get_userid(cls, username): try: url = "https://open.feishu.cn/open-apis/user/v1/batch_get_id?" headers = { "Authorization": "Bearer " + cls.get_token(), "Content-Type": "application/json; charset=utf-8" } name_phone_dict = { "xinxin": "15546206651", "muxinyi": "13699208058", "wangxueke": "13513479926", "yuzhuoyi": "18624010360", "luojunhui": "18801281360", "fanjun": "15200827642", "zhangyong": "17600025055", 'liukunyu': "18810931977" } username = name_phone_dict.get(username) data = {"mobiles": [username]} urllib3.disable_warnings() r = requests.get(url=url, headers=headers, params=data, verify=False, proxies=proxies) open_id = r.json()["data"]["mobile_users"][username][0]["open_id"] return open_id except Exception as e: pass # logger.error(f"get_userid异常:{e}\n") # 飞书机器人 @classmethod def bot(cls, log_type, crawler, text, mark_name): try: headers = {'Content-Type': 'application/json'} if crawler == "机器自动改造消息通知": url = "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703" sheet_url = "https://w42nne6hzg.feishu.cn/sheets/KsoMsyP2ghleM9tzBfmcEEXBnXg?sheet=bc154d" users = f"{mark_name}" elif crawler == "快手关键词搜索": url = "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703" sheet_url = "https://w42nne6hzg.feishu.cn/sheets/KsoMsyP2ghleM9tzBfmcEEXBnXg?sheet=U1gySe" users = "".join([f'{name}' for type, name in zip(log_type, mark_name)]) # users = f"{mark_name}" else: url = "https://open.feishu.cn/open-apis/bot/v2/hook/7928f182-08c1-4c4d-b2f7-82e10c93ca80" sheet_url = "https://w42nne6hzg.feishu.cn/sheets/KsoMsyP2ghleM9tzBfmcEEXBnXg?sheet=bc154d" users = f"{mark_name}" data = json.dumps({ "msg_type": "interactive", "card": { "config": { "wide_screen_mode": True, "enable_forward": True }, "elements": [{ "tag": "div", "text": { "content": users + text, "tag": "lark_md" } }, { "actions": [{ "tag": "button", "text": { "content": "详情,点击~~~~~", "tag": "lark_md" }, "url": sheet_url, "type": "default", "value": {} }], "tag": "action" }], "header": { "title": { "content": "📣消息提醒", "tag": "plain_text" } } } }) urllib3.disable_warnings() r = requests.post(url, headers=headers, data=data, verify=False, proxies=proxies) except Exception as e: logger.error(f"bot异常:{e}\n") # 飞书机器人-改造计划完成通知 @classmethod def finish_bot(cls, text, url, content): try: headers = {'Content-Type': 'application/json'} data = json.dumps({ "msg_type": "interactive", "card": { "config": { "wide_screen_mode": True, "enable_forward": True }, "elements": [{ "tag": "div", "text": { "content": text, "tag": "lark_md" } }], "header": { "title": { "content": content, "tag": "plain_text" } } } }) urllib3.disable_warnings() r = requests.post(url, headers=headers, data=data, verify=False, proxies=proxies) except Exception as e: logger.error(f"bot异常:{e}\n") class ZhongQingKanDian: API_BASE_URL = "http://8.217.192.46:8889" COMMON_HEADERS = { "Content-Type": "application/json" } MAX_RETRIES = 3 TIMEOUT = 10 # 设置超时时间 def __init__(self): self.session = requests.Session() self.session.headers.update(self.COMMON_HEADERS) # 初始化请求次数计数器 self.recommend_list_request_count = 0 self.content_recommend_list_request_count = 0 self.detail_request_count = 0 def send_request(self, endpoint, data): full_url = f"{self.API_BASE_URL}{endpoint}" for retry in range(self.MAX_RETRIES): try: response = self.session.post(full_url, data=data, timeout=self.TIMEOUT) response.raise_for_status() return response.json() except requests.RequestException as e: Local.logger("zhongqingkandian", "recommend").info( f"请求 {full_url} 失败(第 {retry + 1} 次重试): {e}") if retry < self.MAX_RETRIES - 1: time.sleep(2) except json.JSONDecodeError as e: Local.logger("zhongqingkandian", "recommend").info( f"解析 {full_url} 的响应数据失败(第 {retry + 1} 次重试): {e}") # print(f"解析 {full_url} 的响应数据失败(第 {retry + 1} 次重试): {e}") if retry < self.MAX_RETRIES - 1: time.sleep(2) return None def is_response_valid(self, resp): if resp and resp.get("code", -1) == 0: data = resp.get("data", {}).get("data") return data is not None return False def req_recommend_list(self): url = '/crawler/zhong_qing_kan_dian/recommend' body = json.dumps({"cursor": ""}) resp = self.send_request(url, body) if self.is_response_valid(resp): self.recommend_list_request_count += 1 Local.logger("zhongqingkandian", "recommend").info(f"请求推荐流的总次数: {self.recommend_list_request_count}响应:{resp}") return resp["data"]["data"] Local.logger("zhongqingkandian", "recommend").info( f"请求推荐流失败,返回异常: {resp}") return None def req_content_recommend_list(self, content_id): url = '/crawler/zhong_qing_kan_dian/related' body = json.dumps({ "content_id": str(content_id), "cursor": "" }) resp = self.send_request(url, body) if self.is_response_valid(resp): self.content_recommend_list_request_count += 1 Local.logger("zhongqingkandian", "recommend").info(f"请求内容相关推荐流的总次数: {self.content_recommend_list_request_count}响应:{resp}") return resp["data"]["data"] Local.logger("zhongqingkandian", "recommend").info( f"请求内容相关推荐流失败,返回异常: {resp}") return None def req_detail(self, content_link, label): url = '/crawler/zhong_qing_kan_dian/detail' body = json.dumps({ "content_link": content_link }) resp = self.send_request(url, body) if resp and resp.get("code") == 0: self.detail_request_count += 1 Local.logger("zhongqingkandian", "recommend").info(f"请求详情的总次数: {self.detail_request_count}") data = resp["data"]["data"] if data["content_type"] == "video": video_id = data['channel_content_id'] video_title = data["title"] video_cover = data["image_url_list"][0]['image_url'] video_url = data["video_url_list"][0]['video_url'] video_duration = data["video_url_list"][0]['video_duration'] account_id = data["channel_account_id"] account_name = data["channel_account_name"] account_avatar = data["avatar"] values = [ [ video_title, video_url, video_duration, video_cover, video_id, content_link, account_name, account_id, account_avatar, label, ] ] FeishuUtils.insert_columns("BvScsJKDWhuj1ctUX1mcBzq1nYb", "a338b3", "ROWS", 1, 2) time.sleep(0.5) FeishuUtils.update_values("BvScsJKDWhuj1ctUX1mcBzq1nYb", "a338b3", "A2:Z2", values) else: Local.logger("zhongqingkandian", "recommend").info(f"不是视频") else: Local.logger("zhongqingkandian", "recommend").info(f"请求详情失败,返回异常: {resp}") return None def control_request(self): recommend_list = self.req_recommend_list() if recommend_list: for video_obj in recommend_list: content_link = video_obj.get("share_url") content_id = video_obj.get("id") if content_link and content_id: time.sleep(2) detail = self.req_detail(content_link, "推荐") if detail: print(detail) time.sleep(10) content_recommend_list = self.req_content_recommend_list(content_id) if content_recommend_list: for content_obj in content_recommend_list: content_link = content_obj.get("share_info", {}).get("share_url") if content_link: res = self.req_detail(content_link, "内容相关推荐") if res: print(res) def run(self): while True: self.control_request() if __name__ == '__main__': ZhongQingKanDian().run() # ZhongQingKanDian().req_detail('https://vol.youth.cn/1qWiCPOjl1CUewP5?signature=bDjmABzyXE32GNxlOY4pJVbdZfDqw9naZ9vnQ58wq06peMdkrP','ceshi')