3 from collections import defaultdict
5 from hyper import HTTP20Connection
18 ADML_HOST = 'localhost'
20 ADML_ENDPOINT = ADML_HOST + ':' + ADML_PORT
24 CONTENT_LENGTH = 'content-length'
26 # Flow calculation throw admlf (ADML Flow) fixture:
27 # flow = admlf.getId()
29 # For sequenced tests, the id is just a monotonically increased number from 1.
30 # For parallel tests, this id is sequenced from 1 for every worker, then, globally,
31 # this is a handicap to manage flow ids (tests ids) for ADML FSM (finite state machine).
32 # We consider a base multiplier of 10000, so collisions would take place when workers
33 # reserves more than 10000 flows. With a 'worst-case' assumption of `5 flows per test case`,
34 # you should cover up to 5000 per worker. Anyway, feel free to increase this value,
35 # specially if you are thinking in use pytest and ADML Agent for system tests.
36 FLOW_BASE_MULTIPLIER = 10000
38 ######################
39 # CLASSES & FIXTURES #
40 ######################
42 class Sequencer(object):
43 def __init__(self, request):
45 self.request = request
49 Returns the worker id, or 'master' if not parallel execution is done
52 if hasattr(self.request.config, 'slaveinput'):
53 wid = self.request.config.slaveinput['slaveid']
59 Returns the next identifier value (monotonically increased in every call)
67 # Workers are named: wd0, wd1, wd2, etc.
68 wid_number = int(re.findall(r'\d+', wid)[0])
70 return FLOW_BASE_MULTIPLIER * wid_number + self.sequence
74 @pytest.fixture(scope='session')
79 return Sequencer(request)
86 # CRITICAL ERROR WARNING INFO DEBUG NOSET
87 def setLevelInfo(): logging.getLogger().setLevel(logging.INFO)
88 def setLevelDebug(): logging.getLogger().setLevel(logging.DEBUG)
90 def error(message): logging.getLogger().error(message)
91 def warning(message): logging.getLogger().warning(message)
92 def info(message): logging.getLogger().info(message)
93 def debug(message): logging.getLogger().debug(message)
95 @pytest.fixture(scope='session')
99 MyLogger.logger = logging.getLogger('CT')
102 @pytest.fixture(scope='session')
105 message_bytes = message.encode('ascii')
106 base64_bytes = base64.b64encode(message_bytes)
107 return base64_bytes.decode('ascii')
111 @pytest.fixture(scope='session')
113 def decode(base64_message):
114 base64_bytes = base64_message.encode('ascii')
115 message_bytes = base64.b64decode(base64_bytes)
116 return message_bytes.decode('ascii')
119 # HTTP communication:
120 class RestClient(object):
121 """A client helper to perform rest operations: GET, POST.
124 endpoint: server endpoint to make the HTTP2.0 connection
127 def __init__(self, endpoint):
128 """Return a RestClient object for ADML endpoint."""
129 self._endpoint = endpoint
130 self._ip = self._endpoint.split(':')[0]
131 self._connection = HTTP20Connection(host=self._endpoint)
133 def _log_http(self, kind, method, url, body, headers):
134 length = len(body) if body else 0
136 '{} {}{} {} headers: {!s} data: {}:{!a}'.format(
137 method, self._endpoint, url, kind, headers, length, body))
139 def _log_request(self, method, url, body, headers):
140 self._log_http('REQUEST', method, url, body, headers)
142 def _log_response(self, method, url, response):
144 'RESPONSE:{}'.format(response["status"]), method, url,
145 response["body"], response["headers"])
147 #def log_event(self, level, log_msg):
148 # # Log caller function name and formated message
149 # MyLogger.logger.log(level, inspect.getouterframes(inspect.currentframe())[1].function + ': {!a}'.format(log_msg))
151 def parse(self, response):
152 response_body = response.read(decode_content=True).decode('utf-8')
153 if len(response_body) != 0:
154 response_body_dict = json.loads(response_body)
156 response_body_dict = ''
157 response_data = { "status":response.status, "body":response_body_dict, "headers":response.headers }
160 def request(self, requestMethod, requestUrl, requestBody=None, requestHeaders=None):
162 Returns response data dictionary with 'status', 'body' and 'headers'
164 requestBody = RestClient._pad_body_and_length(requestBody, requestHeaders)
165 self._log_request(requestMethod, requestUrl, requestBody, requestHeaders)
166 self._connection.request(method=requestMethod, url=requestUrl, body=requestBody, headers=requestHeaders)
167 response = self.parse(self._connection.get_response())
168 self._log_response(requestMethod, requestUrl, response)
171 def _pad_body_and_length(requestBody, requestHeaders):
172 """Pad the body and adjust content-length if needed.
173 When the length of the body is multiple of 1024 this function appends
174 one space to the body and increases by one the content-length.
176 This is a workaround for hyper issue 355 [0].
177 The issue has been fixed but it has not been released yet.
179 [0]: https://github.com/Lukasa/hyper/issues/355
182 >>> body, headers = ' '*1024, { 'content-length':'41' }
183 >>> body = RestClient._pad_body_and_length(body, headers)
184 >>> ( len(body), headers['content-length'] )
187 if requestBody and 0 == (len(requestBody) % 1024):
188 logging.warning( "RestClient.request:" +
189 " padding body because" +
190 " its length ({})".format(len(requestBody)) +
191 " is multiple of 1024")
193 content_length = CONTENT_LENGTH
194 if requestHeaders and content_length in requestHeaders:
195 length = int(requestHeaders[content_length])
196 requestHeaders[content_length] = str(length+1)
199 def get(self, requestUrl):
200 return self.request('GET', requestUrl)
202 def post(self, requestUrl, requestBody = None, requestHeaders={'content-type': 'application/json'}):
203 return self.request('POST', requestUrl, requestBody, requestHeaders)
205 def postDict(self, requestUrl, requestBody = None, requestHeaders={'content-type': 'application/json'}):
207 Accepts request body as python dictionary
209 requestBodyJson = None
210 if requestBody: requestBodyJson = json.dumps(requestBody, indent=None, separators=(',', ':'))
211 return self.request('POST', requestUrl, requestBodyJson, requestHeaders)
214 #def delete(self, requestUrl):
215 # return self.request('DELETE', requestUrl)
217 def __assert_received_expected(self, received, expected, what):
218 match = (received == expected)
219 log = "Received {what}: {received} | Expected {what}: {expected}".format(received=received, expected=expected, what=what)
220 if match: MyLogger.info(log)
221 else: MyLogger.error(log)
225 def check_response_status(self, received, expected, **kwargs):
227 received: status code received (got from response data parsed, field 'status')
228 expected: status code expected
230 self.__assert_received_expected(received, expected, "status code")
232 #def check_expected_cause(self, response, **kwargs):
234 # received: response data parsed where field 'body'->'cause' is analyzed
235 # kwargs: aditional regexp to match expected cause
237 # if "expected_cause" in kwargs:
238 # received_content = response["body"]
239 # received_cause = received_content.get("cause", "")
240 # regular_expr_cause = kwargs["expected_cause"]
241 # regular_expr_flag = kwargs.get("regular_expression_flag", 0)
242 # matchObj = re.match(regular_expr_cause, received_cause, regular_expr_flag)
243 # log = 'Test error cause: "{}"~=/{}/.'.format(received_cause, regular_expr_cause)
244 # if matchObj: MyLogger.info(log)
245 # else: MyLogger.error(log)
247 # assert matchObj is not None
249 def check_response_body(self, received, expected, inputJsonString = False):
251 received: body content received (got from response data parsed, field 'body')
252 expected: body content expected
253 inputJsonString: input parameters as json string (default are python dictionaries)
257 received = json.loads(received)
258 expected = json.loads(expected)
260 self.__assert_received_expected(received, expected, "body")
262 def check_response_headers(self, received, expected):
264 received: headers received (got from response data parsed, field 'headers')
265 expected: headers expected
267 self.__assert_received_expected(received, expected, "headers")
269 def assert_response__status_body_headers(self, response, status, bodyDict, headersDict = None):
271 response: Response parsed data
272 status: numeric status code
273 body: body dictionary to match with
274 headers: headers dictionary to match with (by default, not checked: internally length and content-type application/json is verified)
276 self.check_response_status(response["status"], status)
277 self.check_response_body(response["body"], bodyDict)
278 if headersDict: self.check_response_headers(response["headers"], headersDict)
282 self._connection.close()
285 # ADML Client simple fixture
286 @pytest.fixture(scope='session')
288 admlc = RestClient(ADML_ENDPOINT)
291 print("ADMLC Teardown")
293 @pytest.fixture(scope='session')
296 MyLogger.info("Gathering test suite resources ...")
297 for resource in glob.glob('resources/*'):
298 f = open(resource, "r")
299 name = os.path.basename(resource)
300 resourcesDict[name] = f.read()
303 def get_resources(key, **kwargs):
304 # Be careful with templates containing curly braces:
305 # https://stackoverflow.com/questions/5466451/how-can-i-print-literal-curly-brace-characters-in-python-string-and-also-use-fo
306 resource = resourcesDict[key]
309 args = defaultdict (str, kwargs)
310 resource = resource.format_map(args)
320 REQUEST_BODY_DIAMETER_HEX = '''
322 "diameterHex":"{diameterHex}"
325 REQUEST_BODY_NODE = {
331 (ADML_URI_PREFIX, '/decode', REQUEST_BODY_DIAMETER_HEX, ADML_ENDPOINT),
334 # Share RestClient connection for all the tests: session-scoped fixture
335 @pytest.fixture(scope="session", params=PARAMS)
336 def request_data(request):
337 admlc = RestClient(request.param[3])
338 def get_request_data(**kwargs):
339 args = defaultdict (str, kwargs)
340 uri_prefix = request.param[0]
341 request_uri_suffix=request.param[1]
342 formatted_uri=uri_prefix + request_uri_suffix.format_map(args)
343 request_body=request.param[2]
344 formatted_request_body=request_body.format_map(args)
345 return formatted_uri,formatted_request_body,admlc
347 yield get_request_data
349 print("RestClient Teardown")
351 # Fixture usage example: requestUrl,requestBody,admlc = request_data(diameterHex="<hex content>")