import ssl import datetime import time import requests import json from pymysql.cursors import DictCursor from elasticsearch import Elasticsearch, ApiError from elasticsearch import helpers from applications.db import DatabaseConnector from config import long_articles_config from config.es_mappings import index_name, mappings, settings db_client = DatabaseConnector(long_articles_config) db_client.connect() ctx = ssl.create_default_context(cafile="config/es_certs.crt") es_password = 'nkvvASQuQ0XUGRq5OLvm' es = Elasticsearch( "https://192.168.205.85:9200", # 如果启用了用户名密码 basic_auth=("elastic", es_password), ssl_context=ctx ) def create_index(): # 2. 防守式删除旧索引(可选) if es.indices.exists(index=index_name): es.indices.delete(index=index_name) # 3. 创建索引 try: es.indices.create( index=index_name, settings=settings, mappings=mappings ) print(f"✅ Index <{index_name}> created.") except ApiError as e: # 打印 ES 返回的具体错误 print(f"❌ Failed: {e.meta.error['type']} – {e.meta.error['reason']}") def get_max_article_id(): response = es.search( index=index_name, size=1, sort="article_id:desc", _source=["article_id"] ) # print(json.dumps(response.body, indent=4)) return response['hits']['hits'][0]['_source']['article_id'] def get_articles(id_): fetch_query = f""" select article_id, platform, out_account_id, title from crawler_meta_article where status = 1 and article_id > %s order by article_id limit 10000; """ # 执行查询 results = db_client.fetch(fetch_query, cursor_type=DictCursor, params=(id_,)) docs = [ { "_index": index_name, "_id": item['article_id'], "_source": { "article_id": item['article_id'], "platform": item['platform'], "out_account_id": item['out_account_id'], "title": item['title'] } } for item in results ] return docs def search(): query = { "query": { "match": { "title": "刘伯承元帅" } }, "_source": ["article_id", "title"] } a = time.time() resp = es.search(index=index_name, body=query) b = time.time() for hit in resp["hits"]["hits"]: print(hit["_source"]) print(b - a) def get_cluster_docs_stats(): """获取集群文档统计信息""" stats = es.nodes.stats() # print(stats) # print(type(stats)) print(json.dumps(stats.body, indent=4, ensure_ascii=False)) if __name__ == "__main__": max_id = get_max_article_id() i = 0 while int(max_id) < 27492350: articles = get_articles(max_id) res = helpers.bulk(es, articles) print(es.count(index=index_name)) max_id = get_max_article_id() i += 1 print(i)