import json import time import requests import urllib3 from common.public import task_fun_mq, get_consumer, ack_message from common.scheduling_db import MysqlHelper from common.common import Common import asyncio from gongzhonghao.gongzhonghao_author.gongzhonghao_author import GongzhonghaoAuthor token_d = { "token": "883406306", "cookie": "appmsglist_action_3524986952=card; ua_id=j6t2xNuC0mv6dLVbAAAAAMPRLKj1sVGSlMDwNFJKE3s=; wxuin=93278011749821; mm_lang=zh_CN; pgv_pvid=6815195556; noticeLoginFlag=1; remember_acct=2071735594%40qq.com; rewardsn=; wxtokenkey=777; _clck=3930572231|1|ff1|0; uuid=680bd7f128bf80058bc62dd82ff85c96; rand_info=CAESIBtaIUDyVXWwBRD33d7CafRp3rV5rXK7mcvYCy4Yvnn+; slave_bizuin=3236647229; data_bizuin=3236647229; bizuin=3236647229; data_ticket=Dx0Yxt5o9JJuMyndtyu3+JZBym0Dcjy6QqjPcfp+xwsLHf3Y+L9ZmP+kDX6o4t9r; slave_sid=WjV0MXhZZXlrcG9BTGVOZjBEOUlyUFptMWEyN2JNcXlpeU5kcGIyVm9IZUZOV3J1RElKb29KTDJIRHRYaGZtNnVSbklua1FOdUNsX3NoQWE4RFVKM0lKbDkzU25wblRGTDhDWFJteExtMHBjZGwyanZKOVVCWmE1UmNxT3FaZWNsd0VrVm52eEpLakFocGVz; slave_user=gh_d284c09295eb; xid=675798a4e148cb559bed6bb65681ebf9; _clsk=1a6iklq|1694746372692|2|1|mp.weixin.qq.com/weheat-agent/payload/record" } def get_user_info(token_dict): url = "https://mp.weixin.qq.com/cgi-bin/searchbiz?" headers = { "accept": "*/*", "accept-encoding": "gzip, deflate, br", "accept-language": "zh-CN,zh;q=0.9", "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?" "t=media/appmsg_edit_v2&action=edit&isNew=1" "&type=77&createType=5&token=1011071554&lang=zh_CN", "sec-ch-ua": '" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" " (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36", "x-requested-with": "XMLHttpRequest", "cookie": token_dict["cookie"], } params = { "action": "search_biz", "begin": "0", "count": "5", "query": "生活小妙招小助手", "token": token_dict["token"], "lang": "zh_CN", "f": "json", "ajax": "1", } # proxies = Common.tunnel_proxies() # print(proxies) urllib3.disable_warnings() r = requests.get(url=url, headers=headers, params=params, verify=False) r.close() print(r.json()) # if r.json()["base_resp"]["err_msg"] == "invalid session": # Common.logger(log_type, crawler).warning( # f"status_code:{r.status_code}, get_fakeid:{r.text}\n" # ) # # Common.logging( # # log_type, # # crawler, # # env, # # f"status_code:{r.status_code}, get_fakeid:{r.text}\n", # # ) # cls.release_token(log_type, crawler, env, token_dict["token_id"], -2) # if 20 >= datetime.datetime.now().hour >= 10: # Feishu.bot( # log_type, # crawler, # f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n过期啦,请扫码更换token\nhttps://mp.weixin.qq.com/", # ) # time.sleep(60 * 15) # continue # if r.json()["base_resp"]["err_msg"] == "freq control": # Common.logger(log_type, crawler).warning( # f"status_code:{r.status_code}, get_fakeid:{r.text}\n" # ) # # Common.logging( # # log_type, # # crawler, # # env, # # f"status_code:{r.status_code}, get_fakeid:{r.text}\n", # # ) # cls.release_token(log_type, crawler, env, token_dict["token_id"], -2) # if 20 >= datetime.datetime.now().hour >= 10: # Feishu.bot( # log_type, # crawler, # f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/", # ) # time.sleep(60 * 15) # continue # if r.json()["base_resp"]["err_msg"] == "ok" and len(r.json()["list"]) == 0: # Common.logger(log_type, crawler).warning( # f"status_code:{r.status_code}, get_fakeid:{r.text}\n" # ) # # Common.logging( # # log_type, # # crawler, # # env, # # f"status_code:{r.status_code}, get_fakeid:{r.text}\n", # # ) # unbind_msg = task_unbind( # log_type=log_type, # crawler=crawler, # taskid=task_dict["id"], # uids=str(user_dict["uid"]), # env=env, # ) # if unbind_msg == "success": # if 20 >= datetime.datetime.now().hour >= 10: # Feishu.bot( # log_type, # crawler, # f"公众号:{user_dict['link']}, 站内昵称:{user_dict['nick_name']}\n抓取异常, 已取消抓取该公众号\n", # ) # # Common.logging( # # log_type, # # crawler, # # env, # # f"公众号:{user_dict['link']}, 站内昵称:{user_dict['nick_name']}\n抓取异常, 已取消抓取该公众号\n", # # ) # else: # Common.logger(log_type, crawler).warning(f"unbind_msg:{unbind_msg}") # # Common.logging(log_type, crawler, env, f"unbind_msg:{unbind_msg}") # return None user_info_dict = { "user_name": r.json()["list"][0]["nickname"], "user_id": r.json()["list"][0]["fakeid"], "avatar_url": r.json()["list"][0]["round_head_img"], } return user_info_dict def get_videoList(token_dict, user_dict): begin = 0 url = "https://mp.weixin.qq.com/cgi-bin/appmsg?" headers = { "accept": "*/*", "accept-encoding": "gzip, deflate, br", "accept-language": "zh-CN,zh;q=0.9", "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?" "t=media/appmsg_edit_v2&action=edit&isNew=1" "&type=77&createType=5&token=" + str(token_dict["token"]) + "&lang=zh_CN", "sec-ch-ua": '" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" " (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36", "x-requested-with": "XMLHttpRequest", "cookie": token_dict["cookie"], } params = { "action": "list_ex", "begin": str(begin), "count": "5", "fakeid": user_dict["user_id"], "type": "9", "query": "", "token": str(token_dict["token"]), "lang": "zh_CN", "f": "json", "ajax": "1", } urllib3.disable_warnings() r = requests.get(url=url, headers=headers, params=params, verify=False) print(r.url) r.close() print(r.json()) if r.json()["base_resp"]["err_msg"] == "invalid session": time.sleep(60 * 15) print("invalid session") if r.json()["base_resp"]["err_msg"] == "freq control": print("freq control") if ( r.json()["base_resp"]["err_msg"] == "invalid args" and r.json()["base_resp"]["ret"] == 200002 ): print("invalid args") if "app_msg_list" not in r.json(): print("no app_msg_list") if len(r.json()["app_msg_list"]) == 0: print("没有更多视频了\n") return else: begin += 5 app_msg_list = r.json()["app_msg_list"] for article in app_msg_list: # try: create_time = article.get("create_time", 0) publish_time_stamp = int(create_time) publish_time_str = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp) ) article_url = article.get("link", "") video_dict = { "video_id": article.get("aid", ""), "video_title": article.get("title", "") .replace(" ", "") .replace('"', "") .replace("'", ""), "publish_time_stamp": publish_time_stamp, "publish_time_str": publish_time_str, "user_name": user_dict["user_name"], "play_cnt": 0, "comment_cnt": 0, "like_cnt": 0, "share_cnt": 0, "user_id": user_dict["user_id"], "avatar_url": user_dict["avatar_url"], "cover_url": article.get("cover", ""), "article_url": article.get("link", ""), # "video_url": cls.get_video_url(article_url, env), "video_url": "url", "session": f"gongzhonghao-author1-{int(time.time())}", } print(video_dict) # for k, v in video_dict.items(): # Common.logger(log_type, crawler).info(f"{k}:{v}") # Common.logging( # log_type, crawler, env, f"video_dict:{video_dict}" # ) # if int(time.time()) - publish_time_stamp > 3600 * 24 * int( # rule_dict.get("period", {}).get("max", 1000) # ): # Common.logger(log_type, crawler).info( # f"发布时间超过{int(rule_dict.get('period', {}).get('max', 1000))}天\n" # ) # # Common.logging( # # log_type, # # crawler, # # env, # # f"发布时间超过{int(rule_dict.get('period', {}).get('max', 1000))}天\n", # # ) # return # # if ( # video_dict["article_url"] == 0 # or video_dict["video_url"] == 0 # ): # Common.logger(log_type, crawler).info("文章涉嫌违反相关法律法规和政策\n") # # Common.logging(log_type, crawler, env, "文章涉嫌违反相关法律法规和政策\n") # # 标题敏感词过滤 # elif ( # any( # str(word) # if str(word) in video_dict["video_title"] # else False # for word in get_config_from_mysql( # log_type=log_type, # source=crawler, # env=env, # text="filter", # action="", # ) # ) # is True # ): # Common.logger(log_type, crawler).info("标题已中过滤词\n") # # Common.logging(log_type, crawler, env, "标题已中过滤词\n") # # 已下载判断 # elif ( # cls.repeat_video( # log_type, crawler, video_dict["video_id"], env # ) # != 0 # ): # Common.logger(log_type, crawler).info("视频已下载\n") # # Common.logging(log_type, crawler, env, "视频已下载\n") # # 标题相似度 # elif ( # title_like( # log_type, # crawler, # video_dict["video_title"], # cls.platform, # env, # ) # is True # ): # Common.logger(log_type, crawler).info( # f'标题相似度>=80%:{video_dict["video_title"]}\n' # ) # # Common.logging( # # log_type, # # crawler, # # env, # # f'标题相似度>=80%:{video_dict["video_title"]}\n', # # ) # else: # video_dict["out_user_id"] = video_dict["user_id"] # video_dict["platform"] = crawler # video_dict["strategy"] = log_type # video_dict["out_video_id"] = video_dict["video_id"] # video_dict["width"] = 0 # video_dict["height"] = 0 # video_dict["crawler_rule"] = json.dumps(rule_dict) # video_dict["user_id"] = user_dict["uid"] # video_dict["publish_time"] = video_dict["publish_time_str"] # mq.send_msg(video_dict) # except Exception as e: # Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n") # Common.logging(log_type, crawler, env, f"抓取单条视频异常:{e}\n") # Common.logger(log_type, crawler).info("休眠 60 秒\n") # Common.logging(log_type, crawler, env, "休眠 60 秒\n") time.sleep(60) # 分割列表 def chunks(data_list, chunk_size): """ :param data_list: 列表 :param chunk_size: 每个子列表的长度 :return: 大列表包小列表[[], [], [], []......] """ for i in range(0, len(data_list), chunk_size): yield data_list[i: i + chunk_size] async def get_author_videos(args): await asyncio.sleep(1) print(args['log_type']) await GongzhonghaoAuthor.get_all_videos( log_type=args['log_type'], crawler=args['crawler'], task_dict=args['task_dict'], token_index=args['token_index'], rule_dict=args['rule_dict'], user_list=args['user_list'], env=args['env'] ) if __name__ == "__main__": mess = { "createTime": 1684500378438, "id": 27, "interval": 86400, "machine": "aliyun", "mode": "author", "operator": "xxl", "rule": "[{\"duration\":{\"min\":20,\"max\":2700}},{\"period\":{\"min\":1,\"max\":2}}]", "source": "gongzhonghao", "spiderName": "run_gzh2_author", "startTime": 1693493854438, "status": 0, "taskName": "公众号_2", "updateTime": 1688572800179 } # 解析 task_dict rule_list = json.loads(mess['rule']) rule_dict = {} for item in rule_list: for key, val in item.items(): rule_dict[key] = val mess['rule'] = rule_dict task_dict = mess # 解析 user_list task_id = task_dict["id"] select_user_sql = ( f"""select * from crawler_user_v3 where task_id={task_id}""" ) user_list = MysqlHelper.get_values( "author", "gongzhonghao", select_user_sql, "prod", action="" ) print(len(user_list)) user_list = chunks(user_list, 250) print(user_list) for index, i in enumerate(user_list): with open("/Users/luojunhui/cyber/gzh_spider/test_AB/200/user_list_{}.json".format(index + 1), "w", encoding="utf-8") as f: f.write(json.dumps(i, ensure_ascii=False, indent=4)) # print(user_list) # loop = asyncio.get_event_loop() # arg_list = [] # for index, sub_list in enumerate(user_list): # arg = {'log_type': "author{}".format(index + 1), 'crawler': "gongzhonghao", 'token_index': index + 1, # 'task_dict': task_dict, 'rule_dict': rule_dict, 'user_list': sub_list, 'env': 'prod'} # arg_list.append(arg) # # coroutines_list = [get_author_videos(arg) for arg in arg_list] # # # async def test(): # await asyncio.gather(*coroutines_list) # asyncio.run(test()) user_d = get_user_info(token_d) # print(user_d) # # # # get_videoList(token_d, user_d)