[Initial] Scripts are done with data
This commit is contained in:
commit
4addf5448d
16 changed files with 1457 additions and 0 deletions
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
|
@ -0,0 +1 @@
|
|||
__pycache__
|
1000
data.json
Normal file
1000
data.json
Normal file
File diff suppressed because one or more lines are too long
0
drivers/__init__.py
Normal file
0
drivers/__init__.py
Normal file
115
drivers/elastic.py
Normal file
115
drivers/elastic.py
Normal file
|
@ -0,0 +1,115 @@
|
|||
from elasticsearch import Elasticsearch
|
||||
from elasticsearch.helpers import bulk
|
||||
from models.data_thread import Threads
|
||||
from interfaces.db_testing_interface import DB_Testing
|
||||
|
||||
INDEX = "threads"
|
||||
|
||||
class Elastic_Testing(DB_Testing):
|
||||
driver_name = "ES"
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.singles_data = []
|
||||
for t in Threads:
|
||||
x = t.__dict__.copy()
|
||||
self.singles_data.append(x)
|
||||
|
||||
self.bulk_data = []
|
||||
for t in Threads:
|
||||
y = t.__dict__.copy()
|
||||
y["_index"] = INDEX
|
||||
self.bulk_data.append(y)
|
||||
|
||||
def delete_table(self, driver: Elasticsearch):
|
||||
if driver.indices.exists(index=INDEX):
|
||||
driver.indices.delete(index=INDEX)
|
||||
|
||||
def create_table(self, driver: Elasticsearch):
|
||||
self.delete_table(driver)
|
||||
driver.indices.create(index=INDEX, body={
|
||||
'settings': {
|
||||
'index': {
|
||||
'number_of_replicas': 0
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
def delete_data(self, driver: Elasticsearch):
|
||||
if driver.indices.exists(index=INDEX):
|
||||
driver.indices.delete(index=INDEX)
|
||||
|
||||
def add_bulk(self, driver: Elasticsearch):
|
||||
bulk(driver, self.bulk_data)
|
||||
|
||||
def add_singles(self, driver: Elasticsearch):
|
||||
for t in self.singles_data:
|
||||
driver.index(index=INDEX, document=t)
|
||||
|
||||
def attach5_mr_mrs(self, driver: Elasticsearch):
|
||||
query = {
|
||||
"query": {
|
||||
"bool": {
|
||||
"must": [
|
||||
{
|
||||
"range": {
|
||||
"attachment_count": {
|
||||
"gt": 5
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"terms": {
|
||||
"subject.keyword": ["Mr", "Mrs"]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
return f"Got {driver.count(index=INDEX, body=query).body['count']}"
|
||||
|
||||
def search_mails_ends_microsoftcom(self, driver: Elasticsearch):
|
||||
query = {
|
||||
"query": {
|
||||
"wildcard": {
|
||||
"cc_recipients.keyword": "*@microsoft.com"
|
||||
}
|
||||
}
|
||||
}
|
||||
return f"Got {driver.count(index=INDEX, body=query).body['count']}"
|
||||
|
||||
def search_lorem(self, driver: Elasticsearch):
|
||||
query = {
|
||||
"query": {
|
||||
"match_phrase": {
|
||||
"body": "Nullam sit amet turpis elementum ligula vehicula consequat. Morbi a ipsum. Integer a nibh."
|
||||
}
|
||||
}
|
||||
}
|
||||
return f"Got {driver.count(index=INDEX, body=query).body['count']}"
|
||||
|
||||
def get_sum_attachment_less_5(self, driver: Elasticsearch):
|
||||
query = {
|
||||
"size": 0,
|
||||
"query": {
|
||||
"bool": {
|
||||
"must": [
|
||||
{
|
||||
"range": {
|
||||
"attachment_count": {
|
||||
"lt": 5
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"aggs": {
|
||||
"attachment_count": {
|
||||
"sum": {
|
||||
"field": "attachment_count"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return f"Got {driver.search(index=INDEX, body=query).body['aggregations']['attachment_count']['value']}"
|
112
drivers/psql.py
Normal file
112
drivers/psql.py
Normal file
|
@ -0,0 +1,112 @@
|
|||
from psycopg2._psycopg import connection, cursor
|
||||
from models.data_thread import Data_Thread, Threads
|
||||
from interfaces.db_testing_interface import DB_Testing
|
||||
|
||||
|
||||
class PSQL_Thread(Data_Thread):
|
||||
def __init__(self, thread: Data_Thread) -> None:
|
||||
super().__init__()
|
||||
for x in dir(thread):
|
||||
if not x.startswith("_"):
|
||||
self.__setattr__(x, thread.__getattribute__(x))
|
||||
|
||||
def insert_into_db(self, cursor: cursor):
|
||||
insert_query = '''
|
||||
INSERT INTO thread (
|
||||
sender_name, recipient_name, subject, body, sent_date, received_date, attachment_count, is_read, is_spam, importance_level, reply_count, forward_count, cc_recipients, bcc_recipients, folder
|
||||
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
|
||||
'''
|
||||
|
||||
data = (
|
||||
self.sender_name, self.recipient_name, self.subject, self.body,
|
||||
self.sent_date, self.received_date, self.attachment_count, self.is_read,
|
||||
self.is_spam, self.importance_level, self.reply_count, self.forward_count,
|
||||
self.cc_recipients, self.bcc_recipients, self.folder
|
||||
)
|
||||
|
||||
cursor.execute(insert_query, data)
|
||||
|
||||
class PSQL_Testing(DB_Testing):
|
||||
driver_name = "PSQL"
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.data = [PSQL_Thread(x) for x in Threads]
|
||||
|
||||
def create_table(self, driver: connection):
|
||||
cursor = driver.cursor()
|
||||
createTableQuery = '''
|
||||
CREATE TABLE thread (
|
||||
thread_id SERIAL PRIMARY KEY,
|
||||
sender_name VARCHAR(255) NOT NULL,
|
||||
recipient_name VARCHAR(255) NOT NULL,
|
||||
subject VARCHAR(255),
|
||||
body TEXT,
|
||||
sent_date TIMESTAMP,
|
||||
received_date TIMESTAMP,
|
||||
attachment_count INTEGER,
|
||||
is_read BOOLEAN,
|
||||
is_spam BOOLEAN,
|
||||
importance_level VARCHAR(50),
|
||||
reply_count INTEGER,
|
||||
forward_count INTEGER,
|
||||
cc_recipients TEXT,
|
||||
bcc_recipients TEXT,
|
||||
folder VARCHAR(100)
|
||||
);
|
||||
'''
|
||||
cursor.execute(createTableQuery)
|
||||
driver.commit()
|
||||
|
||||
def delete_table(self, driver: connection):
|
||||
cursor = driver.cursor()
|
||||
dropQuery = 'DROP TABLE IF EXISTS thread;'
|
||||
cursor.execute(dropQuery)
|
||||
driver.commit()
|
||||
|
||||
def add_singles(self, driver: connection):
|
||||
cursor = driver.cursor()
|
||||
for t in self.data:
|
||||
t.insert_into_db(cursor)
|
||||
driver.commit()
|
||||
|
||||
def add_bulk(self, driver: connection):
|
||||
cursor = driver.cursor()
|
||||
for t in self.data:
|
||||
t.insert_into_db(cursor)
|
||||
driver.commit()
|
||||
|
||||
def delete_data(self, driver: connection):
|
||||
cursor = driver.cursor()
|
||||
dropQuery = 'DELETE FROM thread;'
|
||||
cursor.execute(dropQuery)
|
||||
driver.commit()
|
||||
|
||||
def attach5_mr_mrs(self, driver: connection):
|
||||
cursor = driver.cursor()
|
||||
searchQuery = "SELECT COUNT(*) FROM thread WHERE attachment_count > 5 AND (subject = 'Mr' or subject = 'Mrs');"
|
||||
cursor.execute(searchQuery)
|
||||
driver.commit()
|
||||
return f"Got {cursor.fetchone()[0]}"
|
||||
|
||||
def search_mails_ends_microsoftcom(self, driver: connection):
|
||||
cursor = driver.cursor()
|
||||
searchQuery = "SELECT COUNT(*) FROM thread WHERE cc_recipients LIKE '%@microsoft.com';"
|
||||
cursor.execute(searchQuery)
|
||||
driver.commit()
|
||||
return f"Got {cursor.fetchone()[0]}"
|
||||
|
||||
def search_lorem(self, driver: connection):
|
||||
cursor = driver.cursor()
|
||||
searchQuery = "select COUNT(*) from thread where body like '%Nullam sit amet turpis elementum ligula vehicula consequat. Morbi a ipsum. Integer a nibh.%';"
|
||||
cursor.execute(searchQuery)
|
||||
driver.commit()
|
||||
return f"Got {cursor.fetchone()[0]}"
|
||||
|
||||
def get_sum_attachment_less_5(self, driver: connection):
|
||||
cursor = driver.cursor()
|
||||
searchQuery = "select SUM(attachment_count) from thread where attachment_count < 5;"
|
||||
cursor.execute(searchQuery)
|
||||
driver.commit()
|
||||
return f"Got {cursor.fetchone()[0]}"
|
||||
|
0
interfaces/__init__.py
Normal file
0
interfaces/__init__.py
Normal file
71
interfaces/db_testing_interface.py
Normal file
71
interfaces/db_testing_interface.py
Normal file
|
@ -0,0 +1,71 @@
|
|||
from typing import Generic, TypeVar
|
||||
from typing_extensions import Dict
|
||||
from abc import abstractmethod
|
||||
from utils.utils import time_func
|
||||
from time import sleep
|
||||
|
||||
T = TypeVar('T')
|
||||
NUM_RUNS = 10000
|
||||
|
||||
class DB_Testing(Generic[T]):
|
||||
driver_name: str
|
||||
|
||||
@abstractmethod
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def delete_table(self, driver: T):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def create_table(self, driver: T):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def delete_data(self, driver: T):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def add_bulk(self, driver: T):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def add_singles(self, driver: T):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def attach5_mr_mrs(self, driver: T) -> str:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def search_mails_ends_microsoftcom(self, driver: T) -> str:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def search_lorem(self, driver: T) -> str:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_sum_attachment_less_5(self, driver: T) -> str:
|
||||
pass
|
||||
|
||||
|
||||
def do_tests(self, driver: T) -> Dict[str, float]:
|
||||
stats = {}
|
||||
print(f"\n### Starting {self.driver_name} Benchmark ###")
|
||||
stats.update(time_func(f"[{self.driver_name}] Reset table", "delete_table", lambda : self.delete_table(driver)))
|
||||
stats.update(time_func(f"[{self.driver_name}] Create table", "create_table", lambda : self.create_table(driver)))
|
||||
stats.update(time_func(f"[{self.driver_name}] Add singles", "add_singles", lambda : self.add_singles(driver)))
|
||||
stats.update(time_func(f"[{self.driver_name}] Delete data", "delete_data", lambda : self.delete_data(driver)))
|
||||
stats.update(time_func(f"[{self.driver_name}] Add bulk", "add_bulk", lambda : self.add_bulk(driver)))
|
||||
|
||||
sleep(1) # To wait for any indexing to avoid race conditions somehow
|
||||
|
||||
# Multiple
|
||||
stats.update(time_func(f"[{self.driver_name}] Count mails with more than 5 attachments and Mr/Mrs", "attach5_mr_mrs", lambda : self.attach5_mr_mrs(driver), NUM_RUNS))
|
||||
stats.update(time_func(f"[{self.driver_name}] Sum the number of attachment from mail that have less than 5 attachments", "get_sum_attachment_less_5", lambda : self.get_sum_attachment_less_5(driver), NUM_RUNS))
|
||||
stats.update(time_func(f"[{self.driver_name}] Count mails that have a lorem sentence", "search_lorem", lambda : self.search_lorem(driver), NUM_RUNS))
|
||||
stats.update(time_func(f"[{self.driver_name}] Count mails that have a mail that ends with microsoft.com", "search_mails_ends_microsoftcom", lambda : self.search_mails_ends_microsoftcom(driver), NUM_RUNS))
|
||||
print("\n")
|
||||
return stats
|
95
main.py
Normal file
95
main.py
Normal file
|
@ -0,0 +1,95 @@
|
|||
from elasticsearch import Elasticsearch
|
||||
from psycopg2 import connect
|
||||
from drivers.psql import PSQL_Testing
|
||||
from drivers.elastic import Elastic_Testing
|
||||
from sshtunnel import SSHTunnelForwarder
|
||||
from utils.utils import preprocess_json
|
||||
from typing_extensions import Dict
|
||||
from collections import defaultdict
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
def test_psql():
|
||||
with SSHTunnelForwarder(
|
||||
ssh_address_or_host=("orodruin.mordor", 22),
|
||||
ssh_username="postgres",
|
||||
ssh_password="postgres",
|
||||
remote_bind_address=('127.0.0.1', 5432),
|
||||
local_bind_address = ('127.0.0.1', 5432)
|
||||
):
|
||||
psqlClient = connect(
|
||||
database="postgres",
|
||||
host="127.0.0.1",
|
||||
user="postgres",
|
||||
password="postgres",
|
||||
port="5432"
|
||||
)
|
||||
psqlClient.autocommit = False
|
||||
return PSQL_Testing().do_tests(psqlClient)
|
||||
|
||||
|
||||
|
||||
def test_elasticsearch():
|
||||
es = Elasticsearch(
|
||||
'https://orodruin.mordor:9200/',
|
||||
api_key='WjMwVXQ0OEJnUzRTOUVUaVNNVHY6MFh2X3RDcGRRWC1FRVNRZkdhWlYwUQ==',
|
||||
verify_certs=False, # just to not create certificates
|
||||
ssl_show_warn=False
|
||||
)
|
||||
return Elastic_Testing().do_tests(es)
|
||||
|
||||
|
||||
|
||||
def plot(timings: Dict[str, Dict[str, float]]):
|
||||
# Transform the dict from {Driver: {Function, timing}} into {Function: [{Drivers, timing}]}
|
||||
usable_dict = defaultdict(lambda: defaultdict(float))
|
||||
for driver_name, function_timing in timings.items():
|
||||
for function_name, timing in function_timing.items():
|
||||
usable_dict[function_name][driver_name] += timing
|
||||
usable_dict = {k: dict(v) for k, v in usable_dict.items()}
|
||||
|
||||
relative_dict = {}
|
||||
for function, systems in usable_dict.items():
|
||||
relative_dict[function] = systems['ES'] / systems['PSQL']
|
||||
|
||||
functions = list(relative_dict.keys())
|
||||
psql_values = [1] * len(functions) # List of ones for the psql values
|
||||
|
||||
es_values = list(relative_dict.values())
|
||||
|
||||
|
||||
plt.figure(figsize=(12, 8))
|
||||
index = np.arange(len(functions))
|
||||
bar_width = 0.35
|
||||
|
||||
plt.bar(index, psql_values, bar_width, label='PSQL', color='lightblue')
|
||||
plt.bar(index + bar_width, es_values, bar_width, label='ES', color='orange')
|
||||
|
||||
plt.xlabel('Functions')
|
||||
plt.ylabel('Relative Time')
|
||||
plt.title('Performance of ES Relative to PSQL')
|
||||
plt.xticks(index + bar_width / 2, functions, rotation=45, ha="right")
|
||||
plt.legend()
|
||||
|
||||
for i, v in enumerate(psql_values):
|
||||
plt.text(i, v + 0.02, str(v), ha='center', va='bottom')
|
||||
for i, v in enumerate(es_values):
|
||||
plt.text(i + bar_width, v + 0.02, str(round(v, 2)), ha='center', va='bottom')
|
||||
|
||||
plt.tight_layout()
|
||||
plt.savefig("plot.png")
|
||||
plt.show()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
print("### Preprocessing JSON ###")
|
||||
preprocess_json()
|
||||
|
||||
|
||||
timings = {}
|
||||
|
||||
timings['PSQL'] = test_psql()
|
||||
timings['ES'] = test_elasticsearch()
|
||||
|
||||
plot(timings)
|
0
models/__init__.py
Normal file
0
models/__init__.py
Normal file
22
models/data_thread.py
Normal file
22
models/data_thread.py
Normal file
|
@ -0,0 +1,22 @@
|
|||
from typing_extensions import List
|
||||
import json
|
||||
|
||||
class Data_Thread:
|
||||
sender_name: str
|
||||
recipient_name: str
|
||||
subject: str
|
||||
body: str
|
||||
sent_date: str
|
||||
received_date: str
|
||||
attachment_count: int
|
||||
is_read: bool
|
||||
is_spam: bool
|
||||
importance_level: str
|
||||
reply_count: int
|
||||
forward_count: int
|
||||
cc_recipients: str
|
||||
bcc_recipients: str
|
||||
folder: str
|
||||
|
||||
Threads : List[Data_Thread] = []
|
||||
|
BIN
plot_1.png
Normal file
BIN
plot_1.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 56 KiB |
BIN
plot_100.png
Normal file
BIN
plot_100.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 56 KiB |
BIN
plot_10000.png
Normal file
BIN
plot_10000.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 56 KiB |
6
requirements.txt
Normal file
6
requirements.txt
Normal file
|
@ -0,0 +1,6 @@
|
|||
psycopg2==2.9.9
|
||||
sshtunnel==0.4.0
|
||||
typing_extensions==4.12.0
|
||||
typing==3.7.4.3
|
||||
elasticsearch==8.13.2
|
||||
matplotlib==3.9.0
|
0
utils/__init__.py
Normal file
0
utils/__init__.py
Normal file
35
utils/utils.py
Normal file
35
utils/utils.py
Normal file
|
@ -0,0 +1,35 @@
|
|||
import json
|
||||
from typing import Callable, Optional
|
||||
from typing_extensions import Dict
|
||||
import time
|
||||
|
||||
from models.data_thread import Data_Thread, Threads
|
||||
|
||||
def time_func(name: str, function_name: str, x : Callable, repeat : Optional[int] = None) -> Dict[str, float]:
|
||||
if (not repeat):
|
||||
a = time.perf_counter()
|
||||
r = x()
|
||||
timed = time.perf_counter()-a
|
||||
print(f"{name} took {timed:.3f}s")
|
||||
if r: print(f"|> {r}")
|
||||
return {function_name: timed}
|
||||
|
||||
else:
|
||||
timings = []
|
||||
for _ in range(repeat):
|
||||
a = time.perf_counter()
|
||||
r = x()
|
||||
timings.append(time.perf_counter()-a)
|
||||
print(f"{name} took in average {sum(timings)/len(timings):.3f}s over {repeat} runs")
|
||||
if r: print(f"|> {r}")
|
||||
return {function_name: sum(timings)/len(timings)}
|
||||
|
||||
|
||||
def preprocess_json():
|
||||
with open('data.json', 'r') as f:
|
||||
data : dict = json.load(f)
|
||||
for d in data:
|
||||
x = Data_Thread()
|
||||
for k in d.keys():
|
||||
x.__setattr__(k, d[k])
|
||||
Threads.append(x)
|
Loading…
Reference in a new issue