[Added multishard elastic + Plots]

This commit is contained in:
Evann Regnault 2024-05-31 15:53:07 +02:00
parent 682e4885f2
commit d07d511a9f
9 changed files with 284 additions and 62 deletions

1
.gitignore vendored
View file

@ -1 +1,2 @@
__pycache__ __pycache__
.venv

View file

@ -2,6 +2,7 @@ from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk from elasticsearch.helpers import bulk
from models.data_thread import Threads from models.data_thread import Threads
from interfaces.db_testing_interface import DB_Testing from interfaces.db_testing_interface import DB_Testing
import json
INDEX = "threads" INDEX = "threads"
@ -16,8 +17,8 @@ class Elastic_Testing(DB_Testing):
self.bulk_data = [] self.bulk_data = []
for t in Threads: for t in Threads:
y = t.__dict__.copy() y = "{\"index\":{}}\n"
y["_index"] = INDEX y += json.dumps(t.__dict__.copy())
self.bulk_data.append(y) self.bulk_data.append(y)
def delete_table(self, driver: Elasticsearch): def delete_table(self, driver: Elasticsearch):
@ -39,7 +40,8 @@ class Elastic_Testing(DB_Testing):
driver.indices.delete(index=INDEX) driver.indices.delete(index=INDEX)
def add_bulk(self, driver: Elasticsearch): def add_bulk(self, driver: Elasticsearch):
bulk(driver, self.bulk_data) for i in range(0, len(self.bulk_data), 1000):
driver.bulk(index=INDEX, operations = '\n'.join(self.bulk_data[i:i+1000]), refresh=True)
def add_singles(self, driver: Elasticsearch): def add_singles(self, driver: Elasticsearch):
for t in self.singles_data: for t in self.singles_data:
@ -162,7 +164,7 @@ class Elastic_Testing(DB_Testing):
} }
} }
driver.update_by_query(index=INDEX, body=update_query, conflicts="proceed") driver.update_by_query(index=INDEX, body=update_query, conflicts="proceed", slices=len(Threads)//250, request_timeout=120)
def update_add_replies_per_subject(self, driver: Elasticsearch) -> str: def update_add_replies_per_subject(self, driver: Elasticsearch) -> str:
@ -208,4 +210,4 @@ class Elastic_Testing(DB_Testing):
} }
} }
driver.update_by_query(index=INDEX, body=update_query, conflicts="proceed") driver.update_by_query(index=INDEX, body=update_query, conflicts="proceed", slices=len(Threads)//250, request_timeout=120)

215
drivers/elastic4.py Normal file
View file

@ -0,0 +1,215 @@
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from models.data_thread import Threads
from interfaces.db_testing_interface import DB_Testing
import json
INDEX = "threads"
class Elastic_4_Shards_Testing(DB_Testing):
driver_name = "ES (4-Shards)"
def __init__(self) -> None:
super().__init__()
self.singles_data = []
for t in Threads:
x = t.__dict__.copy()
self.singles_data.append(x)
self.bulk_data = []
for t in Threads:
y = "{\"index\":{}}\n"
y += json.dumps(t.__dict__.copy())
self.bulk_data.append(y)
def delete_table(self, driver: Elasticsearch):
if driver.indices.exists(index=INDEX):
driver.indices.delete(index=INDEX)
def create_table(self, driver: Elasticsearch):
self.delete_table(driver)
driver.indices.create(index=INDEX, body={
'settings': {
'index': {
'number_of_replicas': 0,
'number_of_shards': 4
}
}
})
def delete_data(self, driver: Elasticsearch):
if driver.indices.exists(index=INDEX):
driver.indices.delete(index=INDEX)
def add_bulk(self, driver: Elasticsearch):
self.create_table(driver)
for i in range(0, len(self.bulk_data), 1000):
driver.bulk(index=INDEX, operations = '\n'.join(self.bulk_data[i:i+1000]), refresh=True)
def add_singles(self, driver: Elasticsearch):
for t in self.singles_data:
driver.index(index=INDEX, document=t)
def attach5_mr_mrs(self, driver: Elasticsearch):
query = {
"query": {
"bool": {
"must": [
{
"range": {
"attachment_count": {
"gt": 5
}
}
},
{
"terms": {
"subject.keyword": ["Mr", "Mrs"]
}
}
]
}
}
}
return f"Got {driver.count(index=INDEX, body=query).body['count']}"
def search_mails_ends_microsoftcom(self, driver: Elasticsearch):
query = {
"query": {
"wildcard": {
"cc_recipients.keyword": "*@microsoft.com"
}
}
}
return f"Got {driver.count(index=INDEX, body=query).body['count']}"
def search_lorem(self, driver: Elasticsearch):
query = {
"query": {
"match_phrase": {
"body": "Nullam sit amet turpis elementum ligula vehicula consequat. Morbi a ipsum. Integer a nibh."
}
}
}
return f"Got {driver.count(index=INDEX, body=query).body['count']}"
def get_sum_attachment_less_5(self, driver: Elasticsearch):
query = {
"size": 0,
"query": {
"bool": {
"must": [
{
"range": {
"attachment_count": {
"lt": 5
}
}
}
]
}
},
"aggs": {
"attachment_count": {
"sum": {
"field": "attachment_count"
}
}
}
}
return f"Got {driver.search(index=INDEX, body=query).body['aggregations']['attachment_count']['value']}"
def update_add_replies_per_last_name(self, driver: Elasticsearch) -> str:
agg_query = {
"size": 0,
"aggs": {
"reply_count_by_last_name": {
"terms": {
"script": {
"source": "String name = doc['sender_name.keyword'].value; String[] parts = /\\s+/.split(name); return parts[parts.length - 1]",
"lang": "painless"
},
"size": 1000
},
"aggs": {
"last_name_replies": {
"sum": {
"field": "reply_count"
}
}
}
}
}
}
agg_results = driver.search(index=INDEX, body=agg_query)
buckets = agg_results['aggregations']['reply_count_by_last_name']['buckets']
reply_counts = {bucket['key']: bucket['last_name_replies']['value'] for bucket in buckets}
update_query = {
"script": {
"source": """
String name = ctx._source.sender_name;
String[] parts = /\\s+/.split(name);
String lastName = parts[parts.length - 1];
if (params.replyCounts.containsKey(lastName)) {
ctx._source.last_name_replies = params.replyCounts.get(lastName);
}
""",
"lang": "painless",
"params": {
"replyCounts": reply_counts
}
},
"query": {
"match_all": {}
}
}
driver.update_by_query(index=INDEX, body=update_query, conflicts="proceed", slices=len(Threads)//250, request_timeout=120)
def update_add_replies_per_subject(self, driver: Elasticsearch) -> str:
agg_query = {
"size": 0,
"aggs": {
"reply_count_by_subject": {
"terms": {
"script": {
"source": "String name = doc['sender_name.keyword'].value; String[] parts = /\\s+/.split(name); return parts[parts.length - 1]",
"lang": "painless"
},
"size": 1000
},
"aggs": {
"subject_replies": {
"sum": {
"field": "reply_count"
}
}
}
}
}
}
agg_results = driver.search(index=INDEX, body=agg_query)
buckets = agg_results['aggregations']['reply_count_by_subject']['buckets']
reply_counts = {bucket['key']: bucket['subject_replies']['value'] for bucket in buckets}
update_query = {
"script": {
"source": """
ctx._source.subject_replies = params.replyCounts.get(ctx.subject);
""",
"lang": "painless",
"params": {
"replyCounts": reply_counts
}
},
"query": {
"match_all": {}
}
}
driver.update_by_query(index=INDEX, body=update_query, conflicts="proceed", slices=len(Threads)//250, request_timeout=120)

View file

@ -5,7 +5,7 @@ from utils.utils import time_func
from time import sleep from time import sleep
T = TypeVar('T') T = TypeVar('T')
NUM_RUNS = 10 NUM_RUNS = 100
class DB_Testing(Generic[T]): class DB_Testing(Generic[T]):
driver_name: str driver_name: str
@ -61,8 +61,8 @@ class DB_Testing(Generic[T]):
def do_tests(self, driver: T) -> Dict[str, float]: def do_tests(self, driver: T) -> Dict[str, float]:
stats = {} stats = {}
print(f"\n### Starting {self.driver_name} Benchmark ###") print(f"\n### Starting {self.driver_name} Benchmark ###")
stats.update(time_func(f"[{self.driver_name}] Reset table", "delete_table", lambda : self.delete_table(driver))) self.delete_table(driver)
stats.update(time_func(f"[{self.driver_name}] Create table", "create_table", lambda : self.create_table(driver))) self.create_table(driver)
stats.update(time_func(f"[{self.driver_name}] Add singles", "add_singles", lambda : self.add_singles(driver))) stats.update(time_func(f"[{self.driver_name}] Add singles", "add_singles", lambda : self.add_singles(driver)))
stats.update(time_func(f"[{self.driver_name}] Delete data", "delete_data", lambda : self.delete_data(driver))) stats.update(time_func(f"[{self.driver_name}] Delete data", "delete_data", lambda : self.delete_data(driver)))
stats.update(time_func(f"[{self.driver_name}] Add bulk", "add_bulk", lambda : self.add_bulk(driver))) stats.update(time_func(f"[{self.driver_name}] Add bulk", "add_bulk", lambda : self.add_bulk(driver)))

83
main.py
View file

@ -2,6 +2,7 @@ from elasticsearch import Elasticsearch
from psycopg2 import connect from psycopg2 import connect
from drivers.psql import PSQL_Testing from drivers.psql import PSQL_Testing
from drivers.elastic import Elastic_Testing from drivers.elastic import Elastic_Testing
from drivers.elastic4 import Elastic_4_Shards_Testing
from sshtunnel import SSHTunnelForwarder from sshtunnel import SSHTunnelForwarder
from utils.utils import preprocess_json from utils.utils import preprocess_json
from typing_extensions import Dict from typing_extensions import Dict
@ -10,13 +11,6 @@ import numpy as np
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
def test_psql(): def test_psql():
with SSHTunnelForwarder(
ssh_address_or_host=("orodruin.mordor", 22),
ssh_username="postgres",
ssh_password="postgres",
remote_bind_address=('127.0.0.1', 5432),
local_bind_address = ('127.0.0.1', 5432)
):
psqlClient = connect( psqlClient = connect(
database="postgres", database="postgres",
host="127.0.0.1", host="127.0.0.1",
@ -27,57 +21,63 @@ def test_psql():
psqlClient.autocommit = False psqlClient.autocommit = False
return PSQL_Testing().do_tests(psqlClient) return PSQL_Testing().do_tests(psqlClient)
def test_elasticsearch(): def test_elasticsearch():
es = Elasticsearch( es = Elasticsearch(
'https://orodruin.mordor:9200/', 'http://elastic-tma.docker.tma.coe.int:80',
api_key='WjMwVXQ0OEJnUzRTOUVUaVNNVHY6MFh2X3RDcGRRWC1FRVNRZkdhWlYwUQ==', api_key='VGxhTnhJOEJGM3NDbkpWLVBzUkg6eHE4c3FuclhTWW1sRm9YN0FkWmRMdw==',
verify_certs=False, # just to not create certificates verify_certs=False, # just to not create certificates
ssl_show_warn=False ssl_show_warn=False
) )
return Elastic_Testing().do_tests(es) return Elastic_Testing().do_tests(es)
def test_elasticsearch_4_shards():
es = Elasticsearch(
'http://elastic-tma.docker.tma.coe.int:80',
api_key='VGxhTnhJOEJGM3NDbkpWLVBzUkg6eHE4c3FuclhTWW1sRm9YN0FkWmRMdw==',
verify_certs=False, # just to not create certificates
ssl_show_warn=False
)
return Elastic_4_Shards_Testing().do_tests(es)
def plot(timings: Dict[str, Dict[str, float]]): def plot(timings: Dict[str, Dict[str, float]]):
# Transform the dict from {Driver: {Function, timing}} into {Function: [{Drivers, timing}]} functions = list(timings['PSQL'].keys())
usable_dict = defaultdict(lambda: defaultdict(float)) drivers = list(timings.keys())
for driver_name, function_timing in timings.items():
for function_name, timing in function_timing.items():
usable_dict[function_name][driver_name] += timing
usable_dict = {k: dict(v) for k, v in usable_dict.items()}
relative_dict = {} values = {func: [] for func in functions}
for function, systems in usable_dict.items():
relative_dict[function] = systems['ES'] / systems['PSQL']
functions = list(relative_dict.keys()) for func in functions:
psql_values = [1] * len(functions) # List of ones for the psql values values[func].append(timings['PSQL'][func]);
es_values = list(relative_dict.values()) for driver in [x for x in drivers if x != 'PSQL']:
for func in functions:
values[func].append(timings[driver][func] / values[func][0])
for func in functions:
values[func][0] = 1
plt.figure(figsize=(12, 8)) fig, ax = plt.subplots(figsize=(12, 8))
index = np.arange(len(functions)) index = np.arange(len(functions))
bar_width = 0.35 bar_width = 0.25
plt.bar(index, psql_values, bar_width, label='PSQL', color='lightblue') for i, driver in enumerate(drivers):
plt.bar(index + bar_width, es_values, bar_width, label='ES', color='orange') ax.bar(index + i * bar_width, [values[func][i] for func in functions], bar_width, label=driver)
plt.xlabel('Functions')
plt.ylabel('Relative Time')
plt.title('Performance of ES Relative to PSQL')
plt.xticks(index + bar_width / 2, functions, rotation=45, ha="right")
plt.legend()
for i, v in enumerate(psql_values): ax.set_xlabel('Functions')
plt.text(i, v + 0.02, str(v), ha='center', va='bottom') ax.set_ylabel('Relative Time')
for i, v in enumerate(es_values): ax.set_title('Performance of ES Relative to PSQL')
plt.text(i + bar_width, v + 0.02, str(round(v, 2)), ha='center', va='bottom') ax.set_xticks(index + bar_width * (len(drivers) -1 ) / 2, functions, rotation=45, ha="right")
ax.set_xticklabels(functions)
ax.legend()
plt.tight_layout() for i, x in enumerate(drivers):
plt.savefig("plot.png") for j, v in enumerate([values[func][i] for func in functions]):
plt.text(j + (i * bar_width), v + 0.02, str(round(v, 2)), fontsize=8, ha='center', va='bottom')
fig.tight_layout()
plt.savefig("plot-tma-opti.png")
plt.show() plt.show()
@ -89,7 +89,10 @@ if __name__ == "__main__":
timings = {} timings = {}
# timings['PSQL'] = test_psql() timings['PSQL'] = test_psql()
timings['ES'] = test_elasticsearch() timings['ES'] = test_elasticsearch()
timings['ES4shards'] = test_elasticsearch_4_shards()
# plot(timings)
plot(timings)
print(timings)

BIN
plot-tma-opti.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 71 KiB

BIN
plot-tma.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 68 KiB

View file

@ -1,6 +1,6 @@
psycopg2==2.9.9
sshtunnel==0.4.0 sshtunnel==0.4.0
typing_extensions==4.12.0 typing_extensions==4.12.0
typing==3.7.4.3 typing==3.7.4.3
elasticsearch==8.13.2 elasticsearch==8.13.2
matplotlib==3.9.0 matplotlib==3.9.0
psycopg2-binary

View file

@ -26,10 +26,11 @@ def time_func(name: str, function_name: str, x : Callable, repeat : Optional[int
def preprocess_json(): def preprocess_json():
with open('data.json', 'r') as f: for x in range(0, 1):
data : dict = json.load(f) with open('data.json', 'r') as f:
for d in data: data : dict = json.load(f)
x = Data_Thread() for d in data:
for k in d.keys(): x = Data_Thread()
x.__setattr__(k, d[k]) for k in d.keys():
Threads.append(x) x.__setattr__(k, d[k])
Threads.append(x)