Performance Testing Distributed Applications with Locust

ids = []
for _ in range(10000):
body = body(
str(uuid.uuid4()),
str(uuid.uuid4()))
r = requests.post(
"http://localhost:8001/endpoint/",
json=body,
headers=headers,
)
r.raise_for_status()
id = r.json()["array"][0]["id"]
ids.append(id)
with open("foobar.json", "w") as outfile:
json.dump(ids, outfile)
^ how to run the same task many times in python3
@events.request_success.add_listener
def additional_success_handler(request_type, name, response_time, response_length, **kwargs):
success = '{"request_type":"%s", "endpoint":"%s", "result":"%s", ' \
'"response_time":%s, "response_length":%s, "other":%s}'
json_string = success % (request_type, name, "OK", response_time, response_length, json.dumps(kwargs))
message = {"type": "success", "payload": json.loads(json_string)}
logger.info(message)
@events.request_failure.add_listener
def additional_failure_handler(request_type, name, response_time, exception, **kwargs):
error = '{"request_type":"%s", "name":"%s", "result":"%s", ' \
'"response_time":%s, "exception":"%s", "other":%s}'
json_string = error % (request_type, name, "ERR", response_time, exception, json.dumps(kwargs))
message = {"type": "error", "payload": json.loads(json_string)}
logger.info(message)
^ success and failures in locust
from concurrent.futures import ThreadPoolExecutor, as_completed
def create_data():
body = "json"
r = requests.post(
'http://endpoint',
json=body,
headers=headers,
)
r.raise_for_status()
answer_id = r.json()['array'][0]['key']
return answer_id
def test_create_data():
with ThreadPoolExecutor(max_workers=15) as executor:
create_answers = [executor.submit(create_data) for _ in range(30)]
list_of_data = [
future.result()
for future in as_completed(create_answers)
]
with open('foobar.json', 'w') as outfile:
json.dump(list_of_data, outfile)##THREADPOOL EXECUTOR
You can get results from the ThreadPoolExecutor in the order that tasks are completed by calling the as_completed() module function.The function takes a collection of Future objects and will return the same Future objects in the order that their associated tasks are completed.
check if tests are there in bash:
if [ -d test ]; then
test_dir=test
elif [ -d tests ]; then
test_dir=tests
else
echo '{"checkName": "pytest", "status": "failure", "message": "Your project needs to have a 'test' directory."}' > /out/check-status.json
exit 0
fi
new perf
import os
import json
import hvac
import random
import logging
from log import logger
from locust import task, events, HttpUser
from typing import Optional, Any
from functools import cached_property
from requests_bayerauth import MsalConfidentialClientAuth
headers = {
"Content-Type": "application/json"
}
@events.quitting.add_listener
def _(environment):
if environment.stats.total.get_response_time_percentile(0.95) > 800:
logging.error("Test failed: 95th percentile response time is < 800 ms")
environment.process_exit_code = 1
else:
environment.process_exit_code = 0
@events.request_success.add_listener
def success_handler(request_type, name, response_time, response_length, **kwargs):
""" success handler to log statistics """
OK_TEMPLATE = '{"request_type":"%s", "name":"%s", "result":"%s", ' \
'"response_time":%s, "response_length":%s, "other":%s}'
json_string = OK_TEMPLATE % (request_type, name, "OK", response_time, response_length, json.dumps(kwargs))
message = {"type": "success", "payload": json.loads(json_string)}
logger.info(message)
@events.request_failure.add_listener
def failure_handler(request_type, name, response_time, exception, **kwargs):
""" failure handler to log statistics """
ERR_TEMPLATE = '{"request_type":"%s", "name":"%s", "result":"%s", ' \
'"response_time":%s, "exception":"%s", "other":%s}'
json_string = ERR_TEMPLATE % (request_type, name, "ERR", response_time, exception, json.dumps(kwargs))
message = {"type": "error", "payload": json.loads(json_string)}
logger.info(message)
with open("data/two_or_more_acres.txt") as f:
content = f.read()
two_plus_acres_polygons = [
line for line in content.splitlines()
]
class GetElevation(HttpUser):
@task
def get_elevation_two_plus_acres(self):
elevation_id = random.choice(two_plus_acres_polygons)
self.client.get(url=f'/api/v1/?filter={elevation_id}',
name='of_test', headers=headers)
cypress
npx cypress run --headless --env TOKEN=$(npm run --silent token-gen)
new dockerfile for locust
FROM locustio/locust:1.4.3
WORKDIR /mnt/locust
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
ADD data/ data/
COPY *.py ./
List EKS Clusters
eksctl get clusters
Capture Node Group name
eksctl get nodegroup — cluster=<clusterName>
eksctl get nodegroup — cluster=eksdemo
Delete Node Group
eksctl delete nodegroup — cluster=<clusterName> — name=<nodegroupName>
eksctl delete nodegroup — cluster=eksdemo1 — name=eksdemo1-ng-public1
Delete Cluster
eksctl delete cluster <clusterName>
eksctl delete cluster eksdemo1