fauna.client.client

  1from dataclasses import dataclass
  2from datetime import timedelta
  3from typing import Any, Dict, Iterator, Mapping, Optional, Union
  4
  5import fauna
  6from fauna.client.headers import _DriverEnvironment, _Header, _Auth, Header
  7from fauna.client.retryable import Retryable
  8from fauna.client.utils import _Environment, LastTxnTs
  9from fauna.encoding import FaunaEncoder, FaunaDecoder
 10from fauna.encoding import QuerySuccess, QueryTags, QueryStats
 11from fauna.errors import FaunaError, ClientError, ProtocolError, \
 12  RetryableFaunaException, NetworkError
 13from fauna.http.http_client import HTTPClient
 14from fauna.query import Query, Page, fql
 15from fauna.query.models import StreamToken
 16
 17DefaultHttpConnectTimeout = timedelta(seconds=5)
 18DefaultHttpReadTimeout: Optional[timedelta] = None
 19DefaultHttpWriteTimeout = timedelta(seconds=5)
 20DefaultHttpPoolTimeout = timedelta(seconds=5)
 21DefaultIdleConnectionTimeout = timedelta(seconds=5)
 22DefaultQueryTimeout = timedelta(seconds=5)
 23DefaultClientBufferTimeout = timedelta(seconds=5)
 24DefaultMaxConnections = 20
 25DefaultMaxIdleConnections = 20
 26
 27
 28@dataclass
 29class QueryOptions:
 30  """
 31    A dataclass representing options available for a query.
 32
 33    * linearized - If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
 34    * max_contention_retries - The max number of times to retry the query if contention is encountered.
 35    * query_timeout - Controls the maximum amount of time Fauna will execute your query before marking it failed.
 36    * query_tags - Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_
 37    * traceparent - A traceparent to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ Must match format: https://www.w3.org/TR/trace-context/#traceparent-header
 38    * typecheck - Enable or disable typechecking of the query before evaluation. If not set, the value configured on the Client will be used. If neither is set, Fauna will use the value of the "typechecked" flag on the database configuration.
 39    * additional_headers - Add/update HTTP request headers for the query. In general, this should not be necessary.
 40    """
 41
 42  linearized: Optional[bool] = None
 43  max_contention_retries: Optional[int] = None
 44  query_timeout: Optional[timedelta] = DefaultQueryTimeout
 45  query_tags: Optional[Mapping[str, str]] = None
 46  traceparent: Optional[str] = None
 47  typecheck: Optional[bool] = None
 48  additional_headers: Optional[Dict[str, str]] = None
 49
 50
 51@dataclass
 52class StreamOptions:
 53  """
 54    A dataclass representing options available for a stream.
 55
 56    * max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown.
 57    * max_backoff - The maximum backoff in seconds for an individual retry.
 58    * start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after
 59    the timestamp.
 60    * status_events - Indicates if stream should include status events. Status events are periodic events that
 61    update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics
 62    about the cost of maintaining the stream other than the cost of the received events.
 63    """
 64
 65  max_attempts: Optional[int] = None
 66  max_backoff: Optional[int] = None
 67  start_ts: Optional[int] = None
 68  status_events: bool = False
 69
 70
 71class Client:
 72
 73  def __init__(
 74      self,
 75      endpoint: Optional[str] = None,
 76      secret: Optional[str] = None,
 77      http_client: Optional[HTTPClient] = None,
 78      query_tags: Optional[Mapping[str, str]] = None,
 79      linearized: Optional[bool] = None,
 80      max_contention_retries: Optional[int] = None,
 81      typecheck: Optional[bool] = None,
 82      additional_headers: Optional[Dict[str, str]] = None,
 83      query_timeout: Optional[timedelta] = DefaultQueryTimeout,
 84      client_buffer_timeout: Optional[timedelta] = DefaultClientBufferTimeout,
 85      http_read_timeout: Optional[timedelta] = DefaultHttpReadTimeout,
 86      http_write_timeout: Optional[timedelta] = DefaultHttpWriteTimeout,
 87      http_connect_timeout: Optional[timedelta] = DefaultHttpConnectTimeout,
 88      http_pool_timeout: Optional[timedelta] = DefaultHttpPoolTimeout,
 89      http_idle_timeout: Optional[timedelta] = DefaultIdleConnectionTimeout,
 90      max_attempts: int = 3,
 91      max_backoff: int = 20,
 92  ):
 93    """Initializes a Client.
 94
 95        :param endpoint: The Fauna Endpoint to use. Defaults to https://db.fauna.com, or the `FAUNA_ENDPOINT` env variable.
 96        :param secret: The Fauna Secret to use. Defaults to empty, or the `FAUNA_SECRET` env variable.
 97        :param http_client: An :class:`HTTPClient` implementation. Defaults to a global :class:`HTTPXClient`.
 98        :param query_tags: Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_
 99        :param linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
100        :param max_contention_retries: The max number of times to retry the query if contention is encountered.
101        :param typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration.
102        :param additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary.
103        :param query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is :py:data:`DefaultQueryTimeout`.
104        :param client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is :py:data:`DefaultClientBufferTimeout`, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error.
105        :param http_read_timeout: Set HTTP Read timeout, default is :py:data:`DefaultHttpReadTimeout`.
106        :param http_write_timeout: Set HTTP Write timeout, default is :py:data:`DefaultHttpWriteTimeout`.
107        :param http_connect_timeout: Set HTTP Connect timeout, default is :py:data:`DefaultHttpConnectTimeout`.
108        :param http_pool_timeout: Set HTTP Pool timeout, default is :py:data:`DefaultHttpPoolTimeout`.
109        :param http_idle_timeout: Set HTTP Idle timeout, default is :py:data:`DefaultIdleConnectionTimeout`.
110        :param max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3.
111        :param max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20.
112        """
113
114    self._set_endpoint(endpoint)
115    self._max_attempts = max_attempts
116    self._max_backoff = max_backoff
117
118    if secret is None:
119      self._auth = _Auth(_Environment.EnvFaunaSecret())
120    else:
121      self._auth = _Auth(secret)
122
123    self._last_txn_ts = LastTxnTs()
124
125    self._query_tags = {}
126    if query_tags is not None:
127      self._query_tags.update(query_tags)
128
129    if query_timeout is not None:
130      self._query_timeout_ms = int(query_timeout.total_seconds() * 1000)
131    else:
132      self._query_timeout_ms = None
133
134    self._headers: Dict[str, str] = {
135        _Header.AcceptEncoding: "gzip",
136        _Header.ContentType: "application/json;charset=utf-8",
137        _Header.Driver: "python",
138        _Header.DriverEnv: str(_DriverEnvironment()),
139    }
140
141    if typecheck is not None:
142      self._headers[Header.Typecheck] = str(typecheck).lower()
143
144    if linearized is not None:
145      self._headers[Header.Linearized] = str(linearized).lower()
146
147    if max_contention_retries is not None and max_contention_retries > 0:
148      self._headers[Header.MaxContentionRetries] = \
149          f"{max_contention_retries}"
150
151    if additional_headers is not None:
152      self._headers = {
153          **self._headers,
154          **additional_headers,
155      }
156
157    self._session: HTTPClient
158
159    if http_client is not None:
160      self._session = http_client
161    else:
162      if fauna.global_http_client is None:
163        timeout_s: Optional[float] = None
164        if query_timeout is not None and client_buffer_timeout is not None:
165          timeout_s = (query_timeout + client_buffer_timeout).total_seconds()
166        read_timeout_s: Optional[float] = None
167        if http_read_timeout is not None:
168          read_timeout_s = http_read_timeout.total_seconds()
169
170        write_timeout_s: Optional[float] = http_write_timeout.total_seconds(
171        ) if http_write_timeout is not None else None
172        connect_timeout_s: Optional[float] = http_connect_timeout.total_seconds(
173        ) if http_connect_timeout is not None else None
174        pool_timeout_s: Optional[float] = http_pool_timeout.total_seconds(
175        ) if http_pool_timeout is not None else None
176        idle_timeout_s: Optional[float] = http_idle_timeout.total_seconds(
177        ) if http_idle_timeout is not None else None
178
179        import httpx
180        from fauna.http.httpx_client import HTTPXClient
181        c = HTTPXClient(
182            httpx.Client(
183                http1=True,
184                http2=False,
185                timeout=httpx.Timeout(
186                    timeout=timeout_s,
187                    connect=connect_timeout_s,
188                    read=read_timeout_s,
189                    write=write_timeout_s,
190                    pool=pool_timeout_s,
191                ),
192                limits=httpx.Limits(
193                    max_connections=DefaultMaxConnections,
194                    max_keepalive_connections=DefaultMaxIdleConnections,
195                    keepalive_expiry=idle_timeout_s,
196                ),
197            ))
198        fauna.global_http_client = c
199
200      self._session = fauna.global_http_client
201
202  def close(self):
203    self._session.close()
204    if self._session == fauna.global_http_client:
205      fauna.global_http_client = None
206
207  def set_last_txn_ts(self, txn_ts: int):
208    """
209        Set the last timestamp seen by this client.
210        This has no effect if earlier than stored timestamp.
211
212        .. WARNING:: This should be used only when coordinating timestamps across
213        multiple clients. Moving the timestamp arbitrarily forward into
214        the future will cause transactions to stall.
215
216        :param txn_ts: the new transaction time.
217        """
218    self._last_txn_ts.update_txn_time(txn_ts)
219
220  def get_last_txn_ts(self) -> Optional[int]:
221    """
222        Get the last timestamp seen by this client.
223        :return:
224        """
225    return self._last_txn_ts.time
226
227  def get_query_timeout(self) -> Optional[timedelta]:
228    """
229        Get the query timeout for all queries.
230        """
231    if self._query_timeout_ms is not None:
232      return timedelta(milliseconds=self._query_timeout_ms)
233    else:
234      return None
235
236  def paginate(
237      self,
238      fql: Query,
239      opts: Optional[QueryOptions] = None,
240  ) -> "QueryIterator":
241    """
242        Run a query on Fauna and returning an iterator of results. If the query
243        returns a Page, the iterator will fetch additional Pages until the
244        after token is null. Each call for a page will be retried with exponential
245        backoff up to the max_attempts set in the client's retry policy in the
246        event of a 429 or 502.
247
248        :param fql: A Query
249        :param opts: (Optional) Query Options
250
251        :return: a :class:`QueryResponse`
252
253        :raises NetworkError: HTTP Request failed in transit
254        :raises ProtocolError: HTTP error not from Fauna
255        :raises ServiceError: Fauna returned an error
256        :raises ValueError: Encoding and decoding errors
257        :raises TypeError: Invalid param types
258        """
259
260    if not isinstance(fql, Query):
261      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
262                 f"Query by calling fauna.fql()"
263      raise TypeError(err_msg)
264
265    return QueryIterator(self, fql, opts)
266
267  def query(
268      self,
269      fql: Query,
270      opts: Optional[QueryOptions] = None,
271  ) -> QuerySuccess:
272    """
273        Run a query on Fauna. A query will be retried max_attempt times with exponential backoff
274        up to the max_backoff in the event of a 429.
275
276        :param fql: A Query
277        :param opts: (Optional) Query Options
278
279        :return: a :class:`QueryResponse`
280
281        :raises NetworkError: HTTP Request failed in transit
282        :raises ProtocolError: HTTP error not from Fauna
283        :raises ServiceError: Fauna returned an error
284        :raises ValueError: Encoding and decoding errors
285        :raises TypeError: Invalid param types
286        """
287
288    if not isinstance(fql, Query):
289      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
290                 f"Query by calling fauna.fql()"
291      raise TypeError(err_msg)
292
293    try:
294      encoded_query: Mapping[str, Any] = FaunaEncoder.encode(fql)
295    except Exception as e:
296      raise ClientError("Failed to encode Query") from e
297
298    retryable = Retryable[QuerySuccess](
299        self._max_attempts,
300        self._max_backoff,
301        self._query,
302        "/query/1",
303        fql=encoded_query,
304        opts=opts,
305    )
306
307    r = retryable.run()
308    r.response.stats.attempts = r.attempts
309    return r.response
310
311  def _query(
312      self,
313      path: str,
314      fql: Mapping[str, Any],
315      arguments: Optional[Mapping[str, Any]] = None,
316      opts: Optional[QueryOptions] = None,
317  ) -> QuerySuccess:
318
319    headers = self._headers.copy()
320    headers[_Header.Format] = "tagged"
321    headers[_Header.Authorization] = self._auth.bearer()
322
323    if self._query_timeout_ms is not None:
324      headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms)
325
326    headers.update(self._last_txn_ts.request_header)
327
328    query_tags = {}
329    if self._query_tags is not None:
330      query_tags.update(self._query_tags)
331
332    if opts is not None:
333      if opts.linearized is not None:
334        headers[Header.Linearized] = str(opts.linearized).lower()
335      if opts.max_contention_retries is not None:
336        headers[Header.MaxContentionRetries] = \
337            f"{opts.max_contention_retries}"
338      if opts.traceparent is not None:
339        headers[Header.Traceparent] = opts.traceparent
340      if opts.query_timeout is not None:
341        timeout_ms = f"{int(opts.query_timeout.total_seconds() * 1000)}"
342        headers[Header.QueryTimeoutMs] = timeout_ms
343      if opts.query_tags is not None:
344        query_tags.update(opts.query_tags)
345      if opts.typecheck is not None:
346        headers[Header.Typecheck] = str(opts.typecheck).lower()
347      if opts.additional_headers is not None:
348        headers.update(opts.additional_headers)
349
350    if len(query_tags) > 0:
351      headers[Header.Tags] = QueryTags.encode(query_tags)
352
353    data: dict[str, Any] = {
354        "query": fql,
355        "arguments": arguments or {},
356    }
357
358    with self._session.request(
359        method="POST",
360        url=self._endpoint + path,
361        headers=headers,
362        data=data,
363    ) as response:
364      status_code = response.status_code()
365      response_json = response.json()
366      headers = response.headers()
367
368      self._check_protocol(response_json, status_code)
369
370      dec: Any = FaunaDecoder.decode(response_json)
371
372      if status_code > 399:
373        FaunaError.parse_error_and_throw(dec, status_code)
374
375      if "txn_ts" in dec:
376        self.set_last_txn_ts(int(response_json["txn_ts"]))
377
378      stats = QueryStats(dec["stats"]) if "stats" in dec else None
379      summary = dec["summary"] if "summary" in dec else None
380      query_tags = QueryTags.decode(
381          dec["query_tags"]) if "query_tags" in dec else None
382      txn_ts = dec["txn_ts"] if "txn_ts" in dec else None
383      schema_version = dec["schema_version"] if "schema_version" in dec else None
384      traceparent = headers.get("traceparent", None)
385      static_type = dec["static_type"] if "static_type" in dec else None
386
387      return QuerySuccess(
388          data=dec["data"],
389          query_tags=query_tags,
390          static_type=static_type,
391          stats=stats,
392          summary=summary,
393          traceparent=traceparent,
394          txn_ts=txn_ts,
395          schema_version=schema_version,
396      )
397
398  def stream(
399      self,
400      fql: Union[StreamToken, Query],
401      opts: StreamOptions = StreamOptions()
402  ) -> "StreamIterator":
403    """
404        Opens a Stream in Fauna and returns an iterator that consume Fauna events.
405
406        :param fql: A Query that returns a StreamToken or a StreamToken.
407        :param opts: (Optional) Stream Options.
408
409        :return: a :class:`StreamIterator`
410
411        :raises NetworkError: HTTP Request failed in transit
412        :raises ProtocolError: HTTP error not from Fauna
413        :raises ServiceError: Fauna returned an error
414        :raises ValueError: Encoding and decoding errors
415        :raises TypeError: Invalid param types
416        """
417
418    if isinstance(fql, Query):
419      token = self.query(fql).data
420    else:
421      token = fql
422
423    if not isinstance(token, StreamToken):
424      err_msg = f"'fql' must be a StreamToken, or a Query that returns a StreamToken but was a {type(token)}."
425      raise TypeError(err_msg)
426
427    headers = self._headers.copy()
428    headers[_Header.Format] = "tagged"
429    headers[_Header.Authorization] = self._auth.bearer()
430
431    return StreamIterator(self._session, headers, self._endpoint + "/stream/1",
432                          self._max_attempts, self._max_backoff, opts, token)
433
434  def _check_protocol(self, response_json: Any, status_code):
435    # TODO: Logic to validate wire protocol belongs elsewhere.
436    should_raise = False
437
438    # check for QuerySuccess
439    if status_code <= 399 and "data" not in response_json:
440      should_raise = True
441
442    # check for QueryFailure
443    if status_code > 399:
444      if "error" not in response_json:
445        should_raise = True
446      else:
447        e = response_json["error"]
448        if "code" not in e or "message" not in e:
449          should_raise = True
450
451    if should_raise:
452      raise ProtocolError(
453          status_code,
454          f"Response is in an unknown format: \n{response_json}",
455      )
456
457  def _set_endpoint(self, endpoint):
458    if endpoint is None:
459      endpoint = _Environment.EnvFaunaEndpoint()
460
461    if endpoint[-1:] == "/":
462      endpoint = endpoint[:-1]
463
464    self._endpoint = endpoint
465
466
467class StreamIterator:
468  """A class that mixes a ContextManager and an Iterator so we can detected retryable errors."""
469
470  def __init__(self, http_client: HTTPClient, headers: Dict[str, str],
471               endpoint: str, max_attempts: int, max_backoff: int,
472               opts: StreamOptions, token: StreamToken):
473    self._http_client = http_client
474    self._headers = headers
475    self._endpoint = endpoint
476    self._max_attempts = max_attempts
477    self._max_backoff = max_backoff
478    self._opts = opts
479    self._token = token
480    self._stream = None
481    self.last_ts = None
482    self._ctx = self._create_stream()
483
484  def __enter__(self):
485    return self
486
487  def __exit__(self, exc_type, exc_value, exc_traceback):
488    if self._stream is not None:
489      self._stream.close()
490
491    self._ctx.__exit__(exc_type, exc_value, exc_traceback)
492    return False
493
494  def __iter__(self):
495    return self
496
497  def __next__(self):
498    if self._opts.max_attempts is not None:
499      max_attempts = self._opts.max_attempts
500    else:
501      max_attempts = self._max_attempts
502
503    if self._opts.max_backoff is not None:
504      max_backoff = self._opts.max_backoff
505    else:
506      max_backoff = self._max_backoff
507
508    retryable = Retryable[Any](max_attempts, max_backoff, self._next_element)
509    return retryable.run().response
510
511  def _next_element(self):
512    try:
513      if self._stream is None:
514        try:
515          self._stream = self._ctx.__enter__()
516        except Exception:
517          self._retry_stream()
518
519      if self._stream is not None:
520        event: Any = FaunaDecoder.decode(next(self._stream))
521
522        if event["type"] == "error":
523          FaunaError.parse_error_and_throw(event, 400)
524
525        self.last_ts = event["txn_ts"]
526
527        if event["type"] == "start":
528          return self._next_element()
529
530        if not self._opts.status_events and event["type"] == "status":
531          return self._next_element()
532
533        return event
534
535      raise StopIteration
536    except NetworkError:
537      self._retry_stream()
538
539  def _retry_stream(self):
540    if self._stream is not None:
541      self._stream.close()
542
543    self._stream = None
544
545    try:
546      self._ctx = self._create_stream()
547    except Exception:
548      pass
549    raise RetryableFaunaException
550
551  def _create_stream(self):
552    data: Dict[str, Any] = {"token": self._token.token}
553    if self.last_ts is not None:
554      data["start_ts"] = self.last_ts
555    elif self._opts.start_ts is not None:
556      data["start_ts"] = self._opts.start_ts
557
558    return self._http_client.stream(
559        url=self._endpoint, headers=self._headers, data=data)
560
561  def close(self):
562    if self._stream is not None:
563      self._stream.close()
564
565
566class QueryIterator:
567  """A class to provider an iterator on top of Fauna queries."""
568
569  def __init__(self,
570               client: Client,
571               fql: Query,
572               opts: Optional[QueryOptions] = None):
573    """Initializes the QueryIterator
574
575        :param fql: A Query
576        :param opts: (Optional) Query Options
577
578        :raises TypeError: Invalid param types
579        """
580    if not isinstance(client, Client):
581      err_msg = f"'client' must be a Client but was a {type(client)}. You can build a " \
582                  f"Client by calling fauna.client.Client()"
583      raise TypeError(err_msg)
584
585    if not isinstance(fql, Query):
586      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
587                 f"Query by calling fauna.fql()"
588      raise TypeError(err_msg)
589
590    self.client = client
591    self.fql = fql
592    self.opts = opts
593
594  def __iter__(self) -> Iterator:
595    return self.iter()
596
597  def iter(self) -> Iterator:
598    """
599        A generator function that immediately fetches and yields the results of
600        the stored query. Yields additional pages on subsequent iterations if
601        they exist
602        """
603
604    cursor = None
605    initial_response = self.client.query(self.fql, self.opts)
606
607    if isinstance(initial_response.data, Page):
608      cursor = initial_response.data.after
609      yield initial_response.data.data
610
611      while cursor is not None:
612        next_response = self.client.query(
613            fql("Set.paginate(${after})", after=cursor), self.opts)
614        # TODO: `Set.paginate` does not yet return a `@set` tagged value
615        #       so we will get back a plain object that might not have
616        #       an after property.
617        cursor = next_response.data.get("after")
618        yield next_response.data.get("data")
619
620    else:
621      yield [initial_response.data]
622
623  def flatten(self) -> Iterator:
624    """
625        A generator function that immediately fetches and yields the results of
626        the stored query. Yields each item individually, rather than a whole
627        Page at a time. Fetches additional pages as required if they exist.
628        """
629
630    for page in self.iter():
631      for item in page:
632        yield item
DefaultHttpConnectTimeout = datetime.timedelta(seconds=5)
DefaultHttpReadTimeout: Optional[datetime.timedelta] = None
DefaultHttpWriteTimeout = datetime.timedelta(seconds=5)
DefaultHttpPoolTimeout = datetime.timedelta(seconds=5)
DefaultIdleConnectionTimeout = datetime.timedelta(seconds=5)
DefaultQueryTimeout = datetime.timedelta(seconds=5)
DefaultClientBufferTimeout = datetime.timedelta(seconds=5)
DefaultMaxConnections = 20
DefaultMaxIdleConnections = 20
@dataclass
class QueryOptions:
29@dataclass
30class QueryOptions:
31  """
32    A dataclass representing options available for a query.
33
34    * linearized - If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
35    * max_contention_retries - The max number of times to retry the query if contention is encountered.
36    * query_timeout - Controls the maximum amount of time Fauna will execute your query before marking it failed.
37    * query_tags - Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_
38    * traceparent - A traceparent to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ Must match format: https://www.w3.org/TR/trace-context/#traceparent-header
39    * typecheck - Enable or disable typechecking of the query before evaluation. If not set, the value configured on the Client will be used. If neither is set, Fauna will use the value of the "typechecked" flag on the database configuration.
40    * additional_headers - Add/update HTTP request headers for the query. In general, this should not be necessary.
41    """
42
43  linearized: Optional[bool] = None
44  max_contention_retries: Optional[int] = None
45  query_timeout: Optional[timedelta] = DefaultQueryTimeout
46  query_tags: Optional[Mapping[str, str]] = None
47  traceparent: Optional[str] = None
48  typecheck: Optional[bool] = None
49  additional_headers: Optional[Dict[str, str]] = None

A dataclass representing options available for a query.

  • linearized - If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
  • max_contention_retries - The max number of times to retry the query if contention is encountered.
  • query_timeout - Controls the maximum amount of time Fauna will execute your query before marking it failed.
  • query_tags - Tags to associate with the query. See logging
  • traceparent - A traceparent to associate with the query. See logging Must match format: https://www.w3.org/TR/trace-context/#traceparent-header
  • typecheck - Enable or disable typechecking of the query before evaluation. If not set, the value configured on the Client will be used. If neither is set, Fauna will use the value of the "typechecked" flag on the database configuration.
  • additional_headers - Add/update HTTP request headers for the query. In general, this should not be necessary.
QueryOptions( linearized: Optional[bool] = None, max_contention_retries: Optional[int] = None, query_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5), query_tags: Optional[Mapping[str, str]] = None, traceparent: Optional[str] = None, typecheck: Optional[bool] = None, additional_headers: Optional[Dict[str, str]] = None)
linearized: Optional[bool] = None
max_contention_retries: Optional[int] = None
query_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5)
query_tags: Optional[Mapping[str, str]] = None
traceparent: Optional[str] = None
typecheck: Optional[bool] = None
additional_headers: Optional[Dict[str, str]] = None
@dataclass
class StreamOptions:
52@dataclass
53class StreamOptions:
54  """
55    A dataclass representing options available for a stream.
56
57    * max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown.
58    * max_backoff - The maximum backoff in seconds for an individual retry.
59    * start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after
60    the timestamp.
61    * status_events - Indicates if stream should include status events. Status events are periodic events that
62    update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics
63    about the cost of maintaining the stream other than the cost of the received events.
64    """
65
66  max_attempts: Optional[int] = None
67  max_backoff: Optional[int] = None
68  start_ts: Optional[int] = None
69  status_events: bool = False

A dataclass representing options available for a stream.

  • max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown.
  • max_backoff - The maximum backoff in seconds for an individual retry.
  • start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after the timestamp.
  • status_events - Indicates if stream should include status events. Status events are periodic events that update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics about the cost of maintaining the stream other than the cost of the received events.
StreamOptions( max_attempts: Optional[int] = None, max_backoff: Optional[int] = None, start_ts: Optional[int] = None, status_events: bool = False)
max_attempts: Optional[int] = None
max_backoff: Optional[int] = None
start_ts: Optional[int] = None
status_events: bool = False
class Client:
 72class Client:
 73
 74  def __init__(
 75      self,
 76      endpoint: Optional[str] = None,
 77      secret: Optional[str] = None,
 78      http_client: Optional[HTTPClient] = None,
 79      query_tags: Optional[Mapping[str, str]] = None,
 80      linearized: Optional[bool] = None,
 81      max_contention_retries: Optional[int] = None,
 82      typecheck: Optional[bool] = None,
 83      additional_headers: Optional[Dict[str, str]] = None,
 84      query_timeout: Optional[timedelta] = DefaultQueryTimeout,
 85      client_buffer_timeout: Optional[timedelta] = DefaultClientBufferTimeout,
 86      http_read_timeout: Optional[timedelta] = DefaultHttpReadTimeout,
 87      http_write_timeout: Optional[timedelta] = DefaultHttpWriteTimeout,
 88      http_connect_timeout: Optional[timedelta] = DefaultHttpConnectTimeout,
 89      http_pool_timeout: Optional[timedelta] = DefaultHttpPoolTimeout,
 90      http_idle_timeout: Optional[timedelta] = DefaultIdleConnectionTimeout,
 91      max_attempts: int = 3,
 92      max_backoff: int = 20,
 93  ):
 94    """Initializes a Client.
 95
 96        :param endpoint: The Fauna Endpoint to use. Defaults to https://db.fauna.com, or the `FAUNA_ENDPOINT` env variable.
 97        :param secret: The Fauna Secret to use. Defaults to empty, or the `FAUNA_SECRET` env variable.
 98        :param http_client: An :class:`HTTPClient` implementation. Defaults to a global :class:`HTTPXClient`.
 99        :param query_tags: Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_
100        :param linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
101        :param max_contention_retries: The max number of times to retry the query if contention is encountered.
102        :param typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration.
103        :param additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary.
104        :param query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is :py:data:`DefaultQueryTimeout`.
105        :param client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is :py:data:`DefaultClientBufferTimeout`, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error.
106        :param http_read_timeout: Set HTTP Read timeout, default is :py:data:`DefaultHttpReadTimeout`.
107        :param http_write_timeout: Set HTTP Write timeout, default is :py:data:`DefaultHttpWriteTimeout`.
108        :param http_connect_timeout: Set HTTP Connect timeout, default is :py:data:`DefaultHttpConnectTimeout`.
109        :param http_pool_timeout: Set HTTP Pool timeout, default is :py:data:`DefaultHttpPoolTimeout`.
110        :param http_idle_timeout: Set HTTP Idle timeout, default is :py:data:`DefaultIdleConnectionTimeout`.
111        :param max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3.
112        :param max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20.
113        """
114
115    self._set_endpoint(endpoint)
116    self._max_attempts = max_attempts
117    self._max_backoff = max_backoff
118
119    if secret is None:
120      self._auth = _Auth(_Environment.EnvFaunaSecret())
121    else:
122      self._auth = _Auth(secret)
123
124    self._last_txn_ts = LastTxnTs()
125
126    self._query_tags = {}
127    if query_tags is not None:
128      self._query_tags.update(query_tags)
129
130    if query_timeout is not None:
131      self._query_timeout_ms = int(query_timeout.total_seconds() * 1000)
132    else:
133      self._query_timeout_ms = None
134
135    self._headers: Dict[str, str] = {
136        _Header.AcceptEncoding: "gzip",
137        _Header.ContentType: "application/json;charset=utf-8",
138        _Header.Driver: "python",
139        _Header.DriverEnv: str(_DriverEnvironment()),
140    }
141
142    if typecheck is not None:
143      self._headers[Header.Typecheck] = str(typecheck).lower()
144
145    if linearized is not None:
146      self._headers[Header.Linearized] = str(linearized).lower()
147
148    if max_contention_retries is not None and max_contention_retries > 0:
149      self._headers[Header.MaxContentionRetries] = \
150          f"{max_contention_retries}"
151
152    if additional_headers is not None:
153      self._headers = {
154          **self._headers,
155          **additional_headers,
156      }
157
158    self._session: HTTPClient
159
160    if http_client is not None:
161      self._session = http_client
162    else:
163      if fauna.global_http_client is None:
164        timeout_s: Optional[float] = None
165        if query_timeout is not None and client_buffer_timeout is not None:
166          timeout_s = (query_timeout + client_buffer_timeout).total_seconds()
167        read_timeout_s: Optional[float] = None
168        if http_read_timeout is not None:
169          read_timeout_s = http_read_timeout.total_seconds()
170
171        write_timeout_s: Optional[float] = http_write_timeout.total_seconds(
172        ) if http_write_timeout is not None else None
173        connect_timeout_s: Optional[float] = http_connect_timeout.total_seconds(
174        ) if http_connect_timeout is not None else None
175        pool_timeout_s: Optional[float] = http_pool_timeout.total_seconds(
176        ) if http_pool_timeout is not None else None
177        idle_timeout_s: Optional[float] = http_idle_timeout.total_seconds(
178        ) if http_idle_timeout is not None else None
179
180        import httpx
181        from fauna.http.httpx_client import HTTPXClient
182        c = HTTPXClient(
183            httpx.Client(
184                http1=True,
185                http2=False,
186                timeout=httpx.Timeout(
187                    timeout=timeout_s,
188                    connect=connect_timeout_s,
189                    read=read_timeout_s,
190                    write=write_timeout_s,
191                    pool=pool_timeout_s,
192                ),
193                limits=httpx.Limits(
194                    max_connections=DefaultMaxConnections,
195                    max_keepalive_connections=DefaultMaxIdleConnections,
196                    keepalive_expiry=idle_timeout_s,
197                ),
198            ))
199        fauna.global_http_client = c
200
201      self._session = fauna.global_http_client
202
203  def close(self):
204    self._session.close()
205    if self._session == fauna.global_http_client:
206      fauna.global_http_client = None
207
208  def set_last_txn_ts(self, txn_ts: int):
209    """
210        Set the last timestamp seen by this client.
211        This has no effect if earlier than stored timestamp.
212
213        .. WARNING:: This should be used only when coordinating timestamps across
214        multiple clients. Moving the timestamp arbitrarily forward into
215        the future will cause transactions to stall.
216
217        :param txn_ts: the new transaction time.
218        """
219    self._last_txn_ts.update_txn_time(txn_ts)
220
221  def get_last_txn_ts(self) -> Optional[int]:
222    """
223        Get the last timestamp seen by this client.
224        :return:
225        """
226    return self._last_txn_ts.time
227
228  def get_query_timeout(self) -> Optional[timedelta]:
229    """
230        Get the query timeout for all queries.
231        """
232    if self._query_timeout_ms is not None:
233      return timedelta(milliseconds=self._query_timeout_ms)
234    else:
235      return None
236
237  def paginate(
238      self,
239      fql: Query,
240      opts: Optional[QueryOptions] = None,
241  ) -> "QueryIterator":
242    """
243        Run a query on Fauna and returning an iterator of results. If the query
244        returns a Page, the iterator will fetch additional Pages until the
245        after token is null. Each call for a page will be retried with exponential
246        backoff up to the max_attempts set in the client's retry policy in the
247        event of a 429 or 502.
248
249        :param fql: A Query
250        :param opts: (Optional) Query Options
251
252        :return: a :class:`QueryResponse`
253
254        :raises NetworkError: HTTP Request failed in transit
255        :raises ProtocolError: HTTP error not from Fauna
256        :raises ServiceError: Fauna returned an error
257        :raises ValueError: Encoding and decoding errors
258        :raises TypeError: Invalid param types
259        """
260
261    if not isinstance(fql, Query):
262      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
263                 f"Query by calling fauna.fql()"
264      raise TypeError(err_msg)
265
266    return QueryIterator(self, fql, opts)
267
268  def query(
269      self,
270      fql: Query,
271      opts: Optional[QueryOptions] = None,
272  ) -> QuerySuccess:
273    """
274        Run a query on Fauna. A query will be retried max_attempt times with exponential backoff
275        up to the max_backoff in the event of a 429.
276
277        :param fql: A Query
278        :param opts: (Optional) Query Options
279
280        :return: a :class:`QueryResponse`
281
282        :raises NetworkError: HTTP Request failed in transit
283        :raises ProtocolError: HTTP error not from Fauna
284        :raises ServiceError: Fauna returned an error
285        :raises ValueError: Encoding and decoding errors
286        :raises TypeError: Invalid param types
287        """
288
289    if not isinstance(fql, Query):
290      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
291                 f"Query by calling fauna.fql()"
292      raise TypeError(err_msg)
293
294    try:
295      encoded_query: Mapping[str, Any] = FaunaEncoder.encode(fql)
296    except Exception as e:
297      raise ClientError("Failed to encode Query") from e
298
299    retryable = Retryable[QuerySuccess](
300        self._max_attempts,
301        self._max_backoff,
302        self._query,
303        "/query/1",
304        fql=encoded_query,
305        opts=opts,
306    )
307
308    r = retryable.run()
309    r.response.stats.attempts = r.attempts
310    return r.response
311
312  def _query(
313      self,
314      path: str,
315      fql: Mapping[str, Any],
316      arguments: Optional[Mapping[str, Any]] = None,
317      opts: Optional[QueryOptions] = None,
318  ) -> QuerySuccess:
319
320    headers = self._headers.copy()
321    headers[_Header.Format] = "tagged"
322    headers[_Header.Authorization] = self._auth.bearer()
323
324    if self._query_timeout_ms is not None:
325      headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms)
326
327    headers.update(self._last_txn_ts.request_header)
328
329    query_tags = {}
330    if self._query_tags is not None:
331      query_tags.update(self._query_tags)
332
333    if opts is not None:
334      if opts.linearized is not None:
335        headers[Header.Linearized] = str(opts.linearized).lower()
336      if opts.max_contention_retries is not None:
337        headers[Header.MaxContentionRetries] = \
338            f"{opts.max_contention_retries}"
339      if opts.traceparent is not None:
340        headers[Header.Traceparent] = opts.traceparent
341      if opts.query_timeout is not None:
342        timeout_ms = f"{int(opts.query_timeout.total_seconds() * 1000)}"
343        headers[Header.QueryTimeoutMs] = timeout_ms
344      if opts.query_tags is not None:
345        query_tags.update(opts.query_tags)
346      if opts.typecheck is not None:
347        headers[Header.Typecheck] = str(opts.typecheck).lower()
348      if opts.additional_headers is not None:
349        headers.update(opts.additional_headers)
350
351    if len(query_tags) > 0:
352      headers[Header.Tags] = QueryTags.encode(query_tags)
353
354    data: dict[str, Any] = {
355        "query": fql,
356        "arguments": arguments or {},
357    }
358
359    with self._session.request(
360        method="POST",
361        url=self._endpoint + path,
362        headers=headers,
363        data=data,
364    ) as response:
365      status_code = response.status_code()
366      response_json = response.json()
367      headers = response.headers()
368
369      self._check_protocol(response_json, status_code)
370
371      dec: Any = FaunaDecoder.decode(response_json)
372
373      if status_code > 399:
374        FaunaError.parse_error_and_throw(dec, status_code)
375
376      if "txn_ts" in dec:
377        self.set_last_txn_ts(int(response_json["txn_ts"]))
378
379      stats = QueryStats(dec["stats"]) if "stats" in dec else None
380      summary = dec["summary"] if "summary" in dec else None
381      query_tags = QueryTags.decode(
382          dec["query_tags"]) if "query_tags" in dec else None
383      txn_ts = dec["txn_ts"] if "txn_ts" in dec else None
384      schema_version = dec["schema_version"] if "schema_version" in dec else None
385      traceparent = headers.get("traceparent", None)
386      static_type = dec["static_type"] if "static_type" in dec else None
387
388      return QuerySuccess(
389          data=dec["data"],
390          query_tags=query_tags,
391          static_type=static_type,
392          stats=stats,
393          summary=summary,
394          traceparent=traceparent,
395          txn_ts=txn_ts,
396          schema_version=schema_version,
397      )
398
399  def stream(
400      self,
401      fql: Union[StreamToken, Query],
402      opts: StreamOptions = StreamOptions()
403  ) -> "StreamIterator":
404    """
405        Opens a Stream in Fauna and returns an iterator that consume Fauna events.
406
407        :param fql: A Query that returns a StreamToken or a StreamToken.
408        :param opts: (Optional) Stream Options.
409
410        :return: a :class:`StreamIterator`
411
412        :raises NetworkError: HTTP Request failed in transit
413        :raises ProtocolError: HTTP error not from Fauna
414        :raises ServiceError: Fauna returned an error
415        :raises ValueError: Encoding and decoding errors
416        :raises TypeError: Invalid param types
417        """
418
419    if isinstance(fql, Query):
420      token = self.query(fql).data
421    else:
422      token = fql
423
424    if not isinstance(token, StreamToken):
425      err_msg = f"'fql' must be a StreamToken, or a Query that returns a StreamToken but was a {type(token)}."
426      raise TypeError(err_msg)
427
428    headers = self._headers.copy()
429    headers[_Header.Format] = "tagged"
430    headers[_Header.Authorization] = self._auth.bearer()
431
432    return StreamIterator(self._session, headers, self._endpoint + "/stream/1",
433                          self._max_attempts, self._max_backoff, opts, token)
434
435  def _check_protocol(self, response_json: Any, status_code):
436    # TODO: Logic to validate wire protocol belongs elsewhere.
437    should_raise = False
438
439    # check for QuerySuccess
440    if status_code <= 399 and "data" not in response_json:
441      should_raise = True
442
443    # check for QueryFailure
444    if status_code > 399:
445      if "error" not in response_json:
446        should_raise = True
447      else:
448        e = response_json["error"]
449        if "code" not in e or "message" not in e:
450          should_raise = True
451
452    if should_raise:
453      raise ProtocolError(
454          status_code,
455          f"Response is in an unknown format: \n{response_json}",
456      )
457
458  def _set_endpoint(self, endpoint):
459    if endpoint is None:
460      endpoint = _Environment.EnvFaunaEndpoint()
461
462    if endpoint[-1:] == "/":
463      endpoint = endpoint[:-1]
464
465    self._endpoint = endpoint
Client( endpoint: Optional[str] = None, secret: Optional[str] = None, http_client: Optional[fauna.http.http_client.HTTPClient] = None, query_tags: Optional[Mapping[str, str]] = None, linearized: Optional[bool] = None, max_contention_retries: Optional[int] = None, typecheck: Optional[bool] = None, additional_headers: Optional[Dict[str, str]] = None, query_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5), client_buffer_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5), http_read_timeout: Optional[datetime.timedelta] = None, http_write_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5), http_connect_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5), http_pool_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5), http_idle_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5), max_attempts: int = 3, max_backoff: int = 20)
 74  def __init__(
 75      self,
 76      endpoint: Optional[str] = None,
 77      secret: Optional[str] = None,
 78      http_client: Optional[HTTPClient] = None,
 79      query_tags: Optional[Mapping[str, str]] = None,
 80      linearized: Optional[bool] = None,
 81      max_contention_retries: Optional[int] = None,
 82      typecheck: Optional[bool] = None,
 83      additional_headers: Optional[Dict[str, str]] = None,
 84      query_timeout: Optional[timedelta] = DefaultQueryTimeout,
 85      client_buffer_timeout: Optional[timedelta] = DefaultClientBufferTimeout,
 86      http_read_timeout: Optional[timedelta] = DefaultHttpReadTimeout,
 87      http_write_timeout: Optional[timedelta] = DefaultHttpWriteTimeout,
 88      http_connect_timeout: Optional[timedelta] = DefaultHttpConnectTimeout,
 89      http_pool_timeout: Optional[timedelta] = DefaultHttpPoolTimeout,
 90      http_idle_timeout: Optional[timedelta] = DefaultIdleConnectionTimeout,
 91      max_attempts: int = 3,
 92      max_backoff: int = 20,
 93  ):
 94    """Initializes a Client.
 95
 96        :param endpoint: The Fauna Endpoint to use. Defaults to https://db.fauna.com, or the `FAUNA_ENDPOINT` env variable.
 97        :param secret: The Fauna Secret to use. Defaults to empty, or the `FAUNA_SECRET` env variable.
 98        :param http_client: An :class:`HTTPClient` implementation. Defaults to a global :class:`HTTPXClient`.
 99        :param query_tags: Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_
100        :param linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
101        :param max_contention_retries: The max number of times to retry the query if contention is encountered.
102        :param typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration.
103        :param additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary.
104        :param query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is :py:data:`DefaultQueryTimeout`.
105        :param client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is :py:data:`DefaultClientBufferTimeout`, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error.
106        :param http_read_timeout: Set HTTP Read timeout, default is :py:data:`DefaultHttpReadTimeout`.
107        :param http_write_timeout: Set HTTP Write timeout, default is :py:data:`DefaultHttpWriteTimeout`.
108        :param http_connect_timeout: Set HTTP Connect timeout, default is :py:data:`DefaultHttpConnectTimeout`.
109        :param http_pool_timeout: Set HTTP Pool timeout, default is :py:data:`DefaultHttpPoolTimeout`.
110        :param http_idle_timeout: Set HTTP Idle timeout, default is :py:data:`DefaultIdleConnectionTimeout`.
111        :param max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3.
112        :param max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20.
113        """
114
115    self._set_endpoint(endpoint)
116    self._max_attempts = max_attempts
117    self._max_backoff = max_backoff
118
119    if secret is None:
120      self._auth = _Auth(_Environment.EnvFaunaSecret())
121    else:
122      self._auth = _Auth(secret)
123
124    self._last_txn_ts = LastTxnTs()
125
126    self._query_tags = {}
127    if query_tags is not None:
128      self._query_tags.update(query_tags)
129
130    if query_timeout is not None:
131      self._query_timeout_ms = int(query_timeout.total_seconds() * 1000)
132    else:
133      self._query_timeout_ms = None
134
135    self._headers: Dict[str, str] = {
136        _Header.AcceptEncoding: "gzip",
137        _Header.ContentType: "application/json;charset=utf-8",
138        _Header.Driver: "python",
139        _Header.DriverEnv: str(_DriverEnvironment()),
140    }
141
142    if typecheck is not None:
143      self._headers[Header.Typecheck] = str(typecheck).lower()
144
145    if linearized is not None:
146      self._headers[Header.Linearized] = str(linearized).lower()
147
148    if max_contention_retries is not None and max_contention_retries > 0:
149      self._headers[Header.MaxContentionRetries] = \
150          f"{max_contention_retries}"
151
152    if additional_headers is not None:
153      self._headers = {
154          **self._headers,
155          **additional_headers,
156      }
157
158    self._session: HTTPClient
159
160    if http_client is not None:
161      self._session = http_client
162    else:
163      if fauna.global_http_client is None:
164        timeout_s: Optional[float] = None
165        if query_timeout is not None and client_buffer_timeout is not None:
166          timeout_s = (query_timeout + client_buffer_timeout).total_seconds()
167        read_timeout_s: Optional[float] = None
168        if http_read_timeout is not None:
169          read_timeout_s = http_read_timeout.total_seconds()
170
171        write_timeout_s: Optional[float] = http_write_timeout.total_seconds(
172        ) if http_write_timeout is not None else None
173        connect_timeout_s: Optional[float] = http_connect_timeout.total_seconds(
174        ) if http_connect_timeout is not None else None
175        pool_timeout_s: Optional[float] = http_pool_timeout.total_seconds(
176        ) if http_pool_timeout is not None else None
177        idle_timeout_s: Optional[float] = http_idle_timeout.total_seconds(
178        ) if http_idle_timeout is not None else None
179
180        import httpx
181        from fauna.http.httpx_client import HTTPXClient
182        c = HTTPXClient(
183            httpx.Client(
184                http1=True,
185                http2=False,
186                timeout=httpx.Timeout(
187                    timeout=timeout_s,
188                    connect=connect_timeout_s,
189                    read=read_timeout_s,
190                    write=write_timeout_s,
191                    pool=pool_timeout_s,
192                ),
193                limits=httpx.Limits(
194                    max_connections=DefaultMaxConnections,
195                    max_keepalive_connections=DefaultMaxIdleConnections,
196                    keepalive_expiry=idle_timeout_s,
197                ),
198            ))
199        fauna.global_http_client = c
200
201      self._session = fauna.global_http_client

Initializes a Client.

Parameters
  • endpoint: The Fauna Endpoint to use. Defaults to https: //db.fauna.com, or the FAUNA_ENDPOINT env variable.
  • secret: The Fauna Secret to use. Defaults to empty, or the FAUNA_SECRET env variable.
  • http_client: An HTTPClient implementation. Defaults to a global HTTPXClient.
  • **query_tags: Tags to associate with the query. See logging
  • linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
  • max_contention_retries: The max number of times to retry the query if contention is encountered.
  • typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration.
  • additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary.
  • query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is DefaultQueryTimeout.
  • client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is DefaultClientBufferTimeout, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error.
  • http_read_timeout: Set HTTP Read timeout, default is DefaultHttpReadTimeout.
  • http_write_timeout: Set HTTP Write timeout, default is DefaultHttpWriteTimeout.
  • http_connect_timeout: Set HTTP Connect timeout, default is DefaultHttpConnectTimeout.
  • http_pool_timeout: Set HTTP Pool timeout, default is DefaultHttpPoolTimeout.
  • http_idle_timeout: Set HTTP Idle timeout, default is DefaultIdleConnectionTimeout.
  • max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3.
  • max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20.
def close(self):
203  def close(self):
204    self._session.close()
205    if self._session == fauna.global_http_client:
206      fauna.global_http_client = None
def set_last_txn_ts(self, txn_ts: int):
208  def set_last_txn_ts(self, txn_ts: int):
209    """
210        Set the last timestamp seen by this client.
211        This has no effect if earlier than stored timestamp.
212
213        .. WARNING:: This should be used only when coordinating timestamps across
214        multiple clients. Moving the timestamp arbitrarily forward into
215        the future will cause transactions to stall.
216
217        :param txn_ts: the new transaction time.
218        """
219    self._last_txn_ts.update_txn_time(txn_ts)

Set the last timestamp seen by this client. This has no effect if earlier than stored timestamp.

.. WARNING:: This should be used only when coordinating timestamps across multiple clients. Moving the timestamp arbitrarily forward into the future will cause transactions to stall.

Parameters
  • txn_ts: the new transaction time.
def get_last_txn_ts(self) -> Optional[int]:
221  def get_last_txn_ts(self) -> Optional[int]:
222    """
223        Get the last timestamp seen by this client.
224        :return:
225        """
226    return self._last_txn_ts.time

Get the last timestamp seen by this client.

Returns
def get_query_timeout(self) -> Optional[datetime.timedelta]:
228  def get_query_timeout(self) -> Optional[timedelta]:
229    """
230        Get the query timeout for all queries.
231        """
232    if self._query_timeout_ms is not None:
233      return timedelta(milliseconds=self._query_timeout_ms)
234    else:
235      return None

Get the query timeout for all queries.

def paginate( self, fql: fauna.query.query_builder.Query, opts: Optional[QueryOptions] = None) -> QueryIterator:
237  def paginate(
238      self,
239      fql: Query,
240      opts: Optional[QueryOptions] = None,
241  ) -> "QueryIterator":
242    """
243        Run a query on Fauna and returning an iterator of results. If the query
244        returns a Page, the iterator will fetch additional Pages until the
245        after token is null. Each call for a page will be retried with exponential
246        backoff up to the max_attempts set in the client's retry policy in the
247        event of a 429 or 502.
248
249        :param fql: A Query
250        :param opts: (Optional) Query Options
251
252        :return: a :class:`QueryResponse`
253
254        :raises NetworkError: HTTP Request failed in transit
255        :raises ProtocolError: HTTP error not from Fauna
256        :raises ServiceError: Fauna returned an error
257        :raises ValueError: Encoding and decoding errors
258        :raises TypeError: Invalid param types
259        """
260
261    if not isinstance(fql, Query):
262      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
263                 f"Query by calling fauna.fql()"
264      raise TypeError(err_msg)
265
266    return QueryIterator(self, fql, opts)

Run a query on Fauna and returning an iterator of results. If the query returns a Page, the iterator will fetch additional Pages until the after token is null. Each call for a page will be retried with exponential backoff up to the max_attempts set in the client's retry policy in the event of a 429 or 502.

Parameters
  • fql: A Query
  • opts: (Optional) Query Options
Returns

a QueryResponse

Raises
  • NetworkError: HTTP Request failed in transit
  • ProtocolError: HTTP error not from Fauna
  • ServiceError: Fauna returned an error
  • ValueError: Encoding and decoding errors
  • TypeError: Invalid param types
def query( self, fql: fauna.query.query_builder.Query, opts: Optional[QueryOptions] = None) -> fauna.encoding.wire_protocol.QuerySuccess:
268  def query(
269      self,
270      fql: Query,
271      opts: Optional[QueryOptions] = None,
272  ) -> QuerySuccess:
273    """
274        Run a query on Fauna. A query will be retried max_attempt times with exponential backoff
275        up to the max_backoff in the event of a 429.
276
277        :param fql: A Query
278        :param opts: (Optional) Query Options
279
280        :return: a :class:`QueryResponse`
281
282        :raises NetworkError: HTTP Request failed in transit
283        :raises ProtocolError: HTTP error not from Fauna
284        :raises ServiceError: Fauna returned an error
285        :raises ValueError: Encoding and decoding errors
286        :raises TypeError: Invalid param types
287        """
288
289    if not isinstance(fql, Query):
290      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
291                 f"Query by calling fauna.fql()"
292      raise TypeError(err_msg)
293
294    try:
295      encoded_query: Mapping[str, Any] = FaunaEncoder.encode(fql)
296    except Exception as e:
297      raise ClientError("Failed to encode Query") from e
298
299    retryable = Retryable[QuerySuccess](
300        self._max_attempts,
301        self._max_backoff,
302        self._query,
303        "/query/1",
304        fql=encoded_query,
305        opts=opts,
306    )
307
308    r = retryable.run()
309    r.response.stats.attempts = r.attempts
310    return r.response

Run a query on Fauna. A query will be retried max_attempt times with exponential backoff up to the max_backoff in the event of a 429.

Parameters
  • fql: A Query
  • opts: (Optional) Query Options
Returns

a QueryResponse

Raises
  • NetworkError: HTTP Request failed in transit
  • ProtocolError: HTTP error not from Fauna
  • ServiceError: Fauna returned an error
  • ValueError: Encoding and decoding errors
  • TypeError: Invalid param types
def stream( self, fql: Union[fauna.query.models.StreamToken, fauna.query.query_builder.Query], opts: StreamOptions = StreamOptions(max_attempts=None, max_backoff=None, start_ts=None, status_events=False)) -> StreamIterator:
399  def stream(
400      self,
401      fql: Union[StreamToken, Query],
402      opts: StreamOptions = StreamOptions()
403  ) -> "StreamIterator":
404    """
405        Opens a Stream in Fauna and returns an iterator that consume Fauna events.
406
407        :param fql: A Query that returns a StreamToken or a StreamToken.
408        :param opts: (Optional) Stream Options.
409
410        :return: a :class:`StreamIterator`
411
412        :raises NetworkError: HTTP Request failed in transit
413        :raises ProtocolError: HTTP error not from Fauna
414        :raises ServiceError: Fauna returned an error
415        :raises ValueError: Encoding and decoding errors
416        :raises TypeError: Invalid param types
417        """
418
419    if isinstance(fql, Query):
420      token = self.query(fql).data
421    else:
422      token = fql
423
424    if not isinstance(token, StreamToken):
425      err_msg = f"'fql' must be a StreamToken, or a Query that returns a StreamToken but was a {type(token)}."
426      raise TypeError(err_msg)
427
428    headers = self._headers.copy()
429    headers[_Header.Format] = "tagged"
430    headers[_Header.Authorization] = self._auth.bearer()
431
432    return StreamIterator(self._session, headers, self._endpoint + "/stream/1",
433                          self._max_attempts, self._max_backoff, opts, token)

Opens a Stream in Fauna and returns an iterator that consume Fauna events.

Parameters
  • fql: A Query that returns a StreamToken or a StreamToken.
  • opts: (Optional) Stream Options.
Returns

a StreamIterator

Raises
  • NetworkError: HTTP Request failed in transit
  • ProtocolError: HTTP error not from Fauna
  • ServiceError: Fauna returned an error
  • ValueError: Encoding and decoding errors
  • TypeError: Invalid param types
class StreamIterator:
468class StreamIterator:
469  """A class that mixes a ContextManager and an Iterator so we can detected retryable errors."""
470
471  def __init__(self, http_client: HTTPClient, headers: Dict[str, str],
472               endpoint: str, max_attempts: int, max_backoff: int,
473               opts: StreamOptions, token: StreamToken):
474    self._http_client = http_client
475    self._headers = headers
476    self._endpoint = endpoint
477    self._max_attempts = max_attempts
478    self._max_backoff = max_backoff
479    self._opts = opts
480    self._token = token
481    self._stream = None
482    self.last_ts = None
483    self._ctx = self._create_stream()
484
485  def __enter__(self):
486    return self
487
488  def __exit__(self, exc_type, exc_value, exc_traceback):
489    if self._stream is not None:
490      self._stream.close()
491
492    self._ctx.__exit__(exc_type, exc_value, exc_traceback)
493    return False
494
495  def __iter__(self):
496    return self
497
498  def __next__(self):
499    if self._opts.max_attempts is not None:
500      max_attempts = self._opts.max_attempts
501    else:
502      max_attempts = self._max_attempts
503
504    if self._opts.max_backoff is not None:
505      max_backoff = self._opts.max_backoff
506    else:
507      max_backoff = self._max_backoff
508
509    retryable = Retryable[Any](max_attempts, max_backoff, self._next_element)
510    return retryable.run().response
511
512  def _next_element(self):
513    try:
514      if self._stream is None:
515        try:
516          self._stream = self._ctx.__enter__()
517        except Exception:
518          self._retry_stream()
519
520      if self._stream is not None:
521        event: Any = FaunaDecoder.decode(next(self._stream))
522
523        if event["type"] == "error":
524          FaunaError.parse_error_and_throw(event, 400)
525
526        self.last_ts = event["txn_ts"]
527
528        if event["type"] == "start":
529          return self._next_element()
530
531        if not self._opts.status_events and event["type"] == "status":
532          return self._next_element()
533
534        return event
535
536      raise StopIteration
537    except NetworkError:
538      self._retry_stream()
539
540  def _retry_stream(self):
541    if self._stream is not None:
542      self._stream.close()
543
544    self._stream = None
545
546    try:
547      self._ctx = self._create_stream()
548    except Exception:
549      pass
550    raise RetryableFaunaException
551
552  def _create_stream(self):
553    data: Dict[str, Any] = {"token": self._token.token}
554    if self.last_ts is not None:
555      data["start_ts"] = self.last_ts
556    elif self._opts.start_ts is not None:
557      data["start_ts"] = self._opts.start_ts
558
559    return self._http_client.stream(
560        url=self._endpoint, headers=self._headers, data=data)
561
562  def close(self):
563    if self._stream is not None:
564      self._stream.close()

A class that mixes a ContextManager and an Iterator so we can detected retryable errors.

StreamIterator( http_client: fauna.http.http_client.HTTPClient, headers: Dict[str, str], endpoint: str, max_attempts: int, max_backoff: int, opts: StreamOptions, token: fauna.query.models.StreamToken)
471  def __init__(self, http_client: HTTPClient, headers: Dict[str, str],
472               endpoint: str, max_attempts: int, max_backoff: int,
473               opts: StreamOptions, token: StreamToken):
474    self._http_client = http_client
475    self._headers = headers
476    self._endpoint = endpoint
477    self._max_attempts = max_attempts
478    self._max_backoff = max_backoff
479    self._opts = opts
480    self._token = token
481    self._stream = None
482    self.last_ts = None
483    self._ctx = self._create_stream()
last_ts
def close(self):
562  def close(self):
563    if self._stream is not None:
564      self._stream.close()
class QueryIterator:
567class QueryIterator:
568  """A class to provider an iterator on top of Fauna queries."""
569
570  def __init__(self,
571               client: Client,
572               fql: Query,
573               opts: Optional[QueryOptions] = None):
574    """Initializes the QueryIterator
575
576        :param fql: A Query
577        :param opts: (Optional) Query Options
578
579        :raises TypeError: Invalid param types
580        """
581    if not isinstance(client, Client):
582      err_msg = f"'client' must be a Client but was a {type(client)}. You can build a " \
583                  f"Client by calling fauna.client.Client()"
584      raise TypeError(err_msg)
585
586    if not isinstance(fql, Query):
587      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
588                 f"Query by calling fauna.fql()"
589      raise TypeError(err_msg)
590
591    self.client = client
592    self.fql = fql
593    self.opts = opts
594
595  def __iter__(self) -> Iterator:
596    return self.iter()
597
598  def iter(self) -> Iterator:
599    """
600        A generator function that immediately fetches and yields the results of
601        the stored query. Yields additional pages on subsequent iterations if
602        they exist
603        """
604
605    cursor = None
606    initial_response = self.client.query(self.fql, self.opts)
607
608    if isinstance(initial_response.data, Page):
609      cursor = initial_response.data.after
610      yield initial_response.data.data
611
612      while cursor is not None:
613        next_response = self.client.query(
614            fql("Set.paginate(${after})", after=cursor), self.opts)
615        # TODO: `Set.paginate` does not yet return a `@set` tagged value
616        #       so we will get back a plain object that might not have
617        #       an after property.
618        cursor = next_response.data.get("after")
619        yield next_response.data.get("data")
620
621    else:
622      yield [initial_response.data]
623
624  def flatten(self) -> Iterator:
625    """
626        A generator function that immediately fetches and yields the results of
627        the stored query. Yields each item individually, rather than a whole
628        Page at a time. Fetches additional pages as required if they exist.
629        """
630
631    for page in self.iter():
632      for item in page:
633        yield item

A class to provider an iterator on top of Fauna queries.

QueryIterator( client: Client, fql: fauna.query.query_builder.Query, opts: Optional[QueryOptions] = None)
570  def __init__(self,
571               client: Client,
572               fql: Query,
573               opts: Optional[QueryOptions] = None):
574    """Initializes the QueryIterator
575
576        :param fql: A Query
577        :param opts: (Optional) Query Options
578
579        :raises TypeError: Invalid param types
580        """
581    if not isinstance(client, Client):
582      err_msg = f"'client' must be a Client but was a {type(client)}. You can build a " \
583                  f"Client by calling fauna.client.Client()"
584      raise TypeError(err_msg)
585
586    if not isinstance(fql, Query):
587      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
588                 f"Query by calling fauna.fql()"
589      raise TypeError(err_msg)
590
591    self.client = client
592    self.fql = fql
593    self.opts = opts

Initializes the QueryIterator

Parameters
  • fql: A Query
  • opts: (Optional) Query Options
Raises
  • TypeError: Invalid param types
client
fql
opts
def iter(self) -> Iterator:
598  def iter(self) -> Iterator:
599    """
600        A generator function that immediately fetches and yields the results of
601        the stored query. Yields additional pages on subsequent iterations if
602        they exist
603        """
604
605    cursor = None
606    initial_response = self.client.query(self.fql, self.opts)
607
608    if isinstance(initial_response.data, Page):
609      cursor = initial_response.data.after
610      yield initial_response.data.data
611
612      while cursor is not None:
613        next_response = self.client.query(
614            fql("Set.paginate(${after})", after=cursor), self.opts)
615        # TODO: `Set.paginate` does not yet return a `@set` tagged value
616        #       so we will get back a plain object that might not have
617        #       an after property.
618        cursor = next_response.data.get("after")
619        yield next_response.data.get("data")
620
621    else:
622      yield [initial_response.data]

A generator function that immediately fetches and yields the results of the stored query. Yields additional pages on subsequent iterations if they exist

def flatten(self) -> Iterator:
624  def flatten(self) -> Iterator:
625    """
626        A generator function that immediately fetches and yields the results of
627        the stored query. Yields each item individually, rather than a whole
628        Page at a time. Fetches additional pages as required if they exist.
629        """
630
631    for page in self.iter():
632      for item in page:
633        yield item

A generator function that immediately fetches and yields the results of the stored query. Yields each item individually, rather than a whole Page at a time. Fetches additional pages as required if they exist.