The OncoKB Development team has conducted API performance tests to identify optimization opportunities and collect key metrics for evaluating overall API capabilities. You’ll find key performance indicators such as response times, throughput, and resource utilization across different endpoints.
Annotate Mutation by HGVS
10/24/2024
This test aims to determine the performance of the annotate/mutations/byHGVSg endpoint given that the variants have been annotated and cached by Genome Nexus.
Datasets
We have chosen the following studies for benchmarking:
These tests will be conducted by replicating the production setup. All configurations can be found here.
Test Setup
We will be using Locust.io to write our performance tests.
As a prerequisite, all variants from WES and WGS datasets have been annotated (and cached in Genome Nexus) prior to benchmarking OncoKB HGVSg endpoint.
Running Tests
single_thread_test.py
from locust import HttpUser, task, events
import time
# Data to annotate for test
variants_list = []
with open("wgs.txt", "r") as data_file:
variants_list = [{"hgvsg": row.strip()} for row in data_file
variant_batches = list(chunk_variants(variants_list))
# Global variables to track total time and batches
start_time = None
total_batches = len(variant_batches)
completed_batches = 0
# Batch the WES/WGS dataset into chunks of 100 variants
def chunk_variants(variants, chunk_size=100):
for i in range(0, len(variants), chunk_size):
yield variants[i:i + chunk_size]
# Locust test on start listener
@events.test_start.add_listener
def on_test_start(environment, **kwargs):
global start_time
start_time = time.time()
# When test stops, then print out how long it took
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
end_time = time.time()
total_time = end_time - start_time
print(f"Total time to process {total_batches * 100} variants: {total_time:.2f} seconds")
class OncoKBUser(HttpUser):
@task
def sendRequest(self):
global completed_batches
if completed_batches < total_batches:
batch = variant_batches[completed_batches]
# Send POST request with Authorization header
response = self.client.post(
"/api/v1/annotate/mutations/byHGVSg",
headers={"Content-Type": "application/json","Authorization": "Bearer <token>"},
data=json.dumps(batch), timeout=600000
)
# Track batch completion
completed_batches += 1
# Check response time for each request
print(f"Request {completed_batches}/{total_batches} completed in {response.elapsed.total_seconds()} seconds")
else:
self.environment.runner.quit()
multi_thread_test.py
from locust import HttpUser, task, events
import time
# Data to annotate for test
variants_list = []
with open("wgs.txt", "r") as data_file:
variants_list = [{"hgvsg": row.strip()} for row in data_file
variant_batches = list(chunk_variants(variants_list))
# Global variables to track total time and batches
start_time = None
total_batches = len(variant_batches)
completed_batches = 0
lock = Lock()
active_users = 0
# Batch the WES/WGS dataset into chunks of 100 variants
def chunk_variants(variants, chunk_size=100):
for i in range(0, len(variants), chunk_size):
yield variants[i:i + chunk_size]
# Locust test on start listener
@events.test_start.add_listener
def on_test_start(environment, **kwargs):
global start_time
global active_users
# Initialize active user count with the number of users in the test
active_users = environment.runner.user_count
start_time = time.time()
# When test stops, then print out how long it took
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
end_time = time.time()
total_time = end_time - start_time
print(f"Total time to process {total_batches * 100} variants: {total_time:.2f} seconds")
class OncoKBUser(HttpUser):
def get_next_batch(self):
global completed_batches
with lock:
print(completed_batches)
if completed_batches < total_batches:
batch = variant_batches[completed_batches]
completed_batches += 1
print(batch[0])
return batch
else:
return None # No more batches available
@task
def send5Threads(self):
batch = self.get_next_batch()
if batch is None:
global active_users
with lock:
active_users -= 1 # Mark the user as finished
# If all users are done processing, stop the test
if active_users < 0:
print("All users finished processing. Stopping test.")
self.environment.runner.quit() # Stop the entire test
signal.raise_signal(signal.SIGTERM)
return
response = self.client.post(
"/api/v1/annotate/mutations/byHGVSg",
headers={"Content-Type": "application/json","Authorization": "Bearer <token>"},
data=json.dumps(batch)
)
print(f"Request {completed_batches}/{total_batches} completed in {response.elapsed.total_seconds()} seconds")