[Dynamic Shards]

This commit is contained in:
Evann Regnault 2024-05-31 17:24:17 +02:00
parent 55a75e9485
commit 7f0d8a7dda
11 changed files with 11 additions and 234 deletions

View file

@ -8,8 +8,10 @@ INDEX = "threads"
class Elastic_Testing(DB_Testing):
driver_name = "ES"
def __init__(self) -> None:
def __init__(self, shards = 1) -> None:
super().__init__()
self.shards = shards
self.driver_name += f" ({shards} Shards)"
self.singles_data = []
for t in Threads:
x = t.__dict__.copy()
@ -30,7 +32,8 @@ class Elastic_Testing(DB_Testing):
driver.indices.create(index=INDEX, body={
'settings': {
'index': {
'number_of_replicas': 0
'number_of_replicas': 0,
'number_of_shards': self.shards
}
}
})

View file

@ -1,215 +0,0 @@
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from models.data_thread import Threads
from interfaces.db_testing_interface import DB_Testing
import json
INDEX = "threads"
class Elastic_2_Shards_Testing(DB_Testing):
driver_name = "ES (4-Shards)"
def __init__(self) -> None:
super().__init__()
self.singles_data = []
for t in Threads:
x = t.__dict__.copy()
self.singles_data.append(x)
self.bulk_data = []
for t in Threads:
y = "{\"index\":{}}\n"
y += json.dumps(t.__dict__.copy())
self.bulk_data.append(y)
def delete_table(self, driver: Elasticsearch):
if driver.indices.exists(index=INDEX):
driver.indices.delete(index=INDEX)
def create_table(self, driver: Elasticsearch):
self.delete_table(driver)
driver.indices.create(index=INDEX, body={
'settings': {
'index': {
'number_of_replicas': 0,
'number_of_shards': 2
}
}
})
def delete_data(self, driver: Elasticsearch):
if driver.indices.exists(index=INDEX):
driver.indices.delete(index=INDEX)
def add_bulk(self, driver: Elasticsearch):
self.create_table(driver)
for i in range(0, len(self.bulk_data), 1000):
driver.bulk(index=INDEX, operations = '\n'.join(self.bulk_data[i:i+1000]), refresh=True)
def add_singles(self, driver: Elasticsearch):
for t in self.singles_data:
driver.index(index=INDEX, document=t)
def attach5_mr_mrs(self, driver: Elasticsearch):
query = {
"query": {
"bool": {
"must": [
{
"range": {
"attachment_count": {
"gt": 5
}
}
},
{
"terms": {
"subject.keyword": ["Mr", "Mrs"]
}
}
]
}
}
}
return f"Got {driver.count(index=INDEX, body=query).body['count']}"
def search_mails_ends_microsoftcom(self, driver: Elasticsearch):
query = {
"query": {
"wildcard": {
"cc_recipients.keyword": "*@microsoft.com"
}
}
}
return f"Got {driver.count(index=INDEX, body=query).body['count']}"
def search_lorem(self, driver: Elasticsearch):
query = {
"query": {
"match_phrase": {
"body": "Nullam sit amet turpis elementum ligula vehicula consequat. Morbi a ipsum. Integer a nibh."
}
}
}
return f"Got {driver.count(index=INDEX, body=query).body['count']}"
def get_sum_attachment_less_5(self, driver: Elasticsearch):
query = {
"size": 0,
"query": {
"bool": {
"must": [
{
"range": {
"attachment_count": {
"lt": 5
}
}
}
]
}
},
"aggs": {
"attachment_count": {
"sum": {
"field": "attachment_count"
}
}
}
}
return f"Got {driver.search(index=INDEX, body=query).body['aggregations']['attachment_count']['value']}"
def update_add_replies_per_last_name(self, driver: Elasticsearch) -> str:
agg_query = {
"size": 0,
"aggs": {
"reply_count_by_last_name": {
"terms": {
"script": {
"source": "String name = doc['sender_name.keyword'].value; String[] parts = /\\s+/.split(name); return parts[parts.length - 1]",
"lang": "painless"
},
"size": 1000
},
"aggs": {
"last_name_replies": {
"sum": {
"field": "reply_count"
}
}
}
}
}
}
agg_results = driver.search(index=INDEX, body=agg_query)
buckets = agg_results['aggregations']['reply_count_by_last_name']['buckets']
reply_counts = {bucket['key']: bucket['last_name_replies']['value'] for bucket in buckets}
update_query = {
"script": {
"source": """
String name = ctx._source.sender_name;
String[] parts = /\\s+/.split(name);
String lastName = parts[parts.length - 1];
if (params.replyCounts.containsKey(lastName)) {
ctx._source.last_name_replies = params.replyCounts.get(lastName);
}
""",
"lang": "painless",
"params": {
"replyCounts": reply_counts
}
},
"query": {
"match_all": {}
}
}
driver.update_by_query(index=INDEX, body=update_query, conflicts="proceed", slices=len(Threads)//250, request_timeout=120)
def update_add_replies_per_subject(self, driver: Elasticsearch) -> str:
agg_query = {
"size": 0,
"aggs": {
"reply_count_by_subject": {
"terms": {
"script": {
"source": "String name = doc['sender_name.keyword'].value; String[] parts = /\\s+/.split(name); return parts[parts.length - 1]",
"lang": "painless"
},
"size": 1000
},
"aggs": {
"subject_replies": {
"sum": {
"field": "reply_count"
}
}
}
}
}
}
agg_results = driver.search(index=INDEX, body=agg_query)
buckets = agg_results['aggregations']['reply_count_by_subject']['buckets']
reply_counts = {bucket['key']: bucket['subject_replies']['value'] for bucket in buckets}
update_query = {
"script": {
"source": """
ctx._source.subject_replies = params.replyCounts.get(ctx.subject);
""",
"lang": "painless",
"params": {
"replyCounts": reply_counts
}
},
"query": {
"match_all": {}
}
}
driver.update_by_query(index=INDEX, body=update_query, conflicts="proceed", slices=len(Threads)//250, request_timeout=120)

23
main.py
View file

@ -2,7 +2,6 @@ from elasticsearch import Elasticsearch
from psycopg2 import connect
from drivers.psql import PSQL_Testing
from drivers.elastic import Elastic_Testing
from drivers.elastic4 import Elastic_2_Shards_Testing
from sshtunnel import SSHTunnelForwarder
from utils.utils import preprocess_json
from typing_extensions import Dict
@ -21,24 +20,14 @@ def test_psql():
psqlClient.autocommit = False
return PSQL_Testing().do_tests(psqlClient)
def test_elasticsearch():
def test_elasticsearch(shards = 1):
es = Elasticsearch(
'https://127.0.0.1:9200',
api_key='RVAzeHpvOEJXTnUzZ2RiTkdWX2Q6TksySHBfaEFSWktoQmNPOFFSbm1DUQ==',
verify_certs=False, # just to not create certificates
ssl_show_warn=False
)
return Elastic_Testing().do_tests(es)
def test_elasticsearch_2_shards():
es = Elasticsearch(
'https://127.0.0.1:9200',
api_key='RVAzeHpvOEJXTnUzZ2RiTkdWX2Q6TksySHBfaEFSWktoQmNPOFFSbm1DUQ==',
verify_certs=False, # just to not create certificates
ssl_show_warn=False
)
return Elastic_2_Shards_Testing().do_tests(es)
return Elastic_Testing(shards).do_tests(es)
def plot(timings: Dict[str, Dict[str, float]]):
@ -59,7 +48,7 @@ def plot(timings: Dict[str, Dict[str, float]]):
fig, ax = plt.subplots(figsize=(12, 8))
index = np.arange(len(functions))
bar_width = 0.25
bar_width = 0.2
for i, driver in enumerate(drivers):
ax.bar(index + i * bar_width, [values[func][i] for func in functions], bar_width, label=driver)
@ -74,7 +63,7 @@ def plot(timings: Dict[str, Dict[str, float]]):
for i, x in enumerate(drivers):
for j, v in enumerate([values[func][i] for func in functions]):
plt.text(j + (i * bar_width), v + 0.02, str(round(v, 2)), fontsize=8, ha='center', va='bottom')
plt.text(j + (i * bar_width), v + 0.02, str(round(v, 2)), fontsize=6, ha='center', va='bottom')
fig.tight_layout()
plt.savefig("plot-orodruin-opti.png")
@ -91,8 +80,8 @@ if __name__ == "__main__":
timings['PSQL'] = test_psql()
timings['ES'] = test_elasticsearch()
timings['ES2shards'] = test_elasticsearch_2_shards()
timings['ES2shards'] = test_elasticsearch(2)
timings['ES4shards'] = test_elasticsearch(4)
plot(timings)
print(timings)

BIN
plot-tma-opti-psql-1-2.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 69 KiB

BIN
plot-tma-opti-psql-1-4.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 71 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 71 KiB

After

Width:  |  Height:  |  Size: 72 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 68 KiB

BIN
plot.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 64 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 56 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 56 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 56 KiB