The OncoKB Development team has conducted API performance tests to identify optimization opportunities and collect key metrics for evaluating overall API capabilities. You’ll find key performance indicators such as response times, throughput, and resource utilization across different endpoints.
Annotate Mutation by HGVS
10/24/2024
This test aims to determine the performance of the annotate/mutations/byHGVSg endpoint given that the variants have been annotated and cached by Genome Nexus.
Datasets
We have chosen the following studies for benchmarking:
These tests will be conducted by replicating the production setup. All configurations can be found here.
Test Setup
We will be using Locust.io to write our performance tests.
As a prerequisite, all variants from WES and WGS datasets have been annotated (and cached in Genome Nexus) prior to benchmarking OncoKB HGVSg endpoint.
Running Tests
single_thread_test.py
from locust import HttpUser, task, eventsimport time# Data to annotate for testvariants_list = []withopen("wgs.txt", "r")as data_file: variants_list = [{"hgvsg": row.strip()}for row in data_filevariant_batches = list(chunk_variants(variants_list))# Global variables to track total time and batchesstart_time = Nonetotal_batches = len(variant_batches)completed_batches = 0# Batch the WES/WGS dataset into chunks of 100 variantsdefchunk_variants(variants, chunk_size=100):for i inrange(0, len(variants), chunk_size):yield variants[i:i + chunk_size]# Locust test on start listener@events.test_start.add_listenerdefon_test_start(environment, **kwargs):global start_time start_time = time.time()# When test stops, then print out how long it took@events.test_stop.add_listenerdefon_test_stop(environment, **kwargs): end_time = time.time() total_time = end_time - start_timeprint(f"Total time to process {total_batches *100} variants: {total_time:.2f} seconds")classOncoKBUser(HttpUser):@taskdefsendRequest(self):global completed_batchesif completed_batches < total_batches: batch = variant_batches[completed_batches]# Send POST request with Authorization header response = self.client.post("/api/v1/annotate/mutations/byHGVSg", headers={"Content-Type": "application/json","Authorization": "Bearer <token>"}, data=json.dumps(batch), timeout=600000 )# Track batch completion completed_batches += 1# Check response time for each requestprint(f"Request {completed_batches}/{total_batches} completed in {response.elapsed.total_seconds()} seconds")else: self.environment.runner.quit()
multi_thread_test.py
from locust import HttpUser, task, eventsimport time# Data to annotate for testvariants_list = []withopen("wgs.txt", "r")as data_file: variants_list = [{"hgvsg": row.strip()}for row in data_filevariant_batches = list(chunk_variants(variants_list))# Global variables to track total time and batchesstart_time = Nonetotal_batches = len(variant_batches)completed_batches = 0lock = Lock()active_users = 0# Batch the WES/WGS dataset into chunks of 100 variantsdefchunk_variants(variants, chunk_size=100):for i inrange(0, len(variants), chunk_size):yield variants[i:i + chunk_size]# Locust test on start listener@events.test_start.add_listenerdefon_test_start(environment, **kwargs):global start_timeglobal active_users# Initialize active user count with the number of users in the test active_users = environment.runner.user_count start_time = time.time()# When test stops, then print out how long it took@events.test_stop.add_listenerdefon_test_stop(environment, **kwargs): end_time = time.time() total_time = end_time - start_timeprint(f"Total time to process {total_batches *100} variants: {total_time:.2f} seconds")classOncoKBUser(HttpUser):defget_next_batch(self):global completed_batcheswith lock:print(completed_batches)if completed_batches < total_batches: batch = variant_batches[completed_batches] completed_batches += 1print(batch[0])return batchelse:returnNone# No more batches available@taskdefsend5Threads(self): batch = self.get_next_batch()if batch isNone:global active_userswith lock: active_users -= 1# Mark the user as finished# If all users are done processing, stop the testif active_users <0:print("All users finished processing. Stopping test.") self.environment.runner.quit()# Stop the entire test signal.raise_signal(signal.SIGTERM)return response = self.client.post("/api/v1/annotate/mutations/byHGVSg", headers={"Content-Type": "application/json","Authorization": "Bearer <token>"}, data=json.dumps(batch) )print(f"Request {completed_batches}/{total_batches} completed in {response.elapsed.total_seconds()} seconds")