""" @author: luojunhui 通过抓包 aigc 平台,自动化一些操作 """ import requests import json from applications.decoratorApi import retryOnTimeout from applications.denetMysql import DeNetMysql HEADERS = { 'Accept': 'application/json', 'Accept-Language': 'zh,zh-CN;q=0.9', 'Content-Type': 'application/json', 'Origin': 'http://admin.cybertogether.net', 'Proxy-Connection': 'keep-alive', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36' } PERSON_COOKIE = { "token": "af54cdc404c3464d896745df389b2dce", "appType": 9, "platform": "pc", "appVersionCode": 1000, "clientTimestamp": 1, "fid": 1, "loginUid": 1, "pageSource": 1, "requestId": 1, "rid": 1, "uid": 1 } def get_generated_article_title(generate_task_id): """ 生成计划 id 获取该生成计划已经生成过的文章标题 :param generate_task_id: :return: title_set """ db = DeNetMysql() sql = f""" SELECT DISTINCT output.output FROM produce_plan_exe_record planExeRecord JOIN produce_plan_module_output output ON output.plan_exe_id = planExeRecord.plan_exe_id AND output.produce_module_type = 3 WHERE planExeRecord.plan_id = '{generate_task_id}'; """ title_tuple = db.select(sql) title_set = set([i[0] for i in title_tuple]) return title_set def get_publish_account_from_aigc(): """ 从 aigc 系统中获取正在发布的账号 :return: name: 公众号名称 gh_id: 公众号 gh_id follower_count: 粉丝数量 service_type_info: '公众号类型:0-订阅号,1-由历史老账号升级后的订阅号,2-服务号', verify_type_info:'公众号认证类型:-1-未认证,0-微信认证,1-新浪微博认证,3-已资质认证通过但还未通过名称认证,4-已资质认证通过、还未通过名称认证,但通过了新浪微博认证' """ db = DeNetMysql() sql = """ SELECT DISTINCT t3.`name`, t3.gh_id, t3.follower_count, t3.create_timestamp, t4.service_type_info, t4.verify_type_info FROM publish_plan t1 JOIN publish_plan_account t2 ON t1.id = t2.plan_id JOIN publish_account t3 ON t2.account_id = t3.id LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id WHERE t1.plan_status = 1 AND t3.channel = 5 GROUP BY t3.id ORDER BY t3.create_timestamp DESC """ info_tuple = db.select(sql) info_list = [ { "name": line[0], "ghId": line[1], "follower_count": line[2], "account_init_timestamp": int(line[3] / 1000), "account_type": line[4], "account_auth": line[5] } for line in info_tuple ] return info_list def auto_create_crawler_task(plan_id, plan_name, plan_tag, url_list): """ 通过 url 自动创建抓取计划 :param plan_id: 计划 id, 若往已经存在的 plan_id 中加文章则需要传,否则会新生成一个 id :param plan_name: 计划名称 :param plan_tag: 计划标签 :param url_list: 输入的 url_list :return: """ url = "http://aigc-api.cybertogether.net/aigc/crawler/plan/save" payload = json.dumps({ "params": { "contentFilters": [], "accountFilters": [], "filterAccountMatchMode": 1, "filterContentMatchMode": 1, "selectModeValues": [], "searchModeValues": [], "contentModal": 3, "analyze": {}, "crawlerComment": 0, "inputGroup": None, "inputSourceGroups": [], "modePublishTime": [], "planType": 2, "frequencyType": 2, "planTag": plan_tag, "tagPenetrateFlag": 0, "id": plan_id, "name": plan_name, "channel": 5, "crawlerMode": 5, "inputModeValues": url_list, "modePublishTimeStart": None, "modePublishTimeEnd": None, "executeRate": None, "executeDate": None, "executeWindowStart": None, "executeWindowEnd": None, "executeTimeInterval": None, "executeNum": None, "addModal": None, "addChannel": None, "fileUpload": None, "prompt": None, "acelFlag": None, "tasks": [] }, "baseInfo": PERSON_COOKIE }) response = requests.request("POST", url, headers=HEADERS, data=payload) return response.json() def bind_crawler_task_to_generate_task(crawler_task_list, generate_task_id): """ 将抓取计划绑定至生成计划 生成计划已经存在 :crawler_task_list: 要输入的抓取计划List :generate_task_id: 目标生成计划 id :return: response """ url = "http://aigc-api.cybertogether.net/aigc/produce/plan/save" plan_info = get_generate_task_detail(generate_task_id) input_source_groups = plan_info.get("inputSourceGroups") existed_crawler_task = input_source_groups[0].get("inputSources") new_task_list = existed_crawler_task + crawler_task_list input_source_group_0 = input_source_groups[0] input_source_group_0['inputSources'] = new_task_list payload = json.dumps({ "params": { "contentFilters": [], "produceModal": plan_info.get("produceModal"), "inputModal": plan_info.get("inputModal"), "tasks": plan_info.get("tasks", []), "modules": [], "moduleGroups": plan_info.get("moduleGroups"), "inputSourceGroups": [input_source_group_0], "layoutType": plan_info.get("layoutType"), "activeManualReview": plan_info.get("activeManualReview"), "totalProduceNum": plan_info.get("totalProduceNum"), "dailyProduceNum": plan_info.get("dailyProduceNum"), "maxConcurrentNum": plan_info.get("maxConcurrentNum"), "id": generate_task_id, "name": plan_info.get("name"), "planTag": plan_info.get("planTag"), "tagPenetrateFlag": plan_info.get("tagPenetrateFlag"), "inputType": plan_info.get("inputType"), "inputChannel": plan_info.get("inputChannel"), "activeManualReviewCount": plan_info.get("activeManualReviewCount"), "autoComposite": plan_info.get("autoComposite") }, "baseInfo": PERSON_COOKIE }) response = requests.request("POST", url, headers=HEADERS, data=payload) return response.json() @retryOnTimeout() def get_generate_task_detail(generate_task_id): """ 通过生成计划的 id,获取该生成计划已有的抓取计划 list :param generate_task_id: :return: """ url = "http://aigc-api.cybertogether.net/aigc/produce/plan/detail" payload = json.dumps({ "params": { "id": generate_task_id }, "baseInfo": PERSON_COOKIE }) response = requests.request("POST", url, headers=HEADERS, data=payload, timeout=10) result = response.json() if result['msg'] == 'success': return result['data'] else: return {}