Coverage for src/zapy/requests/converter.py: 100%
110 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-10 19:35 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-10 19:35 +0000
1import itertools
2import sys
3from collections import defaultdict
4from io import BufferedReader
5from threading import Lock
6from typing import Any, Sequence, Tuple, TypedDict, cast
8from zapy.base import ZapyAuto
9from zapy.templating.eval import exec_sync
10from zapy.templating.templating import evaluate, render
12from .context import ZapyRequestContext, build_context_module
13from .exceptions import error_location
14from .file_loader import ZapyFileInfo
15from .hooks import RequestHook
16from .models import Code, HttpxArguments, KeyValueItem, ZapyRequest, httpx_types
18HttpxFileTypes = httpx_types.FileTypes
19HttpxRequestFiles = httpx_types.RequestFiles
21FORM_TYPES = [
22 "application/x-www-form-urlencoded",
23 "multipart/form-data",
24]
26FileInfo = tuple[str, BufferedReader, *tuple[str, ...]]
27HttpxFile = tuple[str, FileInfo]
28ParameterList = dict[str, list[str]]
31class RequestBodyArgs(TypedDict):
32 files: HttpxRequestFiles | None
33 data: ParameterList | None
34 content: str | None
37class RequestConverter:
39 def __init__(self, zapy_request: ZapyRequest, ctx: ZapyRequestContext):
40 self.zapy_request = zapy_request
41 self.ctx = ctx
43 # evaluate script
44 request_hooks, variables = self._load_script()
45 self.request_hooks = request_hooks
46 self.variables = variables
48 def build_httpx_args(self) -> HttpxArguments:
49 zapy_request: ZapyRequest = self.zapy_request
51 # variable_declaration
52 self.variables |= self._convert_variables(self.zapy_request.variables)
54 httpx_args = HttpxArguments(
55 method=zapy_request.method,
56 url=self._convert_url(zapy_request.endpoint),
57 params=self._convert_params(zapy_request.params),
58 headers=self._convert_headers(zapy_request.headers, body_content_type=zapy_request.body_type),
59 **self._build_httpx_args_body(zapy_request.body_type, zapy_request.body),
60 )
62 return httpx_args
64 @error_location("body")
65 def _build_httpx_args_body(self, body_type: str, body: list[KeyValueItem] | Code | None) -> RequestBodyArgs:
66 files, data, content = None, None, None
67 if body is None or body_type == "None":
68 data = None
69 elif body_type in FORM_TYPES:
70 body = cast(list[KeyValueItem], body)
71 data, files = self._convert_body_data(body)
72 else:
73 body = cast(Code, body)
74 _body_source = self.__join_code(body)
75 content = self.__render(_body_source)
76 return RequestBodyArgs(
77 files=files,
78 data=data,
79 content=content,
80 )
82 @error_location("body")
83 def _convert_body_data(
84 self, data_list: list[KeyValueItem]
85 ) -> tuple[ParameterList, Sequence[Tuple[str, HttpxFileTypes]]]:
86 active_params = filter(lambda x: x.active and x.key.strip(), data_list)
87 result_dict = defaultdict(list)
88 files: list[Tuple[str, HttpxFileTypes]] = []
89 for param in active_params:
90 value = self.__eval_var(param.value)
91 if isinstance(value, ZapyFileInfo):
92 name = value.file_name
93 file = open(value.file_location, mode="rb")
94 file_info: HttpxFileTypes
95 if value.mime_type is ZapyAuto:
96 file_info = (name, file)
97 else:
98 file_info = (name, file, str(value.mime_type))
99 files.append((param.key, file_info))
100 else:
101 result_dict[param.key].append(str(value))
103 return dict(result_dict), files
105 @error_location("url")
106 def _convert_url(self, endpoint: str) -> str:
107 return self.__render(endpoint)
109 @error_location("params")
110 def _convert_params(self, parameter_list: list[KeyValueItem]) -> ParameterList:
111 active_params = filter(lambda x: x.active and x.key.strip(), parameter_list)
112 groups = itertools.groupby(active_params, lambda x: x.key.strip())
113 result_dict = {key: [self.__render(p.value) for p in params] for key, params in groups}
115 return result_dict
117 @error_location("headers")
118 def _convert_headers(self, header_list: list[KeyValueItem], body_content_type: str | None = None) -> dict[str, str]:
119 headers = {}
120 for kv_item in header_list:
121 key = kv_item.key.strip()
122 if not (kv_item.active and key):
123 continue
124 eval_var = self.__eval_var(kv_item.value)
125 if key.lower() == "content-type" and eval_var == ZapyAuto:
126 if body_content_type not in ("None", "multipart/form-data"):
127 headers[key] = str(body_content_type)
128 else:
129 headers[key] = str(eval_var)
131 return headers
133 @error_location("variables")
134 def _convert_variables(self, variable_list: list[KeyValueItem]) -> dict[str, Any | str]:
135 return {x.key.strip(): self.__eval_var(x.value) for x in variable_list if x.active and x.key.strip()}
137 @error_location("script")
138 def _load_script(self) -> tuple[RequestHook, dict]:
139 script = self.__join_code(self.zapy_request.script)
140 self.script = script
142 module_context = build_context_module(self.ctx)
143 ctx_vars = {
144 "print": self.ctx.logger,
145 "ctx": module_context,
146 }
148 if script is None or not script.strip():
149 return RequestHook(), ctx_vars
151 with Lock():
152 sys.modules["zapy.ctx"] = module_context
153 exec_sync(script, ctx_vars)
154 request_hook = module_context.hooks.request_hook
156 return request_hook, ctx_vars
158 def __eval_var(self, value: str) -> Any | str:
159 return evaluate(value, self.variables)
161 def __render(self, source: str) -> str:
162 return render(source, self.variables)
164 def __join_code(self, code: Code) -> str:
165 if isinstance(code, str):
166 return code
167 else:
168 return "\n".join(code)