fauna.http.httpx_client
1import json 2import logging 3from contextlib import contextmanager 4from json import JSONDecodeError 5from typing import Mapping, Any, Optional, Iterator 6 7import httpx 8 9from fauna.errors import ClientError, NetworkError 10from fauna.http.http_client import HTTPResponse, HTTPClient 11 12 13class HTTPXResponse(HTTPResponse): 14 15 def __init__(self, response: httpx.Response): 16 self._r = response 17 18 def headers(self) -> Mapping[str, str]: 19 h = {} 20 for (k, v) in self._r.headers.items(): 21 h[k] = v 22 return h 23 24 def json(self) -> Any: 25 try: 26 decoded = self._r.read().decode("utf-8") 27 return json.loads(decoded) 28 except (JSONDecodeError, UnicodeDecodeError) as e: 29 raise ClientError( 30 f"Unable to decode response from endpoint {self._r.request.url}. Check that your endpoint is valid." 31 ) from e 32 33 def text(self) -> str: 34 return str(self.read(), encoding='utf-8') 35 36 def status_code(self) -> int: 37 return self._r.status_code 38 39 def read(self) -> bytes: 40 return self._r.read() 41 42 def iter_bytes(self, size: Optional[int] = None) -> Iterator[bytes]: 43 return self._r.iter_bytes(size) 44 45 def close(self) -> None: 46 try: 47 self._r.close() 48 except Exception as e: 49 raise ClientError("Error closing response") from e 50 51 52class HTTPXClient(HTTPClient): 53 54 def __init__(self, 55 client: httpx.Client, 56 logger: logging.Logger = logging.getLogger("fauna")): 57 super(HTTPXClient, self).__init__() 58 self._c = client 59 self._logger = logger 60 61 def request( 62 self, 63 method: str, 64 url: str, 65 headers: Mapping[str, str], 66 data: Mapping[str, Any], 67 ) -> HTTPResponse: 68 69 try: 70 request = self._c.build_request( 71 method, 72 url, 73 json=data, 74 headers=headers, 75 ) 76 77 if self._logger.isEnabledFor(logging.DEBUG): 78 headers_to_log = request.headers.copy() 79 headers_to_log.pop("Authorization") 80 self._logger.debug( 81 f"query.request method={request.method} url={request.url} headers={headers_to_log} data={data}" 82 ) 83 84 except httpx.InvalidURL as e: 85 raise ClientError("Invalid URL Format") from e 86 87 try: 88 response = self._c.send( 89 request, 90 stream=False, 91 ) 92 93 if self._logger.isEnabledFor(logging.DEBUG): 94 self._logger.debug( 95 f"query.response status_code={response.status_code} headers={response.headers} data={response.text}" 96 ) 97 98 return HTTPXResponse(response) 99 except (httpx.HTTPError, httpx.InvalidURL) as e: 100 raise NetworkError("Exception re-raised from HTTP request") from e 101 102 @contextmanager 103 def stream( 104 self, 105 url: str, 106 headers: Mapping[str, str], 107 data: Mapping[str, Any], 108 ) -> Iterator[Any]: 109 request = self._c.build_request( 110 method="POST", 111 url=url, 112 headers=headers, 113 json=data, 114 ) 115 116 if self._logger.isEnabledFor(logging.DEBUG): 117 headers_to_log = request.headers.copy() 118 headers_to_log.pop("Authorization") 119 self._logger.debug( 120 f"stream.request method={request.method} url={request.url} headers={headers_to_log} data={data}" 121 ) 122 123 response = self._c.send( 124 request=request, 125 stream=True, 126 ) 127 128 try: 129 yield self._transform(response) 130 finally: 131 response.close() 132 133 def _transform(self, response): 134 try: 135 for line in response.iter_lines(): 136 loaded = json.loads(line) 137 if self._logger.isEnabledFor(logging.DEBUG): 138 self._logger.debug(f"stream.data data={loaded}") 139 yield loaded 140 except httpx.ReadTimeout as e: 141 raise NetworkError("Stream timeout") from e 142 except (httpx.HTTPError, httpx.InvalidURL) as e: 143 raise NetworkError("Exception re-raised from HTTP request") from e 144 145 def close(self): 146 self._c.close()
14class HTTPXResponse(HTTPResponse): 15 16 def __init__(self, response: httpx.Response): 17 self._r = response 18 19 def headers(self) -> Mapping[str, str]: 20 h = {} 21 for (k, v) in self._r.headers.items(): 22 h[k] = v 23 return h 24 25 def json(self) -> Any: 26 try: 27 decoded = self._r.read().decode("utf-8") 28 return json.loads(decoded) 29 except (JSONDecodeError, UnicodeDecodeError) as e: 30 raise ClientError( 31 f"Unable to decode response from endpoint {self._r.request.url}. Check that your endpoint is valid." 32 ) from e 33 34 def text(self) -> str: 35 return str(self.read(), encoding='utf-8') 36 37 def status_code(self) -> int: 38 return self._r.status_code 39 40 def read(self) -> bytes: 41 return self._r.read() 42 43 def iter_bytes(self, size: Optional[int] = None) -> Iterator[bytes]: 44 return self._r.iter_bytes(size) 45 46 def close(self) -> None: 47 try: 48 self._r.close() 49 except Exception as e: 50 raise ClientError("Error closing response") from e
Helper class that provides a standard way to create an ABC using inheritance.
53class HTTPXClient(HTTPClient): 54 55 def __init__(self, 56 client: httpx.Client, 57 logger: logging.Logger = logging.getLogger("fauna")): 58 super(HTTPXClient, self).__init__() 59 self._c = client 60 self._logger = logger 61 62 def request( 63 self, 64 method: str, 65 url: str, 66 headers: Mapping[str, str], 67 data: Mapping[str, Any], 68 ) -> HTTPResponse: 69 70 try: 71 request = self._c.build_request( 72 method, 73 url, 74 json=data, 75 headers=headers, 76 ) 77 78 if self._logger.isEnabledFor(logging.DEBUG): 79 headers_to_log = request.headers.copy() 80 headers_to_log.pop("Authorization") 81 self._logger.debug( 82 f"query.request method={request.method} url={request.url} headers={headers_to_log} data={data}" 83 ) 84 85 except httpx.InvalidURL as e: 86 raise ClientError("Invalid URL Format") from e 87 88 try: 89 response = self._c.send( 90 request, 91 stream=False, 92 ) 93 94 if self._logger.isEnabledFor(logging.DEBUG): 95 self._logger.debug( 96 f"query.response status_code={response.status_code} headers={response.headers} data={response.text}" 97 ) 98 99 return HTTPXResponse(response) 100 except (httpx.HTTPError, httpx.InvalidURL) as e: 101 raise NetworkError("Exception re-raised from HTTP request") from e 102 103 @contextmanager 104 def stream( 105 self, 106 url: str, 107 headers: Mapping[str, str], 108 data: Mapping[str, Any], 109 ) -> Iterator[Any]: 110 request = self._c.build_request( 111 method="POST", 112 url=url, 113 headers=headers, 114 json=data, 115 ) 116 117 if self._logger.isEnabledFor(logging.DEBUG): 118 headers_to_log = request.headers.copy() 119 headers_to_log.pop("Authorization") 120 self._logger.debug( 121 f"stream.request method={request.method} url={request.url} headers={headers_to_log} data={data}" 122 ) 123 124 response = self._c.send( 125 request=request, 126 stream=True, 127 ) 128 129 try: 130 yield self._transform(response) 131 finally: 132 response.close() 133 134 def _transform(self, response): 135 try: 136 for line in response.iter_lines(): 137 loaded = json.loads(line) 138 if self._logger.isEnabledFor(logging.DEBUG): 139 self._logger.debug(f"stream.data data={loaded}") 140 yield loaded 141 except httpx.ReadTimeout as e: 142 raise NetworkError("Stream timeout") from e 143 except (httpx.HTTPError, httpx.InvalidURL) as e: 144 raise NetworkError("Exception re-raised from HTTP request") from e 145 146 def close(self): 147 self._c.close()
Helper class that provides a standard way to create an ABC using inheritance.
def
request( self, method: str, url: str, headers: Mapping[str, str], data: Mapping[str, Any]) -> fauna.http.http_client.HTTPResponse:
62 def request( 63 self, 64 method: str, 65 url: str, 66 headers: Mapping[str, str], 67 data: Mapping[str, Any], 68 ) -> HTTPResponse: 69 70 try: 71 request = self._c.build_request( 72 method, 73 url, 74 json=data, 75 headers=headers, 76 ) 77 78 if self._logger.isEnabledFor(logging.DEBUG): 79 headers_to_log = request.headers.copy() 80 headers_to_log.pop("Authorization") 81 self._logger.debug( 82 f"query.request method={request.method} url={request.url} headers={headers_to_log} data={data}" 83 ) 84 85 except httpx.InvalidURL as e: 86 raise ClientError("Invalid URL Format") from e 87 88 try: 89 response = self._c.send( 90 request, 91 stream=False, 92 ) 93 94 if self._logger.isEnabledFor(logging.DEBUG): 95 self._logger.debug( 96 f"query.response status_code={response.status_code} headers={response.headers} data={response.text}" 97 ) 98 99 return HTTPXResponse(response) 100 except (httpx.HTTPError, httpx.InvalidURL) as e: 101 raise NetworkError("Exception re-raised from HTTP request") from e
@contextmanager
def
stream( self, url: str, headers: Mapping[str, str], data: Mapping[str, Any]) -> Iterator[Any]:
103 @contextmanager 104 def stream( 105 self, 106 url: str, 107 headers: Mapping[str, str], 108 data: Mapping[str, Any], 109 ) -> Iterator[Any]: 110 request = self._c.build_request( 111 method="POST", 112 url=url, 113 headers=headers, 114 json=data, 115 ) 116 117 if self._logger.isEnabledFor(logging.DEBUG): 118 headers_to_log = request.headers.copy() 119 headers_to_log.pop("Authorization") 120 self._logger.debug( 121 f"stream.request method={request.method} url={request.url} headers={headers_to_log} data={data}" 122 ) 123 124 response = self._c.send( 125 request=request, 126 stream=True, 127 ) 128 129 try: 130 yield self._transform(response) 131 finally: 132 response.close()