Skip to content

Commit

Permalink
StressAnAPI v1.0.3
Browse files Browse the repository at this point in the history
  • Loading branch information
rabuchaim committed Jul 23, 2024
1 parent 7816f53 commit c252f10
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 53 deletions.
4 changes: 2 additions & 2 deletions stressanapi/simple_stressanapi_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
"""Simple StressAnAPI Server v1.0.2"""
"""Simple StressAnAPI Server v1.0.3"""
import sys, os, argparse, threading, time, asyncio, random, logging, datetime as dt
sys.tracebacklimit = 0
try:
Expand Down Expand Up @@ -54,7 +54,7 @@ def get(self,*args):
header_keys = list(self.request.headers.keys())
self.write(f"GET request path: {self.request.path} - header keys: {','.join(header_keys)}")
def post(self,*args):
self.set_status(self.return_a_random_error())
# self.set_status(self.return_a_random_error())
arguments = tornado.escape.json_decode(self.request.body)
arg_keys, arg_values = list(arguments.keys()), list(arguments.values())
self.write(f"POST argument keys: {','.join(arg_keys)} - values: {','.join(arg_values)}")
Expand Down
150 changes: 99 additions & 51 deletions stressanapi/stressanapi.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# encoding: utf-8
# -*- coding: utf-8 -*-
"""StressAnAPI v1.0.2 - An API stress-test tool"""
"""StressAnAPI v1.0.3 - An API stress-test tool"""
"""
____ _ _ _ ____ ___
/ ___|| |_ _ __ ___ ___ ___ / \ _ __ / \ | _ \_ _|
Expand Down Expand Up @@ -29,7 +29,7 @@
sys.tracebacklimit = 0

__appname__ = 'StressAnAPI'
__version__ = '1.0.2'
__version__ = '1.0.3'
__release__ = '15/July/2024'
__descr__ = 'A stress-test tool for API servers'
__url__ = 'https://github.com/rabuchaim/StressAnAPI/'
Expand All @@ -50,6 +50,7 @@
lock = threading.Lock()
rlock = threading.RLock()

# __all__ = ['extract_template_var']
##################################################################################################################################
##################################################################################################################################

Expand Down Expand Up @@ -391,9 +392,11 @@ def displayConfig():
"504":"Gateway Timeout","505":"HTTP Version Not Supported","511":"Network Authentication Required",
##──── Connection Errors (created internally just to handle connection failures to an api server)
"900":"Connection Refused",
"901":"Connection Timeout","902":"Connection reset by peer",
"901":"Connection Timeout",
"902":"Connection reset by peer",
"903":"Remote end closed connection without response",
"904":"Exceeded maximum redirects",
"999":"Unknown Error",
"32":"Broken Pipe"
}

Expand Down Expand Up @@ -478,8 +481,8 @@ def __repr__(self) -> Dict:
return str(dict(sorted({f"{k} {self.descr(k)}": v for k, v in self._dict.items()}.items(), key=lambda x: x[0])))

def save(self, status_code):
# with self._lock:
self._dict[status_code] = self._dict.get(status_code, 0) + 1
with self._lock:
self._dict[status_code] = self._dict.get(status_code, 0) + 1

def reset(self):
with self._lock:
Expand All @@ -499,7 +502,8 @@ def asdict(self) -> Dict:
return self._dict
@property
def asdict_desc(self) -> Dict:
return dict(sorted({f"{k} {self.descr(k)}": v for k, v in self._dict.items()}.items(), key=lambda x: x[0]))
with self._lock:
return dict(sorted({f"{k} {self.descr(k)}": v for k, v in self._dict.items()}.items(), key=lambda x: x[0]))
@property
def values(self) -> Dict:
return {k: v for k, v in self._dict.items() if v > 0}
Expand Down Expand Up @@ -919,42 +923,42 @@ def isValidIPv6(ipv6_address):
except:
return False

def getRandomPrivateIPv4(num_ips=1):
"""Generate an IP address from networks 10.0.0.0/8 or 172.16.0.0/12 or 192.168.0.0/16
# def getRandomPrivateIPv4(num_ips=1):
# """Generate an IP address from networks 10.0.0.0/8 or 172.16.0.0/12 or 192.168.0.0/16

If only 1 IP is requested, returns a string, otherwise returns a list.
# If only 1 IP is requested, returns a string, otherwise returns a list.

If fails for some reason, raise an error.
"""
return_list = []
try:
while (len(return_list) < num_ips):
return_list.append(int2ipv4(random.choice([random.randint(167772160,184549375),random.randint(3232235520,3232301055),random.randint(2886729728,2887778303)])))
except Exception as ERR:
raise Exception(ERR)
return return_list[0] if len(return_list) == 1 else return_list
# If fails for some reason, raise an error.
# """
# return_list = []
# try:
# while (len(return_list) < num_ips):
# return_list.append(int2ipv4(random.choice([random.randint(167772160,184549375),random.randint(3232235520,3232301055),random.randint(2886729728,2887778303)])))
# except Exception as ERR:
# raise Exception(ERR)
# return return_list[0] if len(return_list) == 1 else return_list

def getRandomIPv4(num_ips=1):
"""Generate an IPv4 address from networks 1.0.0.0 until 223.255.255.255
# def getRandomIPv4(num_ips=1):
# """Generate an IPv4 address from networks 1.0.0.0 until 223.255.255.255

If only 1 IP is requested, returns a string, otherwise returns a list.
# If only 1 IP is requested, returns a string, otherwise returns a list.

If fails for some reason, raise an error.
"""
return_list = []
try:
while (len(return_list) < num_ips):
return_list.append(int2ipv4(random.randint(16777216,3758096383)))
except Exception as ERR:
raise Exception(ERR)
return return_list[0] if len(return_list) == 1 else return_list
# If fails for some reason, raise an error.
# """
# return_list = []
# try:
# while (len(return_list) < num_ips):
# return_list.append(int2ipv4(random.randint(16777216,3758096383)))
# except Exception as ERR:
# raise Exception(ERR)
# return return_list[0] if len(return_list) == 1 else return_list

def getRandomIPv42(): # faster
return int2ipv4(random.randint(16777216,3758096383)) # from 1.0.0.0 to 223.255.255.255
# def getRandomIPv42(): # faster
# return int2ipv4(random.randint(16777216,3758096383)) # from 1.0.0.0 to 223.255.255.255

def generatorRandomIPv4():
while True:
yield int2ipv4(random.randint(16777216,3758096383)) # from 1.0.0.0 to 223.255.255.255
# def generatorRandomIPv4():
# while True:
# yield int2ipv4(random.randint(16777216,3758096383)) # from 1.0.0.0 to 223.255.255.255

##################################################################################################################################
##################################################################################################################################
Expand Down Expand Up @@ -1205,14 +1209,15 @@ def getErrorResponseCode(error_message):
try:
if str(error_message).lower().find("connection refused") >= 0:
return 900
elif str(error_message) == "timed out":
elif str(error_message).lower().find("timed out") >= 0:
return 901
elif str(error_message).lower().find("reset by peer") >= 0:
return 902
elif str(error_message).lower().find("closed connection") >= 0:
return 903
else:
return extractErrorCode(error_message)
result = extractErrorCode(error_message)
return 999 if result is None else result
except Exception as ERR:
logDebug(f"getErrorResponseCode: {str(ERR)}")
return 0
Expand Down Expand Up @@ -1257,9 +1262,11 @@ def logResponseSyslog(text_id,method,url,response_code,response_body,elapsed_tim
def logResponse(text_id,method,url,response_code,response_body,elapsed_time):pass
# @showElapsedTimeAverageDecorator(window_size=5000)
def _logResponse(text_id,method,url,response_code,response_body,elapsed_time):
log(f"{G.middot} {text_id} {method} {url} - {getFormattedStatusCode(response_code)} {elapsed_time}")
with lock:
log(f"{G.middot} {text_id} {method} {url} - {getFormattedStatusCode(response_code)} {elapsed_time}")
def _logResponseBody(text_id,method,url,response_code,response_body,elapsed_time):
log(f"{G.middot} {text_id} {method} {url} - {getFormattedStatusCode(response_code)} {cDarkYellow(response_body.strip())} {elapsed_time}")
with lock:
log(f"{G.middot} {text_id} {method} {url} - {getFormattedStatusCode(response_code)} {cDarkYellow(response_body.strip())} {elapsed_time}")

##──── Returns the current date time to be used with log to stdout functions ─────────────────────────────────────────────────────
def getLogDateEmpty():return ""
Expand Down Expand Up @@ -1688,7 +1695,6 @@ def remove9XXFromString(col_str): # remove errors 900 used by internal control
max_size = (classTerminal().max_width-ansiLen(getLogDate())-10)
max_col_size = max_size // 2
stats = {key:val for key,val in httpStats.asdict_desc.items() if val > 0}

##──── Chart
success = [val for key,val in httpStats.asdict.items() if key in G.config.success_status_codes]
errors400 = [val for key,val in httpStats.asdict.items() if key >= 400 and key < 500]
Expand All @@ -1712,7 +1718,6 @@ def remove9XXFromString(col_str): # remove errors 900 used by internal control
col2_str = f"{col2_key}: {stats[col2_key]}" if col2_key else ''
log(f" {remove9XXFromString(col1_str):<{max_col_size}} {remove9XXFromString(col2_str)}")
log(line.middot1s)

log(f">>> {cWhite(f'Statistics of elapsed time of the last {timeStats.window_size} requests:')}")
log("")
stats = timeStats.stats()
Expand All @@ -1722,9 +1727,9 @@ def remove9XXFromString(col_str): # remove errors 900 used by internal control
requests_per_sec = 'Paused!' if G.event_pause.is_set() else '%.0f'%(requests_per_sec)
a = Table(cols=7,max_size=max_size,with_border=False,border_size=0).head(['Total','Req/Sec',' Min Avg Max'.center(len(min_avg_max)),'50th pct','75th pct','90th pct','99th pct']).row([counter.value,requests_per_sec,min_avg_max,pct50,pct75,pct90,pct99]).get_table()
[log(f"{line}") for line in a]

log(line.single)
except Exception as ERR:
logDebug(f"httpStats.asdict: {httpStats.asdict}")
logDebug(f"displayFullHttpStats: {str(ERR)}")

def finishApplication():
Expand Down Expand Up @@ -1767,6 +1772,7 @@ def displayHelp():
# # #### # # # # # # # # # # # # # #
# # # # # # #### # # #### ## ## #### ### # ###
##

class threadMakeRequestsURLLib(threading.Thread):
def __init__(self,timeStats,httpStats):
threading.Thread.__init__(self)
Expand All @@ -1775,21 +1781,40 @@ def __init__(self,timeStats,httpStats):
self.httpStats = httpStats
zfill_len = len(str(len(G.thread_list))) if len(str(len(G.thread_list))) >= 2 else 2
self.text_id = f"[#{self.name.split('-')[1].zfill(zfill_len)}]"

def run(self):
method, url = G.config.method, G.config.url
data = str(json.dumps(G.config.post_data,sort_keys=False,ensure_ascii=False,separators=(",",":"))).encode()
if method == "GET":
req = urllib.request.Request(url=G.config.url,method=G.config.method)
req.full_url = G.config.url
self.method, self.url = G.config.method, G.config.url
if self.method == "GET":
self.post_data = b''
else:
req = urllib.request.Request(url=G.config.url,data=data,method=G.config.method)
self.post_data = str(json.dumps(G.config.post_data,sort_keys=False,ensure_ascii=False,separators=(",",":"))).encode()
self.template_var = self.extract_template_var(self.url)
if self.template_var == '%%randomipv4%%':
self.get_url = self.__get_url_random_ipv4
elif self.template_var == '%%randomipv6%%':
self.get_url = self.__get_url_random_ipv6
elif self.template_var == '%%randomprivateipv4%%':
self.get_url = self.__get_url_random_private_ipv4
elif self.template_var.startswith('%%randomint:'):
try:
self.template_var = self.extract_template_var(self.url)
_, self.random_min, self.random_max = self.template_var.replace("%","").split(":")
except Exception as ERR:
raise StressAnAPIException(f"Failed in config url: invalid url variable '{self.template_var}' - usage: %%randomint:val_min:val_max%% - {str(ERR)}")
try:
self.random_min = int(self.random_min)
self.random_max = int(self.random_max)
except Exception as ERR:
raise StressAnAPIException(f"Failed in '{self.template_var}' - invalid integer values for min and max - usage: %%randomint:val_min:val_max%% - {str(ERR)}")
self.get_url = self.__get_url_random_int

req = urllib.request.Request(url=self.url,method=self.method)
##──── configure StressAnAPI useragent if the user has not configured any other
if 'user-agent' not in [key.lower() for key,val in G.config.headers.items()]:
req.add_header('User-Agent',G.default_user_agent)
##─────────────────────────────────────────────────────────────────────────────
for header_key, header_value in G.config.headers.items():
req.add_header(header_key,header_value)

while True:
if G.event_quit.is_set() or self.stop.is_set():
break
Expand All @@ -1801,13 +1826,15 @@ def run(self):
if self.stop.is_set(): break
try:
with elapsedTimer() as elapsed:
req.full_url = next(self.get_url())
req.data = self.post_data
response_code,response_body = self.urllib_open(req,G.config.timeout)
self.timeStats.save(elapsed.time)
except Exception as ERR:
logDebug(f"urllib_get error: {str(ERR)}")
finally:
counter.incr()
logResponse(self.text_id,method,url,response_code,response_body,elapsed.text())
logResponse(self.text_id,self.method,req.full_url,response_code,response_body,elapsed.text())
time.sleep(G.config.interval)

# @showElapsedTimeAverageDecorator(5000)
Expand All @@ -1817,8 +1844,8 @@ def urllib_open(self,urllib_request,timeout):
response_code = response.getcode()
response_text = response.readline().strip().decode() if response_code not in [202,204] else '\b'
except Exception as ERR:
logDebug(f"urllib_open: {str(ERR)}")
response_code,response_text = getErrorResponseCode(str(ERR)),shortenErrorMessage(str(ERR),128)
logDebug(f"urllib_open: {str(ERR)}")
finally:
counterAverage.mark()
self.httpStats.save(response_code)
Expand All @@ -1835,6 +1862,27 @@ def join(self):
log(f"Unable to join thread {self.name} - {self.native_id}")
return self.name,self.native_id

def extract_template_var(self,text_string):
try:
match = re.search(r"(\%\%.*\%\%)",text_string)
return match.group(1) if match else ''
except Exception as ERR:
logDebug(f"extract_template_var: {str(ERR)}")
return ''

def __get_url_random_int(self):
yield self.url.replace(self.template_var,str(random.randint(self.random_min,self.random_max)))
def __get_url_random_ipv4(self):
yield self.url.replace(self.template_var,int2ipv4(random.randint(16777216,3758096383)))
def __get_url_random_ipv6(self):
yield self.url.replace(self.template_var,':'.join([f'{random.randint(0, 0xffff):04x}' for _ in range(8)]))
def __get_url_random_private_ipv4(self):
yield self.url.replace(self.template_var,int2ipv4(random.choice([random.randint(167772160,184549375),random.randint(3232235520,3232301055),random.randint(2886729728,2887778303)])))
@showElapsedTimeAverageDecorator()
def get_url(self):
yield self.url


##################################################################################################################################
##################################################################################################################################

Expand Down
49 changes: 49 additions & 0 deletions stressanapi/test_stressanapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env python3
import unittest, json, os
from stressanapi import runCommand, stripColor, G, validateConfigFile
from stressanapi import threadMakeRequestsURLLib, getErrorResponseCode, getFormattedStatusCode

class TestStressAnAPI(unittest.TestCase):
def test_extract_template_var(self):
variables = {'url://path/%%randomipv4%%/test':'%%randomipv4%%',
'url://path/%%randomipv6%%/test':'%%randomipv6%%',
'url://path/test/%%randomprivateipv4%%':'%%randomprivateipv4%%',
'url://path/customer/%%randomint:1:1000%%':'%%randomint:1:1000%%'
}
for key,val in variables.items():
result = threadMakeRequestsURLLib.extract_template_var(self,key)
self.assertEqual(result, val) # Assert that the result is equal to the expected value

def test_getErrorResponseCode(self):
result = getErrorResponseCode('<urlopen error connection refused>')
self.assertEqual(result, 900)
result = getErrorResponseCode('<urlopen error timed out urlopen error>')
self.assertEqual(result, 901)
result = getErrorResponseCode('<reset by peer urlopen error>')
self.assertEqual(result, 902)
result = getErrorResponseCode('<urlopen error closed connection>')
self.assertEqual(result, 903)
result = getErrorResponseCode('<another 111 unknown 222 message 333>')
self.assertEqual(result, 111)
result = getErrorResponseCode('<another unknown message without error code>')
self.assertEqual(result, 999)

def test_getFormattedStatusCode(self):
result = stripColor(getFormattedStatusCode(200))
self.assertEqual(result,'200 OK')
result = stripColor(getFormattedStatusCode(903))
self.assertEqual(result,'### Remote end closed connection without response')
result = stripColor(getFormattedStatusCode(999))
self.assertEqual(result,'### Unknown Error')

if __name__ == '__main__':
test_file = '/tmp/stressanapi_unit_test.json'
template = G.default_template
template['method'] = "GET"
del template['syslog_server_url']

with open(test_file,'w') as f:
json.dump(G.default_template,f,indent=3)
validateConfigFile(test_file)
unittest.main()
os.remove(test_file)

0 comments on commit c252f10

Please sign in to comment.