Source code for er_evaluation.search.elasticsearch

from er_evaluation.search.http_requests import http_post_request


[docs]class ElasticSearch: """ ElasticSearch client for user-friendly search queries and aggregation. Note: The class supports fields with up to one nested level. The use of higher levels of nesting has not yet been tested. """ def __init__(self, base_url, api_key=None): """ Initialize the ElasticSearch client with a base URL and an optional API key. Args: base_url: The base URL of the Elasticsearch server. api_key: The API key for authentication (optional). """ self.base_url = base_url self.api_key = api_key @staticmethod def _build_headers(api_key=None): headers = {"Content-Type": "application/json"} if api_key: headers["Authorization"] = f"ApiKey {api_key}" return headers
[docs] @staticmethod def process_query(user_query, fields, fuzziness=2): """ Process the user's query and build a search query for Elasticsearch. Args: user_query: The user's query string. fields: A list of fields to search in. fuzziness: The fuzziness level for matching (optional, default: 2). Returns: A dictionary representing the Elasticsearch search query. """ def create_nested_query(field, full_field_path, query): """Helper function to create nested queries""" path = field.split(".") if len(path) > 1: return { "nested": { "path": path[0], "query": create_nested_query(".".join(path[1:]), full_field_path, query), } } else: return {"match": {full_field_path: query}} must_clauses = [] for field in fields: query = {"query": user_query, "fuzziness": fuzziness} must_clauses.append(create_nested_query(field, field, query)) return {"query": {"bool": {"should": must_clauses}}}
@staticmethod def _process_aggregations(agg_fields, agg_size=10000, _source=None, top_hits_size=5): """ Process the aggregation fields and build an aggregation query for Elasticsearch. Args: agg_fields: A list of fields to aggregate on. agg_size: The maximum number of aggregation entries (optional, default: 10000). _source: A list of fields to return for each top hit (optional). top_hits_size: The number of top hits to include for each bucket (optional, default: 5). Returns: A dictionary representing the Elasticsearch aggregation query. """ def create_nested_agg(field, full_field_path, size, _source, top_hits_size): """Helper function to create nested aggregations""" path = field.split(".") if len(path) > 1: return { full_field_path: { "nested": {"path": ".".join(path[:-1])}, "aggs": create_nested_agg(".".join(path[1:]), full_field_path, size, _source, top_hits_size), } } else: aggs = {full_field_path + "_inner": {"terms": {"field": full_field_path, "size": size}, "aggs": {}}} if _source is not None: aggs[full_field_path + "_inner"]["aggs"]["top_hits"] = { "top_hits": {"_source": {"includes": _source}, "size": top_hits_size} } return aggs agg_query = {} for agg_field in agg_fields: agg_query.update(create_nested_agg(agg_field, agg_field, agg_size, _source, top_hits_size)) return agg_query
[docs] def search( self, user_query, index, fields, fuzziness=2, size=10000, agg_fields=None, agg_size=10000, agg_source=None, agg_source_top_hits=5, source=None, timeout=5, max_retries=1, ): """ Perform a search using the user's query on the specified index and fields. Args: user_query: The user's query string. index: The index to search in. fields: A list of fields to search in. fuzziness: The fuzziness level for matching (optional, default: 2). agg_fields: A list of fields to aggregate on (optional). agg_size: The maximum number of aggregation results to return (optional, default: 10000). agg_source: A list of fields to return for each top hit in the aggregations (optional). agg_source_top_hits: The number of top hits to include for each bucket in the aggregations (optional, default: 5). source: A list of fields to include in the response (optional). timeout: The timeout for the request in seconds (optional, default: 5). max_retries: The maximum number of retries for the request (optional, default: 1). Returns: A JSON object containing the search results. Raises: HTTPError: If the request fails. """ search_query = ElasticSearch.process_query(user_query, fields, fuzziness) if source is not None: search_query["_source"] = source search_query["size"] = size if agg_fields: search_query["aggs"] = ElasticSearch._process_aggregations( agg_fields, agg_size, _source=agg_source, top_hits_size=agg_source_top_hits ) headers = ElasticSearch._build_headers(self.api_key) return http_post_request( f"{self.base_url}/{index}/_search", headers, search_query, timeout=timeout, max_retries=max_retries )