diff --git a/.gitignore b/.gitignore index df2b2b25..47bd55d5 100644 --- a/.gitignore +++ b/.gitignore @@ -134,3 +134,6 @@ dmypy.json # Pyre type checker .pyre/ + +# PyCharm +.idea \ No newline at end of file diff --git a/beir/retrieval/evaluation.py b/beir/retrieval/evaluation.py index 918929a1..e61bde7c 100644 --- a/beir/retrieval/evaluation.py +++ b/beir/retrieval/evaluation.py @@ -4,6 +4,7 @@ from .search.dense import DenseRetrievalExactSearch as DRES from .search.dense import DenseRetrievalFaissSearch as DRFS from .search.lexical import BM25Search as BM25 +from .search.lexical.vespa_search import VespaLexicalSearch from .search.sparse import SparseSearch as SS from .custom_metrics import mrr, recall_cap, hole, top_k_accuracy @@ -11,7 +12,7 @@ class EvaluateRetrieval: - def __init__(self, retriever: Union[Type[DRES], Type[DRFS], Type[BM25], Type[SS]] = None, k_values: List[int] = [1,3,5,10,100,1000], score_function: str = "cos_sim"): + def __init__(self, retriever: Union[Type[DRES], Type[DRFS], Type[BM25], Type[SS], VespaLexicalSearch] = None, k_values: List[int] = [1, 3, 5, 10, 100, 1000], score_function: str = "cos_sim"): self.k_values = k_values self.top_k = max(k_values) self.retriever = retriever diff --git a/beir/retrieval/search/lexical/vespa_search.py b/beir/retrieval/search/lexical/vespa_search.py new file mode 100644 index 00000000..ebe2a143 --- /dev/null +++ b/beir/retrieval/search/lexical/vespa_search.py @@ -0,0 +1,303 @@ +import shutil +import re +from statistics import mean, median +from collections import Counter +from typing import Dict, Optional +from vespa.application import Vespa +from vespa.package import ApplicationPackage, Field, FieldSet, RankProfile, QueryField +from vespa.query import QueryModel, OR, RankProfile as Ranking, WeakAnd +from vespa.deployment import VespaDocker +from tenacity import retry, wait_exponential, stop_after_attempt, RetryError + +REPLACE_SYMBOLS = ["(", ")", " -", " +"] +QUOTES = [ + "\u0022", # quotation mark (") + "\u0027", # apostrophe (') + "\u00ab", # left-pointing double-angle quotation mark + "\u00bb", # right-pointing double-angle quotation mark + "\u2018", # left single quotation mark + "\u2019", # right single quotation mark + "\u201a", # single low-9 quotation mark + "\u201b", # single high-reversed-9 quotation mark + "\u201c", # left double quotation mark + "\u201d", # right double quotation mark + "\u201e", # double low-9 quotation mark + "\u201f", # double high-reversed-9 quotation mark + "\u2039", # single left-pointing angle quotation mark + "\u203a", # single right-pointing angle quotation mark + "\u300c", # left corner bracket + "\u300d", # right corner bracket + "\u300e", # left white corner bracket + "\u300f", # right white corner bracket + "\u301d", # reversed double prime quotation mark + "\u301e", # double prime quotation mark + "\u301f", # low double prime quotation mark + "\ufe41", # presentation form for vertical left corner bracket + "\ufe42", # presentation form for vertical right corner bracket + "\ufe43", # presentation form for vertical left corner white bracket + "\ufe44", # presentation form for vertical right corner white bracket + "\uff02", # fullwidth quotation mark + "\uff07", # fullwidth apostrophe + "\uff62", # halfwidth left corner bracket + "\uff63", # halfwidth right corner bracket +] +REPLACE_SYMBOLS.extend(QUOTES) + + +def replace_symbols(x): + for symbol in REPLACE_SYMBOLS: + x = x.replace(symbol, "") + return x + + +class VespaLexicalSearch: + def __init__( + self, + application_name: str, + match_phase: str = "or", + rank_phase: str = "bm25", + deployment_parameters: Optional[Dict] = None, + initialize: bool = True, + ): + self.results = {} + self.application_name = application_name.replace("-", "") + assert match_phase in [ + "or", + "weak_and", + ], "'match_phase' should be either 'or' or 'weak_and'" + self.match_phase = match_phase + assert rank_phase in [ + "bm25", + "native_rank", + ], "'rank_phase' should be either 'bm25' or 'native_rank'" + self.rank_phase = rank_phase + self.deployment_parameters = deployment_parameters + self.initialize = initialize + self.vespa_docker = None + if self.initialize: + self.app = self.initialise() + assert ( + self.app.get_application_status().status_code == 200 + ), "Application status different than 200." + else: + self.vespa_docker = VespaDocker.from_container_name_or_id( + self.application_name + ) + assert self.deployment_parameters is not None, ( + "if 'initialize' is set to false, 'deployment_parameters' should contain Vespa " + "connection parameters such as 'url' and 'port'" + ) + self.app = Vespa(**self.deployment_parameters) + assert ( + self.app.get_application_status().status_code == 200 + ), "Application status different than 200." + + def initialise(self): + # + # Create Vespa application package + # + app_package = ApplicationPackage(name=self.application_name) + app_package.schema.add_fields( + Field(name="id", type="string", indexing=["attribute", "summary"]), + Field( + name="title", + type="string", + indexing=["index"], + index="enable-bm25", + ), + Field( + name="body", + type="string", + indexing=["index"], + index="enable-bm25", + ), + ) + app_package.schema.add_field_set( + FieldSet(name="default", fields=["title", "body"]) + ) + app_package.schema.add_rank_profile( + rank_profile=RankProfile( + name="bm25", first_phase="bm25(title) + bm25(body)" + ) + ) + app_package.schema.add_rank_profile( + rank_profile=RankProfile( + name="native_rank", first_phase="nativeRank(title,body)" + ) + ) + app_package.query_profile.add_fields(QueryField(name="maxHits", value=10000)) + # + # Deploy application + # + if not self.deployment_parameters: + self.deployment_parameters = {"port": 8089, "container_memory": "12G"} + self.vespa_docker = VespaDocker(**self.deployment_parameters) + app = self.vespa_docker.deploy(application_package=app_package) + app.delete_all_docs( + content_cluster_name=self.application_name + "_content", + schema=self.application_name, + ) + return app + + def remove_app(self): + if self.vespa_docker: + shutil.rmtree( + self.application_name, ignore_errors=True + ) # remove application package folder + self.vespa_docker.container.stop(timeout=600) # stop docker container + self.vespa_docker.container.remove() # rm docker container + + @retry(wait=wait_exponential(multiplier=1), stop=stop_after_attempt(10)) + def send_query_batch( + self, query_batch, query_model, hits, timeout=100, async_connections=50 + ): + query_results = self.app.query_batch( + query_batch=query_batch, + query_model=query_model, + connections=async_connections, + total_timeout=timeout * len(query_batch), + hits=hits, + **{"timeout": str(timeout) + " s", "ranking.softtimeout.enable": "false"} + ) + return query_results + + def process_queries( + self, query_ids, queries, query_model, hits, batch_size, timeout=100, async_connections=50 + ): + results = {} + assert len(query_ids) == len( + queries + ), "There must be one query_id for each query." + query_id_batches = [ + query_ids[i : i + batch_size] for i in range(0, len(query_ids), batch_size) + ] + query_batches = [ + queries[i : i + batch_size] for i in range(0, len(queries), batch_size) + ] + for idx, (query_id_batch, query_batch) in enumerate( + zip(query_id_batches, query_batches) + ): + print( + "{}, {}, {}: {}/{}".format( + self.application_name, + self.match_phase, + self.rank_phase, + idx, + len(query_batches), + ) + ) + try: + query_results = self.send_query_batch( + query_batch=query_batch, + query_model=query_model, + hits=hits, + timeout=timeout, + async_connections=async_connections + ) + number_hits = [x.number_documents_retrieved for x in query_results] + status_code_summary = Counter([x.status_code for x in query_results]) + print( + "Sucessfull queries: {}/{}\nDocuments retrieved. Min: {}, Max: {}, Mean: {}, Median: {}.".format( + status_code_summary[200], + len(query_batch), + min(number_hits), + max(number_hits), + round(mean(number_hits), 2), + round(median(number_hits), 2), + ) + ) + except RetryError: + continue + for (query_id, query_result) in zip(query_id_batch, query_results): + scores = {} + try: + if query_result.hits: + for hit in query_result.hits: + corpus_id = hit["fields"]["id"] + if ( + corpus_id != query_id + ): # See https://github.com/UKPLab/beir/issues/72 + scores[corpus_id] = hit["relevance"] + except KeyError: + continue + results[query_id] = scores + return results + + def search( + self, + corpus: Dict[str, Dict[str, str]], + queries: Dict[str, str], + top_k: int, + *args, + **kwargs + ) -> Dict[str, Dict[str, float]]: + + if self.initialize: + _ = self.index(corpus) + + # retrieve results from BM25 + query_ids = list(queries.keys()) + queries = [queries[qid] for qid in query_ids] + + queries = [ + re.sub(" +", " ", replace_symbols(x)).strip() for x in queries + ] # remove quotes and double spaces from queries + + if self.match_phase == "or": + match_phase = OR() + elif self.match_phase == "weak_and": + match_phase = WeakAnd(hits=top_k) + else: + ValueError("'match_phase' should be either 'or' or 'weak_and'") + + if self.rank_phase not in ["bm25", "native_rank"]: + ValueError("'rank_phase' should be either 'bm25' or 'native_rank'") + + query_model = QueryModel( + name=self.match_phase + "_" + self.rank_phase, + match_phase=match_phase, + rank_profile=Ranking(name=self.rank_phase, list_features=False), + ) + + self.results = self.process_queries( + query_ids=query_ids, + queries=queries, + query_model=query_model, + hits=top_k, + batch_size=1000, + timeout="100 s", + ) + return self.results + + @retry(wait=wait_exponential(multiplier=1), stop=stop_after_attempt(10)) + def send_feed_batch(self, feed_batch, total_timeout=10000): + feed_results = self.app.feed_batch( + batch=feed_batch, total_timeout=total_timeout + ) + return feed_results + + def index(self, corpus: Dict[str, Dict[str, str]], batch_size=1000): + batch_feed = [ + { + "id": idx, + "fields": { + "id": idx, + "title": corpus[idx].get("title", None), + "body": corpus[idx].get("text", None), + }, + } + for idx in list(corpus.keys()) + ] + mini_batches = [ + batch_feed[i : i + batch_size] + for i in range(0, len(batch_feed), batch_size) + ] + for idx, feed_batch in enumerate(mini_batches): + feed_results = self.send_feed_batch(feed_batch=feed_batch) + status_code_summary = Counter([x.status_code for x in feed_results]) + print( + "Successful documents fed: {}/{}.\nBatch progress: {}/{}.".format( + status_code_summary[200], len(feed_batch), idx, len(mini_batches) + ) + ) + return 0 diff --git a/beir/test_retrieval_lexical_vespa.py b/beir/test_retrieval_lexical_vespa.py new file mode 100644 index 00000000..21af5b73 --- /dev/null +++ b/beir/test_retrieval_lexical_vespa.py @@ -0,0 +1,65 @@ +import unittest +from beir.retrieval.search.lexical.vespa_search import VespaLexicalSearch +from beir.retrieval.evaluation import EvaluateRetrieval + + +class TestVespaSearch(unittest.TestCase): + def setUp(self) -> None: + self.application_name = "vespa_test" + self.corpus = { + "1": {"title": "this is a title 1", "text": "this is text 1"}, + "2": {"title": "this is a title 2", "text": "this is text 2"}, + "3": {"title": "this is a title 3", "text": "this is text 3"}, + } + self.queries = {"1": "this is query 1", "2": "this is query 2"} + + def test_or_bm25(self): + self.model = VespaLexicalSearch( + application_name=self.application_name, initialize=True + ) + retriever = EvaluateRetrieval(self.model) + results = retriever.retrieve(corpus=self.corpus, queries=self.queries) + self.assertEqual({"1", "2"}, set(results.keys())) + for query_id in results.keys(): + self.assertEqual({"1", "2", "3"}, set(results[query_id].keys())) + + def test_or_native_rank(self): + self.model = VespaLexicalSearch( + application_name=self.application_name, + initialize=True, + match_phase="or", + rank_phase="native_rank", + ) + retriever = EvaluateRetrieval(self.model) + results = retriever.retrieve(corpus=self.corpus, queries=self.queries) + self.assertEqual({"1", "2"}, set(results.keys())) + for query_id in results.keys(): + self.assertEqual({"1", "2", "3"}, set(results[query_id].keys())) + + def test_weakand_bm25(self): + self.model = VespaLexicalSearch( + application_name=self.application_name, + initialize=True, + match_phase="weak_and", + ) + retriever = EvaluateRetrieval(self.model) + results = retriever.retrieve(corpus=self.corpus, queries=self.queries) + self.assertEqual({"1", "2"}, set(results.keys())) + for query_id in results.keys(): + self.assertEqual({"1", "2", "3"}, set(results[query_id].keys())) + + def test_weakand_native_rank(self): + self.model = VespaLexicalSearch( + application_name=self.application_name, + initialize=True, + match_phase="weak_and", + rank_phase="native_rank", + ) + retriever = EvaluateRetrieval(self.model) + results = retriever.retrieve(corpus=self.corpus, queries=self.queries) + self.assertEqual({"1", "2"}, set(results.keys())) + for query_id in results.keys(): + self.assertEqual({"1", "2", "3"}, set(results[query_id].keys())) + + def tearDown(self) -> None: + self.model.remove_app() diff --git a/examples/benchmarking/benchmark_lexical_vespa.py b/examples/benchmarking/benchmark_lexical_vespa.py new file mode 100755 index 00000000..25c562d8 --- /dev/null +++ b/examples/benchmarking/benchmark_lexical_vespa.py @@ -0,0 +1,257 @@ +#! /usr/bin/env python3 + +import os +import shutil +from typing import Tuple, Optional, List, Dict +from beir import util +from beir.datasets.data_loader import GenericDataLoader +from beir.retrieval.search.lexical.vespa_search import VespaLexicalSearch +from beir.retrieval.evaluation import EvaluateRetrieval +from pandas import DataFrame + + +def download_and_unzip_dataset(data_dir: str, dataset_name: str) -> str: + """ + Download and unzip dataset + + :param data_dir: Folder path to hold the downloaded files + :param dataset_name: Name of the dataset according to BEIR benchmark + + :return: Return the path of the folder containing the unzipped dataset files. + """ + url = "https://public.ukp.informatik.tu-darmstadt.de/thakur/BEIR/datasets/{}.zip".format( + dataset_name + ) + data_path = util.download_and_unzip(url, data_dir) + print("Dataset downloaded here: {}".format(data_path)) + return data_path + + +def prepare_data(data_path: str, split_type: str = "test") -> Tuple: + """ + Extract corpus, queries and qrels from the test dataset. + + :param data_path: Folder path that contains the unzipped dataset files. + :param split_type: One of 'train', 'dev' or 'test' set. + + :return: a tuple containing 'corpus', 'queries' and 'qrels'. + """ + corpus, queries, qrels = GenericDataLoader(data_path).load( + split=split_type + ) # or split = "train" or "dev" + return corpus, queries, qrels + + +def parse_match_phase_argument(match_phase: Optional[List[str]] = None) -> List[str]: + """ + Parse match phase argument. + + :param match_phase: An optional list of match phase types to use in the experiments. + Currently supported types are 'weak_and' and 'or'. By default the experiments will use + 'weak_and'. + + :return: A list with all the match phase types to use in the experiments. + """ + if not match_phase: + match_phase_list = ["weak_and"] + else: + assert all( + [x in ["or", "weak_and"] for x in match_phase] + ), "match_phase must be a list containing 'weak_and' and/or 'or'." + match_phase_list = match_phase + return match_phase_list + + +def parse_rank_phase_argument(rank_phase: Optional[List[str]] = None) -> List[str]: + """ + Parse rank phase argument. + + :param rank_phase: An optional list of rank phase types to use in the experiments. + Currently supported types are 'bm25' and 'native_rank'. By default the experiments will use + 'bm25'. + + :return: A list with all the match phase types to use in the experiments. + """ + if not rank_phase: + rank_phase_list = ["bm25"] + else: + assert all( + [x in ["native_rank", "bm25"] for x in rank_phase] + ), "rank_phase must be a list containing 'native_rank' and/or 'bm25'." + rank_phase_list = rank_phase + return rank_phase_list + + +def get_search_results( + dataset_name: str, + corpus: Dict[str, Dict[str, str]], + queries: Dict[str, str], + qrels: Dict[str, Dict[str, int]], + match_phase: Optional[List[str]] = None, + rank_phase: Optional[List[str]] = None, + initialize: bool = True, + remove_app: bool = True, +): + """ + Deploy an Vespa app, feed, query and compute evaluation metrics + + :param dataset_name: Name of the dataset according to BEIR benchmark + :param corpus: Corpus used to feed the app. + :param queries: Queries used to query the app. + :param qrels: Labeled data used to evaluate the query results. + :param match_phase: An optional list of match phase types to use in the experiments. + Currently supported types are 'weak_and' and 'or'. By default the experiments will use + 'weak_and'. + :param rank_phase: An optional list of rank phase types to use in the experiments. + Currently supported types are 'bm25' and 'native_rank'. By default the experiments will use + 'bm25'. + :param initialize: Deploy and feed the app on the first run of the experiments. Default to True. + :param remove_app: Stop and remove the app after the experiments are run. Default to True. + """ + if initialize: + deployment_parameters = None + else: + deployment_parameters = {"url": "http://localhost", "port": 8089} + match_phase_list = parse_match_phase_argument(match_phase=match_phase) + rank_phase_list = parse_rank_phase_argument(rank_phase=rank_phase) + metrics = [] + for match_phase in match_phase_list: + for rank_phase in rank_phase_list: + model = VespaLexicalSearch( + application_name=dataset_name, + match_phase=match_phase, + rank_phase=rank_phase, + initialize=initialize, + deployment_parameters=deployment_parameters, + ) + initialize = False # only initialize the first run + deployment_parameters = {"url": "http://localhost", "port": 8089} + retriever = EvaluateRetrieval(model) + results = retriever.retrieve(corpus, queries) + ndcg, _map, recall, precision = retriever.evaluate( + qrels, results, retriever.k_values + ) + metric = { + "dataset_name": dataset_name, + "match_phase": match_phase, + "rank_phase": rank_phase, + } + metric.update(ndcg) + metric.update(_map) + metric.update(recall) + metric.update(precision) + metrics.append(metric) + if remove_app: + try: + model.remove_app() + except: # todo: could not find how to increase container.remove() timeout (https://github.com/docker/docker-py/issues/2951) + pass + return metrics + + +def benchmark_vespa_lexical( + data_dir: str, + dataset_names: List[str], + split_type: str = "test", + match_phase: Optional[List[str]] = None, + rank_phase: Optional[List[str]] = None, + initialize: bool = True, + remove_dataset: bool = True, + remove_app: bool = True, +): + """ + Benchmark Vespa lexical search app against a suite of BEIR datasets. + + A metrics.csv file will be created at 'data_dir' containing the metrics computed in the experiments. + + :param data_dir: Folder path to hold the downloaded files + :param dataset_names: A list of dataset names according to the BEIR benchmark. + :param split_type: One of 'train', 'dev' or 'test' set. + :param match_phase: An optional list of match phase types to use in the experiments. + Currently supported types are 'weak_and' and 'or'. By default the experiments will use + 'weak_and'. + :param rank_phase: An optional list of rank phase types to use in the experiments. + Currently supported types are 'bm25' and 'native_rank'. By default the experiments will use + 'bm25'. + :param initialize: Deploy and feed the app on the first run of the experiments. Default to True. + :param remove_dataset: Remove dataset files after the experiments are run. Default to True. + :param remove_app: Stop and remove the app after the experiments are run. Default to True. + """ + result = [] + for dataset_name in dataset_names: + print("Dataset: {}".format(dataset_name)) + data_path = download_and_unzip_dataset( + data_dir=data_dir, dataset_name=dataset_name + ) + corpus, queries, qrels = prepare_data( + data_path=data_path, split_type=split_type + ) + metrics = get_search_results( + dataset_name=dataset_name, + corpus=corpus, + queries=queries, + qrels=qrels, + match_phase=match_phase, + rank_phase=rank_phase, + initialize=initialize, + remove_app=remove_app, + ) + output_file = os.path.join(data_dir, "metrics.csv") + if os.path.isfile(output_file): + DataFrame.from_records(metrics).to_csv( + output_file, mode="a", header=False, index=False + ) + else: + DataFrame.from_records(metrics).to_csv( + output_file, mode="w", header=True, index=False + ) + print(metrics) + result.extend(metrics) + if remove_dataset: + shutil.rmtree(os.path.join(data_dir, dataset_name)) + os.remove(os.path.join(data_dir, dataset_name + ".zip")) + return result + + +if __name__ == "__main__": + + data_dir = os.environ.get("DATA_DIR", os.getcwd()) + dataset_names = [ + "scifact", + "trec-covid", + "nfcorpus", + "nq", + "fiqa", + "arguana", + "webis-touche2020", + "cqadupstack", + "quora", + "dbpedia-entity", + "scidocs", + "fever", + "climate-fever", + "hotpotqa", + ] + _ = benchmark_vespa_lexical( + data_dir=data_dir, + dataset_names=dataset_names, + split_type="test", + match_phase=["weak_and"], + rank_phase=["bm25"], + initialize=True, + remove_dataset=True, + remove_app=True, + ) + # + # MS MARCO is the only dataset which uses the dev set to compute the metrics + # + _ = benchmark_vespa_lexical( + data_dir=data_dir, + dataset_names=["msmarco"], + split_type="dev", + match_phase=["weak_and"], + rank_phase=["bm25"], + initialize=True, + remove_dataset=True, + remove_app=True, + ) diff --git a/setup.py b/setup.py index 29de4fdc..652ebc76 100644 --- a/setup.py +++ b/setup.py @@ -21,6 +21,8 @@ 'pytrec_eval', 'faiss_cpu', 'elasticsearch', + 'pyvespa', + 'tenacity', 'tensorflow>=2.2.0', 'tensorflow-text', 'tensorflow-hub'