fauna.http.httpx_client

  1import json
  2import logging
  3from contextlib import contextmanager
  4from json import JSONDecodeError
  5from typing import Mapping, Any, Optional, Iterator
  6
  7import httpx
  8
  9from fauna.errors import ClientError, NetworkError
 10from fauna.http.http_client import HTTPResponse, HTTPClient
 11
 12
 13class HTTPXResponse(HTTPResponse):
 14
 15  def __init__(self, response: httpx.Response):
 16    self._r = response
 17
 18  def headers(self) -> Mapping[str, str]:
 19    h = {}
 20    for (k, v) in self._r.headers.items():
 21      h[k] = v
 22    return h
 23
 24  def json(self) -> Any:
 25    try:
 26      decoded = self._r.read().decode("utf-8")
 27      return json.loads(decoded)
 28    except (JSONDecodeError, UnicodeDecodeError) as e:
 29      raise ClientError(
 30          f"Unable to decode response from endpoint {self._r.request.url}. Check that your endpoint is valid."
 31      ) from e
 32
 33  def text(self) -> str:
 34    return str(self.read(), encoding='utf-8')
 35
 36  def status_code(self) -> int:
 37    return self._r.status_code
 38
 39  def read(self) -> bytes:
 40    return self._r.read()
 41
 42  def iter_bytes(self, size: Optional[int] = None) -> Iterator[bytes]:
 43    return self._r.iter_bytes(size)
 44
 45  def close(self) -> None:
 46    try:
 47      self._r.close()
 48    except Exception as e:
 49      raise ClientError("Error closing response") from e
 50
 51
 52class HTTPXClient(HTTPClient):
 53
 54  def __init__(self,
 55               client: httpx.Client,
 56               logger: logging.Logger = logging.getLogger("fauna")):
 57    super(HTTPXClient, self).__init__()
 58    self._c = client
 59    self._logger = logger
 60
 61  def request(
 62      self,
 63      method: str,
 64      url: str,
 65      headers: Mapping[str, str],
 66      data: Mapping[str, Any],
 67  ) -> HTTPResponse:
 68
 69    try:
 70      request = self._c.build_request(
 71          method,
 72          url,
 73          json=data,
 74          headers=headers,
 75      )
 76
 77      if self._logger.isEnabledFor(logging.DEBUG):
 78        headers_to_log = request.headers.copy()
 79        headers_to_log.pop("Authorization")
 80        self._logger.debug(
 81            f"query.request method={request.method} url={request.url} headers={headers_to_log} data={data}"
 82        )
 83
 84    except httpx.InvalidURL as e:
 85      raise ClientError("Invalid URL Format") from e
 86
 87    try:
 88      response = self._c.send(
 89          request,
 90          stream=False,
 91      )
 92
 93      if self._logger.isEnabledFor(logging.DEBUG):
 94        self._logger.debug(
 95            f"query.response status_code={response.status_code} headers={response.headers} data={response.text}"
 96        )
 97
 98      return HTTPXResponse(response)
 99    except (httpx.HTTPError, httpx.InvalidURL) as e:
100      raise NetworkError("Exception re-raised from HTTP request") from e
101
102  @contextmanager
103  def stream(
104      self,
105      url: str,
106      headers: Mapping[str, str],
107      data: Mapping[str, Any],
108  ) -> Iterator[Any]:
109    request = self._c.build_request(
110        method="POST",
111        url=url,
112        headers=headers,
113        json=data,
114    )
115
116    if self._logger.isEnabledFor(logging.DEBUG):
117      headers_to_log = request.headers.copy()
118      headers_to_log.pop("Authorization")
119      self._logger.debug(
120          f"stream.request method={request.method} url={request.url} headers={headers_to_log} data={data}"
121      )
122
123    response = self._c.send(
124        request=request,
125        stream=True,
126    )
127
128    try:
129      yield self._transform(response)
130    finally:
131      response.close()
132
133  def _transform(self, response):
134    try:
135      for line in response.iter_lines():
136        loaded = json.loads(line)
137        if self._logger.isEnabledFor(logging.DEBUG):
138          self._logger.debug(f"stream.data data={loaded}")
139        yield loaded
140    except httpx.ReadTimeout as e:
141      raise NetworkError("Stream timeout") from e
142    except (httpx.HTTPError, httpx.InvalidURL) as e:
143      raise NetworkError("Exception re-raised from HTTP request") from e
144
145  def close(self):
146    self._c.close()
class HTTPXResponse(fauna.http.http_client.HTTPResponse):
14class HTTPXResponse(HTTPResponse):
15
16  def __init__(self, response: httpx.Response):
17    self._r = response
18
19  def headers(self) -> Mapping[str, str]:
20    h = {}
21    for (k, v) in self._r.headers.items():
22      h[k] = v
23    return h
24
25  def json(self) -> Any:
26    try:
27      decoded = self._r.read().decode("utf-8")
28      return json.loads(decoded)
29    except (JSONDecodeError, UnicodeDecodeError) as e:
30      raise ClientError(
31          f"Unable to decode response from endpoint {self._r.request.url}. Check that your endpoint is valid."
32      ) from e
33
34  def text(self) -> str:
35    return str(self.read(), encoding='utf-8')
36
37  def status_code(self) -> int:
38    return self._r.status_code
39
40  def read(self) -> bytes:
41    return self._r.read()
42
43  def iter_bytes(self, size: Optional[int] = None) -> Iterator[bytes]:
44    return self._r.iter_bytes(size)
45
46  def close(self) -> None:
47    try:
48      self._r.close()
49    except Exception as e:
50      raise ClientError("Error closing response") from e

Helper class that provides a standard way to create an ABC using inheritance.

HTTPXResponse(response: httpx.Response)
16  def __init__(self, response: httpx.Response):
17    self._r = response
def headers(self) -> Mapping[str, str]:
19  def headers(self) -> Mapping[str, str]:
20    h = {}
21    for (k, v) in self._r.headers.items():
22      h[k] = v
23    return h
def json(self) -> Any:
25  def json(self) -> Any:
26    try:
27      decoded = self._r.read().decode("utf-8")
28      return json.loads(decoded)
29    except (JSONDecodeError, UnicodeDecodeError) as e:
30      raise ClientError(
31          f"Unable to decode response from endpoint {self._r.request.url}. Check that your endpoint is valid."
32      ) from e
def text(self) -> str:
34  def text(self) -> str:
35    return str(self.read(), encoding='utf-8')
def status_code(self) -> int:
37  def status_code(self) -> int:
38    return self._r.status_code
def read(self) -> bytes:
40  def read(self) -> bytes:
41    return self._r.read()
def iter_bytes(self, size: Optional[int] = None) -> Iterator[bytes]:
43  def iter_bytes(self, size: Optional[int] = None) -> Iterator[bytes]:
44    return self._r.iter_bytes(size)
def close(self) -> None:
46  def close(self) -> None:
47    try:
48      self._r.close()
49    except Exception as e:
50      raise ClientError("Error closing response") from e
class HTTPXClient(fauna.http.http_client.HTTPClient):
 53class HTTPXClient(HTTPClient):
 54
 55  def __init__(self,
 56               client: httpx.Client,
 57               logger: logging.Logger = logging.getLogger("fauna")):
 58    super(HTTPXClient, self).__init__()
 59    self._c = client
 60    self._logger = logger
 61
 62  def request(
 63      self,
 64      method: str,
 65      url: str,
 66      headers: Mapping[str, str],
 67      data: Mapping[str, Any],
 68  ) -> HTTPResponse:
 69
 70    try:
 71      request = self._c.build_request(
 72          method,
 73          url,
 74          json=data,
 75          headers=headers,
 76      )
 77
 78      if self._logger.isEnabledFor(logging.DEBUG):
 79        headers_to_log = request.headers.copy()
 80        headers_to_log.pop("Authorization")
 81        self._logger.debug(
 82            f"query.request method={request.method} url={request.url} headers={headers_to_log} data={data}"
 83        )
 84
 85    except httpx.InvalidURL as e:
 86      raise ClientError("Invalid URL Format") from e
 87
 88    try:
 89      response = self._c.send(
 90          request,
 91          stream=False,
 92      )
 93
 94      if self._logger.isEnabledFor(logging.DEBUG):
 95        self._logger.debug(
 96            f"query.response status_code={response.status_code} headers={response.headers} data={response.text}"
 97        )
 98
 99      return HTTPXResponse(response)
100    except (httpx.HTTPError, httpx.InvalidURL) as e:
101      raise NetworkError("Exception re-raised from HTTP request") from e
102
103  @contextmanager
104  def stream(
105      self,
106      url: str,
107      headers: Mapping[str, str],
108      data: Mapping[str, Any],
109  ) -> Iterator[Any]:
110    request = self._c.build_request(
111        method="POST",
112        url=url,
113        headers=headers,
114        json=data,
115    )
116
117    if self._logger.isEnabledFor(logging.DEBUG):
118      headers_to_log = request.headers.copy()
119      headers_to_log.pop("Authorization")
120      self._logger.debug(
121          f"stream.request method={request.method} url={request.url} headers={headers_to_log} data={data}"
122      )
123
124    response = self._c.send(
125        request=request,
126        stream=True,
127    )
128
129    try:
130      yield self._transform(response)
131    finally:
132      response.close()
133
134  def _transform(self, response):
135    try:
136      for line in response.iter_lines():
137        loaded = json.loads(line)
138        if self._logger.isEnabledFor(logging.DEBUG):
139          self._logger.debug(f"stream.data data={loaded}")
140        yield loaded
141    except httpx.ReadTimeout as e:
142      raise NetworkError("Stream timeout") from e
143    except (httpx.HTTPError, httpx.InvalidURL) as e:
144      raise NetworkError("Exception re-raised from HTTP request") from e
145
146  def close(self):
147    self._c.close()

Helper class that provides a standard way to create an ABC using inheritance.

HTTPXClient( client: httpx.Client, logger: logging.Logger = <Logger fauna (WARNING)>)
55  def __init__(self,
56               client: httpx.Client,
57               logger: logging.Logger = logging.getLogger("fauna")):
58    super(HTTPXClient, self).__init__()
59    self._c = client
60    self._logger = logger
def request( self, method: str, url: str, headers: Mapping[str, str], data: Mapping[str, Any]) -> fauna.http.http_client.HTTPResponse:
 62  def request(
 63      self,
 64      method: str,
 65      url: str,
 66      headers: Mapping[str, str],
 67      data: Mapping[str, Any],
 68  ) -> HTTPResponse:
 69
 70    try:
 71      request = self._c.build_request(
 72          method,
 73          url,
 74          json=data,
 75          headers=headers,
 76      )
 77
 78      if self._logger.isEnabledFor(logging.DEBUG):
 79        headers_to_log = request.headers.copy()
 80        headers_to_log.pop("Authorization")
 81        self._logger.debug(
 82            f"query.request method={request.method} url={request.url} headers={headers_to_log} data={data}"
 83        )
 84
 85    except httpx.InvalidURL as e:
 86      raise ClientError("Invalid URL Format") from e
 87
 88    try:
 89      response = self._c.send(
 90          request,
 91          stream=False,
 92      )
 93
 94      if self._logger.isEnabledFor(logging.DEBUG):
 95        self._logger.debug(
 96            f"query.response status_code={response.status_code} headers={response.headers} data={response.text}"
 97        )
 98
 99      return HTTPXResponse(response)
100    except (httpx.HTTPError, httpx.InvalidURL) as e:
101      raise NetworkError("Exception re-raised from HTTP request") from e
@contextmanager
def stream( self, url: str, headers: Mapping[str, str], data: Mapping[str, Any]) -> Iterator[Any]:
103  @contextmanager
104  def stream(
105      self,
106      url: str,
107      headers: Mapping[str, str],
108      data: Mapping[str, Any],
109  ) -> Iterator[Any]:
110    request = self._c.build_request(
111        method="POST",
112        url=url,
113        headers=headers,
114        json=data,
115    )
116
117    if self._logger.isEnabledFor(logging.DEBUG):
118      headers_to_log = request.headers.copy()
119      headers_to_log.pop("Authorization")
120      self._logger.debug(
121          f"stream.request method={request.method} url={request.url} headers={headers_to_log} data={data}"
122      )
123
124    response = self._c.send(
125        request=request,
126        stream=True,
127    )
128
129    try:
130      yield self._transform(response)
131    finally:
132      response.close()
def close(self):
146  def close(self):
147    self._c.close()