[Added another one]
This commit is contained in:
parent
3ef595e195
commit
682e4885f2
5 changed files with 83 additions and 6 deletions
|
@ -127,7 +127,7 @@ class Elastic_Testing(DB_Testing):
|
|||
"size": 1000
|
||||
},
|
||||
"aggs": {
|
||||
"total_reply_count": {
|
||||
"last_name_replies": {
|
||||
"sum": {
|
||||
"field": "reply_count"
|
||||
}
|
||||
|
@ -140,7 +140,7 @@ class Elastic_Testing(DB_Testing):
|
|||
agg_results = driver.search(index=INDEX, body=agg_query)
|
||||
|
||||
buckets = agg_results['aggregations']['reply_count_by_last_name']['buckets']
|
||||
reply_counts = {bucket['key']: bucket['total_reply_count']['value'] for bucket in buckets}
|
||||
reply_counts = {bucket['key']: bucket['last_name_replies']['value'] for bucket in buckets}
|
||||
|
||||
update_query = {
|
||||
"script": {
|
||||
|
@ -149,7 +149,7 @@ class Elastic_Testing(DB_Testing):
|
|||
String[] parts = /\\s+/.split(name);
|
||||
String lastName = parts[parts.length - 1];
|
||||
if (params.replyCounts.containsKey(lastName)) {
|
||||
ctx._source.total_reply_count = params.replyCounts.get(lastName);
|
||||
ctx._source.last_name_replies = params.replyCounts.get(lastName);
|
||||
}
|
||||
""",
|
||||
"lang": "painless",
|
||||
|
@ -163,3 +163,49 @@ class Elastic_Testing(DB_Testing):
|
|||
}
|
||||
|
||||
driver.update_by_query(index=INDEX, body=update_query, conflicts="proceed")
|
||||
|
||||
|
||||
def update_add_replies_per_subject(self, driver: Elasticsearch) -> str:
|
||||
agg_query = {
|
||||
"size": 0,
|
||||
"aggs": {
|
||||
"reply_count_by_subject": {
|
||||
"terms": {
|
||||
"script": {
|
||||
"source": "String name = doc['sender_name.keyword'].value; String[] parts = /\\s+/.split(name); return parts[parts.length - 1]",
|
||||
"lang": "painless"
|
||||
},
|
||||
"size": 1000
|
||||
},
|
||||
"aggs": {
|
||||
"subject_replies": {
|
||||
"sum": {
|
||||
"field": "reply_count"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
agg_results = driver.search(index=INDEX, body=agg_query)
|
||||
|
||||
buckets = agg_results['aggregations']['reply_count_by_subject']['buckets']
|
||||
reply_counts = {bucket['key']: bucket['subject_replies']['value'] for bucket in buckets}
|
||||
|
||||
update_query = {
|
||||
"script": {
|
||||
"source": """
|
||||
ctx._source.subject_replies = params.replyCounts.get(ctx.subject);
|
||||
""",
|
||||
"lang": "painless",
|
||||
"params": {
|
||||
"replyCounts": reply_counts
|
||||
}
|
||||
},
|
||||
"query": {
|
||||
"match_all": {}
|
||||
}
|
||||
}
|
||||
|
||||
driver.update_by_query(index=INDEX, body=update_query, conflicts="proceed")
|
|
@ -137,3 +137,29 @@ class PSQL_Testing(DB_Testing):
|
|||
cursor.execute(updateQuery)
|
||||
cursor.execute(unalterQuery)
|
||||
driver.commit()
|
||||
|
||||
def update_add_replies_per_subject(self, driver: connection) -> str:
|
||||
cursor = driver.cursor()
|
||||
alterQuery = """
|
||||
ALTER TABLE thread ADD COLUMN subject_replies INT;
|
||||
"""
|
||||
updateQuery = """
|
||||
WITH subject_sums AS (
|
||||
SELECT
|
||||
recipient_name,
|
||||
SUM(reply_count) OVER (PARTITION BY SPLIT_PART(recipient_name, ' ', -1)) AS total_replies
|
||||
FROM
|
||||
thread
|
||||
)
|
||||
UPDATE thread AS t
|
||||
SET subject_replies = ln_sums.total_replies
|
||||
FROM subject_sums AS ln_sums
|
||||
WHERE t.recipient_name = ln_sums.recipient_name;
|
||||
"""
|
||||
unalterQuery = """
|
||||
ALTER TABLE thread DROP COLUMN subject_replies;
|
||||
"""
|
||||
cursor.execute(alterQuery)
|
||||
cursor.execute(updateQuery)
|
||||
cursor.execute(unalterQuery)
|
||||
driver.commit()
|
|
@ -51,9 +51,12 @@ class DB_Testing(Generic[T]):
|
|||
pass
|
||||
|
||||
@abstractmethod
|
||||
def update_add_replies_per_last_name(self, driver: T) -> str:
|
||||
def update_add_replies_per_last_name(self, driver: T):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def update_add_replies_per_subject(self, driver: T):
|
||||
pass
|
||||
|
||||
def do_tests(self, driver: T) -> Dict[str, float]:
|
||||
stats = {}
|
||||
|
@ -76,6 +79,8 @@ class DB_Testing(Generic[T]):
|
|||
|
||||
# Update
|
||||
stats.update(time_func(f"[{self.driver_name}] Update all thread to add a field with the total replies per last name", "update_add_replies_per_last_name", lambda : self.update_add_replies_per_last_name(driver), NUM_RUNS))
|
||||
stats.update(time_func(f"[{self.driver_name}] Update all thread to add a field with the total replies per subject", "update_add_replies_per_subject", lambda : self.update_add_replies_per_subject(driver), NUM_RUNS))
|
||||
|
||||
|
||||
print("\n")
|
||||
return stats
|
4
main.py
4
main.py
|
@ -89,7 +89,7 @@ if __name__ == "__main__":
|
|||
|
||||
timings = {}
|
||||
|
||||
timings['PSQL'] = test_psql()
|
||||
# timings['PSQL'] = test_psql()
|
||||
timings['ES'] = test_elasticsearch()
|
||||
|
||||
plot(timings)
|
||||
# plot(timings)
|
BIN
plot.png
BIN
plot.png
Binary file not shown.
Before Width: | Height: | Size: 65 KiB After Width: | Height: | Size: 64 KiB |
Loading…
Reference in a new issue