conf_task.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
  1. import copy
  2. import logging
  3. import os
  4. import sys
  5. import time
  6. import requests
  7. from flask import Flask, request
  8. from flask import jsonify
  9. from dotenv import load_dotenv
  10. sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))
  11. from conf.config import get_config
  12. from common.db.mysql_help import MysqlHelper
  13. load_dotenv(verbose=True)
  14. env = os.getenv('env')
  15. app = Flask(__name__)
  16. app.config['JSON_AS_ASCII'] = False
  17. # mysql实例
  18. mysql_con = MysqlHelper()
  19. conf = get_config()
  20. @app.route("/v1/crawler/task/dellink", methods=["POST"])
  21. def delSpiderLink():
  22. data = request.json
  23. spider_link = data['spider_link']
  24. task_id = data['task_id']
  25. up_sql = f'update crawler_author_map set is_del=0 where spider_link="{spider_link}"'
  26. MysqlHelper.update_values(up_sql)
  27. sql = f'select spider_link from crawler_task where task_id ={task_id}'
  28. task = mysql_con.get_values(sql)
  29. spider_links = eval(task[0]['spider_link'])
  30. for link in spider_links:
  31. spider_links.remove(link)
  32. u_sql = f'update crawler_task set spider_link="{spider_links}" where task_id={task_id}'
  33. mysql_con.update_values(u_sql)
  34. if spider_link:
  35. return jsonify({'code': 200, 'message': '抓取名单删除成功', 'del_link': spider_link})
  36. else:
  37. return jsonify({'code': 400, 'message': '抓取名单删除失败', 'del_link': spider_link, 'spider_link': spider_links})
  38. @app.route("/v1/crawler/task/getcategory", methods=["GET"])
  39. def getCategory():
  40. sql = f'select id, content_category from crawler_content_category'
  41. result = mysql_con.get_values(sql)
  42. return jsonify({'code': 200, 'data': result})
  43. @app.route("/v1/crawler/task/getmodename", methods=["GET"])
  44. def getModeName():
  45. sql = f'select id, mode_name from crawler_mode'
  46. result = mysql_con.get_values(sql)
  47. return jsonify({'code': 200, 'data': result})
  48. @app.route("/v1/crawler/task/getmodeboard", methods=["GET"])
  49. def getModeBoard():
  50. sql = f'select id, mode_board from crawler_board'
  51. result = mysql_con.get_values(sql)
  52. return jsonify({'code': 200, 'data': result})
  53. @app.route("/v1/crawler/user/findmedia", methods=["GET"])
  54. def getMediaInfo():
  55. data = request.args.to_dict()
  56. task_id = data['task_id']
  57. sql = f'select * from crawler_author_map where task_id={task_id} and is_del=1'
  58. result = mysql_con.get_values(sql)
  59. task_user_info = []
  60. for task_info in result:
  61. media_id = task_info['media_id']
  62. media_info = requests.get(url=conf['select_media_url'], params={'uid': media_id}).json()['content']
  63. media_name = media_info['longvideoNickName'] if media_info['longvideoNickName'] else media_info['nickName']
  64. nick_name = task_info['nick_name']
  65. spider_link = task_info['spider_link']
  66. create_user_time = task_info['create_user_time']
  67. media_data = dict(
  68. media_name=media_name,
  69. nick_name=nick_name,
  70. spider_link=spider_link,
  71. media_id={'media_id': media_id, 'media_url': conf['media_main_url'].format(media_id)},
  72. create_user_time=create_user_time * 1000
  73. )
  74. task_user_info.append(media_data)
  75. return jsonify({'code': 200, 'data': task_user_info})
  76. @app.route("/v1/crawler/task/findtask", methods=["GET"])
  77. def getTaskUserInfo():
  78. # 根据条件查找任务
  79. data = request.args.to_dict()
  80. values = ''
  81. for k, v in data.items():
  82. if isinstance(v, int):
  83. values += f'{k}={v} and '
  84. else:
  85. values += f'{k}="{v}" and '
  86. sql = f"select task_id from crawler_author_map where {values[:-4]}" # [:-1]是为了去掉末尾的逗号
  87. res = mysql_con.get_values(sql)
  88. task_id = res['task_id']
  89. sql = f'select task_name, source, task_type, create_task_user, insert_time, update_task_user, update_time from crawler_task where task_id={task_id} '
  90. task_info = mysql_con.get_values(sql)
  91. return jsonify({'code': 200, 'data': task_info})
  92. # 只接受get方法访问
  93. @app.route("/v1/crawler/source/getall", methods=["GET"])
  94. def getSource():
  95. try:
  96. # 获取传入的params参数
  97. get_data = request.args.to_dict()
  98. # # 对参数进行操作
  99. sql = 'select source, task_type, spider_name, machine, source_desc, task_type_desc, spider_name_desc from crawler_source'
  100. result = mysql_con.get_values(sql)
  101. if not result:
  102. return jsonify({'code': '200', 'result': [], 'message': '没有更多数据'})
  103. source_list = list()
  104. for source_info in result:
  105. source_dict = {
  106. 'task_type': [
  107. {
  108. 'type': source_info['task_type'],
  109. 'description': source_info['task_type_desc'],
  110. 'spider': {
  111. 'spider_name': source_info['spider_name'],
  112. 'description': source_info['spider_name_desc']
  113. }
  114. }
  115. ],
  116. 'description': source_info['source_desc'],
  117. 'source': source_info['source'],
  118. 'machine': source_info['machine']
  119. }
  120. source_list.append(source_dict)
  121. except Exception as e:
  122. return jsonify({'code': '400', 'message': '获取数据源信息失败'})
  123. return jsonify({'code': '200', 'result': source_list})
  124. @app.route("/v1/crawler/task/checkrepeat", methods=["POST"])
  125. def get_repeat_list():
  126. data = request.json
  127. # 字段转换
  128. spider_links = data.get('spider_link')
  129. repeat_list = list()
  130. # 判断是否为重复名单
  131. for spider_link in spider_links:
  132. if isinstance(spider_link, int):
  133. s_sql = f"""select spider_link from crawler_author_map where spider_link={spider_link}"""
  134. else:
  135. s_sql = f"""select spider_link from crawler_author_map where spider_link='{spider_link}'"""
  136. result = mysql_con.get_values(s_sql)
  137. if result:
  138. repeat_list.append(spider_link)
  139. if repeat_list:
  140. return jsonify({'code': 400, 'message': '名单重复', 'repeat_list': repeat_list})
  141. else:
  142. return jsonify({'code': 200, 'message': '抓取名单校验通过', 'repeat_list': repeat_list})
  143. @app.route("/v1/crawler/task/insert", methods=["POST"])
  144. def insertTask():
  145. try:
  146. data = request.json
  147. user_data = copy.deepcopy(data)
  148. tag_name_list = []
  149. content_tag_list = []
  150. user_tag = data['user_tag']
  151. user_content_tag = data['user_content_tag']
  152. for tag in user_tag:
  153. tag_name_list.append(tag['tagName'])
  154. for tag in user_content_tag:
  155. content_tag_list.append(tag['tagName'])
  156. if data['min_publish_time']:
  157. data['min_publish_time'] = int(data['min_publish_time'] / 1000)
  158. else:
  159. data['min_publish_time'] = 0
  160. # if data['min_publish_day']:
  161. # data['min_publish_day'] = 0
  162. data['next_time'] = int(data['next_time'] / 1000)
  163. data['insert_time'] = int(time.time())
  164. data['update_time'] = int(time.time())
  165. data['spider_link'] = str(data['spider_link'])
  166. data['spider_rule'] = str(data['spider_rule'])
  167. data['user_tag_info'] = str(user_tag)
  168. data['content_tag_info'] = str(user_content_tag)
  169. data['user_tag'] = ','.join(str(i) for i in tag_name_list)
  170. data['user_content_tag'] = ','.join(str(i) for i in content_tag_list)
  171. # data['crawler_interval'] = data.pop('interval')
  172. # 获取到一个以键且为逗号分隔的字符串,返回一个字符串
  173. keys = ','.join(data.keys())
  174. values = ','.join(['%s'] * len(data))
  175. sql = 'insert into {table}({keys}) VALUES({values})'.format(table='crawler_task', keys=keys, values=values)
  176. task_id = mysql_con.insert_values(sql, tuple(data.values()))
  177. if task_id:
  178. success_list, fail_list = create_uid(user_data, task_id)
  179. return jsonify(
  180. {'code': 200, 'message': 'task create success', 'success_list': success_list, 'fail_list': fail_list})
  181. except Exception as e:
  182. return jsonify({'code': 500, 'message': '任务写入失败,原因:{e}'})
  183. @app.route("/v1/crawler/task/gettask", methods=["POST"])
  184. def getAllTask():
  185. try:
  186. get_data = request.json
  187. page = int(get_data.get('page', 1))
  188. offset = int(get_data.get('offset', 10))
  189. start_count = (page * offset) - offset
  190. end_count = page * offset
  191. if get_data.get('fields'):
  192. select_data = get_data['fields']
  193. values = ''
  194. for k, v in select_data.items():
  195. if isinstance(v, int):
  196. values += f'{k}={v} and '
  197. else:
  198. values += f'{k}="{v}" and '
  199. sql = f"select task_id from crawler_author_map where {values[:-4]} and is_del=1" # [:-1]是为了去掉末尾的逗号
  200. res = mysql_con.get_values(sql)
  201. task_list = list()
  202. for author_info in res:
  203. task_id = author_info['task_id']
  204. sql = f'select * from crawler_task where task_id={task_id} order by update_time desc limit {start_count}, {end_count}'
  205. task_info = mysql_con.get_values(sql)[0]
  206. task_data = dict(
  207. task_id=task_info['task_id'],
  208. task_name=task_info['task_name'],
  209. source=task_info['source'],
  210. task_type=task_info['task_type'],
  211. create_task_user=task_info['create_task_user'],
  212. insert_time=task_info['insert_time'] * 1000,
  213. update_task_user=task_info['update_task_user'],
  214. update_time=task_info['update_time'] * 1000
  215. )
  216. task_list.append(task_data)
  217. return jsonify({'code': 200, 'result': task_list, 'total': len(task_list)})
  218. sql = f"""select * from crawler_task order by update_time desc limit {start_count}, {end_count} """
  219. result = mysql_con.get_values(sql)
  220. if not result:
  221. return jsonify({'code': '200', 'result': [], 'message': '没有更多任务'})
  222. task_list = list()
  223. for task_info in result:
  224. task_data = dict(
  225. task_id=task_info['task_id'],
  226. task_name=task_info['task_name'],
  227. source=task_info['source'],
  228. task_type=task_info['task_type'],
  229. create_task_user=task_info['create_task_user'],
  230. insert_time=task_info['insert_time'] * 1000,
  231. update_task_user=task_info['update_task_user'],
  232. update_time=task_info['update_time'] * 1000
  233. )
  234. task_list.append(task_data)
  235. t_sql = f"""select count(*) from crawler_task"""
  236. t_res = mysql_con.get_values(t_sql)
  237. total = t_res[0]['count(*)']
  238. except Exception as e:
  239. return jsonify({"code": "400", 'message': "任务列表获取失败"})
  240. return jsonify({'code': '200', 'result': task_list, 'total': total})
  241. @app.route("/v1/crawler/task/getone", methods=["GET"])
  242. def getOneTask():
  243. try:
  244. get_data = request.args.to_dict()
  245. task_id = get_data['task_id']
  246. sql = f'select * from crawler_task where task_id={task_id}'
  247. result = mysql_con.get_values(sql)
  248. if not result:
  249. return jsonify({'code': '400', 'result': [], 'message': 'no data'})
  250. data = result[0]
  251. if data['min_publish_time']:
  252. data['min_publish_time'] = data['min_publish_time'] * 1000
  253. else:
  254. data['min_publish_time'] = 0
  255. data['next_time'] = data['next_time'] * 1000
  256. data['spider_link'] = eval(data['spider_link'])
  257. data['spider_rule'] = eval(data['spider_rule'])
  258. #
  259. data['user_tag_info'] = eval(data['user_tag_info'])
  260. data['content_tag_info'] = eval(data['content_tag_info'])
  261. except Exception as e:
  262. return jsonify({'code': '500', "message": "获取任务信息失败"})
  263. return jsonify({'code': '200', 'result': result})
  264. @app.route("/v1/crawler/task/update", methods=["POST"])
  265. def updateTask():
  266. try:
  267. data = request.json
  268. task_id = data.get('task_id')
  269. task_info = data.get('task_info')
  270. values = ''
  271. task_info['min_publish_time'] = task_info['min_publish_time'] / 1000
  272. task_info['next_time'] = task_info['next_time'] / 1000
  273. for k, v in task_info.items():
  274. if isinstance(v, int):
  275. values += f'{k}={v},'
  276. else:
  277. values += f'{k}="{v}",'
  278. sql = f'update crawler_task set {values[:-1]} where task_id={task_id}'
  279. result = mysql_con.update_values(sql)
  280. if result:
  281. return jsonify({'code': 200, 'message': 'task update success'})
  282. else:
  283. return jsonify({'code': 400, 'message': 'task update faild'})
  284. except Exception as e:
  285. return jsonify({'code': 400, 'message': '任务更新失败'})
  286. def create_uid(task, task_id):
  287. spider_link = task.get('spider_link')
  288. source = task.get('source')
  289. task_type = task.get('task_type')
  290. applets_status = task.get('applets_status')
  291. app_status = task.get('app_status')
  292. user_tag = task.get('user_tag')
  293. user_content_tag = task.get('user_content_tag')
  294. mode_name_id = task.get('mode_name_id', 0)
  295. mode_board_id = task.get('mode_board_id', 0)
  296. content_category_id = task.get('content_category_id', 0)
  297. mn_sql = f'select * from crawler_mode_name where id={mode_name_id}'
  298. mode_name_list = mysql_con.get_values(mn_sql)
  299. mb_sql = f'select * from crawler_mode_board where id={mode_board_id}'
  300. mode_board_list = mysql_con.get_values(mb_sql)
  301. cc_sql = f'select * from crawler_content_category where id={content_category_id}'
  302. content_category_list = mysql_con.get_values(cc_sql)
  303. if mode_name_list:
  304. task['mode_name_str'] = mode_name_list[0]['mode_name']
  305. else:
  306. task['mode_name_str'] = ''
  307. if mode_board_list:
  308. task['mode_board_str'] = mode_board_list[0]['mode_board']
  309. else:
  310. task['mode_board_str'] = ''
  311. if content_category_list:
  312. task['content_category_str'] = content_category_list[0]['content_category']
  313. else:
  314. task['content_category_str'] = ''
  315. success_list = list()
  316. fail_list = list()
  317. tag_name_list = list()
  318. content_tag_list = list()
  319. for tag in user_tag:
  320. tag_name_list.append(tag['tagName'])
  321. for tag in user_content_tag:
  322. content_tag_list.append(tag['tagName'])
  323. user_tags = ','.join(str(i) for i in tag_name_list)
  324. user_content_tags = ','.join(str(i) for i in content_tag_list)
  325. for author_url in spider_link:
  326. now_time = int(time.time())
  327. time_array = time.localtime(now_time)
  328. str_time = time.strftime("%Y-%m-%d", time_array)
  329. # 生成创建用户的tag
  330. tags = ""
  331. tags_list = ['spider', user_tags, task['mode_name_str'], task['mode_board_str'],
  332. task['content_category_str'], str_time]
  333. for v in tags_list:
  334. if v:
  335. tags += str(v) + ','
  336. post_data = {
  337. # 'count': 1, # (必须)账号个数:传1
  338. # 'accountType': 4, # (必须)账号类型 :传 4 app虚拟账号
  339. 'pwd': '', # 密码 默认 12346
  340. 'nickName': '', # 昵称 默认 vuser......
  341. 'avatarUrl': '',
  342. # 头像Url 默认 http://weapppiccdn.yishihui.com/resources/images/pic_normal.png
  343. 'tagName': tags[:-1], # 多条数据用英文逗号分割
  344. }
  345. try:
  346. response = requests.post(url=conf['media_url'], params=post_data)
  347. media_id = response.json()['data']
  348. except Exception as e:
  349. logging.warning(f'创建账户:{spider_link},失败,原因:{e}')
  350. fail_list.append(author_url)
  351. continue
  352. data = dict(
  353. spider_link=author_url,
  354. media_id=media_id,
  355. source=source,
  356. task_type=task_type,
  357. applets_status=applets_status,
  358. app_status=app_status,
  359. user_tag=user_tags,
  360. user_content_tag=user_content_tags,
  361. insert_time=int(time.time()),
  362. update_time=int(time.time()),
  363. create_user_time=now_time,
  364. mode_name_str=task['mode_name_str'],
  365. mode_board_str=task['mode_board_str'],
  366. content_category_str=task['content_category_str'],
  367. # mode_value_str=mode_value_str,
  368. task_id=task_id,
  369. media_main_url=conf['media_main_url'].format(media_id)
  370. )
  371. keys = ','.join(data.keys())
  372. values = ','.join(['%s'] * len(data))
  373. table = 'crawler_author_map'
  374. sql = f"""insert into {table}({keys}) VALUES({values})"""
  375. mysql_con.insert_values(sql, tuple(data.values()))
  376. uer_info = dict(
  377. outer_id=author_url,
  378. uid=media_id
  379. )
  380. success_list.append(uer_info)
  381. return success_list, fail_list
  382. @app.route("/v1/crawler/author/create", methods=["POST"])
  383. def createUser():
  384. spider_link = request.json.get('spider_link')
  385. source = request.json.get('source')
  386. task_type = request.json.get('task_type')
  387. applets_status = request.json.get('applets_status')
  388. app_status = request.json.get('app_status')
  389. user_tag = request.json.get('user_tag')
  390. user_content_tag = request.json.get('user_content_tag')
  391. success_list = list()
  392. fail_list = list()
  393. for author_url in spider_link:
  394. try:
  395. f_sql = f"""select spider_link from crawler_author_map where spider_link="{author_url}" """
  396. result = mysql_con.get_values(f_sql)
  397. if result:
  398. success_list.append(author_url)
  399. continue
  400. else:
  401. tag_name_list = []
  402. content_tag_list = []
  403. for tag in user_tag:
  404. tag_name_list.append(tag['tagName'])
  405. for tag in user_content_tag:
  406. content_tag_list.append(tag['tagName'])
  407. user_tags = ','.join(str(i) for i in tag_name_list)
  408. user_content_tags = ','.join(str(i) for i in content_tag_list)
  409. post_data = {
  410. # 'count': 1, # (必须)账号个数:传1
  411. # 'accountType': 4, # (必须)账号类型 :传 4 app虚拟账号
  412. 'pwd': '', # 密码 默认 12346
  413. 'nickName': '', # 昵称 默认 vuser......
  414. 'avatarUrl': '',
  415. # 头像Url 默认 http://weapppiccdn.yishihui.com/resources/images/pic_normal.png
  416. 'tagName': user_tags, # 多条数据用英文逗号分割
  417. }
  418. response = requests.post(url=conf['media_url'], params=post_data)
  419. media_id = response.json()['data']
  420. data = dict(
  421. spider_link=author_url,
  422. media_id=media_id,
  423. source=source,
  424. task_type=task_type,
  425. applets_status=applets_status,
  426. app_status=app_status,
  427. user_tag=user_tags,
  428. user_content_tag=user_content_tags,
  429. insert_time=int(time.time()),
  430. update_time=int(time.time())
  431. )
  432. keys = ','.join(data.keys())
  433. values = ','.join(['%s'] * len(data))
  434. table = 'crawler_author_map'
  435. sql = f"""insert into {table}({keys}) VALUES({values})"""
  436. result = mysql_con.insert_values(sql, tuple(data.values()))
  437. if not result:
  438. fail_list.append(author_url)
  439. else:
  440. success_list.append(author_url)
  441. except Exception as e:
  442. fail_list.append(author_url)
  443. continue
  444. return jsonify({'code': 200, 'result': {'success': success_list, 'fail': fail_list}})
  445. if __name__ == "__main__":
  446. app.run(debug=True, port=5050)