Performance Testing Distributed Applications with Locust

Zubair Haque
3 min readMar 12, 2021
ids = []
for _ in range(10000):
body = body(
str(uuid.uuid4()),
str(uuid.uuid4()))
r = requests.post(
"http://localhost:8001/endpoint/",
json=body,
headers=headers,
)
r.raise_for_status()
id = r.json()["array"][0]["id"]
ids.append(id)
with open("foobar.json", "w") as outfile:
json.dump(ids, outfile)

^ how to run the same task many times in python3

@events.request_success.add_listener
def additional_success_handler(request_type, name, response_time, response_length, **kwargs):
success = '{"request_type":"%s", "endpoint":"%s", "result":"%s", ' \
'"response_time":%s, "response_length":%s, "other":%s}'

json_string = success % (request_type, name, "OK", response_time, response_length, json.dumps(kwargs))
message = {"type": "success", "payload": json.loads(json_string)}
logger.info(message)


@events.request_failure.add_listener
def additional_failure_handler(request_type, name, response_time, exception, **kwargs):
error = '{"request_type":"%s", "name":"%s", "result":"%s", ' \
'"response_time":%s, "exception":"%s", "other":%s}'
json_string = error % (request_type, name, "ERR", response_time, exception, json.dumps(kwargs))
message = {"type": "error", "payload": json.loads(json_string)}
logger.info(message)

^ success and failures in locust

from concurrent.futures import ThreadPoolExecutor, as_completed


def create_data():
body = "json"
r = requests.post(
'http://endpoint',
json=body,
headers=headers,
)
r.raise_for_status()
answer_id = r.json()['array'][0]['key']
return answer_id


def test_create_data():

with ThreadPoolExecutor(max_workers=15) as executor:
create_answers = [executor.submit(create_data) for _ in range(30)]

list_of_data = [
future.result()
for future in as_completed(create_answers)
]

with open('foobar.json', 'w') as outfile:
json.dump(list_of_data, outfile)
##THREADPOOL EXECUTOR
You can get results from the ThreadPoolExecutor in the order that tasks are completed by calling the as_completed() module function.
The function takes a collection of Future objects and will return the same Future objects in the order that their associated tasks are completed.

check if tests are there in bash:

if [ -d test ]; then              
test_dir=test
elif [ -d tests ]; then
test_dir=tests
else
echo '{"checkName": "pytest", "status": "failure", "message": "Your project needs to have a 'test' directory."}' > /out/check-status.json
exit 0
fi

new perf

import os
import json
import hvac
import random
import logging
from log import logger
from locust import task, events, HttpUser
from typing import Optional, Any
from functools import cached_property
from requests_bayerauth import MsalConfidentialClientAuth

headers = {
"Content-Type": "application/json"
}


@events.quitting.add_listener
def _(environment):
if environment.stats.total.get_response_time_percentile(0.95) > 800:
logging.error("Test failed: 95th percentile response time is < 800 ms")
environment.process_exit_code = 1
else:
environment.process_exit_code = 0


@events.request_success.add_listener
def success_handler(request_type, name, response_time, response_length, **kwargs):
""" success handler to log statistics """
OK_TEMPLATE = '{"request_type":"%s", "name":"%s", "result":"%s", ' \
'"response_time":%s, "response_length":%s, "other":%s}'

json_string = OK_TEMPLATE % (request_type, name, "OK", response_time, response_length, json.dumps(kwargs))
message = {"type": "success", "payload": json.loads(json_string)}
logger.info(message)


@events.request_failure.add_listener
def failure_handler(request_type, name, response_time, exception, **kwargs):
""" failure handler to log statistics """
ERR_TEMPLATE = '{"request_type":"%s", "name":"%s", "result":"%s", ' \
'"response_time":%s, "exception":"%s", "other":%s}'
json_string = ERR_TEMPLATE % (request_type, name, "ERR", response_time, exception, json.dumps(kwargs))
message = {"type": "error", "payload": json.loads(json_string)}
logger.info(message)


with open("data/two_or_more_acres.txt") as f:
content = f.read()
two_plus_acres_polygons = [
line for line in content.splitlines()
]


class GetElevation(HttpUser):
@task
def get_elevation_two_plus_acres(self):
elevation_id = random.choice(two_plus_acres_polygons)
self.client.get(url=f'/api/v1/?filter={elevation_id}',
name='of_test', headers=headers)

cypress

npx cypress run --headless --env TOKEN=$(npm run --silent token-gen)

new dockerfile for locust

FROM locustio/locust:1.4.3

WORKDIR /mnt/locust
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
ADD data/ data/
COPY *.py ./

List EKS Clusters

eksctl get clusters

Capture Node Group name

eksctl get nodegroup — cluster=<clusterName>
eksctl get nodegroup — cluster=eksdemo

Delete Node Group

eksctl delete nodegroup — cluster=<clusterName> — name=<nodegroupName>
eksctl delete nodegroup — cluster=eksdemo1 — name=eksdemo1-ng-public1

Delete Cluster

eksctl delete cluster <clusterName>
eksctl delete cluster eksdemo1

--

--

Zubair Haque

The Engineering Chronicles: I specialize in Automated Deployments