Coverage for src/zapy/requests/converter.py: 100%

110 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-10 19:35 +0000

1import itertools 

2import sys 

3from collections import defaultdict 

4from io import BufferedReader 

5from threading import Lock 

6from typing import Any, Sequence, Tuple, TypedDict, cast 

7 

8from zapy.base import ZapyAuto 

9from zapy.templating.eval import exec_sync 

10from zapy.templating.templating import evaluate, render 

11 

12from .context import ZapyRequestContext, build_context_module 

13from .exceptions import error_location 

14from .file_loader import ZapyFileInfo 

15from .hooks import RequestHook 

16from .models import Code, HttpxArguments, KeyValueItem, ZapyRequest, httpx_types 

17 

18HttpxFileTypes = httpx_types.FileTypes 

19HttpxRequestFiles = httpx_types.RequestFiles 

20 

21FORM_TYPES = [ 

22 "application/x-www-form-urlencoded", 

23 "multipart/form-data", 

24] 

25 

26FileInfo = tuple[str, BufferedReader, *tuple[str, ...]] 

27HttpxFile = tuple[str, FileInfo] 

28ParameterList = dict[str, list[str]] 

29 

30 

31class RequestBodyArgs(TypedDict): 

32 files: HttpxRequestFiles | None 

33 data: ParameterList | None 

34 content: str | None 

35 

36 

37class RequestConverter: 

38 

39 def __init__(self, zapy_request: ZapyRequest, ctx: ZapyRequestContext): 

40 self.zapy_request = zapy_request 

41 self.ctx = ctx 

42 

43 # evaluate script 

44 request_hooks, variables = self._load_script() 

45 self.request_hooks = request_hooks 

46 self.variables = variables 

47 

48 def build_httpx_args(self) -> HttpxArguments: 

49 zapy_request: ZapyRequest = self.zapy_request 

50 

51 # variable_declaration 

52 self.variables |= self._convert_variables(self.zapy_request.variables) 

53 

54 httpx_args = HttpxArguments( 

55 method=zapy_request.method, 

56 url=self._convert_url(zapy_request.endpoint), 

57 params=self._convert_params(zapy_request.params), 

58 headers=self._convert_headers(zapy_request.headers, body_content_type=zapy_request.body_type), 

59 **self._build_httpx_args_body(zapy_request.body_type, zapy_request.body), 

60 ) 

61 

62 return httpx_args 

63 

64 @error_location("body") 

65 def _build_httpx_args_body(self, body_type: str, body: list[KeyValueItem] | Code | None) -> RequestBodyArgs: 

66 files, data, content = None, None, None 

67 if body is None or body_type == "None": 

68 data = None 

69 elif body_type in FORM_TYPES: 

70 body = cast(list[KeyValueItem], body) 

71 data, files = self._convert_body_data(body) 

72 else: 

73 body = cast(Code, body) 

74 _body_source = self.__join_code(body) 

75 content = self.__render(_body_source) 

76 return RequestBodyArgs( 

77 files=files, 

78 data=data, 

79 content=content, 

80 ) 

81 

82 @error_location("body") 

83 def _convert_body_data( 

84 self, data_list: list[KeyValueItem] 

85 ) -> tuple[ParameterList, Sequence[Tuple[str, HttpxFileTypes]]]: 

86 active_params = filter(lambda x: x.active and x.key.strip(), data_list) 

87 result_dict = defaultdict(list) 

88 files: list[Tuple[str, HttpxFileTypes]] = [] 

89 for param in active_params: 

90 value = self.__eval_var(param.value) 

91 if isinstance(value, ZapyFileInfo): 

92 name = value.file_name 

93 file = open(value.file_location, mode="rb") 

94 file_info: HttpxFileTypes 

95 if value.mime_type is ZapyAuto: 

96 file_info = (name, file) 

97 else: 

98 file_info = (name, file, str(value.mime_type)) 

99 files.append((param.key, file_info)) 

100 else: 

101 result_dict[param.key].append(str(value)) 

102 

103 return dict(result_dict), files 

104 

105 @error_location("url") 

106 def _convert_url(self, endpoint: str) -> str: 

107 return self.__render(endpoint) 

108 

109 @error_location("params") 

110 def _convert_params(self, parameter_list: list[KeyValueItem]) -> ParameterList: 

111 active_params = filter(lambda x: x.active and x.key.strip(), parameter_list) 

112 groups = itertools.groupby(active_params, lambda x: x.key.strip()) 

113 result_dict = {key: [self.__render(p.value) for p in params] for key, params in groups} 

114 

115 return result_dict 

116 

117 @error_location("headers") 

118 def _convert_headers(self, header_list: list[KeyValueItem], body_content_type: str | None = None) -> dict[str, str]: 

119 headers = {} 

120 for kv_item in header_list: 

121 key = kv_item.key.strip() 

122 if not (kv_item.active and key): 

123 continue 

124 eval_var = self.__eval_var(kv_item.value) 

125 if key.lower() == "content-type" and eval_var == ZapyAuto: 

126 if body_content_type not in ("None", "multipart/form-data"): 

127 headers[key] = str(body_content_type) 

128 else: 

129 headers[key] = str(eval_var) 

130 

131 return headers 

132 

133 @error_location("variables") 

134 def _convert_variables(self, variable_list: list[KeyValueItem]) -> dict[str, Any | str]: 

135 return {x.key.strip(): self.__eval_var(x.value) for x in variable_list if x.active and x.key.strip()} 

136 

137 @error_location("script") 

138 def _load_script(self) -> tuple[RequestHook, dict]: 

139 script = self.__join_code(self.zapy_request.script) 

140 self.script = script 

141 

142 module_context = build_context_module(self.ctx) 

143 ctx_vars = { 

144 "print": self.ctx.logger, 

145 "ctx": module_context, 

146 } 

147 

148 if script is None or not script.strip(): 

149 return RequestHook(), ctx_vars 

150 

151 with Lock(): 

152 sys.modules["zapy.ctx"] = module_context 

153 exec_sync(script, ctx_vars) 

154 request_hook = module_context.hooks.request_hook 

155 

156 return request_hook, ctx_vars 

157 

158 def __eval_var(self, value: str) -> Any | str: 

159 return evaluate(value, self.variables) 

160 

161 def __render(self, source: str) -> str: 

162 return render(source, self.variables) 

163 

164 def __join_code(self, code: Code) -> str: 

165 if isinstance(code, str): 

166 return code 

167 else: 

168 return "\n".join(code)