fauna.http.http_client

 1import abc
 2import contextlib
 3from dataclasses import dataclass
 4from typing import Iterator, Mapping, Any
 5
 6
 7@dataclass(frozen=True)
 8class ErrorResponse:
 9  status_code: int
10  error_code: str
11  error_message: str
12  summary: str
13
14
15class HTTPResponse(abc.ABC):
16
17  @abc.abstractmethod
18  def headers(self) -> Mapping[str, str]:
19    pass
20
21  @abc.abstractmethod
22  def status_code(self) -> int:
23    pass
24
25  @abc.abstractmethod
26  def json(self) -> Any:
27    pass
28
29  @abc.abstractmethod
30  def text(self) -> str:
31    pass
32
33  @abc.abstractmethod
34  def read(self) -> bytes:
35    pass
36
37  @abc.abstractmethod
38  def iter_bytes(self) -> Iterator[bytes]:
39    pass
40
41  @abc.abstractmethod
42  def close(self):
43    pass
44
45  def __enter__(self):
46    return self
47
48  def __exit__(self, exc_type, exc_val, exc_tb):
49    self.close()
50
51
52class HTTPClient(abc.ABC):
53
54  @abc.abstractmethod
55  def request(
56      self,
57      method: str,
58      url: str,
59      headers: Mapping[str, str],
60      data: Mapping[str, Any],
61  ) -> HTTPResponse:
62    pass
63
64  @abc.abstractmethod
65  @contextlib.contextmanager
66  def stream(
67      self,
68      url: str,
69      headers: Mapping[str, str],
70      data: Mapping[str, Any],
71  ) -> Iterator[Any]:
72    pass
73
74  @abc.abstractmethod
75  def close(self):
76    pass
@dataclass(frozen=True)
class ErrorResponse:
 8@dataclass(frozen=True)
 9class ErrorResponse:
10  status_code: int
11  error_code: str
12  error_message: str
13  summary: str
ErrorResponse(status_code: int, error_code: str, error_message: str, summary: str)
status_code: int
error_code: str
error_message: str
summary: str
class HTTPResponse(abc.ABC):
16class HTTPResponse(abc.ABC):
17
18  @abc.abstractmethod
19  def headers(self) -> Mapping[str, str]:
20    pass
21
22  @abc.abstractmethod
23  def status_code(self) -> int:
24    pass
25
26  @abc.abstractmethod
27  def json(self) -> Any:
28    pass
29
30  @abc.abstractmethod
31  def text(self) -> str:
32    pass
33
34  @abc.abstractmethod
35  def read(self) -> bytes:
36    pass
37
38  @abc.abstractmethod
39  def iter_bytes(self) -> Iterator[bytes]:
40    pass
41
42  @abc.abstractmethod
43  def close(self):
44    pass
45
46  def __enter__(self):
47    return self
48
49  def __exit__(self, exc_type, exc_val, exc_tb):
50    self.close()

Helper class that provides a standard way to create an ABC using inheritance.

@abc.abstractmethod
def headers(self) -> Mapping[str, str]:
18  @abc.abstractmethod
19  def headers(self) -> Mapping[str, str]:
20    pass
@abc.abstractmethod
def status_code(self) -> int:
22  @abc.abstractmethod
23  def status_code(self) -> int:
24    pass
@abc.abstractmethod
def json(self) -> Any:
26  @abc.abstractmethod
27  def json(self) -> Any:
28    pass
@abc.abstractmethod
def text(self) -> str:
30  @abc.abstractmethod
31  def text(self) -> str:
32    pass
@abc.abstractmethod
def read(self) -> bytes:
34  @abc.abstractmethod
35  def read(self) -> bytes:
36    pass
@abc.abstractmethod
def iter_bytes(self) -> Iterator[bytes]:
38  @abc.abstractmethod
39  def iter_bytes(self) -> Iterator[bytes]:
40    pass
@abc.abstractmethod
def close(self):
42  @abc.abstractmethod
43  def close(self):
44    pass
class HTTPClient(abc.ABC):
53class HTTPClient(abc.ABC):
54
55  @abc.abstractmethod
56  def request(
57      self,
58      method: str,
59      url: str,
60      headers: Mapping[str, str],
61      data: Mapping[str, Any],
62  ) -> HTTPResponse:
63    pass
64
65  @abc.abstractmethod
66  @contextlib.contextmanager
67  def stream(
68      self,
69      url: str,
70      headers: Mapping[str, str],
71      data: Mapping[str, Any],
72  ) -> Iterator[Any]:
73    pass
74
75  @abc.abstractmethod
76  def close(self):
77    pass

Helper class that provides a standard way to create an ABC using inheritance.

@abc.abstractmethod
def request( self, method: str, url: str, headers: Mapping[str, str], data: Mapping[str, Any]) -> HTTPResponse:
55  @abc.abstractmethod
56  def request(
57      self,
58      method: str,
59      url: str,
60      headers: Mapping[str, str],
61      data: Mapping[str, Any],
62  ) -> HTTPResponse:
63    pass
@abc.abstractmethod
@contextlib.contextmanager
def stream( self, url: str, headers: Mapping[str, str], data: Mapping[str, Any]) -> Iterator[Any]:
65  @abc.abstractmethod
66  @contextlib.contextmanager
67  def stream(
68      self,
69      url: str,
70      headers: Mapping[str, str],
71      data: Mapping[str, Any],
72  ) -> Iterator[Any]:
73    pass
@abc.abstractmethod
def close(self):
75  @abc.abstractmethod
76  def close(self):
77    pass