diff --git a/.gitignore b/.gitignore index ed8ebf5..b0f2192 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -__pycache__ \ No newline at end of file +__pycache__ +.venv \ No newline at end of file diff --git a/drivers/elastic.py b/drivers/elastic.py index 66b6674..4193c7b 100644 --- a/drivers/elastic.py +++ b/drivers/elastic.py @@ -2,6 +2,7 @@ from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk from models.data_thread import Threads from interfaces.db_testing_interface import DB_Testing +import json INDEX = "threads" @@ -16,8 +17,8 @@ class Elastic_Testing(DB_Testing): self.bulk_data = [] for t in Threads: - y = t.__dict__.copy() - y["_index"] = INDEX + y = "{\"index\":{}}\n" + y += json.dumps(t.__dict__.copy()) self.bulk_data.append(y) def delete_table(self, driver: Elasticsearch): @@ -39,7 +40,8 @@ class Elastic_Testing(DB_Testing): driver.indices.delete(index=INDEX) def add_bulk(self, driver: Elasticsearch): - bulk(driver, self.bulk_data) + for i in range(0, len(self.bulk_data), 1000): + driver.bulk(index=INDEX, operations = '\n'.join(self.bulk_data[i:i+1000]), refresh=True) def add_singles(self, driver: Elasticsearch): for t in self.singles_data: @@ -162,7 +164,7 @@ class Elastic_Testing(DB_Testing): } } - driver.update_by_query(index=INDEX, body=update_query, conflicts="proceed") + driver.update_by_query(index=INDEX, body=update_query, conflicts="proceed", slices=len(Threads)//250, request_timeout=120) def update_add_replies_per_subject(self, driver: Elasticsearch) -> str: @@ -208,4 +210,4 @@ class Elastic_Testing(DB_Testing): } } - driver.update_by_query(index=INDEX, body=update_query, conflicts="proceed") \ No newline at end of file + driver.update_by_query(index=INDEX, body=update_query, conflicts="proceed", slices=len(Threads)//250, request_timeout=120) diff --git a/drivers/elastic4.py b/drivers/elastic4.py new file mode 100644 index 0000000..bb0b703 --- /dev/null +++ b/drivers/elastic4.py @@ -0,0 +1,215 @@ +from elasticsearch import Elasticsearch +from elasticsearch.helpers import bulk +from models.data_thread import Threads +from interfaces.db_testing_interface import DB_Testing +import json + +INDEX = "threads" + +class Elastic_4_Shards_Testing(DB_Testing): + driver_name = "ES (4-Shards)" + def __init__(self) -> None: + super().__init__() + self.singles_data = [] + for t in Threads: + x = t.__dict__.copy() + self.singles_data.append(x) + + self.bulk_data = [] + for t in Threads: + y = "{\"index\":{}}\n" + y += json.dumps(t.__dict__.copy()) + self.bulk_data.append(y) + + def delete_table(self, driver: Elasticsearch): + if driver.indices.exists(index=INDEX): + driver.indices.delete(index=INDEX) + + def create_table(self, driver: Elasticsearch): + self.delete_table(driver) + driver.indices.create(index=INDEX, body={ + 'settings': { + 'index': { + 'number_of_replicas': 0, + 'number_of_shards': 4 + } + } + }) + + def delete_data(self, driver: Elasticsearch): + if driver.indices.exists(index=INDEX): + driver.indices.delete(index=INDEX) + + def add_bulk(self, driver: Elasticsearch): + self.create_table(driver) + for i in range(0, len(self.bulk_data), 1000): + driver.bulk(index=INDEX, operations = '\n'.join(self.bulk_data[i:i+1000]), refresh=True) + + def add_singles(self, driver: Elasticsearch): + for t in self.singles_data: + driver.index(index=INDEX, document=t) + + def attach5_mr_mrs(self, driver: Elasticsearch): + query = { + "query": { + "bool": { + "must": [ + { + "range": { + "attachment_count": { + "gt": 5 + } + } + }, + { + "terms": { + "subject.keyword": ["Mr", "Mrs"] + } + } + ] + } + } + } + return f"Got {driver.count(index=INDEX, body=query).body['count']}" + + def search_mails_ends_microsoftcom(self, driver: Elasticsearch): + query = { + "query": { + "wildcard": { + "cc_recipients.keyword": "*@microsoft.com" + } + } + } + return f"Got {driver.count(index=INDEX, body=query).body['count']}" + + def search_lorem(self, driver: Elasticsearch): + query = { + "query": { + "match_phrase": { + "body": "Nullam sit amet turpis elementum ligula vehicula consequat. Morbi a ipsum. Integer a nibh." + } + } + } + return f"Got {driver.count(index=INDEX, body=query).body['count']}" + + def get_sum_attachment_less_5(self, driver: Elasticsearch): + query = { + "size": 0, + "query": { + "bool": { + "must": [ + { + "range": { + "attachment_count": { + "lt": 5 + } + } + } + ] + } + }, + "aggs": { + "attachment_count": { + "sum": { + "field": "attachment_count" + } + } + } + } + return f"Got {driver.search(index=INDEX, body=query).body['aggregations']['attachment_count']['value']}" + + def update_add_replies_per_last_name(self, driver: Elasticsearch) -> str: + agg_query = { + "size": 0, + "aggs": { + "reply_count_by_last_name": { + "terms": { + "script": { + "source": "String name = doc['sender_name.keyword'].value; String[] parts = /\\s+/.split(name); return parts[parts.length - 1]", + "lang": "painless" + }, + "size": 1000 + }, + "aggs": { + "last_name_replies": { + "sum": { + "field": "reply_count" + } + } + } + } + } + } + + agg_results = driver.search(index=INDEX, body=agg_query) + + buckets = agg_results['aggregations']['reply_count_by_last_name']['buckets'] + reply_counts = {bucket['key']: bucket['last_name_replies']['value'] for bucket in buckets} + + update_query = { + "script": { + "source": """ + String name = ctx._source.sender_name; + String[] parts = /\\s+/.split(name); + String lastName = parts[parts.length - 1]; + if (params.replyCounts.containsKey(lastName)) { + ctx._source.last_name_replies = params.replyCounts.get(lastName); + } + """, + "lang": "painless", + "params": { + "replyCounts": reply_counts + } + }, + "query": { + "match_all": {} + } + } + + driver.update_by_query(index=INDEX, body=update_query, conflicts="proceed", slices=len(Threads)//250, request_timeout=120) + + + def update_add_replies_per_subject(self, driver: Elasticsearch) -> str: + agg_query = { + "size": 0, + "aggs": { + "reply_count_by_subject": { + "terms": { + "script": { + "source": "String name = doc['sender_name.keyword'].value; String[] parts = /\\s+/.split(name); return parts[parts.length - 1]", + "lang": "painless" + }, + "size": 1000 + }, + "aggs": { + "subject_replies": { + "sum": { + "field": "reply_count" + } + } + } + } + } + } + + agg_results = driver.search(index=INDEX, body=agg_query) + + buckets = agg_results['aggregations']['reply_count_by_subject']['buckets'] + reply_counts = {bucket['key']: bucket['subject_replies']['value'] for bucket in buckets} + + update_query = { + "script": { + "source": """ + ctx._source.subject_replies = params.replyCounts.get(ctx.subject); + """, + "lang": "painless", + "params": { + "replyCounts": reply_counts + } + }, + "query": { + "match_all": {} + } + } + + driver.update_by_query(index=INDEX, body=update_query, conflicts="proceed", slices=len(Threads)//250, request_timeout=120) diff --git a/interfaces/db_testing_interface.py b/interfaces/db_testing_interface.py index 1b5e0fc..b260257 100644 --- a/interfaces/db_testing_interface.py +++ b/interfaces/db_testing_interface.py @@ -5,7 +5,7 @@ from utils.utils import time_func from time import sleep T = TypeVar('T') -NUM_RUNS = 10 +NUM_RUNS = 100 class DB_Testing(Generic[T]): driver_name: str @@ -61,8 +61,8 @@ class DB_Testing(Generic[T]): def do_tests(self, driver: T) -> Dict[str, float]: stats = {} print(f"\n### Starting {self.driver_name} Benchmark ###") - stats.update(time_func(f"[{self.driver_name}] Reset table", "delete_table", lambda : self.delete_table(driver))) - stats.update(time_func(f"[{self.driver_name}] Create table", "create_table", lambda : self.create_table(driver))) + self.delete_table(driver) + self.create_table(driver) stats.update(time_func(f"[{self.driver_name}] Add singles", "add_singles", lambda : self.add_singles(driver))) stats.update(time_func(f"[{self.driver_name}] Delete data", "delete_data", lambda : self.delete_data(driver))) stats.update(time_func(f"[{self.driver_name}] Add bulk", "add_bulk", lambda : self.add_bulk(driver))) @@ -83,4 +83,4 @@ class DB_Testing(Generic[T]): print("\n") - return stats \ No newline at end of file + return stats diff --git a/main.py b/main.py index 7a6873a..4c4bc59 100644 --- a/main.py +++ b/main.py @@ -2,6 +2,7 @@ from elasticsearch import Elasticsearch from psycopg2 import connect from drivers.psql import PSQL_Testing from drivers.elastic import Elastic_Testing +from drivers.elastic4 import Elastic_4_Shards_Testing from sshtunnel import SSHTunnelForwarder from utils.utils import preprocess_json from typing_extensions import Dict @@ -10,13 +11,6 @@ import numpy as np import matplotlib.pyplot as plt def test_psql(): - with SSHTunnelForwarder( - ssh_address_or_host=("orodruin.mordor", 22), - ssh_username="postgres", - ssh_password="postgres", - remote_bind_address=('127.0.0.1', 5432), - local_bind_address = ('127.0.0.1', 5432) - ): psqlClient = connect( database="postgres", host="127.0.0.1", @@ -26,58 +20,64 @@ def test_psql(): ) psqlClient.autocommit = False return PSQL_Testing().do_tests(psqlClient) - - def test_elasticsearch(): es = Elasticsearch( - 'https://orodruin.mordor:9200/', - api_key='WjMwVXQ0OEJnUzRTOUVUaVNNVHY6MFh2X3RDcGRRWC1FRVNRZkdhWlYwUQ==', + 'http://elastic-tma.docker.tma.coe.int:80', + api_key='VGxhTnhJOEJGM3NDbkpWLVBzUkg6eHE4c3FuclhTWW1sRm9YN0FkWmRMdw==', verify_certs=False, # just to not create certificates ssl_show_warn=False ) return Elastic_Testing().do_tests(es) +def test_elasticsearch_4_shards(): + es = Elasticsearch( + 'http://elastic-tma.docker.tma.coe.int:80', + api_key='VGxhTnhJOEJGM3NDbkpWLVBzUkg6eHE4c3FuclhTWW1sRm9YN0FkWmRMdw==', + verify_certs=False, # just to not create certificates + ssl_show_warn=False + ) + return Elastic_4_Shards_Testing().do_tests(es) + def plot(timings: Dict[str, Dict[str, float]]): - # Transform the dict from {Driver: {Function, timing}} into {Function: [{Drivers, timing}]} - usable_dict = defaultdict(lambda: defaultdict(float)) - for driver_name, function_timing in timings.items(): - for function_name, timing in function_timing.items(): - usable_dict[function_name][driver_name] += timing - usable_dict = {k: dict(v) for k, v in usable_dict.items()} - - relative_dict = {} - for function, systems in usable_dict.items(): - relative_dict[function] = systems['ES'] / systems['PSQL'] - - functions = list(relative_dict.keys()) - psql_values = [1] * len(functions) # List of ones for the psql values - - es_values = list(relative_dict.values()) + functions = list(timings['PSQL'].keys()) + drivers = list(timings.keys()) + values = {func: [] for func in functions} - plt.figure(figsize=(12, 8)) + for func in functions: + values[func].append(timings['PSQL'][func]); + + for driver in [x for x in drivers if x != 'PSQL']: + for func in functions: + values[func].append(timings[driver][func] / values[func][0]) + + for func in functions: + values[func][0] = 1 + + fig, ax = plt.subplots(figsize=(12, 8)) index = np.arange(len(functions)) - bar_width = 0.35 + bar_width = 0.25 - plt.bar(index, psql_values, bar_width, label='PSQL', color='lightblue') - plt.bar(index + bar_width, es_values, bar_width, label='ES', color='orange') + for i, driver in enumerate(drivers): + ax.bar(index + i * bar_width, [values[func][i] for func in functions], bar_width, label=driver) - plt.xlabel('Functions') - plt.ylabel('Relative Time') - plt.title('Performance of ES Relative to PSQL') - plt.xticks(index + bar_width / 2, functions, rotation=45, ha="right") - plt.legend() - for i, v in enumerate(psql_values): - plt.text(i, v + 0.02, str(v), ha='center', va='bottom') - for i, v in enumerate(es_values): - plt.text(i + bar_width, v + 0.02, str(round(v, 2)), ha='center', va='bottom') + ax.set_xlabel('Functions') + ax.set_ylabel('Relative Time') + ax.set_title('Performance of ES Relative to PSQL') + ax.set_xticks(index + bar_width * (len(drivers) -1 ) / 2, functions, rotation=45, ha="right") + ax.set_xticklabels(functions) + ax.legend() - plt.tight_layout() - plt.savefig("plot.png") + for i, x in enumerate(drivers): + for j, v in enumerate([values[func][i] for func in functions]): + plt.text(j + (i * bar_width), v + 0.02, str(round(v, 2)), fontsize=8, ha='center', va='bottom') + + fig.tight_layout() + plt.savefig("plot-tma-opti.png") plt.show() @@ -89,7 +89,10 @@ if __name__ == "__main__": timings = {} - # timings['PSQL'] = test_psql() + timings['PSQL'] = test_psql() timings['ES'] = test_elasticsearch() + timings['ES4shards'] = test_elasticsearch_4_shards() - # plot(timings) \ No newline at end of file + + plot(timings) + print(timings) diff --git a/plot-tma-opti.png b/plot-tma-opti.png new file mode 100644 index 0000000..f2e91c1 Binary files /dev/null and b/plot-tma-opti.png differ diff --git a/plot-tma.png b/plot-tma.png new file mode 100644 index 0000000..0e96859 Binary files /dev/null and b/plot-tma.png differ diff --git a/requirements.txt b/requirements.txt index 5bd4182..68bcb97 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ -psycopg2==2.9.9 sshtunnel==0.4.0 typing_extensions==4.12.0 typing==3.7.4.3 elasticsearch==8.13.2 -matplotlib==3.9.0 \ No newline at end of file +matplotlib==3.9.0 +psycopg2-binary diff --git a/utils/utils.py b/utils/utils.py index 2dbbefb..3f5b3b4 100644 --- a/utils/utils.py +++ b/utils/utils.py @@ -26,10 +26,11 @@ def time_func(name: str, function_name: str, x : Callable, repeat : Optional[int def preprocess_json(): - with open('data.json', 'r') as f: - data : dict = json.load(f) - for d in data: - x = Data_Thread() - for k in d.keys(): - x.__setattr__(k, d[k]) - Threads.append(x) \ No newline at end of file + for x in range(0, 1): + with open('data.json', 'r') as f: + data : dict = json.load(f) + for d in data: + x = Data_Thread() + for k in d.keys(): + x.__setattr__(k, d[k]) + Threads.append(x) \ No newline at end of file