
  1from dataclasses import dataclass
  2from datetime import timedelta
  3from typing import Any, Dict, Iterator, Mapping, Optional, Union
  5import fauna
  6from fauna.client.headers import _DriverEnvironment, _Header, _Auth, Header
  7from fauna.client.retryable import Retryable
  8from fauna.client.utils import _Environment, LastTxnTs
  9from fauna.encoding import FaunaEncoder, FaunaDecoder
 10from fauna.encoding import QuerySuccess, QueryTags, QueryStats
 11from fauna.errors import FaunaError, ClientError, ProtocolError, \
 12  RetryableFaunaException, NetworkError
 13from fauna.http.http_client import HTTPClient
 14from fauna.query import Query, Page, fql
 15from fauna.query.models import StreamToken
 17DefaultHttpConnectTimeout = timedelta(seconds=5)
 18DefaultHttpReadTimeout: Optional[timedelta] = None
 19DefaultHttpWriteTimeout = timedelta(seconds=5)
 20DefaultHttpPoolTimeout = timedelta(seconds=5)
 21DefaultIdleConnectionTimeout = timedelta(seconds=5)
 22DefaultQueryTimeout = timedelta(seconds=5)
 23DefaultClientBufferTimeout = timedelta(seconds=5)
 24DefaultMaxConnections = 20
 25DefaultMaxIdleConnections = 20
 29class QueryOptions:
 30  """
 31    A dataclass representing options available for a query.
 33    * linearized - If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
 34    * max_contention_retries - The max number of times to retry the query if contention is encountered.
 35    * query_timeout - Controls the maximum amount of time Fauna will execute your query before marking it failed.
 36    * query_tags - Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_
 37    * traceparent - A traceparent to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ Must match format: https://www.w3.org/TR/trace-context/#traceparent-header
 38    * typecheck - Enable or disable typechecking of the query before evaluation. If not set, the value configured on the Client will be used. If neither is set, Fauna will use the value of the "typechecked" flag on the database configuration.
 39    * additional_headers - Add/update HTTP request headers for the query. In general, this should not be necessary.
 40    """
 42  linearized: Optional[bool] = None
 43  max_contention_retries: Optional[int] = None
 44  query_timeout: Optional[timedelta] = DefaultQueryTimeout
 45  query_tags: Optional[Mapping[str, str]] = None
 46  traceparent: Optional[str] = None
 47  typecheck: Optional[bool] = None
 48  additional_headers: Optional[Dict[str, str]] = None
 52class StreamOptions:
 53  """
 54    A dataclass representing options available for a stream.
 56    * max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown.
 57    * max_backoff - The maximum backoff in seconds for an individual retry.
 58    * start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after
 59    the timestamp.
 60    * status_events - Indicates if stream should include status events. Status events are periodic events that
 61    update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics
 62    about the cost of maintaining the stream other than the cost of the received events.
 63    """
 65  max_attempts: Optional[int] = None
 66  max_backoff: Optional[int] = None
 67  start_ts: Optional[int] = None
 68  status_events: bool = False
 71class Client:
 73  def __init__(
 74      self,
 75      endpoint: Optional[str] = None,
 76      secret: Optional[str] = None,
 77      http_client: Optional[HTTPClient] = None,
 78      query_tags: Optional[Mapping[str, str]] = None,
 79      linearized: Optional[bool] = None,
 80      max_contention_retries: Optional[int] = None,
 81      typecheck: Optional[bool] = None,
 82      additional_headers: Optional[Dict[str, str]] = None,
 83      query_timeout: Optional[timedelta] = DefaultQueryTimeout,
 84      client_buffer_timeout: Optional[timedelta] = DefaultClientBufferTimeout,
 85      http_read_timeout: Optional[timedelta] = DefaultHttpReadTimeout,
 86      http_write_timeout: Optional[timedelta] = DefaultHttpWriteTimeout,
 87      http_connect_timeout: Optional[timedelta] = DefaultHttpConnectTimeout,
 88      http_pool_timeout: Optional[timedelta] = DefaultHttpPoolTimeout,
 89      http_idle_timeout: Optional[timedelta] = DefaultIdleConnectionTimeout,
 90      max_attempts: int = 3,
 91      max_backoff: int = 20,
 92  ):
 93    """Initializes a Client.
 95        :param endpoint: The Fauna Endpoint to use. Defaults to https://db.fauna.com, or the `FAUNA_ENDPOINT` env variable.
 96        :param secret: The Fauna Secret to use. Defaults to empty, or the `FAUNA_SECRET` env variable.
 97        :param http_client: An :class:`HTTPClient` implementation. Defaults to a global :class:`HTTPXClient`.
 98        :param query_tags: Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_
 99        :param linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
100        :param max_contention_retries: The max number of times to retry the query if contention is encountered.
101        :param typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration.
102        :param additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary.
103        :param query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is :py:data:`DefaultQueryTimeout`.
104        :param client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is :py:data:`DefaultClientBufferTimeout`, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error.
105        :param http_read_timeout: Set HTTP Read timeout, default is :py:data:`DefaultHttpReadTimeout`.
106        :param http_write_timeout: Set HTTP Write timeout, default is :py:data:`DefaultHttpWriteTimeout`.
107        :param http_connect_timeout: Set HTTP Connect timeout, default is :py:data:`DefaultHttpConnectTimeout`.
108        :param http_pool_timeout: Set HTTP Pool timeout, default is :py:data:`DefaultHttpPoolTimeout`.
109        :param http_idle_timeout: Set HTTP Idle timeout, default is :py:data:`DefaultIdleConnectionTimeout`.
110        :param max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3.
111        :param max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20.
112        """
114    self._set_endpoint(endpoint)
115    self._max_attempts = max_attempts
116    self._max_backoff = max_backoff
118    if secret is None:
119      self._auth = _Auth(_Environment.EnvFaunaSecret())
120    else:
121      self._auth = _Auth(secret)
123    self._last_txn_ts = LastTxnTs()
125    self._query_tags = {}
126    if query_tags is not None:
127      self._query_tags.update(query_tags)
129    if query_timeout is not None:
130      self._query_timeout_ms = int(query_timeout.total_seconds() * 1000)
131    else:
132      self._query_timeout_ms = None
134    self._headers: Dict[str, str] = {
135        _Header.AcceptEncoding: "gzip",
136        _Header.ContentType: "application/json;charset=utf-8",
137        _Header.Driver: "python",
138        _Header.DriverEnv: str(_DriverEnvironment()),
139    }
141    if typecheck is not None:
142      self._headers[Header.Typecheck] = str(typecheck).lower()
144    if linearized is not None:
145      self._headers[Header.Linearized] = str(linearized).lower()
147    if max_contention_retries is not None and max_contention_retries > 0:
148      self._headers[Header.MaxContentionRetries] = \
149          f"{max_contention_retries}"
151    if additional_headers is not None:
152      self._headers = {
153          **self._headers,
154          **additional_headers,
155      }
157    self._session: HTTPClient
159    if http_client is not None:
160      self._session = http_client
161    else:
162      if fauna.global_http_client is None:
163        timeout_s: Optional[float] = None
164        if query_timeout is not None and client_buffer_timeout is not None:
165          timeout_s = (query_timeout + client_buffer_timeout).total_seconds()
166        read_timeout_s: Optional[float] = None
167        if http_read_timeout is not None:
168          read_timeout_s = http_read_timeout.total_seconds()
170        write_timeout_s: Optional[float] = http_write_timeout.total_seconds(
171        ) if http_write_timeout is not None else None
172        connect_timeout_s: Optional[float] = http_connect_timeout.total_seconds(
173        ) if http_connect_timeout is not None else None
174        pool_timeout_s: Optional[float] = http_pool_timeout.total_seconds(
175        ) if http_pool_timeout is not None else None
176        idle_timeout_s: Optional[float] = http_idle_timeout.total_seconds(
177        ) if http_idle_timeout is not None else None
179        import httpx
180        from fauna.http.httpx_client import HTTPXClient
181        c = HTTPXClient(
182            httpx.Client(
183                http1=True,
184                http2=False,
185                timeout=httpx.Timeout(
186                    timeout=timeout_s,
187                    connect=connect_timeout_s,
188                    read=read_timeout_s,
189                    write=write_timeout_s,
190                    pool=pool_timeout_s,
191                ),
192                limits=httpx.Limits(
193                    max_connections=DefaultMaxConnections,
194                    max_keepalive_connections=DefaultMaxIdleConnections,
195                    keepalive_expiry=idle_timeout_s,
196                ),
197            ))
198        fauna.global_http_client = c
200      self._session = fauna.global_http_client
202  def close(self):
203    self._session.close()
204    if self._session == fauna.global_http_client:
205      fauna.global_http_client = None
207  def set_last_txn_ts(self, txn_ts: int):
208    """
209        Set the last timestamp seen by this client.
210        This has no effect if earlier than stored timestamp.
212        .. WARNING:: This should be used only when coordinating timestamps across
213        multiple clients. Moving the timestamp arbitrarily forward into
214        the future will cause transactions to stall.
216        :param txn_ts: the new transaction time.
217        """
218    self._last_txn_ts.update_txn_time(txn_ts)
220  def get_last_txn_ts(self) -> Optional[int]:
221    """
222        Get the last timestamp seen by this client.
223        :return:
224        """
225    return self._last_txn_ts.time
227  def get_query_timeout(self) -> Optional[timedelta]:
228    """
229        Get the query timeout for all queries.
230        """
231    if self._query_timeout_ms is not None:
232      return timedelta(milliseconds=self._query_timeout_ms)
233    else:
234      return None
236  def paginate(
237      self,
238      fql: Query,
239      opts: Optional[QueryOptions] = None,
240  ) -> "QueryIterator":
241    """
242        Run a query on Fauna and returning an iterator of results. If the query
243        returns a Page, the iterator will fetch additional Pages until the
244        after token is null. Each call for a page will be retried with exponential
245        backoff up to the max_attempts set in the client's retry policy in the
246        event of a 429 or 502.
248        :param fql: A Query
249        :param opts: (Optional) Query Options
251        :return: a :class:`QueryResponse`
253        :raises NetworkError: HTTP Request failed in transit
254        :raises ProtocolError: HTTP error not from Fauna
255        :raises ServiceError: Fauna returned an error
256        :raises ValueError: Encoding and decoding errors
257        :raises TypeError: Invalid param types
258        """
260    if not isinstance(fql, Query):
261      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
262                 f"Query by calling fauna.fql()"
263      raise TypeError(err_msg)
265    return QueryIterator(self, fql, opts)
267  def query(
268      self,
269      fql: Query,
270      opts: Optional[QueryOptions] = None,
271  ) -> QuerySuccess:
272    """
273        Run a query on Fauna. A query will be retried max_attempt times with exponential backoff
274        up to the max_backoff in the event of a 429.
276        :param fql: A Query
277        :param opts: (Optional) Query Options
279        :return: a :class:`QueryResponse`
281        :raises NetworkError: HTTP Request failed in transit
282        :raises ProtocolError: HTTP error not from Fauna
283        :raises ServiceError: Fauna returned an error
284        :raises ValueError: Encoding and decoding errors
285        :raises TypeError: Invalid param types
286        """
288    if not isinstance(fql, Query):
289      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
290                 f"Query by calling fauna.fql()"
291      raise TypeError(err_msg)
293    try:
294      encoded_query: Mapping[str, Any] = FaunaEncoder.encode(fql)
295    except Exception as e:
296      raise ClientError("Failed to encode Query") from e
298    retryable = Retryable[QuerySuccess](
299        self._max_attempts,
300        self._max_backoff,
301        self._query,
302        "/query/1",
303        fql=encoded_query,
304        opts=opts,
305    )
307    r = retryable.run()
308    r.response.stats.attempts = r.attempts
309    return r.response
311  def _query(
312      self,
313      path: str,
314      fql: Mapping[str, Any],
315      arguments: Optional[Mapping[str, Any]] = None,
316      opts: Optional[QueryOptions] = None,
317  ) -> QuerySuccess:
319    headers = self._headers.copy()
320    headers[_Header.Format] = "tagged"
321    headers[_Header.Authorization] = self._auth.bearer()
323    if self._query_timeout_ms is not None:
324      headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms)
326    headers.update(self._last_txn_ts.request_header)
328    query_tags = {}
329    if self._query_tags is not None:
330      query_tags.update(self._query_tags)
332    if opts is not None:
333      if opts.linearized is not None:
334        headers[Header.Linearized] = str(opts.linearized).lower()
335      if opts.max_contention_retries is not None:
336        headers[Header.MaxContentionRetries] = \
337            f"{opts.max_contention_retries}"
338      if opts.traceparent is not None:
339        headers[Header.Traceparent] = opts.traceparent
340      if opts.query_timeout is not None:
341        timeout_ms = f"{int(opts.query_timeout.total_seconds() * 1000)}"
342        headers[Header.QueryTimeoutMs] = timeout_ms
343      if opts.query_tags is not None:
344        query_tags.update(opts.query_tags)
345      if opts.typecheck is not None:
346        headers[Header.Typecheck] = str(opts.typecheck).lower()
347      if opts.additional_headers is not None:
348        headers.update(opts.additional_headers)
350    if len(query_tags) > 0:
351      headers[Header.Tags] = QueryTags.encode(query_tags)
353    data: dict[str, Any] = {
354        "query": fql,
355        "arguments": arguments or {},
356    }
358    with self._session.request(
359        method="POST",
360        url=self._endpoint + path,
361        headers=headers,
362        data=data,
363    ) as response:
364      status_code = response.status_code()
365      response_json = response.json()
366      headers = response.headers()
368      self._check_protocol(response_json, status_code)
370      dec: Any = FaunaDecoder.decode(response_json)
372      if status_code > 399:
373        FaunaError.parse_error_and_throw(dec, status_code)
375      if "txn_ts" in dec:
376        self.set_last_txn_ts(int(response_json["txn_ts"]))
378      stats = QueryStats(dec["stats"]) if "stats" in dec else None
379      summary = dec["summary"] if "summary" in dec else None
380      query_tags = QueryTags.decode(
381          dec["query_tags"]) if "query_tags" in dec else None
382      txn_ts = dec["txn_ts"] if "txn_ts" in dec else None
383      schema_version = dec["schema_version"] if "schema_version" in dec else None
384      traceparent = headers.get("traceparent", None)
385      static_type = dec["static_type"] if "static_type" in dec else None
387      return QuerySuccess(
388          data=dec["data"],
389          query_tags=query_tags,
390          static_type=static_type,
391          stats=stats,
392          summary=summary,
393          traceparent=traceparent,
394          txn_ts=txn_ts,
395          schema_version=schema_version,
396      )
398  def stream(
399      self,
400      fql: Union[StreamToken, Query],
401      opts: StreamOptions = StreamOptions()
402  ) -> "StreamIterator":
403    """
404        Opens a Stream in Fauna and returns an iterator that consume Fauna events.
406        :param fql: A Query that returns a StreamToken or a StreamToken.
407        :param opts: (Optional) Stream Options.
409        :return: a :class:`StreamIterator`
411        :raises NetworkError: HTTP Request failed in transit
412        :raises ProtocolError: HTTP error not from Fauna
413        :raises ServiceError: Fauna returned an error
414        :raises ValueError: Encoding and decoding errors
415        :raises TypeError: Invalid param types
416        """
418    if isinstance(fql, Query):
419      token = self.query(fql).data
420    else:
421      token = fql
423    if not isinstance(token, StreamToken):
424      err_msg = f"'fql' must be a StreamToken, or a Query that returns a StreamToken but was a {type(token)}."
425      raise TypeError(err_msg)
427    headers = self._headers.copy()
428    headers[_Header.Format] = "tagged"
429    headers[_Header.Authorization] = self._auth.bearer()
431    return StreamIterator(self._session, headers, self._endpoint + "/stream/1",
432                          self._max_attempts, self._max_backoff, opts, token)
434  def _check_protocol(self, response_json: Any, status_code):
435    # TODO: Logic to validate wire protocol belongs elsewhere.
436    should_raise = False
438    # check for QuerySuccess
439    if status_code <= 399 and "data" not in response_json:
440      should_raise = True
442    # check for QueryFailure
443    if status_code > 399:
444      if "error" not in response_json:
445        should_raise = True
446      else:
447        e = response_json["error"]
448        if "code" not in e or "message" not in e:
449          should_raise = True
451    if should_raise:
452      raise ProtocolError(
453          status_code,
454          f"Response is in an unknown format: \n{response_json}",
455      )
457  def _set_endpoint(self, endpoint):
458    if endpoint is None:
459      endpoint = _Environment.EnvFaunaEndpoint()
461    if endpoint[-1:] == "/":
462      endpoint = endpoint[:-1]
464    self._endpoint = endpoint
467class StreamIterator:
468  """A class that mixes a ContextManager and an Iterator so we can detected retryable errors."""
470  def __init__(self, http_client: HTTPClient, headers: Dict[str, str],
471               endpoint: str, max_attempts: int, max_backoff: int,
472               opts: StreamOptions, token: StreamToken):
473    self._http_client = http_client
474    self._headers = headers
475    self._endpoint = endpoint
476    self._max_attempts = max_attempts
477    self._max_backoff = max_backoff
478    self._opts = opts
479    self._token = token
480    self._stream = None
481    self.last_ts = None
482    self._ctx = self._create_stream()
484  def __enter__(self):
485    return self
487  def __exit__(self, exc_type, exc_value, exc_traceback):
488    if self._stream is not None:
489      self._stream.close()
491    self._ctx.__exit__(exc_type, exc_value, exc_traceback)
492    return False
494  def __iter__(self):
495    return self
497  def __next__(self):
498    if self._opts.max_attempts is not None:
499      max_attempts = self._opts.max_attempts
500    else:
501      max_attempts = self._max_attempts
503    if self._opts.max_backoff is not None:
504      max_backoff = self._opts.max_backoff
505    else:
506      max_backoff = self._max_backoff
508    retryable = Retryable[Any](max_attempts, max_backoff, self._next_element)
509    return retryable.run().response
511  def _next_element(self):
512    try:
513      if self._stream is None:
514        try:
515          self._stream = self._ctx.__enter__()
516        except Exception:
517          self._retry_stream()
519      if self._stream is not None:
520        event: Any = FaunaDecoder.decode(next(self._stream))
522        if event["type"] == "error":
523          FaunaError.parse_error_and_throw(event, 400)
525        self.last_ts = event["txn_ts"]
527        if event["type"] == "start":
528          return self._next_element()
530        if not self._opts.status_events and event["type"] == "status":
531          return self._next_element()
533        return event
535      raise StopIteration
536    except NetworkError:
537      self._retry_stream()
539  def _retry_stream(self):
540    if self._stream is not None:
541      self._stream.close()
543    self._stream = None
545    try:
546      self._ctx = self._create_stream()
547    except Exception:
548      pass
549    raise RetryableFaunaException
551  def _create_stream(self):
552    data: Dict[str, Any] = {"token": self._token.token}
553    if self.last_ts is not None:
554      data["start_ts"] = self.last_ts
555    elif self._opts.start_ts is not None:
556      data["start_ts"] = self._opts.start_ts
558    return self._http_client.stream(
559        url=self._endpoint, headers=self._headers, data=data)
561  def close(self):
562    if self._stream is not None:
563      self._stream.close()
566class QueryIterator:
567  """A class to provider an iterator on top of Fauna queries."""
569  def __init__(self,
570               client: Client,
571               fql: Query,
572               opts: Optional[QueryOptions] = None):
573    """Initializes the QueryIterator
575        :param fql: A Query
576        :param opts: (Optional) Query Options
578        :raises TypeError: Invalid param types
579        """
580    if not isinstance(client, Client):
581      err_msg = f"'client' must be a Client but was a {type(client)}. You can build a " \
582                  f"Client by calling fauna.client.Client()"
583      raise TypeError(err_msg)
585    if not isinstance(fql, Query):
586      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
587                 f"Query by calling fauna.fql()"
588      raise TypeError(err_msg)
590    self.client = client
591    self.fql = fql
592    self.opts = opts
594  def __iter__(self) -> Iterator:
595    return self.iter()
597  def iter(self) -> Iterator:
598    """
599        A generator function that immediately fetches and yields the results of
600        the stored query. Yields additional pages on subsequent iterations if
601        they exist
602        """
604    cursor = None
605    initial_response = self.client.query(self.fql, self.opts)
607    if isinstance(initial_response.data, Page):
608      cursor = initial_response.data.after
609      yield initial_response.data.data
611      while cursor is not None:
612        next_response = self.client.query(
613            fql("Set.paginate(${after})", after=cursor), self.opts)
614        # TODO: `Set.paginate` does not yet return a `@set` tagged value
615        #       so we will get back a plain object that might not have
616        #       an after property.
617        cursor = next_response.data.get("after")
618        yield next_response.data.get("data")
620    else:
621      yield [initial_response.data]
623  def flatten(self) -> Iterator:
624    """
625        A generator function that immediately fetches and yields the results of
626        the stored query. Yields each item individually, rather than a whole
627        Page at a time. Fetches additional pages as required if they exist.
628        """
630    for page in self.iter():
631      for item in page:
632        yield item
