diff --git a/drivers/elastic.py b/drivers/elastic.py index 4193c7b..9b92506 100644 --- a/drivers/elastic.py +++ b/drivers/elastic.py @@ -8,8 +8,10 @@ INDEX = "threads" class Elastic_Testing(DB_Testing): driver_name = "ES" - def __init__(self) -> None: + def __init__(self, shards = 1) -> None: super().__init__() + self.shards = shards + self.driver_name += f" ({shards} Shards)" self.singles_data = [] for t in Threads: x = t.__dict__.copy() @@ -30,7 +32,8 @@ class Elastic_Testing(DB_Testing): driver.indices.create(index=INDEX, body={ 'settings': { 'index': { - 'number_of_replicas': 0 + 'number_of_replicas': 0, + 'number_of_shards': self.shards } } }) diff --git a/drivers/elastic4.py b/drivers/elastic4.py deleted file mode 100644 index 9e74c9d..0000000 --- a/drivers/elastic4.py +++ /dev/null @@ -1,215 +0,0 @@ -from elasticsearch import Elasticsearch -from elasticsearch.helpers import bulk -from models.data_thread import Threads -from interfaces.db_testing_interface import DB_Testing -import json - -INDEX = "threads" - -class Elastic_2_Shards_Testing(DB_Testing): - driver_name = "ES (4-Shards)" - def __init__(self) -> None: - super().__init__() - self.singles_data = [] - for t in Threads: - x = t.__dict__.copy() - self.singles_data.append(x) - - self.bulk_data = [] - for t in Threads: - y = "{\"index\":{}}\n" - y += json.dumps(t.__dict__.copy()) - self.bulk_data.append(y) - - def delete_table(self, driver: Elasticsearch): - if driver.indices.exists(index=INDEX): - driver.indices.delete(index=INDEX) - - def create_table(self, driver: Elasticsearch): - self.delete_table(driver) - driver.indices.create(index=INDEX, body={ - 'settings': { - 'index': { - 'number_of_replicas': 0, - 'number_of_shards': 2 - } - } - }) - - def delete_data(self, driver: Elasticsearch): - if driver.indices.exists(index=INDEX): - driver.indices.delete(index=INDEX) - - def add_bulk(self, driver: Elasticsearch): - self.create_table(driver) - for i in range(0, len(self.bulk_data), 1000): - driver.bulk(index=INDEX, operations = '\n'.join(self.bulk_data[i:i+1000]), refresh=True) - - def add_singles(self, driver: Elasticsearch): - for t in self.singles_data: - driver.index(index=INDEX, document=t) - - def attach5_mr_mrs(self, driver: Elasticsearch): - query = { - "query": { - "bool": { - "must": [ - { - "range": { - "attachment_count": { - "gt": 5 - } - } - }, - { - "terms": { - "subject.keyword": ["Mr", "Mrs"] - } - } - ] - } - } - } - return f"Got {driver.count(index=INDEX, body=query).body['count']}" - - def search_mails_ends_microsoftcom(self, driver: Elasticsearch): - query = { - "query": { - "wildcard": { - "cc_recipients.keyword": "*@microsoft.com" - } - } - } - return f"Got {driver.count(index=INDEX, body=query).body['count']}" - - def search_lorem(self, driver: Elasticsearch): - query = { - "query": { - "match_phrase": { - "body": "Nullam sit amet turpis elementum ligula vehicula consequat. Morbi a ipsum. Integer a nibh." - } - } - } - return f"Got {driver.count(index=INDEX, body=query).body['count']}" - - def get_sum_attachment_less_5(self, driver: Elasticsearch): - query = { - "size": 0, - "query": { - "bool": { - "must": [ - { - "range": { - "attachment_count": { - "lt": 5 - } - } - } - ] - } - }, - "aggs": { - "attachment_count": { - "sum": { - "field": "attachment_count" - } - } - } - } - return f"Got {driver.search(index=INDEX, body=query).body['aggregations']['attachment_count']['value']}" - - def update_add_replies_per_last_name(self, driver: Elasticsearch) -> str: - agg_query = { - "size": 0, - "aggs": { - "reply_count_by_last_name": { - "terms": { - "script": { - "source": "String name = doc['sender_name.keyword'].value; String[] parts = /\\s+/.split(name); return parts[parts.length - 1]", - "lang": "painless" - }, - "size": 1000 - }, - "aggs": { - "last_name_replies": { - "sum": { - "field": "reply_count" - } - } - } - } - } - } - - agg_results = driver.search(index=INDEX, body=agg_query) - - buckets = agg_results['aggregations']['reply_count_by_last_name']['buckets'] - reply_counts = {bucket['key']: bucket['last_name_replies']['value'] for bucket in buckets} - - update_query = { - "script": { - "source": """ - String name = ctx._source.sender_name; - String[] parts = /\\s+/.split(name); - String lastName = parts[parts.length - 1]; - if (params.replyCounts.containsKey(lastName)) { - ctx._source.last_name_replies = params.replyCounts.get(lastName); - } - """, - "lang": "painless", - "params": { - "replyCounts": reply_counts - } - }, - "query": { - "match_all": {} - } - } - - driver.update_by_query(index=INDEX, body=update_query, conflicts="proceed", slices=len(Threads)//250, request_timeout=120) - - - def update_add_replies_per_subject(self, driver: Elasticsearch) -> str: - agg_query = { - "size": 0, - "aggs": { - "reply_count_by_subject": { - "terms": { - "script": { - "source": "String name = doc['sender_name.keyword'].value; String[] parts = /\\s+/.split(name); return parts[parts.length - 1]", - "lang": "painless" - }, - "size": 1000 - }, - "aggs": { - "subject_replies": { - "sum": { - "field": "reply_count" - } - } - } - } - } - } - - agg_results = driver.search(index=INDEX, body=agg_query) - - buckets = agg_results['aggregations']['reply_count_by_subject']['buckets'] - reply_counts = {bucket['key']: bucket['subject_replies']['value'] for bucket in buckets} - - update_query = { - "script": { - "source": """ - ctx._source.subject_replies = params.replyCounts.get(ctx.subject); - """, - "lang": "painless", - "params": { - "replyCounts": reply_counts - } - }, - "query": { - "match_all": {} - } - } - - driver.update_by_query(index=INDEX, body=update_query, conflicts="proceed", slices=len(Threads)//250, request_timeout=120) diff --git a/main.py b/main.py index f048065..646a156 100644 --- a/main.py +++ b/main.py @@ -2,7 +2,6 @@ from elasticsearch import Elasticsearch from psycopg2 import connect from drivers.psql import PSQL_Testing from drivers.elastic import Elastic_Testing -from drivers.elastic4 import Elastic_2_Shards_Testing from sshtunnel import SSHTunnelForwarder from utils.utils import preprocess_json from typing_extensions import Dict @@ -21,24 +20,14 @@ def test_psql(): psqlClient.autocommit = False return PSQL_Testing().do_tests(psqlClient) -def test_elasticsearch(): +def test_elasticsearch(shards = 1): es = Elasticsearch( 'https://127.0.0.1:9200', api_key='RVAzeHpvOEJXTnUzZ2RiTkdWX2Q6TksySHBfaEFSWktoQmNPOFFSbm1DUQ==', verify_certs=False, # just to not create certificates ssl_show_warn=False ) - return Elastic_Testing().do_tests(es) - - -def test_elasticsearch_2_shards(): - es = Elasticsearch( - 'https://127.0.0.1:9200', - api_key='RVAzeHpvOEJXTnUzZ2RiTkdWX2Q6TksySHBfaEFSWktoQmNPOFFSbm1DUQ==', - verify_certs=False, # just to not create certificates - ssl_show_warn=False - ) - return Elastic_2_Shards_Testing().do_tests(es) + return Elastic_Testing(shards).do_tests(es) def plot(timings: Dict[str, Dict[str, float]]): @@ -59,7 +48,7 @@ def plot(timings: Dict[str, Dict[str, float]]): fig, ax = plt.subplots(figsize=(12, 8)) index = np.arange(len(functions)) - bar_width = 0.25 + bar_width = 0.2 for i, driver in enumerate(drivers): ax.bar(index + i * bar_width, [values[func][i] for func in functions], bar_width, label=driver) @@ -74,7 +63,7 @@ def plot(timings: Dict[str, Dict[str, float]]): for i, x in enumerate(drivers): for j, v in enumerate([values[func][i] for func in functions]): - plt.text(j + (i * bar_width), v + 0.02, str(round(v, 2)), fontsize=8, ha='center', va='bottom') + plt.text(j + (i * bar_width), v + 0.02, str(round(v, 2)), fontsize=6, ha='center', va='bottom') fig.tight_layout() plt.savefig("plot-orodruin-opti.png") @@ -91,8 +80,8 @@ if __name__ == "__main__": timings['PSQL'] = test_psql() timings['ES'] = test_elasticsearch() - timings['ES2shards'] = test_elasticsearch_2_shards() - + timings['ES2shards'] = test_elasticsearch(2) + timings['ES4shards'] = test_elasticsearch(4) plot(timings) print(timings) diff --git a/plot-tma-opti-psql-1-2.png b/plot-tma-opti-psql-1-2.png new file mode 100644 index 0000000..72998ec Binary files /dev/null and b/plot-tma-opti-psql-1-2.png differ diff --git a/plot-tma-opti-psql-1-4.png b/plot-tma-opti-psql-1-4.png new file mode 100644 index 0000000..f2e91c1 Binary files /dev/null and b/plot-tma-opti-psql-1-4.png differ diff --git a/plot-tma-opti.png b/plot-tma-opti.png index f2e91c1..f877bf3 100644 Binary files a/plot-tma-opti.png and b/plot-tma-opti.png differ diff --git a/plot-tma.png b/plot-tma.png deleted file mode 100644 index 0e96859..0000000 Binary files a/plot-tma.png and /dev/null differ diff --git a/plot.png b/plot.png deleted file mode 100644 index bcec221..0000000 Binary files a/plot.png and /dev/null differ diff --git a/plot_1.png b/plot_1.png deleted file mode 100644 index 2fcd52e..0000000 Binary files a/plot_1.png and /dev/null differ diff --git a/plot_100.png b/plot_100.png deleted file mode 100644 index baffcfd..0000000 Binary files a/plot_100.png and /dev/null differ diff --git a/plot_10000.png b/plot_10000.png deleted file mode 100644 index 9f049be..0000000 Binary files a/plot_10000.png and /dev/null differ