from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk from models.data_thread import Threads from interfaces.db_testing_interface import DB_Testing import json INDEX = "threads" class Elastic_Testing(DB_Testing): driver_name = "ES" def __init__(self, shards = 1) -> None: super().__init__() self.shards = shards self.driver_name += f" ({shards} Shards)" self.singles_data = [] for t in Threads: x = t.__dict__.copy() self.singles_data.append(x) self.bulk_data = [] for t in Threads: y = "{\"index\":{}}\n" y += json.dumps(t.__dict__.copy()) self.bulk_data.append(y) def delete_table(self, driver: Elasticsearch): if driver.indices.exists(index=INDEX): driver.indices.delete(index=INDEX) def create_table(self, driver: Elasticsearch): self.delete_table(driver) driver.indices.create(index=INDEX, body={ 'settings': { 'index': { 'number_of_replicas': 0, 'number_of_shards': self.shards } } }) def delete_data(self, driver: Elasticsearch): if driver.indices.exists(index=INDEX): driver.indices.delete(index=INDEX) def add_bulk(self, driver: Elasticsearch): self.create_table(driver) for i in range(0, len(self.bulk_data), 1000): driver.bulk(index=INDEX, operations = '\n'.join(self.bulk_data[i:i+1000]), refresh=True) def add_singles(self, driver: Elasticsearch): for t in self.singles_data: driver.index(index=INDEX, document=t) def attach5_mr_mrs(self, driver: Elasticsearch): query = { "query": { "bool": { "must": [ { "range": { "attachment_count": { "gt": 5 } } }, { "terms": { "subject.keyword": ["Mr", "Mrs"] } } ] } } } return f"Got {driver.count(index=INDEX, body=query).body['count']}" def search_mails_ends_microsoftcom(self, driver: Elasticsearch): query = { "query": { "wildcard": { "cc_recipients.keyword": "*@microsoft.com" } } } return f"Got {driver.count(index=INDEX, body=query).body['count']}" def search_lorem(self, driver: Elasticsearch): query = { "query": { "match_phrase": { "body": "Nullam sit amet turpis elementum ligula vehicula consequat. Morbi a ipsum. Integer a nibh." } } } return f"Got {driver.count(index=INDEX, body=query).body['count']}" def get_sum_attachment_less_5(self, driver: Elasticsearch): query = { "size": 0, "query": { "bool": { "must": [ { "range": { "attachment_count": { "lt": 5 } } } ] } }, "aggs": { "attachment_count": { "sum": { "field": "attachment_count" } } } } return f"Got {driver.search(index=INDEX, body=query).body['aggregations']['attachment_count']['value']}" def update_add_replies_per_last_name(self, driver: Elasticsearch) -> str: agg_query = { "size": 0, "aggs": { "reply_count_by_last_name": { "terms": { "script": { "source": "String name = doc['sender_name.keyword'].value; String[] parts = /\\s+/.split(name); return parts[parts.length - 1]", "lang": "painless" }, "size": 1000 }, "aggs": { "last_name_replies": { "sum": { "field": "reply_count" } } } } } } agg_results = driver.search(index=INDEX, body=agg_query) buckets = agg_results['aggregations']['reply_count_by_last_name']['buckets'] reply_counts = {bucket['key']: bucket['last_name_replies']['value'] for bucket in buckets} update_query = { "script": { "source": """ String name = ctx._source.sender_name; String[] parts = /\\s+/.split(name); String lastName = parts[parts.length - 1]; if (params.replyCounts.containsKey(lastName)) { ctx._source.last_name_replies = params.replyCounts.get(lastName); } """, "lang": "painless", "params": { "replyCounts": reply_counts } }, "query": { "match_all": {} } } driver.update_by_query(index=INDEX, body=update_query, conflicts="proceed", slices=len(Threads)//250, request_timeout=120) def update_add_replies_per_subject(self, driver: Elasticsearch) -> str: agg_query = { "size": 0, "aggs": { "reply_count_by_subject": { "terms": { "script": { "source": "String name = doc['sender_name.keyword'].value; String[] parts = /\\s+/.split(name); return parts[parts.length - 1]", "lang": "painless" }, "size": 1000 }, "aggs": { "subject_replies": { "sum": { "field": "reply_count" } } } } } } agg_results = driver.search(index=INDEX, body=agg_query) buckets = agg_results['aggregations']['reply_count_by_subject']['buckets'] reply_counts = {bucket['key']: bucket['subject_replies']['value'] for bucket in buckets} update_query = { "script": { "source": """ ctx._source.subject_replies = params.replyCounts.get(ctx.subject); """, "lang": "painless", "params": { "replyCounts": reply_counts } }, "query": { "match_all": {} } } driver.update_by_query(index=INDEX, body=update_query, conflicts="proceed", slices=len(Threads)//250, request_timeout=120)