fauna.client.client

  1import logging
  2from dataclasses import dataclass
  3from datetime import timedelta
  4from typing import Any, Dict, Iterator, Mapping, Optional, Union, List
  5
  6import fauna
  7from fauna.client.headers import _DriverEnvironment, _Header, _Auth, Header
  8from fauna.client.retryable import Retryable
  9from fauna.client.utils import _Environment, LastTxnTs
 10from fauna.encoding import FaunaEncoder, FaunaDecoder
 11from fauna.encoding import QuerySuccess, QueryTags, QueryStats
 12from fauna.errors import FaunaError, ClientError, ProtocolError, \
 13  RetryableFaunaException, NetworkError
 14from fauna.http.http_client import HTTPClient
 15from fauna.query import EventSource, Query, Page, fql
 16
 17logger = logging.getLogger("fauna")
 18
 19DefaultHttpConnectTimeout = timedelta(seconds=5)
 20DefaultHttpReadTimeout: Optional[timedelta] = None
 21DefaultHttpWriteTimeout = timedelta(seconds=5)
 22DefaultHttpPoolTimeout = timedelta(seconds=5)
 23DefaultIdleConnectionTimeout = timedelta(seconds=5)
 24DefaultQueryTimeout = timedelta(seconds=5)
 25DefaultClientBufferTimeout = timedelta(seconds=5)
 26DefaultMaxConnections = 20
 27DefaultMaxIdleConnections = 20
 28
 29
 30@dataclass
 31class QueryOptions:
 32  """
 33    A dataclass representing options available for a query.
 34
 35    * linearized - If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
 36    * max_contention_retries - The max number of times to retry the query if contention is encountered.
 37    * query_timeout - Controls the maximum amount of time Fauna will execute your query before marking it failed.
 38    * query_tags - Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_
 39    * traceparent - A traceparent to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ Must match format: https://www.w3.org/TR/trace-context/#traceparent-header
 40    * typecheck - Enable or disable typechecking of the query before evaluation. If not set, the value configured on the Client will be used. If neither is set, Fauna will use the value of the "typechecked" flag on the database configuration.
 41    * additional_headers - Add/update HTTP request headers for the query. In general, this should not be necessary.
 42    """
 43
 44  linearized: Optional[bool] = None
 45  max_contention_retries: Optional[int] = None
 46  query_timeout: Optional[timedelta] = DefaultQueryTimeout
 47  query_tags: Optional[Mapping[str, str]] = None
 48  traceparent: Optional[str] = None
 49  typecheck: Optional[bool] = None
 50  additional_headers: Optional[Dict[str, str]] = None
 51
 52
 53@dataclass
 54class StreamOptions:
 55  """
 56    A dataclass representing options available for a stream.
 57
 58    * max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown.
 59    * max_backoff - The maximum backoff in seconds for an individual retry.
 60    * start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after
 61    the timestamp.
 62    * cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
 63    * status_events - Indicates if stream should include status events. Status events are periodic events that
 64    update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics
 65    about the cost of maintaining the stream other than the cost of the received events.
 66    """
 67
 68  max_attempts: Optional[int] = None
 69  max_backoff: Optional[int] = None
 70  start_ts: Optional[int] = None
 71  cursor: Optional[str] = None
 72  status_events: bool = False
 73
 74
 75@dataclass
 76class FeedOptions:
 77  """
 78    A dataclass representing options available for an Event Feed.
 79
 80    * max_attempts - The maximum number of times to attempt an Event Feed query when a retryable exception is thrown.
 81    * max_backoff - The maximum backoff in seconds for an individual retry.
 82    * query_timeout - Controls the maximum amount of time Fauna will execute a query before returning a page of events.
 83    * start_ts - The starting timestamp of the Event Feed, exclusive. If set, Fauna will return events starting after
 84    the timestamp.
 85    * cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
 86    * page_size - The desired number of events per page.
 87    """
 88  max_attempts: Optional[int] = None
 89  max_backoff: Optional[int] = None
 90  query_timeout: Optional[timedelta] = None
 91  page_size: Optional[int] = None
 92  start_ts: Optional[int] = None
 93  cursor: Optional[str] = None
 94
 95
 96class Client:
 97
 98  def __init__(
 99      self,
100      endpoint: Optional[str] = None,
101      secret: Optional[str] = None,
102      http_client: Optional[HTTPClient] = None,
103      query_tags: Optional[Mapping[str, str]] = None,
104      linearized: Optional[bool] = None,
105      max_contention_retries: Optional[int] = None,
106      typecheck: Optional[bool] = None,
107      additional_headers: Optional[Dict[str, str]] = None,
108      query_timeout: Optional[timedelta] = DefaultQueryTimeout,
109      client_buffer_timeout: Optional[timedelta] = DefaultClientBufferTimeout,
110      http_read_timeout: Optional[timedelta] = DefaultHttpReadTimeout,
111      http_write_timeout: Optional[timedelta] = DefaultHttpWriteTimeout,
112      http_connect_timeout: Optional[timedelta] = DefaultHttpConnectTimeout,
113      http_pool_timeout: Optional[timedelta] = DefaultHttpPoolTimeout,
114      http_idle_timeout: Optional[timedelta] = DefaultIdleConnectionTimeout,
115      max_attempts: int = 3,
116      max_backoff: int = 20,
117  ):
118    """Initializes a Client.
119
120        :param endpoint: The Fauna Endpoint to use. Defaults to https://db.fauna.com, or the `FAUNA_ENDPOINT` env variable.
121        :param secret: The Fauna Secret to use. Defaults to empty, or the `FAUNA_SECRET` env variable.
122        :param http_client: An :class:`HTTPClient` implementation. Defaults to a global :class:`HTTPXClient`.
123        :param query_tags: Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_
124        :param linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
125        :param max_contention_retries: The max number of times to retry the query if contention is encountered.
126        :param typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration.
127        :param additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary.
128        :param query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is :py:data:`DefaultQueryTimeout`.
129        :param client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is :py:data:`DefaultClientBufferTimeout`, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error.
130        :param http_read_timeout: Set HTTP Read timeout, default is :py:data:`DefaultHttpReadTimeout`.
131        :param http_write_timeout: Set HTTP Write timeout, default is :py:data:`DefaultHttpWriteTimeout`.
132        :param http_connect_timeout: Set HTTP Connect timeout, default is :py:data:`DefaultHttpConnectTimeout`.
133        :param http_pool_timeout: Set HTTP Pool timeout, default is :py:data:`DefaultHttpPoolTimeout`.
134        :param http_idle_timeout: Set HTTP Idle timeout, default is :py:data:`DefaultIdleConnectionTimeout`.
135        :param max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3.
136        :param max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20.
137        """
138
139    self._set_endpoint(endpoint)
140    self._max_attempts = max_attempts
141    self._max_backoff = max_backoff
142
143    if secret is None:
144      self._auth = _Auth(_Environment.EnvFaunaSecret())
145    else:
146      self._auth = _Auth(secret)
147
148    self._last_txn_ts = LastTxnTs()
149
150    self._query_tags = {}
151    if query_tags is not None:
152      self._query_tags.update(query_tags)
153
154    if query_timeout is not None:
155      self._query_timeout_ms = int(query_timeout.total_seconds() * 1000)
156    else:
157      self._query_timeout_ms = None
158
159    self._headers: Dict[str, str] = {
160        _Header.AcceptEncoding: "gzip",
161        _Header.ContentType: "application/json;charset=utf-8",
162        _Header.Driver: "python",
163        _Header.DriverEnv: str(_DriverEnvironment()),
164    }
165
166    if typecheck is not None:
167      self._headers[Header.Typecheck] = str(typecheck).lower()
168
169    if linearized is not None:
170      self._headers[Header.Linearized] = str(linearized).lower()
171
172    if max_contention_retries is not None and max_contention_retries > 0:
173      self._headers[Header.MaxContentionRetries] = \
174          f"{max_contention_retries}"
175
176    if additional_headers is not None:
177      self._headers = {
178          **self._headers,
179          **additional_headers,
180      }
181
182    self._session: HTTPClient
183
184    if http_client is not None:
185      self._session = http_client
186    else:
187      if fauna.global_http_client is None:
188        timeout_s: Optional[float] = None
189        if query_timeout is not None and client_buffer_timeout is not None:
190          timeout_s = (query_timeout + client_buffer_timeout).total_seconds()
191        read_timeout_s: Optional[float] = None
192        if http_read_timeout is not None:
193          read_timeout_s = http_read_timeout.total_seconds()
194
195        write_timeout_s: Optional[float] = http_write_timeout.total_seconds(
196        ) if http_write_timeout is not None else None
197        connect_timeout_s: Optional[float] = http_connect_timeout.total_seconds(
198        ) if http_connect_timeout is not None else None
199        pool_timeout_s: Optional[float] = http_pool_timeout.total_seconds(
200        ) if http_pool_timeout is not None else None
201        idle_timeout_s: Optional[float] = http_idle_timeout.total_seconds(
202        ) if http_idle_timeout is not None else None
203
204        import httpx
205        from fauna.http.httpx_client import HTTPXClient
206        c = HTTPXClient(
207            httpx.Client(
208                http1=True,
209                http2=False,
210                timeout=httpx.Timeout(
211                    timeout=timeout_s,
212                    connect=connect_timeout_s,
213                    read=read_timeout_s,
214                    write=write_timeout_s,
215                    pool=pool_timeout_s,
216                ),
217                limits=httpx.Limits(
218                    max_connections=DefaultMaxConnections,
219                    max_keepalive_connections=DefaultMaxIdleConnections,
220                    keepalive_expiry=idle_timeout_s,
221                ),
222            ), logger)
223        fauna.global_http_client = c
224
225      self._session = fauna.global_http_client
226
227  def close(self):
228    self._session.close()
229    if self._session == fauna.global_http_client:
230      fauna.global_http_client = None
231
232  def set_last_txn_ts(self, txn_ts: int):
233    """
234        Set the last timestamp seen by this client.
235        This has no effect if earlier than stored timestamp.
236
237        .. WARNING:: This should be used only when coordinating timestamps across
238        multiple clients. Moving the timestamp arbitrarily forward into
239        the future will cause transactions to stall.
240
241        :param txn_ts: the new transaction time.
242        """
243    self._last_txn_ts.update_txn_time(txn_ts)
244
245  def get_last_txn_ts(self) -> Optional[int]:
246    """
247        Get the last timestamp seen by this client.
248        :return:
249        """
250    return self._last_txn_ts.time
251
252  def get_query_timeout(self) -> Optional[timedelta]:
253    """
254        Get the query timeout for all queries.
255        """
256    if self._query_timeout_ms is not None:
257      return timedelta(milliseconds=self._query_timeout_ms)
258    else:
259      return None
260
261  def paginate(
262      self,
263      fql: Query,
264      opts: Optional[QueryOptions] = None,
265  ) -> "QueryIterator":
266    """
267        Run a query on Fauna and returning an iterator of results. If the query
268        returns a Page, the iterator will fetch additional Pages until the
269        after token is null. Each call for a page will be retried with exponential
270        backoff up to the max_attempts set in the client's retry policy in the
271        event of a 429 or 502.
272
273        :param fql: A Query
274        :param opts: (Optional) Query Options
275
276        :return: a :class:`QueryResponse`
277
278        :raises NetworkError: HTTP Request failed in transit
279        :raises ProtocolError: HTTP error not from Fauna
280        :raises ServiceError: Fauna returned an error
281        :raises ValueError: Encoding and decoding errors
282        :raises TypeError: Invalid param types
283        """
284
285    if not isinstance(fql, Query):
286      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
287                 f"Query by calling fauna.fql()"
288      raise TypeError(err_msg)
289
290    return QueryIterator(self, fql, opts)
291
292  def query(
293      self,
294      fql: Query,
295      opts: Optional[QueryOptions] = None,
296  ) -> QuerySuccess:
297    """
298        Run a query on Fauna. A query will be retried max_attempt times with exponential backoff
299        up to the max_backoff in the event of a 429.
300
301        :param fql: A Query
302        :param opts: (Optional) Query Options
303
304        :return: a :class:`QueryResponse`
305
306        :raises NetworkError: HTTP Request failed in transit
307        :raises ProtocolError: HTTP error not from Fauna
308        :raises ServiceError: Fauna returned an error
309        :raises ValueError: Encoding and decoding errors
310        :raises TypeError: Invalid param types
311        """
312
313    if not isinstance(fql, Query):
314      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
315                 f"Query by calling fauna.fql()"
316      raise TypeError(err_msg)
317
318    try:
319      encoded_query: Mapping[str, Any] = FaunaEncoder.encode(fql)
320    except Exception as e:
321      raise ClientError("Failed to encode Query") from e
322
323    retryable = Retryable[QuerySuccess](
324        self._max_attempts,
325        self._max_backoff,
326        self._query,
327        "/query/1",
328        fql=encoded_query,
329        opts=opts,
330    )
331
332    r = retryable.run()
333    r.response.stats.attempts = r.attempts
334    return r.response
335
336  def _query(
337      self,
338      path: str,
339      fql: Mapping[str, Any],
340      arguments: Optional[Mapping[str, Any]] = None,
341      opts: Optional[QueryOptions] = None,
342  ) -> QuerySuccess:
343
344    headers = self._headers.copy()
345    headers[_Header.Format] = "tagged"
346    headers[_Header.Authorization] = self._auth.bearer()
347
348    if self._query_timeout_ms is not None:
349      headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms)
350
351    headers.update(self._last_txn_ts.request_header)
352
353    query_tags = {}
354    if self._query_tags is not None:
355      query_tags.update(self._query_tags)
356
357    if opts is not None:
358      if opts.linearized is not None:
359        headers[Header.Linearized] = str(opts.linearized).lower()
360      if opts.max_contention_retries is not None:
361        headers[Header.MaxContentionRetries] = \
362            f"{opts.max_contention_retries}"
363      if opts.traceparent is not None:
364        headers[Header.Traceparent] = opts.traceparent
365      if opts.query_timeout is not None:
366        timeout_ms = f"{int(opts.query_timeout.total_seconds() * 1000)}"
367        headers[Header.QueryTimeoutMs] = timeout_ms
368      if opts.query_tags is not None:
369        query_tags.update(opts.query_tags)
370      if opts.typecheck is not None:
371        headers[Header.Typecheck] = str(opts.typecheck).lower()
372      if opts.additional_headers is not None:
373        headers.update(opts.additional_headers)
374
375    if len(query_tags) > 0:
376      headers[Header.Tags] = QueryTags.encode(query_tags)
377
378    data: dict[str, Any] = {
379        "query": fql,
380        "arguments": arguments or {},
381    }
382
383    with self._session.request(
384        method="POST",
385        url=self._endpoint + path,
386        headers=headers,
387        data=data,
388    ) as response:
389      status_code = response.status_code()
390      response_json = response.json()
391      headers = response.headers()
392
393      self._check_protocol(response_json, status_code)
394
395      dec: Any = FaunaDecoder.decode(response_json)
396
397      if status_code > 399:
398        FaunaError.parse_error_and_throw(dec, status_code)
399
400      if "txn_ts" in dec:
401        self.set_last_txn_ts(int(response_json["txn_ts"]))
402
403      stats = QueryStats(dec["stats"]) if "stats" in dec else None
404      summary = dec["summary"] if "summary" in dec else None
405      query_tags = QueryTags.decode(
406          dec["query_tags"]) if "query_tags" in dec else None
407      txn_ts = dec["txn_ts"] if "txn_ts" in dec else None
408      schema_version = dec["schema_version"] if "schema_version" in dec else None
409      traceparent = headers.get("traceparent", None)
410      static_type = dec["static_type"] if "static_type" in dec else None
411
412      return QuerySuccess(
413          data=dec["data"],
414          query_tags=query_tags,
415          static_type=static_type,
416          stats=stats,
417          summary=summary,
418          traceparent=traceparent,
419          txn_ts=txn_ts,
420          schema_version=schema_version,
421      )
422
423  def stream(
424      self,
425      fql: Union[EventSource, Query],
426      opts: StreamOptions = StreamOptions()
427  ) -> "StreamIterator":
428    """
429        Opens a Stream in Fauna and returns an iterator that consume Fauna events.
430
431        :param fql: An EventSource or a Query that returns an EventSource.
432        :param opts: (Optional) Stream Options.
433
434        :return: a :class:`StreamIterator`
435
436        :raises ClientError: Invalid options provided
437        :raises NetworkError: HTTP Request failed in transit
438        :raises ProtocolError: HTTP error not from Fauna
439        :raises ServiceError: Fauna returned an error
440        :raises ValueError: Encoding and decoding errors
441        :raises TypeError: Invalid param types
442        """
443
444    if isinstance(fql, Query):
445      if opts.cursor is not None:
446        raise ClientError(
447            "The 'cursor' configuration can only be used with an event source.")
448
449      source = self.query(fql).data
450    else:
451      source = fql
452
453    if not isinstance(source, EventSource):
454      err_msg = f"'fql' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}."
455      raise TypeError(err_msg)
456
457    headers = self._headers.copy()
458    headers[_Header.Format] = "tagged"
459    headers[_Header.Authorization] = self._auth.bearer()
460
461    return StreamIterator(self._session, headers, self._endpoint + "/stream/1",
462                          self._max_attempts, self._max_backoff, opts, source)
463
464  def feed(
465      self,
466      source: Union[EventSource, Query],
467      opts: FeedOptions = FeedOptions(),
468  ) -> "FeedIterator":
469    """
470        Opens an Event Feed in Fauna and returns an iterator that consume Fauna events.
471
472        :param source: An EventSource or a Query that returns an EventSource.
473        :param opts: (Optional) Event Feed options.
474
475        :return: a :class:`FeedIterator`
476
477        :raises ClientError: Invalid options provided
478        :raises NetworkError: HTTP Request failed in transit
479        :raises ProtocolError: HTTP error not from Fauna
480        :raises ServiceError: Fauna returned an error
481        :raises ValueError: Encoding and decoding errors
482        :raises TypeError: Invalid param types
483        """
484
485    if isinstance(source, Query):
486      source = self.query(source).data
487
488    if not isinstance(source, EventSource):
489      err_msg = f"'source' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}."
490      raise TypeError(err_msg)
491
492    headers = self._headers.copy()
493    headers[_Header.Format] = "tagged"
494    headers[_Header.Authorization] = self._auth.bearer()
495
496    if opts.query_timeout is not None:
497      query_timeout_ms = int(opts.query_timeout.total_seconds() * 1000)
498      headers[Header.QueryTimeoutMs] = str(query_timeout_ms)
499    elif self._query_timeout_ms is not None:
500      headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms)
501
502    return FeedIterator(self._session, headers, self._endpoint + "/feed/1",
503                        self._max_attempts, self._max_backoff, opts, source)
504
505  def _check_protocol(self, response_json: Any, status_code):
506    # TODO: Logic to validate wire protocol belongs elsewhere.
507    should_raise = False
508
509    # check for QuerySuccess
510    if status_code <= 399 and "data" not in response_json:
511      should_raise = True
512
513    # check for QueryFailure
514    if status_code > 399:
515      if "error" not in response_json:
516        should_raise = True
517      else:
518        e = response_json["error"]
519        if "code" not in e or "message" not in e:
520          should_raise = True
521
522    if should_raise:
523      raise ProtocolError(
524          status_code,
525          f"Response is in an unknown format: \n{response_json}",
526      )
527
528  def _set_endpoint(self, endpoint):
529    if endpoint is None:
530      endpoint = _Environment.EnvFaunaEndpoint()
531
532    if endpoint[-1:] == "/":
533      endpoint = endpoint[:-1]
534
535    self._endpoint = endpoint
536
537
538class StreamIterator:
539  """A class that mixes a ContextManager and an Iterator so we can detected retryable errors."""
540
541  def __init__(self, http_client: HTTPClient, headers: Dict[str, str],
542               endpoint: str, max_attempts: int, max_backoff: int,
543               opts: StreamOptions, source: EventSource):
544    self._http_client = http_client
545    self._headers = headers
546    self._endpoint = endpoint
547    self._max_attempts = max_attempts
548    self._max_backoff = max_backoff
549    self._opts = opts
550    self._source = source
551    self._stream = None
552    self.last_ts = None
553    self.last_cursor = None
554    self._ctx = self._create_stream()
555
556    if opts.start_ts is not None and opts.cursor is not None:
557      err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the StreamOptions."
558      raise TypeError(err_msg)
559
560  def __enter__(self):
561    return self
562
563  def __exit__(self, exc_type, exc_value, exc_traceback):
564    if self._stream is not None:
565      self._stream.close()
566
567    self._ctx.__exit__(exc_type, exc_value, exc_traceback)
568    return False
569
570  def __iter__(self):
571    return self
572
573  def __next__(self):
574    if self._opts.max_attempts is not None:
575      max_attempts = self._opts.max_attempts
576    else:
577      max_attempts = self._max_attempts
578
579    if self._opts.max_backoff is not None:
580      max_backoff = self._opts.max_backoff
581    else:
582      max_backoff = self._max_backoff
583
584    retryable = Retryable[Any](max_attempts, max_backoff, self._next_element)
585    return retryable.run().response
586
587  def _next_element(self):
588    try:
589      if self._stream is None:
590        try:
591          self._stream = self._ctx.__enter__()
592        except Exception:
593          self._retry_stream()
594
595      if self._stream is not None:
596        event: Any = FaunaDecoder.decode(next(self._stream))
597
598        if event["type"] == "error":
599          FaunaError.parse_error_and_throw(event, 400)
600
601        self.last_ts = event["txn_ts"]
602        self.last_cursor = event.get('cursor')
603
604        if event["type"] == "start":
605          return self._next_element()
606
607        if not self._opts.status_events and event["type"] == "status":
608          return self._next_element()
609
610        return event
611
612      raise StopIteration
613    except NetworkError:
614      self._retry_stream()
615
616  def _retry_stream(self):
617    if self._stream is not None:
618      self._stream.close()
619
620    self._stream = None
621
622    try:
623      self._ctx = self._create_stream()
624    except Exception:
625      pass
626    raise RetryableFaunaException
627
628  def _create_stream(self):
629    data: Dict[str, Any] = {"token": self._source.token}
630    if self.last_cursor is not None:
631      data["cursor"] = self.last_cursor
632    elif self._opts.cursor is not None:
633      data["cursor"] = self._opts.cursor
634    elif self._opts.start_ts is not None:
635      data["start_ts"] = self._opts.start_ts
636
637    return self._http_client.stream(
638        url=self._endpoint, headers=self._headers, data=data)
639
640  def close(self):
641    if self._stream is not None:
642      self._stream.close()
643
644
645class FeedPage:
646
647  def __init__(self, events: List[Any], cursor: str, stats: QueryStats):
648    self._events = events
649    self.cursor = cursor
650    self.stats = stats
651
652  def __len__(self):
653    return len(self._events)
654
655  def __iter__(self) -> Iterator[Any]:
656    for event in self._events:
657      if event["type"] == "error":
658        FaunaError.parse_error_and_throw(event, 400)
659      yield event
660
661
662class FeedIterator:
663  """A class to provide an iterator on top of Event Feed pages."""
664
665  def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str,
666               max_attempts: int, max_backoff: int, opts: FeedOptions,
667               source: EventSource):
668    self._http = http
669    self._headers = headers
670    self._endpoint = endpoint
671    self._max_attempts = opts.max_attempts or max_attempts
672    self._max_backoff = opts.max_backoff or max_backoff
673    self._request: Dict[str, Any] = {"token": source.token}
674    self._is_done = False
675
676    if opts.start_ts is not None and opts.cursor is not None:
677      err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the FeedOptions."
678      raise TypeError(err_msg)
679
680    if opts.page_size is not None:
681      self._request["page_size"] = opts.page_size
682
683    if opts.cursor is not None:
684      self._request["cursor"] = opts.cursor
685    elif opts.start_ts is not None:
686      self._request["start_ts"] = opts.start_ts
687
688  def __iter__(self) -> Iterator[FeedPage]:
689    self._is_done = False
690    return self
691
692  def __next__(self) -> FeedPage:
693    if self._is_done:
694      raise StopIteration
695
696    retryable = Retryable[Any](self._max_attempts, self._max_backoff,
697                               self._next_page)
698    return retryable.run().response
699
700  def _next_page(self) -> FeedPage:
701    with self._http.request(
702        method="POST",
703        url=self._endpoint,
704        headers=self._headers,
705        data=self._request,
706    ) as response:
707      status_code = response.status_code()
708      decoded: Any = FaunaDecoder.decode(response.json())
709
710      if status_code > 399:
711        FaunaError.parse_error_and_throw(decoded, status_code)
712
713      self._is_done = not decoded["has_next"]
714      self._request["cursor"] = decoded["cursor"]
715
716      if "start_ts" in self._request:
717        del self._request["start_ts"]
718
719      return FeedPage(decoded["events"], decoded["cursor"],
720                      QueryStats(decoded["stats"]))
721
722  def flatten(self) -> Iterator:
723    """A generator that yields events instead of pages of events."""
724    for page in self:
725      for event in page:
726        yield event
727
728
729class QueryIterator:
730  """A class to provider an iterator on top of Fauna queries."""
731
732  def __init__(self,
733               client: Client,
734               fql: Query,
735               opts: Optional[QueryOptions] = None):
736    """Initializes the QueryIterator
737
738        :param fql: A Query
739        :param opts: (Optional) Query Options
740
741        :raises TypeError: Invalid param types
742        """
743    if not isinstance(client, Client):
744      err_msg = f"'client' must be a Client but was a {type(client)}. You can build a " \
745                  f"Client by calling fauna.client.Client()"
746      raise TypeError(err_msg)
747
748    if not isinstance(fql, Query):
749      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
750                 f"Query by calling fauna.fql()"
751      raise TypeError(err_msg)
752
753    self.client = client
754    self.fql = fql
755    self.opts = opts
756
757  def __iter__(self) -> Iterator:
758    return self.iter()
759
760  def iter(self) -> Iterator:
761    """
762        A generator function that immediately fetches and yields the results of
763        the stored query. Yields additional pages on subsequent iterations if
764        they exist
765        """
766
767    cursor = None
768    initial_response = self.client.query(self.fql, self.opts)
769
770    if isinstance(initial_response.data, Page):
771      cursor = initial_response.data.after
772      yield initial_response.data.data
773
774      while cursor is not None:
775        next_response = self.client.query(
776            fql("Set.paginate(${after})", after=cursor), self.opts)
777        # TODO: `Set.paginate` does not yet return a `@set` tagged value
778        #       so we will get back a plain object that might not have
779        #       an after property.
780        cursor = next_response.data.get("after")
781        yield next_response.data.get("data")
782
783    else:
784      yield [initial_response.data]
785
786  def flatten(self) -> Iterator:
787    """
788        A generator function that immediately fetches and yields the results of
789        the stored query. Yields each item individually, rather than a whole
790        Page at a time. Fetches additional pages as required if they exist.
791        """
792
793    for page in self.iter():
794      for item in page:
795        yield item
logger = <Logger fauna (WARNING)>
DefaultHttpConnectTimeout = datetime.timedelta(seconds=5)
DefaultHttpReadTimeout: Optional[datetime.timedelta] = None
DefaultHttpWriteTimeout = datetime.timedelta(seconds=5)
DefaultHttpPoolTimeout = datetime.timedelta(seconds=5)
DefaultIdleConnectionTimeout = datetime.timedelta(seconds=5)
DefaultQueryTimeout = datetime.timedelta(seconds=5)
DefaultClientBufferTimeout = datetime.timedelta(seconds=5)
DefaultMaxConnections = 20
DefaultMaxIdleConnections = 20
@dataclass
class QueryOptions:
31@dataclass
32class QueryOptions:
33  """
34    A dataclass representing options available for a query.
35
36    * linearized - If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
37    * max_contention_retries - The max number of times to retry the query if contention is encountered.
38    * query_timeout - Controls the maximum amount of time Fauna will execute your query before marking it failed.
39    * query_tags - Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_
40    * traceparent - A traceparent to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ Must match format: https://www.w3.org/TR/trace-context/#traceparent-header
41    * typecheck - Enable or disable typechecking of the query before evaluation. If not set, the value configured on the Client will be used. If neither is set, Fauna will use the value of the "typechecked" flag on the database configuration.
42    * additional_headers - Add/update HTTP request headers for the query. In general, this should not be necessary.
43    """
44
45  linearized: Optional[bool] = None
46  max_contention_retries: Optional[int] = None
47  query_timeout: Optional[timedelta] = DefaultQueryTimeout
48  query_tags: Optional[Mapping[str, str]] = None
49  traceparent: Optional[str] = None
50  typecheck: Optional[bool] = None
51  additional_headers: Optional[Dict[str, str]] = None

A dataclass representing options available for a query.

  • linearized - If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
  • max_contention_retries - The max number of times to retry the query if contention is encountered.
  • query_timeout - Controls the maximum amount of time Fauna will execute your query before marking it failed.
  • query_tags - Tags to associate with the query. See logging
  • traceparent - A traceparent to associate with the query. See logging Must match format: https://www.w3.org/TR/trace-context/#traceparent-header
  • typecheck - Enable or disable typechecking of the query before evaluation. If not set, the value configured on the Client will be used. If neither is set, Fauna will use the value of the "typechecked" flag on the database configuration.
  • additional_headers - Add/update HTTP request headers for the query. In general, this should not be necessary.
QueryOptions( linearized: Optional[bool] = None, max_contention_retries: Optional[int] = None, query_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5), query_tags: Optional[Mapping[str, str]] = None, traceparent: Optional[str] = None, typecheck: Optional[bool] = None, additional_headers: Optional[Dict[str, str]] = None)
linearized: Optional[bool] = None
max_contention_retries: Optional[int] = None
query_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5)
query_tags: Optional[Mapping[str, str]] = None
traceparent: Optional[str] = None
typecheck: Optional[bool] = None
additional_headers: Optional[Dict[str, str]] = None
@dataclass
class StreamOptions:
54@dataclass
55class StreamOptions:
56  """
57    A dataclass representing options available for a stream.
58
59    * max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown.
60    * max_backoff - The maximum backoff in seconds for an individual retry.
61    * start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after
62    the timestamp.
63    * cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
64    * status_events - Indicates if stream should include status events. Status events are periodic events that
65    update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics
66    about the cost of maintaining the stream other than the cost of the received events.
67    """
68
69  max_attempts: Optional[int] = None
70  max_backoff: Optional[int] = None
71  start_ts: Optional[int] = None
72  cursor: Optional[str] = None
73  status_events: bool = False

A dataclass representing options available for a stream.

  • max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown.
  • max_backoff - The maximum backoff in seconds for an individual retry.
  • start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after the timestamp.
  • cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
  • status_events - Indicates if stream should include status events. Status events are periodic events that update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics about the cost of maintaining the stream other than the cost of the received events.
StreamOptions( max_attempts: Optional[int] = None, max_backoff: Optional[int] = None, start_ts: Optional[int] = None, cursor: Optional[str] = None, status_events: bool = False)
max_attempts: Optional[int] = None
max_backoff: Optional[int] = None
start_ts: Optional[int] = None
cursor: Optional[str] = None
status_events: bool = False
@dataclass
class FeedOptions:
76@dataclass
77class FeedOptions:
78  """
79    A dataclass representing options available for an Event Feed.
80
81    * max_attempts - The maximum number of times to attempt an Event Feed query when a retryable exception is thrown.
82    * max_backoff - The maximum backoff in seconds for an individual retry.
83    * query_timeout - Controls the maximum amount of time Fauna will execute a query before returning a page of events.
84    * start_ts - The starting timestamp of the Event Feed, exclusive. If set, Fauna will return events starting after
85    the timestamp.
86    * cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
87    * page_size - The desired number of events per page.
88    """
89  max_attempts: Optional[int] = None
90  max_backoff: Optional[int] = None
91  query_timeout: Optional[timedelta] = None
92  page_size: Optional[int] = None
93  start_ts: Optional[int] = None
94  cursor: Optional[str] = None

A dataclass representing options available for an Event Feed.

  • max_attempts - The maximum number of times to attempt an Event Feed query when a retryable exception is thrown.
  • max_backoff - The maximum backoff in seconds for an individual retry.
  • query_timeout - Controls the maximum amount of time Fauna will execute a query before returning a page of events.
  • start_ts - The starting timestamp of the Event Feed, exclusive. If set, Fauna will return events starting after the timestamp.
  • cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
  • page_size - The desired number of events per page.
FeedOptions( max_attempts: Optional[int] = None, max_backoff: Optional[int] = None, query_timeout: Optional[datetime.timedelta] = None, page_size: Optional[int] = None, start_ts: Optional[int] = None, cursor: Optional[str] = None)
max_attempts: Optional[int] = None
max_backoff: Optional[int] = None
query_timeout: Optional[datetime.timedelta] = None
page_size: Optional[int] = None
start_ts: Optional[int] = None
cursor: Optional[str] = None
class Client:
 97class Client:
 98
 99  def __init__(
100      self,
101      endpoint: Optional[str] = None,
102      secret: Optional[str] = None,
103      http_client: Optional[HTTPClient] = None,
104      query_tags: Optional[Mapping[str, str]] = None,
105      linearized: Optional[bool] = None,
106      max_contention_retries: Optional[int] = None,
107      typecheck: Optional[bool] = None,
108      additional_headers: Optional[Dict[str, str]] = None,
109      query_timeout: Optional[timedelta] = DefaultQueryTimeout,
110      client_buffer_timeout: Optional[timedelta] = DefaultClientBufferTimeout,
111      http_read_timeout: Optional[timedelta] = DefaultHttpReadTimeout,
112      http_write_timeout: Optional[timedelta] = DefaultHttpWriteTimeout,
113      http_connect_timeout: Optional[timedelta] = DefaultHttpConnectTimeout,
114      http_pool_timeout: Optional[timedelta] = DefaultHttpPoolTimeout,
115      http_idle_timeout: Optional[timedelta] = DefaultIdleConnectionTimeout,
116      max_attempts: int = 3,
117      max_backoff: int = 20,
118  ):
119    """Initializes a Client.
120
121        :param endpoint: The Fauna Endpoint to use. Defaults to https://db.fauna.com, or the `FAUNA_ENDPOINT` env variable.
122        :param secret: The Fauna Secret to use. Defaults to empty, or the `FAUNA_SECRET` env variable.
123        :param http_client: An :class:`HTTPClient` implementation. Defaults to a global :class:`HTTPXClient`.
124        :param query_tags: Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_
125        :param linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
126        :param max_contention_retries: The max number of times to retry the query if contention is encountered.
127        :param typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration.
128        :param additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary.
129        :param query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is :py:data:`DefaultQueryTimeout`.
130        :param client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is :py:data:`DefaultClientBufferTimeout`, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error.
131        :param http_read_timeout: Set HTTP Read timeout, default is :py:data:`DefaultHttpReadTimeout`.
132        :param http_write_timeout: Set HTTP Write timeout, default is :py:data:`DefaultHttpWriteTimeout`.
133        :param http_connect_timeout: Set HTTP Connect timeout, default is :py:data:`DefaultHttpConnectTimeout`.
134        :param http_pool_timeout: Set HTTP Pool timeout, default is :py:data:`DefaultHttpPoolTimeout`.
135        :param http_idle_timeout: Set HTTP Idle timeout, default is :py:data:`DefaultIdleConnectionTimeout`.
136        :param max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3.
137        :param max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20.
138        """
139
140    self._set_endpoint(endpoint)
141    self._max_attempts = max_attempts
142    self._max_backoff = max_backoff
143
144    if secret is None:
145      self._auth = _Auth(_Environment.EnvFaunaSecret())
146    else:
147      self._auth = _Auth(secret)
148
149    self._last_txn_ts = LastTxnTs()
150
151    self._query_tags = {}
152    if query_tags is not None:
153      self._query_tags.update(query_tags)
154
155    if query_timeout is not None:
156      self._query_timeout_ms = int(query_timeout.total_seconds() * 1000)
157    else:
158      self._query_timeout_ms = None
159
160    self._headers: Dict[str, str] = {
161        _Header.AcceptEncoding: "gzip",
162        _Header.ContentType: "application/json;charset=utf-8",
163        _Header.Driver: "python",
164        _Header.DriverEnv: str(_DriverEnvironment()),
165    }
166
167    if typecheck is not None:
168      self._headers[Header.Typecheck] = str(typecheck).lower()
169
170    if linearized is not None:
171      self._headers[Header.Linearized] = str(linearized).lower()
172
173    if max_contention_retries is not None and max_contention_retries > 0:
174      self._headers[Header.MaxContentionRetries] = \
175          f"{max_contention_retries}"
176
177    if additional_headers is not None:
178      self._headers = {
179          **self._headers,
180          **additional_headers,
181      }
182
183    self._session: HTTPClient
184
185    if http_client is not None:
186      self._session = http_client
187    else:
188      if fauna.global_http_client is None:
189        timeout_s: Optional[float] = None
190        if query_timeout is not None and client_buffer_timeout is not None:
191          timeout_s = (query_timeout + client_buffer_timeout).total_seconds()
192        read_timeout_s: Optional[float] = None
193        if http_read_timeout is not None:
194          read_timeout_s = http_read_timeout.total_seconds()
195
196        write_timeout_s: Optional[float] = http_write_timeout.total_seconds(
197        ) if http_write_timeout is not None else None
198        connect_timeout_s: Optional[float] = http_connect_timeout.total_seconds(
199        ) if http_connect_timeout is not None else None
200        pool_timeout_s: Optional[float] = http_pool_timeout.total_seconds(
201        ) if http_pool_timeout is not None else None
202        idle_timeout_s: Optional[float] = http_idle_timeout.total_seconds(
203        ) if http_idle_timeout is not None else None
204
205        import httpx
206        from fauna.http.httpx_client import HTTPXClient
207        c = HTTPXClient(
208            httpx.Client(
209                http1=True,
210                http2=False,
211                timeout=httpx.Timeout(
212                    timeout=timeout_s,
213                    connect=connect_timeout_s,
214                    read=read_timeout_s,
215                    write=write_timeout_s,
216                    pool=pool_timeout_s,
217                ),
218                limits=httpx.Limits(
219                    max_connections=DefaultMaxConnections,
220                    max_keepalive_connections=DefaultMaxIdleConnections,
221                    keepalive_expiry=idle_timeout_s,
222                ),
223            ), logger)
224        fauna.global_http_client = c
225
226      self._session = fauna.global_http_client
227
228  def close(self):
229    self._session.close()
230    if self._session == fauna.global_http_client:
231      fauna.global_http_client = None
232
233  def set_last_txn_ts(self, txn_ts: int):
234    """
235        Set the last timestamp seen by this client.
236        This has no effect if earlier than stored timestamp.
237
238        .. WARNING:: This should be used only when coordinating timestamps across
239        multiple clients. Moving the timestamp arbitrarily forward into
240        the future will cause transactions to stall.
241
242        :param txn_ts: the new transaction time.
243        """
244    self._last_txn_ts.update_txn_time(txn_ts)
245
246  def get_last_txn_ts(self) -> Optional[int]:
247    """
248        Get the last timestamp seen by this client.
249        :return:
250        """
251    return self._last_txn_ts.time
252
253  def get_query_timeout(self) -> Optional[timedelta]:
254    """
255        Get the query timeout for all queries.
256        """
257    if self._query_timeout_ms is not None:
258      return timedelta(milliseconds=self._query_timeout_ms)
259    else:
260      return None
261
262  def paginate(
263      self,
264      fql: Query,
265      opts: Optional[QueryOptions] = None,
266  ) -> "QueryIterator":
267    """
268        Run a query on Fauna and returning an iterator of results. If the query
269        returns a Page, the iterator will fetch additional Pages until the
270        after token is null. Each call for a page will be retried with exponential
271        backoff up to the max_attempts set in the client's retry policy in the
272        event of a 429 or 502.
273
274        :param fql: A Query
275        :param opts: (Optional) Query Options
276
277        :return: a :class:`QueryResponse`
278
279        :raises NetworkError: HTTP Request failed in transit
280        :raises ProtocolError: HTTP error not from Fauna
281        :raises ServiceError: Fauna returned an error
282        :raises ValueError: Encoding and decoding errors
283        :raises TypeError: Invalid param types
284        """
285
286    if not isinstance(fql, Query):
287      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
288                 f"Query by calling fauna.fql()"
289      raise TypeError(err_msg)
290
291    return QueryIterator(self, fql, opts)
292
293  def query(
294      self,
295      fql: Query,
296      opts: Optional[QueryOptions] = None,
297  ) -> QuerySuccess:
298    """
299        Run a query on Fauna. A query will be retried max_attempt times with exponential backoff
300        up to the max_backoff in the event of a 429.
301
302        :param fql: A Query
303        :param opts: (Optional) Query Options
304
305        :return: a :class:`QueryResponse`
306
307        :raises NetworkError: HTTP Request failed in transit
308        :raises ProtocolError: HTTP error not from Fauna
309        :raises ServiceError: Fauna returned an error
310        :raises ValueError: Encoding and decoding errors
311        :raises TypeError: Invalid param types
312        """
313
314    if not isinstance(fql, Query):
315      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
316                 f"Query by calling fauna.fql()"
317      raise TypeError(err_msg)
318
319    try:
320      encoded_query: Mapping[str, Any] = FaunaEncoder.encode(fql)
321    except Exception as e:
322      raise ClientError("Failed to encode Query") from e
323
324    retryable = Retryable[QuerySuccess](
325        self._max_attempts,
326        self._max_backoff,
327        self._query,
328        "/query/1",
329        fql=encoded_query,
330        opts=opts,
331    )
332
333    r = retryable.run()
334    r.response.stats.attempts = r.attempts
335    return r.response
336
337  def _query(
338      self,
339      path: str,
340      fql: Mapping[str, Any],
341      arguments: Optional[Mapping[str, Any]] = None,
342      opts: Optional[QueryOptions] = None,
343  ) -> QuerySuccess:
344
345    headers = self._headers.copy()
346    headers[_Header.Format] = "tagged"
347    headers[_Header.Authorization] = self._auth.bearer()
348
349    if self._query_timeout_ms is not None:
350      headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms)
351
352    headers.update(self._last_txn_ts.request_header)
353
354    query_tags = {}
355    if self._query_tags is not None:
356      query_tags.update(self._query_tags)
357
358    if opts is not None:
359      if opts.linearized is not None:
360        headers[Header.Linearized] = str(opts.linearized).lower()
361      if opts.max_contention_retries is not None:
362        headers[Header.MaxContentionRetries] = \
363            f"{opts.max_contention_retries}"
364      if opts.traceparent is not None:
365        headers[Header.Traceparent] = opts.traceparent
366      if opts.query_timeout is not None:
367        timeout_ms = f"{int(opts.query_timeout.total_seconds() * 1000)}"
368        headers[Header.QueryTimeoutMs] = timeout_ms
369      if opts.query_tags is not None:
370        query_tags.update(opts.query_tags)
371      if opts.typecheck is not None:
372        headers[Header.Typecheck] = str(opts.typecheck).lower()
373      if opts.additional_headers is not None:
374        headers.update(opts.additional_headers)
375
376    if len(query_tags) > 0:
377      headers[Header.Tags] = QueryTags.encode(query_tags)
378
379    data: dict[str, Any] = {
380        "query": fql,
381        "arguments": arguments or {},
382    }
383
384    with self._session.request(
385        method="POST",
386        url=self._endpoint + path,
387        headers=headers,
388        data=data,
389    ) as response:
390      status_code = response.status_code()
391      response_json = response.json()
392      headers = response.headers()
393
394      self._check_protocol(response_json, status_code)
395
396      dec: Any = FaunaDecoder.decode(response_json)
397
398      if status_code > 399:
399        FaunaError.parse_error_and_throw(dec, status_code)
400
401      if "txn_ts" in dec:
402        self.set_last_txn_ts(int(response_json["txn_ts"]))
403
404      stats = QueryStats(dec["stats"]) if "stats" in dec else None
405      summary = dec["summary"] if "summary" in dec else None
406      query_tags = QueryTags.decode(
407          dec["query_tags"]) if "query_tags" in dec else None
408      txn_ts = dec["txn_ts"] if "txn_ts" in dec else None
409      schema_version = dec["schema_version"] if "schema_version" in dec else None
410      traceparent = headers.get("traceparent", None)
411      static_type = dec["static_type"] if "static_type" in dec else None
412
413      return QuerySuccess(
414          data=dec["data"],
415          query_tags=query_tags,
416          static_type=static_type,
417          stats=stats,
418          summary=summary,
419          traceparent=traceparent,
420          txn_ts=txn_ts,
421          schema_version=schema_version,
422      )
423
424  def stream(
425      self,
426      fql: Union[EventSource, Query],
427      opts: StreamOptions = StreamOptions()
428  ) -> "StreamIterator":
429    """
430        Opens a Stream in Fauna and returns an iterator that consume Fauna events.
431
432        :param fql: An EventSource or a Query that returns an EventSource.
433        :param opts: (Optional) Stream Options.
434
435        :return: a :class:`StreamIterator`
436
437        :raises ClientError: Invalid options provided
438        :raises NetworkError: HTTP Request failed in transit
439        :raises ProtocolError: HTTP error not from Fauna
440        :raises ServiceError: Fauna returned an error
441        :raises ValueError: Encoding and decoding errors
442        :raises TypeError: Invalid param types
443        """
444
445    if isinstance(fql, Query):
446      if opts.cursor is not None:
447        raise ClientError(
448            "The 'cursor' configuration can only be used with an event source.")
449
450      source = self.query(fql).data
451    else:
452      source = fql
453
454    if not isinstance(source, EventSource):
455      err_msg = f"'fql' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}."
456      raise TypeError(err_msg)
457
458    headers = self._headers.copy()
459    headers[_Header.Format] = "tagged"
460    headers[_Header.Authorization] = self._auth.bearer()
461
462    return StreamIterator(self._session, headers, self._endpoint + "/stream/1",
463                          self._max_attempts, self._max_backoff, opts, source)
464
465  def feed(
466      self,
467      source: Union[EventSource, Query],
468      opts: FeedOptions = FeedOptions(),
469  ) -> "FeedIterator":
470    """
471        Opens an Event Feed in Fauna and returns an iterator that consume Fauna events.
472
473        :param source: An EventSource or a Query that returns an EventSource.
474        :param opts: (Optional) Event Feed options.
475
476        :return: a :class:`FeedIterator`
477
478        :raises ClientError: Invalid options provided
479        :raises NetworkError: HTTP Request failed in transit
480        :raises ProtocolError: HTTP error not from Fauna
481        :raises ServiceError: Fauna returned an error
482        :raises ValueError: Encoding and decoding errors
483        :raises TypeError: Invalid param types
484        """
485
486    if isinstance(source, Query):
487      source = self.query(source).data
488
489    if not isinstance(source, EventSource):
490      err_msg = f"'source' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}."
491      raise TypeError(err_msg)
492
493    headers = self._headers.copy()
494    headers[_Header.Format] = "tagged"
495    headers[_Header.Authorization] = self._auth.bearer()
496
497    if opts.query_timeout is not None:
498      query_timeout_ms = int(opts.query_timeout.total_seconds() * 1000)
499      headers[Header.QueryTimeoutMs] = str(query_timeout_ms)
500    elif self._query_timeout_ms is not None:
501      headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms)
502
503    return FeedIterator(self._session, headers, self._endpoint + "/feed/1",
504                        self._max_attempts, self._max_backoff, opts, source)
505
506  def _check_protocol(self, response_json: Any, status_code):
507    # TODO: Logic to validate wire protocol belongs elsewhere.
508    should_raise = False
509
510    # check for QuerySuccess
511    if status_code <= 399 and "data" not in response_json:
512      should_raise = True
513
514    # check for QueryFailure
515    if status_code > 399:
516      if "error" not in response_json:
517        should_raise = True
518      else:
519        e = response_json["error"]
520        if "code" not in e or "message" not in e:
521          should_raise = True
522
523    if should_raise:
524      raise ProtocolError(
525          status_code,
526          f"Response is in an unknown format: \n{response_json}",
527      )
528
529  def _set_endpoint(self, endpoint):
530    if endpoint is None:
531      endpoint = _Environment.EnvFaunaEndpoint()
532
533    if endpoint[-1:] == "/":
534      endpoint = endpoint[:-1]
535
536    self._endpoint = endpoint
Client( endpoint: Optional[str] = None, secret: Optional[str] = None, http_client: Optional[fauna.http.http_client.HTTPClient] = None, query_tags: Optional[Mapping[str, str]] = None, linearized: Optional[bool] = None, max_contention_retries: Optional[int] = None, typecheck: Optional[bool] = None, additional_headers: Optional[Dict[str, str]] = None, query_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5), client_buffer_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5), http_read_timeout: Optional[datetime.timedelta] = None, http_write_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5), http_connect_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5), http_pool_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5), http_idle_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5), max_attempts: int = 3, max_backoff: int = 20)
 99  def __init__(
100      self,
101      endpoint: Optional[str] = None,
102      secret: Optional[str] = None,
103      http_client: Optional[HTTPClient] = None,
104      query_tags: Optional[Mapping[str, str]] = None,
105      linearized: Optional[bool] = None,
106      max_contention_retries: Optional[int] = None,
107      typecheck: Optional[bool] = None,
108      additional_headers: Optional[Dict[str, str]] = None,
109      query_timeout: Optional[timedelta] = DefaultQueryTimeout,
110      client_buffer_timeout: Optional[timedelta] = DefaultClientBufferTimeout,
111      http_read_timeout: Optional[timedelta] = DefaultHttpReadTimeout,
112      http_write_timeout: Optional[timedelta] = DefaultHttpWriteTimeout,
113      http_connect_timeout: Optional[timedelta] = DefaultHttpConnectTimeout,
114      http_pool_timeout: Optional[timedelta] = DefaultHttpPoolTimeout,
115      http_idle_timeout: Optional[timedelta] = DefaultIdleConnectionTimeout,
116      max_attempts: int = 3,
117      max_backoff: int = 20,
118  ):
119    """Initializes a Client.
120
121        :param endpoint: The Fauna Endpoint to use. Defaults to https://db.fauna.com, or the `FAUNA_ENDPOINT` env variable.
122        :param secret: The Fauna Secret to use. Defaults to empty, or the `FAUNA_SECRET` env variable.
123        :param http_client: An :class:`HTTPClient` implementation. Defaults to a global :class:`HTTPXClient`.
124        :param query_tags: Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_
125        :param linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
126        :param max_contention_retries: The max number of times to retry the query if contention is encountered.
127        :param typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration.
128        :param additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary.
129        :param query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is :py:data:`DefaultQueryTimeout`.
130        :param client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is :py:data:`DefaultClientBufferTimeout`, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error.
131        :param http_read_timeout: Set HTTP Read timeout, default is :py:data:`DefaultHttpReadTimeout`.
132        :param http_write_timeout: Set HTTP Write timeout, default is :py:data:`DefaultHttpWriteTimeout`.
133        :param http_connect_timeout: Set HTTP Connect timeout, default is :py:data:`DefaultHttpConnectTimeout`.
134        :param http_pool_timeout: Set HTTP Pool timeout, default is :py:data:`DefaultHttpPoolTimeout`.
135        :param http_idle_timeout: Set HTTP Idle timeout, default is :py:data:`DefaultIdleConnectionTimeout`.
136        :param max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3.
137        :param max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20.
138        """
139
140    self._set_endpoint(endpoint)
141    self._max_attempts = max_attempts
142    self._max_backoff = max_backoff
143
144    if secret is None:
145      self._auth = _Auth(_Environment.EnvFaunaSecret())
146    else:
147      self._auth = _Auth(secret)
148
149    self._last_txn_ts = LastTxnTs()
150
151    self._query_tags = {}
152    if query_tags is not None:
153      self._query_tags.update(query_tags)
154
155    if query_timeout is not None:
156      self._query_timeout_ms = int(query_timeout.total_seconds() * 1000)
157    else:
158      self._query_timeout_ms = None
159
160    self._headers: Dict[str, str] = {
161        _Header.AcceptEncoding: "gzip",
162        _Header.ContentType: "application/json;charset=utf-8",
163        _Header.Driver: "python",
164        _Header.DriverEnv: str(_DriverEnvironment()),
165    }
166
167    if typecheck is not None:
168      self._headers[Header.Typecheck] = str(typecheck).lower()
169
170    if linearized is not None:
171      self._headers[Header.Linearized] = str(linearized).lower()
172
173    if max_contention_retries is not None and max_contention_retries > 0:
174      self._headers[Header.MaxContentionRetries] = \
175          f"{max_contention_retries}"
176
177    if additional_headers is not None:
178      self._headers = {
179          **self._headers,
180          **additional_headers,
181      }
182
183    self._session: HTTPClient
184
185    if http_client is not None:
186      self._session = http_client
187    else:
188      if fauna.global_http_client is None:
189        timeout_s: Optional[float] = None
190        if query_timeout is not None and client_buffer_timeout is not None:
191          timeout_s = (query_timeout + client_buffer_timeout).total_seconds()
192        read_timeout_s: Optional[float] = None
193        if http_read_timeout is not None:
194          read_timeout_s = http_read_timeout.total_seconds()
195
196        write_timeout_s: Optional[float] = http_write_timeout.total_seconds(
197        ) if http_write_timeout is not None else None
198        connect_timeout_s: Optional[float] = http_connect_timeout.total_seconds(
199        ) if http_connect_timeout is not None else None
200        pool_timeout_s: Optional[float] = http_pool_timeout.total_seconds(
201        ) if http_pool_timeout is not None else None
202        idle_timeout_s: Optional[float] = http_idle_timeout.total_seconds(
203        ) if http_idle_timeout is not None else None
204
205        import httpx
206        from fauna.http.httpx_client import HTTPXClient
207        c = HTTPXClient(
208            httpx.Client(
209                http1=True,
210                http2=False,
211                timeout=httpx.Timeout(
212                    timeout=timeout_s,
213                    connect=connect_timeout_s,
214                    read=read_timeout_s,
215                    write=write_timeout_s,
216                    pool=pool_timeout_s,
217                ),
218                limits=httpx.Limits(
219                    max_connections=DefaultMaxConnections,
220                    max_keepalive_connections=DefaultMaxIdleConnections,
221                    keepalive_expiry=idle_timeout_s,
222                ),
223            ), logger)
224        fauna.global_http_client = c
225
226      self._session = fauna.global_http_client

Initializes a Client.

Parameters
  • endpoint: The Fauna Endpoint to use. Defaults to https: //db.fauna.com, or the FAUNA_ENDPOINT env variable.
  • secret: The Fauna Secret to use. Defaults to empty, or the FAUNA_SECRET env variable.
  • http_client: An HTTPClient implementation. Defaults to a global HTTPXClient.
  • **query_tags: Tags to associate with the query. See logging
  • linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
  • max_contention_retries: The max number of times to retry the query if contention is encountered.
  • typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration.
  • additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary.
  • query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is DefaultQueryTimeout.
  • client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is DefaultClientBufferTimeout, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error.
  • http_read_timeout: Set HTTP Read timeout, default is DefaultHttpReadTimeout.
  • http_write_timeout: Set HTTP Write timeout, default is DefaultHttpWriteTimeout.
  • http_connect_timeout: Set HTTP Connect timeout, default is DefaultHttpConnectTimeout.
  • http_pool_timeout: Set HTTP Pool timeout, default is DefaultHttpPoolTimeout.
  • http_idle_timeout: Set HTTP Idle timeout, default is DefaultIdleConnectionTimeout.
  • max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3.
  • max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20.
def close(self):
228  def close(self):
229    self._session.close()
230    if self._session == fauna.global_http_client:
231      fauna.global_http_client = None
def set_last_txn_ts(self, txn_ts: int):
233  def set_last_txn_ts(self, txn_ts: int):
234    """
235        Set the last timestamp seen by this client.
236        This has no effect if earlier than stored timestamp.
237
238        .. WARNING:: This should be used only when coordinating timestamps across
239        multiple clients. Moving the timestamp arbitrarily forward into
240        the future will cause transactions to stall.
241
242        :param txn_ts: the new transaction time.
243        """
244    self._last_txn_ts.update_txn_time(txn_ts)

Set the last timestamp seen by this client. This has no effect if earlier than stored timestamp.

.. WARNING:: This should be used only when coordinating timestamps across multiple clients. Moving the timestamp arbitrarily forward into the future will cause transactions to stall.

Parameters
  • txn_ts: the new transaction time.
def get_last_txn_ts(self) -> Optional[int]:
246  def get_last_txn_ts(self) -> Optional[int]:
247    """
248        Get the last timestamp seen by this client.
249        :return:
250        """
251    return self._last_txn_ts.time

Get the last timestamp seen by this client.

Returns
def get_query_timeout(self) -> Optional[datetime.timedelta]:
253  def get_query_timeout(self) -> Optional[timedelta]:
254    """
255        Get the query timeout for all queries.
256        """
257    if self._query_timeout_ms is not None:
258      return timedelta(milliseconds=self._query_timeout_ms)
259    else:
260      return None

Get the query timeout for all queries.

def paginate( self, fql: fauna.query.query_builder.Query, opts: Optional[QueryOptions] = None) -> QueryIterator:
262  def paginate(
263      self,
264      fql: Query,
265      opts: Optional[QueryOptions] = None,
266  ) -> "QueryIterator":
267    """
268        Run a query on Fauna and returning an iterator of results. If the query
269        returns a Page, the iterator will fetch additional Pages until the
270        after token is null. Each call for a page will be retried with exponential
271        backoff up to the max_attempts set in the client's retry policy in the
272        event of a 429 or 502.
273
274        :param fql: A Query
275        :param opts: (Optional) Query Options
276
277        :return: a :class:`QueryResponse`
278
279        :raises NetworkError: HTTP Request failed in transit
280        :raises ProtocolError: HTTP error not from Fauna
281        :raises ServiceError: Fauna returned an error
282        :raises ValueError: Encoding and decoding errors
283        :raises TypeError: Invalid param types
284        """
285
286    if not isinstance(fql, Query):
287      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
288                 f"Query by calling fauna.fql()"
289      raise TypeError(err_msg)
290
291    return QueryIterator(self, fql, opts)

Run a query on Fauna and returning an iterator of results. If the query returns a Page, the iterator will fetch additional Pages until the after token is null. Each call for a page will be retried with exponential backoff up to the max_attempts set in the client's retry policy in the event of a 429 or 502.

Parameters
  • fql: A Query
  • opts: (Optional) Query Options
Returns

a QueryResponse

Raises
  • NetworkError: HTTP Request failed in transit
  • ProtocolError: HTTP error not from Fauna
  • ServiceError: Fauna returned an error
  • ValueError: Encoding and decoding errors
  • TypeError: Invalid param types
def query( self, fql: fauna.query.query_builder.Query, opts: Optional[QueryOptions] = None) -> fauna.encoding.wire_protocol.QuerySuccess:
293  def query(
294      self,
295      fql: Query,
296      opts: Optional[QueryOptions] = None,
297  ) -> QuerySuccess:
298    """
299        Run a query on Fauna. A query will be retried max_attempt times with exponential backoff
300        up to the max_backoff in the event of a 429.
301
302        :param fql: A Query
303        :param opts: (Optional) Query Options
304
305        :return: a :class:`QueryResponse`
306
307        :raises NetworkError: HTTP Request failed in transit
308        :raises ProtocolError: HTTP error not from Fauna
309        :raises ServiceError: Fauna returned an error
310        :raises ValueError: Encoding and decoding errors
311        :raises TypeError: Invalid param types
312        """
313
314    if not isinstance(fql, Query):
315      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
316                 f"Query by calling fauna.fql()"
317      raise TypeError(err_msg)
318
319    try:
320      encoded_query: Mapping[str, Any] = FaunaEncoder.encode(fql)
321    except Exception as e:
322      raise ClientError("Failed to encode Query") from e
323
324    retryable = Retryable[QuerySuccess](
325        self._max_attempts,
326        self._max_backoff,
327        self._query,
328        "/query/1",
329        fql=encoded_query,
330        opts=opts,
331    )
332
333    r = retryable.run()
334    r.response.stats.attempts = r.attempts
335    return r.response

Run a query on Fauna. A query will be retried max_attempt times with exponential backoff up to the max_backoff in the event of a 429.

Parameters
  • fql: A Query
  • opts: (Optional) Query Options
Returns

a QueryResponse

Raises
  • NetworkError: HTTP Request failed in transit
  • ProtocolError: HTTP error not from Fauna
  • ServiceError: Fauna returned an error
  • ValueError: Encoding and decoding errors
  • TypeError: Invalid param types
def stream( self, fql: Union[fauna.query.models.EventSource, fauna.query.query_builder.Query], opts: StreamOptions = StreamOptions(max_attempts=None, max_backoff=None, start_ts=None, cursor=None, status_events=False)) -> StreamIterator:
424  def stream(
425      self,
426      fql: Union[EventSource, Query],
427      opts: StreamOptions = StreamOptions()
428  ) -> "StreamIterator":
429    """
430        Opens a Stream in Fauna and returns an iterator that consume Fauna events.
431
432        :param fql: An EventSource or a Query that returns an EventSource.
433        :param opts: (Optional) Stream Options.
434
435        :return: a :class:`StreamIterator`
436
437        :raises ClientError: Invalid options provided
438        :raises NetworkError: HTTP Request failed in transit
439        :raises ProtocolError: HTTP error not from Fauna
440        :raises ServiceError: Fauna returned an error
441        :raises ValueError: Encoding and decoding errors
442        :raises TypeError: Invalid param types
443        """
444
445    if isinstance(fql, Query):
446      if opts.cursor is not None:
447        raise ClientError(
448            "The 'cursor' configuration can only be used with an event source.")
449
450      source = self.query(fql).data
451    else:
452      source = fql
453
454    if not isinstance(source, EventSource):
455      err_msg = f"'fql' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}."
456      raise TypeError(err_msg)
457
458    headers = self._headers.copy()
459    headers[_Header.Format] = "tagged"
460    headers[_Header.Authorization] = self._auth.bearer()
461
462    return StreamIterator(self._session, headers, self._endpoint + "/stream/1",
463                          self._max_attempts, self._max_backoff, opts, source)

Opens a Stream in Fauna and returns an iterator that consume Fauna events.

Parameters
  • fql: An EventSource or a Query that returns an EventSource.
  • opts: (Optional) Stream Options.
Returns

a StreamIterator

Raises
  • ClientError: Invalid options provided
  • NetworkError: HTTP Request failed in transit
  • ProtocolError: HTTP error not from Fauna
  • ServiceError: Fauna returned an error
  • ValueError: Encoding and decoding errors
  • TypeError: Invalid param types
def feed( self, source: Union[fauna.query.models.EventSource, fauna.query.query_builder.Query], opts: FeedOptions = FeedOptions(max_attempts=None, max_backoff=None, query_timeout=None, page_size=None, start_ts=None, cursor=None)) -> FeedIterator:
465  def feed(
466      self,
467      source: Union[EventSource, Query],
468      opts: FeedOptions = FeedOptions(),
469  ) -> "FeedIterator":
470    """
471        Opens an Event Feed in Fauna and returns an iterator that consume Fauna events.
472
473        :param source: An EventSource or a Query that returns an EventSource.
474        :param opts: (Optional) Event Feed options.
475
476        :return: a :class:`FeedIterator`
477
478        :raises ClientError: Invalid options provided
479        :raises NetworkError: HTTP Request failed in transit
480        :raises ProtocolError: HTTP error not from Fauna
481        :raises ServiceError: Fauna returned an error
482        :raises ValueError: Encoding and decoding errors
483        :raises TypeError: Invalid param types
484        """
485
486    if isinstance(source, Query):
487      source = self.query(source).data
488
489    if not isinstance(source, EventSource):
490      err_msg = f"'source' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}."
491      raise TypeError(err_msg)
492
493    headers = self._headers.copy()
494    headers[_Header.Format] = "tagged"
495    headers[_Header.Authorization] = self._auth.bearer()
496
497    if opts.query_timeout is not None:
498      query_timeout_ms = int(opts.query_timeout.total_seconds() * 1000)
499      headers[Header.QueryTimeoutMs] = str(query_timeout_ms)
500    elif self._query_timeout_ms is not None:
501      headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms)
502
503    return FeedIterator(self._session, headers, self._endpoint + "/feed/1",
504                        self._max_attempts, self._max_backoff, opts, source)

Opens an Event Feed in Fauna and returns an iterator that consume Fauna events.

Parameters
  • source: An EventSource or a Query that returns an EventSource.
  • opts: (Optional) Event Feed options.
Returns

a FeedIterator

Raises
  • ClientError: Invalid options provided
  • NetworkError: HTTP Request failed in transit
  • ProtocolError: HTTP error not from Fauna
  • ServiceError: Fauna returned an error
  • ValueError: Encoding and decoding errors
  • TypeError: Invalid param types
class StreamIterator:
539class StreamIterator:
540  """A class that mixes a ContextManager and an Iterator so we can detected retryable errors."""
541
542  def __init__(self, http_client: HTTPClient, headers: Dict[str, str],
543               endpoint: str, max_attempts: int, max_backoff: int,
544               opts: StreamOptions, source: EventSource):
545    self._http_client = http_client
546    self._headers = headers
547    self._endpoint = endpoint
548    self._max_attempts = max_attempts
549    self._max_backoff = max_backoff
550    self._opts = opts
551    self._source = source
552    self._stream = None
553    self.last_ts = None
554    self.last_cursor = None
555    self._ctx = self._create_stream()
556
557    if opts.start_ts is not None and opts.cursor is not None:
558      err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the StreamOptions."
559      raise TypeError(err_msg)
560
561  def __enter__(self):
562    return self
563
564  def __exit__(self, exc_type, exc_value, exc_traceback):
565    if self._stream is not None:
566      self._stream.close()
567
568    self._ctx.__exit__(exc_type, exc_value, exc_traceback)
569    return False
570
571  def __iter__(self):
572    return self
573
574  def __next__(self):
575    if self._opts.max_attempts is not None:
576      max_attempts = self._opts.max_attempts
577    else:
578      max_attempts = self._max_attempts
579
580    if self._opts.max_backoff is not None:
581      max_backoff = self._opts.max_backoff
582    else:
583      max_backoff = self._max_backoff
584
585    retryable = Retryable[Any](max_attempts, max_backoff, self._next_element)
586    return retryable.run().response
587
588  def _next_element(self):
589    try:
590      if self._stream is None:
591        try:
592          self._stream = self._ctx.__enter__()
593        except Exception:
594          self._retry_stream()
595
596      if self._stream is not None:
597        event: Any = FaunaDecoder.decode(next(self._stream))
598
599        if event["type"] == "error":
600          FaunaError.parse_error_and_throw(event, 400)
601
602        self.last_ts = event["txn_ts"]
603        self.last_cursor = event.get('cursor')
604
605        if event["type"] == "start":
606          return self._next_element()
607
608        if not self._opts.status_events and event["type"] == "status":
609          return self._next_element()
610
611        return event
612
613      raise StopIteration
614    except NetworkError:
615      self._retry_stream()
616
617  def _retry_stream(self):
618    if self._stream is not None:
619      self._stream.close()
620
621    self._stream = None
622
623    try:
624      self._ctx = self._create_stream()
625    except Exception:
626      pass
627    raise RetryableFaunaException
628
629  def _create_stream(self):
630    data: Dict[str, Any] = {"token": self._source.token}
631    if self.last_cursor is not None:
632      data["cursor"] = self.last_cursor
633    elif self._opts.cursor is not None:
634      data["cursor"] = self._opts.cursor
635    elif self._opts.start_ts is not None:
636      data["start_ts"] = self._opts.start_ts
637
638    return self._http_client.stream(
639        url=self._endpoint, headers=self._headers, data=data)
640
641  def close(self):
642    if self._stream is not None:
643      self._stream.close()

A class that mixes a ContextManager and an Iterator so we can detected retryable errors.

StreamIterator( http_client: fauna.http.http_client.HTTPClient, headers: Dict[str, str], endpoint: str, max_attempts: int, max_backoff: int, opts: StreamOptions, source: fauna.query.models.EventSource)
542  def __init__(self, http_client: HTTPClient, headers: Dict[str, str],
543               endpoint: str, max_attempts: int, max_backoff: int,
544               opts: StreamOptions, source: EventSource):
545    self._http_client = http_client
546    self._headers = headers
547    self._endpoint = endpoint
548    self._max_attempts = max_attempts
549    self._max_backoff = max_backoff
550    self._opts = opts
551    self._source = source
552    self._stream = None
553    self.last_ts = None
554    self.last_cursor = None
555    self._ctx = self._create_stream()
556
557    if opts.start_ts is not None and opts.cursor is not None:
558      err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the StreamOptions."
559      raise TypeError(err_msg)
last_ts
last_cursor
def close(self):
641  def close(self):
642    if self._stream is not None:
643      self._stream.close()
class FeedPage:
646class FeedPage:
647
648  def __init__(self, events: List[Any], cursor: str, stats: QueryStats):
649    self._events = events
650    self.cursor = cursor
651    self.stats = stats
652
653  def __len__(self):
654    return len(self._events)
655
656  def __iter__(self) -> Iterator[Any]:
657    for event in self._events:
658      if event["type"] == "error":
659        FaunaError.parse_error_and_throw(event, 400)
660      yield event
FeedPage( events: List[Any], cursor: str, stats: fauna.encoding.wire_protocol.QueryStats)
648  def __init__(self, events: List[Any], cursor: str, stats: QueryStats):
649    self._events = events
650    self.cursor = cursor
651    self.stats = stats
cursor
stats
class FeedIterator:
663class FeedIterator:
664  """A class to provide an iterator on top of Event Feed pages."""
665
666  def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str,
667               max_attempts: int, max_backoff: int, opts: FeedOptions,
668               source: EventSource):
669    self._http = http
670    self._headers = headers
671    self._endpoint = endpoint
672    self._max_attempts = opts.max_attempts or max_attempts
673    self._max_backoff = opts.max_backoff or max_backoff
674    self._request: Dict[str, Any] = {"token": source.token}
675    self._is_done = False
676
677    if opts.start_ts is not None and opts.cursor is not None:
678      err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the FeedOptions."
679      raise TypeError(err_msg)
680
681    if opts.page_size is not None:
682      self._request["page_size"] = opts.page_size
683
684    if opts.cursor is not None:
685      self._request["cursor"] = opts.cursor
686    elif opts.start_ts is not None:
687      self._request["start_ts"] = opts.start_ts
688
689  def __iter__(self) -> Iterator[FeedPage]:
690    self._is_done = False
691    return self
692
693  def __next__(self) -> FeedPage:
694    if self._is_done:
695      raise StopIteration
696
697    retryable = Retryable[Any](self._max_attempts, self._max_backoff,
698                               self._next_page)
699    return retryable.run().response
700
701  def _next_page(self) -> FeedPage:
702    with self._http.request(
703        method="POST",
704        url=self._endpoint,
705        headers=self._headers,
706        data=self._request,
707    ) as response:
708      status_code = response.status_code()
709      decoded: Any = FaunaDecoder.decode(response.json())
710
711      if status_code > 399:
712        FaunaError.parse_error_and_throw(decoded, status_code)
713
714      self._is_done = not decoded["has_next"]
715      self._request["cursor"] = decoded["cursor"]
716
717      if "start_ts" in self._request:
718        del self._request["start_ts"]
719
720      return FeedPage(decoded["events"], decoded["cursor"],
721                      QueryStats(decoded["stats"]))
722
723  def flatten(self) -> Iterator:
724    """A generator that yields events instead of pages of events."""
725    for page in self:
726      for event in page:
727        yield event

A class to provide an iterator on top of Event Feed pages.

FeedIterator( http: fauna.http.http_client.HTTPClient, headers: Dict[str, str], endpoint: str, max_attempts: int, max_backoff: int, opts: FeedOptions, source: fauna.query.models.EventSource)
666  def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str,
667               max_attempts: int, max_backoff: int, opts: FeedOptions,
668               source: EventSource):
669    self._http = http
670    self._headers = headers
671    self._endpoint = endpoint
672    self._max_attempts = opts.max_attempts or max_attempts
673    self._max_backoff = opts.max_backoff or max_backoff
674    self._request: Dict[str, Any] = {"token": source.token}
675    self._is_done = False
676
677    if opts.start_ts is not None and opts.cursor is not None:
678      err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the FeedOptions."
679      raise TypeError(err_msg)
680
681    if opts.page_size is not None:
682      self._request["page_size"] = opts.page_size
683
684    if opts.cursor is not None:
685      self._request["cursor"] = opts.cursor
686    elif opts.start_ts is not None:
687      self._request["start_ts"] = opts.start_ts
def flatten(self) -> Iterator:
723  def flatten(self) -> Iterator:
724    """A generator that yields events instead of pages of events."""
725    for page in self:
726      for event in page:
727        yield event

A generator that yields events instead of pages of events.

class QueryIterator:
730class QueryIterator:
731  """A class to provider an iterator on top of Fauna queries."""
732
733  def __init__(self,
734               client: Client,
735               fql: Query,
736               opts: Optional[QueryOptions] = None):
737    """Initializes the QueryIterator
738
739        :param fql: A Query
740        :param opts: (Optional) Query Options
741
742        :raises TypeError: Invalid param types
743        """
744    if not isinstance(client, Client):
745      err_msg = f"'client' must be a Client but was a {type(client)}. You can build a " \
746                  f"Client by calling fauna.client.Client()"
747      raise TypeError(err_msg)
748
749    if not isinstance(fql, Query):
750      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
751                 f"Query by calling fauna.fql()"
752      raise TypeError(err_msg)
753
754    self.client = client
755    self.fql = fql
756    self.opts = opts
757
758  def __iter__(self) -> Iterator:
759    return self.iter()
760
761  def iter(self) -> Iterator:
762    """
763        A generator function that immediately fetches and yields the results of
764        the stored query. Yields additional pages on subsequent iterations if
765        they exist
766        """
767
768    cursor = None
769    initial_response = self.client.query(self.fql, self.opts)
770
771    if isinstance(initial_response.data, Page):
772      cursor = initial_response.data.after
773      yield initial_response.data.data
774
775      while cursor is not None:
776        next_response = self.client.query(
777            fql("Set.paginate(${after})", after=cursor), self.opts)
778        # TODO: `Set.paginate` does not yet return a `@set` tagged value
779        #       so we will get back a plain object that might not have
780        #       an after property.
781        cursor = next_response.data.get("after")
782        yield next_response.data.get("data")
783
784    else:
785      yield [initial_response.data]
786
787  def flatten(self) -> Iterator:
788    """
789        A generator function that immediately fetches and yields the results of
790        the stored query. Yields each item individually, rather than a whole
791        Page at a time. Fetches additional pages as required if they exist.
792        """
793
794    for page in self.iter():
795      for item in page:
796        yield item

A class to provider an iterator on top of Fauna queries.

QueryIterator( client: Client, fql: fauna.query.query_builder.Query, opts: Optional[QueryOptions] = None)
733  def __init__(self,
734               client: Client,
735               fql: Query,
736               opts: Optional[QueryOptions] = None):
737    """Initializes the QueryIterator
738
739        :param fql: A Query
740        :param opts: (Optional) Query Options
741
742        :raises TypeError: Invalid param types
743        """
744    if not isinstance(client, Client):
745      err_msg = f"'client' must be a Client but was a {type(client)}. You can build a " \
746                  f"Client by calling fauna.client.Client()"
747      raise TypeError(err_msg)
748
749    if not isinstance(fql, Query):
750      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
751                 f"Query by calling fauna.fql()"
752      raise TypeError(err_msg)
753
754    self.client = client
755    self.fql = fql
756    self.opts = opts

Initializes the QueryIterator

Parameters
  • fql: A Query
  • opts: (Optional) Query Options
Raises
  • TypeError: Invalid param types
client
fql
opts
def iter(self) -> Iterator:
761  def iter(self) -> Iterator:
762    """
763        A generator function that immediately fetches and yields the results of
764        the stored query. Yields additional pages on subsequent iterations if
765        they exist
766        """
767
768    cursor = None
769    initial_response = self.client.query(self.fql, self.opts)
770
771    if isinstance(initial_response.data, Page):
772      cursor = initial_response.data.after
773      yield initial_response.data.data
774
775      while cursor is not None:
776        next_response = self.client.query(
777            fql("Set.paginate(${after})", after=cursor), self.opts)
778        # TODO: `Set.paginate` does not yet return a `@set` tagged value
779        #       so we will get back a plain object that might not have
780        #       an after property.
781        cursor = next_response.data.get("after")
782        yield next_response.data.get("data")
783
784    else:
785      yield [initial_response.data]

A generator function that immediately fetches and yields the results of the stored query. Yields additional pages on subsequent iterations if they exist

def flatten(self) -> Iterator:
787  def flatten(self) -> Iterator:
788    """
789        A generator function that immediately fetches and yields the results of
790        the stored query. Yields each item individually, rather than a whole
791        Page at a time. Fetches additional pages as required if they exist.
792        """
793
794    for page in self.iter():
795      for item in page:
796        yield item

A generator function that immediately fetches and yields the results of the stored query. Yields each item individually, rather than a whole Page at a time. Fetches additional pages as required if they exist.