fauna.client.client

  1import logging
  2from dataclasses import dataclass
  3from datetime import timedelta
  4from typing import Any, Dict, Iterator, Mapping, Optional, Union, List
  5
  6import fauna
  7from fauna.client.headers import _DriverEnvironment, _Header, _Auth, Header
  8from fauna.client.retryable import Retryable
  9from fauna.client.utils import _Environment, LastTxnTs
 10from fauna.encoding import FaunaEncoder, FaunaDecoder
 11from fauna.encoding import QuerySuccess, QueryTags, QueryStats
 12from fauna.errors import FaunaError, ClientError, ProtocolError, \
 13  RetryableFaunaException, NetworkError
 14from fauna.http.http_client import HTTPClient
 15from fauna.query import EventSource, Query, Page, fql
 16
 17logger = logging.getLogger("fauna")
 18
 19DefaultHttpConnectTimeout = timedelta(seconds=5)
 20DefaultHttpReadTimeout: Optional[timedelta] = None
 21DefaultHttpWriteTimeout = timedelta(seconds=5)
 22DefaultHttpPoolTimeout = timedelta(seconds=5)
 23DefaultIdleConnectionTimeout = timedelta(seconds=5)
 24DefaultQueryTimeout = timedelta(seconds=5)
 25DefaultClientBufferTimeout = timedelta(seconds=5)
 26DefaultMaxConnections = 20
 27DefaultMaxIdleConnections = 20
 28
 29
 30@dataclass
 31class QueryOptions:
 32  """
 33    A dataclass representing options available for a query.
 34
 35    * linearized - If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
 36    * max_contention_retries - The max number of times to retry the query if contention is encountered.
 37    * query_timeout - Controls the maximum amount of time Fauna will execute your query before marking it failed.
 38    * query_tags - Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_
 39    * traceparent - A traceparent to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ Must match format: https://www.w3.org/TR/trace-context/#traceparent-header
 40    * typecheck - Enable or disable typechecking of the query before evaluation. If not set, the value configured on the Client will be used. If neither is set, Fauna will use the value of the "typechecked" flag on the database configuration.
 41    * additional_headers - Add/update HTTP request headers for the query. In general, this should not be necessary.
 42    """
 43
 44  linearized: Optional[bool] = None
 45  max_contention_retries: Optional[int] = None
 46  query_timeout: Optional[timedelta] = DefaultQueryTimeout
 47  query_tags: Optional[Mapping[str, str]] = None
 48  traceparent: Optional[str] = None
 49  typecheck: Optional[bool] = None
 50  additional_headers: Optional[Dict[str, str]] = None
 51
 52
 53@dataclass
 54class StreamOptions:
 55  """
 56    A dataclass representing options available for a stream.
 57
 58    * max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown.
 59    * max_backoff - The maximum backoff in seconds for an individual retry.
 60    * start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after
 61    the timestamp.
 62    * cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
 63    * status_events - Indicates if stream should include status events. Status events are periodic events that
 64    update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics
 65    about the cost of maintaining the stream other than the cost of the received events.
 66    """
 67
 68  max_attempts: Optional[int] = None
 69  max_backoff: Optional[int] = None
 70  start_ts: Optional[int] = None
 71  cursor: Optional[str] = None
 72  status_events: bool = False
 73
 74
 75@dataclass
 76class FeedOptions:
 77  """
 78    A dataclass representing options available for an Event Feed.
 79
 80    * max_attempts - The maximum number of times to attempt an Event Feed query when a retryable exception is thrown.
 81    * max_backoff - The maximum backoff in seconds for an individual retry.
 82    * query_timeout - Controls the maximum amount of time Fauna will execute a query before returning a page of events.
 83    * start_ts - The starting timestamp of the Event Feed, exclusive. If set, Fauna will return events starting after
 84    the timestamp.
 85    * cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
 86    * page_size - Maximum number of events returned per page. Must be in the
 87    range 1 to 16000 (inclusive). Defaults to 16.
 88    """
 89  max_attempts: Optional[int] = None
 90  max_backoff: Optional[int] = None
 91  query_timeout: Optional[timedelta] = None
 92  page_size: Optional[int] = None
 93  start_ts: Optional[int] = None
 94  cursor: Optional[str] = None
 95
 96
 97class Client:
 98
 99  def __init__(
100      self,
101      endpoint: Optional[str] = None,
102      secret: Optional[str] = None,
103      http_client: Optional[HTTPClient] = None,
104      query_tags: Optional[Mapping[str, str]] = None,
105      linearized: Optional[bool] = None,
106      max_contention_retries: Optional[int] = None,
107      typecheck: Optional[bool] = None,
108      additional_headers: Optional[Dict[str, str]] = None,
109      query_timeout: Optional[timedelta] = DefaultQueryTimeout,
110      client_buffer_timeout: Optional[timedelta] = DefaultClientBufferTimeout,
111      http_read_timeout: Optional[timedelta] = DefaultHttpReadTimeout,
112      http_write_timeout: Optional[timedelta] = DefaultHttpWriteTimeout,
113      http_connect_timeout: Optional[timedelta] = DefaultHttpConnectTimeout,
114      http_pool_timeout: Optional[timedelta] = DefaultHttpPoolTimeout,
115      http_idle_timeout: Optional[timedelta] = DefaultIdleConnectionTimeout,
116      max_attempts: int = 3,
117      max_backoff: int = 20,
118  ):
119    """Initializes a Client.
120
121        :param endpoint: The Fauna Endpoint to use. Defaults to https://db.fauna.com, or the `FAUNA_ENDPOINT` env variable.
122        :param secret: The Fauna Secret to use. Defaults to empty, or the `FAUNA_SECRET` env variable.
123        :param http_client: An :class:`HTTPClient` implementation. Defaults to a global :class:`HTTPXClient`.
124        :param query_tags: Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_
125        :param linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
126        :param max_contention_retries: The max number of times to retry the query if contention is encountered.
127        :param typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration.
128        :param additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary.
129        :param query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is :py:data:`DefaultQueryTimeout`.
130        :param client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is :py:data:`DefaultClientBufferTimeout`, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error.
131        :param http_read_timeout: Set HTTP Read timeout, default is :py:data:`DefaultHttpReadTimeout`.
132        :param http_write_timeout: Set HTTP Write timeout, default is :py:data:`DefaultHttpWriteTimeout`.
133        :param http_connect_timeout: Set HTTP Connect timeout, default is :py:data:`DefaultHttpConnectTimeout`.
134        :param http_pool_timeout: Set HTTP Pool timeout, default is :py:data:`DefaultHttpPoolTimeout`.
135        :param http_idle_timeout: Set HTTP Idle timeout, default is :py:data:`DefaultIdleConnectionTimeout`.
136        :param max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3.
137        :param max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20.
138        """
139
140    self._set_endpoint(endpoint)
141    self._max_attempts = max_attempts
142    self._max_backoff = max_backoff
143
144    if secret is None:
145      self._auth = _Auth(_Environment.EnvFaunaSecret())
146    else:
147      self._auth = _Auth(secret)
148
149    self._last_txn_ts = LastTxnTs()
150
151    self._query_tags = {}
152    if query_tags is not None:
153      self._query_tags.update(query_tags)
154
155    if query_timeout is not None:
156      self._query_timeout_ms = int(query_timeout.total_seconds() * 1000)
157    else:
158      self._query_timeout_ms = None
159
160    self._headers: Dict[str, str] = {
161        _Header.AcceptEncoding: "gzip",
162        _Header.ContentType: "application/json;charset=utf-8",
163        _Header.Driver: "python",
164        _Header.DriverEnv: str(_DriverEnvironment()),
165    }
166
167    if typecheck is not None:
168      self._headers[Header.Typecheck] = str(typecheck).lower()
169
170    if linearized is not None:
171      self._headers[Header.Linearized] = str(linearized).lower()
172
173    if max_contention_retries is not None and max_contention_retries > 0:
174      self._headers[Header.MaxContentionRetries] = \
175          f"{max_contention_retries}"
176
177    if additional_headers is not None:
178      self._headers = {
179          **self._headers,
180          **additional_headers,
181      }
182
183    self._session: HTTPClient
184
185    if http_client is not None:
186      self._session = http_client
187    else:
188      if fauna.global_http_client is None:
189        timeout_s: Optional[float] = None
190        if query_timeout is not None and client_buffer_timeout is not None:
191          timeout_s = (query_timeout + client_buffer_timeout).total_seconds()
192        read_timeout_s: Optional[float] = None
193        if http_read_timeout is not None:
194          read_timeout_s = http_read_timeout.total_seconds()
195
196        write_timeout_s: Optional[float] = http_write_timeout.total_seconds(
197        ) if http_write_timeout is not None else None
198        connect_timeout_s: Optional[float] = http_connect_timeout.total_seconds(
199        ) if http_connect_timeout is not None else None
200        pool_timeout_s: Optional[float] = http_pool_timeout.total_seconds(
201        ) if http_pool_timeout is not None else None
202        idle_timeout_s: Optional[float] = http_idle_timeout.total_seconds(
203        ) if http_idle_timeout is not None else None
204
205        import httpx
206        from fauna.http.httpx_client import HTTPXClient
207        c = HTTPXClient(
208            httpx.Client(
209                http1=True,
210                http2=False,
211                timeout=httpx.Timeout(
212                    timeout=timeout_s,
213                    connect=connect_timeout_s,
214                    read=read_timeout_s,
215                    write=write_timeout_s,
216                    pool=pool_timeout_s,
217                ),
218                limits=httpx.Limits(
219                    max_connections=DefaultMaxConnections,
220                    max_keepalive_connections=DefaultMaxIdleConnections,
221                    keepalive_expiry=idle_timeout_s,
222                ),
223            ), logger)
224        fauna.global_http_client = c
225
226      self._session = fauna.global_http_client
227
228  def close(self):
229    self._session.close()
230    if self._session == fauna.global_http_client:
231      fauna.global_http_client = None
232
233  def set_last_txn_ts(self, txn_ts: int):
234    """
235        Set the last timestamp seen by this client.
236        This has no effect if earlier than stored timestamp.
237
238        .. WARNING:: This should be used only when coordinating timestamps across
239        multiple clients. Moving the timestamp arbitrarily forward into
240        the future will cause transactions to stall.
241
242        :param txn_ts: the new transaction time.
243        """
244    self._last_txn_ts.update_txn_time(txn_ts)
245
246  def get_last_txn_ts(self) -> Optional[int]:
247    """
248        Get the last timestamp seen by this client.
249        :return:
250        """
251    return self._last_txn_ts.time
252
253  def get_query_timeout(self) -> Optional[timedelta]:
254    """
255        Get the query timeout for all queries.
256        """
257    if self._query_timeout_ms is not None:
258      return timedelta(milliseconds=self._query_timeout_ms)
259    else:
260      return None
261
262  def paginate(
263      self,
264      fql: Query,
265      opts: Optional[QueryOptions] = None,
266  ) -> "QueryIterator":
267    """
268        Run a query on Fauna and returning an iterator of results. If the query
269        returns a Page, the iterator will fetch additional Pages until the
270        after token is null. Each call for a page will be retried with exponential
271        backoff up to the max_attempts set in the client's retry policy in the
272        event of a 429 or 502.
273
274        :param fql: A Query
275        :param opts: (Optional) Query Options
276
277        :return: a :class:`QueryResponse`
278
279        :raises NetworkError: HTTP Request failed in transit
280        :raises ProtocolError: HTTP error not from Fauna
281        :raises ServiceError: Fauna returned an error
282        :raises ValueError: Encoding and decoding errors
283        :raises TypeError: Invalid param types
284        """
285
286    if not isinstance(fql, Query):
287      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
288                 f"Query by calling fauna.fql()"
289      raise TypeError(err_msg)
290
291    return QueryIterator(self, fql, opts)
292
293  def query(
294      self,
295      fql: Query,
296      opts: Optional[QueryOptions] = None,
297  ) -> QuerySuccess:
298    """
299        Run a query on Fauna. A query will be retried max_attempt times with exponential backoff
300        up to the max_backoff in the event of a 429.
301
302        :param fql: A Query
303        :param opts: (Optional) Query Options
304
305        :return: a :class:`QueryResponse`
306
307        :raises NetworkError: HTTP Request failed in transit
308        :raises ProtocolError: HTTP error not from Fauna
309        :raises ServiceError: Fauna returned an error
310        :raises ValueError: Encoding and decoding errors
311        :raises TypeError: Invalid param types
312        """
313
314    if not isinstance(fql, Query):
315      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
316                 f"Query by calling fauna.fql()"
317      raise TypeError(err_msg)
318
319    try:
320      encoded_query: Mapping[str, Any] = FaunaEncoder.encode(fql)
321    except Exception as e:
322      raise ClientError("Failed to encode Query") from e
323
324    retryable = Retryable[QuerySuccess](
325        self._max_attempts,
326        self._max_backoff,
327        self._query,
328        "/query/1",
329        fql=encoded_query,
330        opts=opts,
331    )
332
333    r = retryable.run()
334    r.response.stats.attempts = r.attempts
335    return r.response
336
337  def _query(
338      self,
339      path: str,
340      fql: Mapping[str, Any],
341      arguments: Optional[Mapping[str, Any]] = None,
342      opts: Optional[QueryOptions] = None,
343  ) -> QuerySuccess:
344
345    headers = self._headers.copy()
346    headers[_Header.Format] = "tagged"
347    headers[_Header.Authorization] = self._auth.bearer()
348
349    if self._query_timeout_ms is not None:
350      headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms)
351
352    headers.update(self._last_txn_ts.request_header)
353
354    query_tags = {}
355    if self._query_tags is not None:
356      query_tags.update(self._query_tags)
357
358    if opts is not None:
359      if opts.linearized is not None:
360        headers[Header.Linearized] = str(opts.linearized).lower()
361      if opts.max_contention_retries is not None:
362        headers[Header.MaxContentionRetries] = \
363            f"{opts.max_contention_retries}"
364      if opts.traceparent is not None:
365        headers[Header.Traceparent] = opts.traceparent
366      if opts.query_timeout is not None:
367        timeout_ms = f"{int(opts.query_timeout.total_seconds() * 1000)}"
368        headers[Header.QueryTimeoutMs] = timeout_ms
369      if opts.query_tags is not None:
370        query_tags.update(opts.query_tags)
371      if opts.typecheck is not None:
372        headers[Header.Typecheck] = str(opts.typecheck).lower()
373      if opts.additional_headers is not None:
374        headers.update(opts.additional_headers)
375
376    if len(query_tags) > 0:
377      headers[Header.Tags] = QueryTags.encode(query_tags)
378
379    data: dict[str, Any] = {
380        "query": fql,
381        "arguments": arguments or {},
382    }
383
384    with self._session.request(
385        method="POST",
386        url=self._endpoint + path,
387        headers=headers,
388        data=data,
389    ) as response:
390      status_code = response.status_code()
391      response_json = response.json()
392      headers = response.headers()
393
394      self._check_protocol(response_json, status_code)
395
396      dec: Any = FaunaDecoder.decode(response_json)
397
398      if status_code > 399:
399        FaunaError.parse_error_and_throw(dec, status_code)
400
401      if "txn_ts" in dec:
402        self.set_last_txn_ts(int(response_json["txn_ts"]))
403
404      stats = QueryStats(dec["stats"]) if "stats" in dec else None
405      summary = dec["summary"] if "summary" in dec else None
406      query_tags = QueryTags.decode(
407          dec["query_tags"]) if "query_tags" in dec else None
408      txn_ts = dec["txn_ts"] if "txn_ts" in dec else None
409      schema_version = dec["schema_version"] if "schema_version" in dec else None
410      traceparent = headers.get("traceparent", None)
411      static_type = dec["static_type"] if "static_type" in dec else None
412
413      return QuerySuccess(
414          data=dec["data"],
415          query_tags=query_tags,
416          static_type=static_type,
417          stats=stats,
418          summary=summary,
419          traceparent=traceparent,
420          txn_ts=txn_ts,
421          schema_version=schema_version,
422      )
423
424  def stream(
425      self,
426      fql: Union[EventSource, Query],
427      opts: StreamOptions = StreamOptions()
428  ) -> "StreamIterator":
429    """
430        Opens a Stream in Fauna and returns an iterator that consume Fauna events.
431
432        :param fql: An EventSource or a Query that returns an EventSource.
433        :param opts: (Optional) Stream Options.
434
435        :return: a :class:`StreamIterator`
436
437        :raises ClientError: Invalid options provided
438        :raises NetworkError: HTTP Request failed in transit
439        :raises ProtocolError: HTTP error not from Fauna
440        :raises ServiceError: Fauna returned an error
441        :raises ValueError: Encoding and decoding errors
442        :raises TypeError: Invalid param types
443        """
444
445    if isinstance(fql, Query):
446      if opts.cursor is not None:
447        raise ClientError(
448            "The 'cursor' configuration can only be used with an event source.")
449
450      source = self.query(fql).data
451    else:
452      source = fql
453
454    if not isinstance(source, EventSource):
455      err_msg = f"'fql' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}."
456      raise TypeError(err_msg)
457
458    headers = self._headers.copy()
459    headers[_Header.Format] = "tagged"
460    headers[_Header.Authorization] = self._auth.bearer()
461
462    return StreamIterator(self._session, headers, self._endpoint + "/stream/1",
463                          self._max_attempts, self._max_backoff, opts, source)
464
465  def feed(
466      self,
467      source: Union[EventSource, Query],
468      opts: FeedOptions = FeedOptions(),
469  ) -> "FeedIterator":
470    """
471        Opens an Event Feed in Fauna and returns an iterator that consume Fauna events.
472
473        :param source: An EventSource or a Query that returns an EventSource.
474        :param opts: (Optional) Event Feed options.
475
476        :return: a :class:`FeedIterator`
477
478        :raises ClientError: Invalid options provided
479        :raises NetworkError: HTTP Request failed in transit
480        :raises ProtocolError: HTTP error not from Fauna
481        :raises ServiceError: Fauna returned an error
482        :raises ValueError: Encoding and decoding errors
483        :raises TypeError: Invalid param types
484        """
485
486    if isinstance(source, Query):
487      source = self.query(source).data
488
489    if not isinstance(source, EventSource):
490      err_msg = f"'source' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}."
491      raise TypeError(err_msg)
492
493    headers = self._headers.copy()
494    headers[_Header.Format] = "tagged"
495    headers[_Header.Authorization] = self._auth.bearer()
496
497    if opts.query_timeout is not None:
498      query_timeout_ms = int(opts.query_timeout.total_seconds() * 1000)
499      headers[Header.QueryTimeoutMs] = str(query_timeout_ms)
500    elif self._query_timeout_ms is not None:
501      headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms)
502
503    return FeedIterator(self._session, headers, self._endpoint + "/feed/1",
504                        self._max_attempts, self._max_backoff, opts, source)
505
506  def _check_protocol(self, response_json: Any, status_code):
507    # TODO: Logic to validate wire protocol belongs elsewhere.
508    should_raise = False
509
510    # check for QuerySuccess
511    if status_code <= 399 and "data" not in response_json:
512      should_raise = True
513
514    # check for QueryFailure
515    if status_code > 399:
516      if "error" not in response_json:
517        should_raise = True
518      else:
519        e = response_json["error"]
520        if "code" not in e or "message" not in e:
521          should_raise = True
522
523    if should_raise:
524      raise ProtocolError(
525          status_code,
526          f"Response is in an unknown format: \n{response_json}",
527      )
528
529  def _set_endpoint(self, endpoint):
530    if endpoint is None:
531      endpoint = _Environment.EnvFaunaEndpoint()
532
533    if endpoint[-1:] == "/":
534      endpoint = endpoint[:-1]
535
536    self._endpoint = endpoint
537
538
539class StreamIterator:
540  """A class that mixes a ContextManager and an Iterator so we can detected retryable errors."""
541
542  def __init__(self, http_client: HTTPClient, headers: Dict[str, str],
543               endpoint: str, max_attempts: int, max_backoff: int,
544               opts: StreamOptions, source: EventSource):
545    self._http_client = http_client
546    self._headers = headers
547    self._endpoint = endpoint
548    self._max_attempts = max_attempts
549    self._max_backoff = max_backoff
550    self._opts = opts
551    self._source = source
552    self._stream = None
553    self.last_ts = None
554    self.last_cursor = None
555    self._ctx = self._create_stream()
556
557    if opts.start_ts is not None and opts.cursor is not None:
558      err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the StreamOptions."
559      raise TypeError(err_msg)
560
561  def __enter__(self):
562    return self
563
564  def __exit__(self, exc_type, exc_value, exc_traceback):
565    if self._stream is not None:
566      self._stream.close()
567
568    self._ctx.__exit__(exc_type, exc_value, exc_traceback)
569    return False
570
571  def __iter__(self):
572    return self
573
574  def __next__(self):
575    if self._opts.max_attempts is not None:
576      max_attempts = self._opts.max_attempts
577    else:
578      max_attempts = self._max_attempts
579
580    if self._opts.max_backoff is not None:
581      max_backoff = self._opts.max_backoff
582    else:
583      max_backoff = self._max_backoff
584
585    retryable = Retryable[Any](max_attempts, max_backoff, self._next_element)
586    return retryable.run().response
587
588  def _next_element(self):
589    try:
590      if self._stream is None:
591        try:
592          self._stream = self._ctx.__enter__()
593        except Exception:
594          self._retry_stream()
595
596      if self._stream is not None:
597        event: Any = FaunaDecoder.decode(next(self._stream))
598
599        if event["type"] == "error":
600          FaunaError.parse_error_and_throw(event, 400)
601
602        self.last_ts = event["txn_ts"]
603        self.last_cursor = event.get('cursor')
604
605        if event["type"] == "start":
606          return self._next_element()
607
608        if not self._opts.status_events and event["type"] == "status":
609          return self._next_element()
610
611        return event
612
613      raise StopIteration
614    except NetworkError:
615      self._retry_stream()
616
617  def _retry_stream(self):
618    if self._stream is not None:
619      self._stream.close()
620
621    self._stream = None
622
623    try:
624      self._ctx = self._create_stream()
625    except Exception:
626      pass
627    raise RetryableFaunaException
628
629  def _create_stream(self):
630    data: Dict[str, Any] = {"token": self._source.token}
631    if self.last_cursor is not None:
632      data["cursor"] = self.last_cursor
633    elif self._opts.cursor is not None:
634      data["cursor"] = self._opts.cursor
635    elif self._opts.start_ts is not None:
636      data["start_ts"] = self._opts.start_ts
637
638    return self._http_client.stream(
639        url=self._endpoint, headers=self._headers, data=data)
640
641  def close(self):
642    if self._stream is not None:
643      self._stream.close()
644
645
646class FeedPage:
647
648  def __init__(self, events: List[Any], cursor: str, stats: QueryStats):
649    self._events = events
650    self.cursor = cursor
651    self.stats = stats
652
653  def __len__(self):
654    return len(self._events)
655
656  def __iter__(self) -> Iterator[Any]:
657    for event in self._events:
658      if event["type"] == "error":
659        FaunaError.parse_error_and_throw(event, 400)
660      yield event
661
662
663class FeedIterator:
664  """A class to provide an iterator on top of Event Feed pages."""
665
666  def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str,
667               max_attempts: int, max_backoff: int, opts: FeedOptions,
668               source: EventSource):
669    self._http = http
670    self._headers = headers
671    self._endpoint = endpoint
672    self._max_attempts = opts.max_attempts or max_attempts
673    self._max_backoff = opts.max_backoff or max_backoff
674    self._request: Dict[str, Any] = {"token": source.token}
675    self._is_done = False
676
677    if opts.start_ts is not None and opts.cursor is not None:
678      err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the FeedOptions."
679      raise TypeError(err_msg)
680
681    if opts.page_size is not None:
682      self._request["page_size"] = opts.page_size
683
684    if opts.cursor is not None:
685      self._request["cursor"] = opts.cursor
686    elif opts.start_ts is not None:
687      self._request["start_ts"] = opts.start_ts
688
689  def __iter__(self) -> Iterator[FeedPage]:
690    self._is_done = False
691    return self
692
693  def __next__(self) -> FeedPage:
694    if self._is_done:
695      raise StopIteration
696
697    retryable = Retryable[Any](self._max_attempts, self._max_backoff,
698                               self._next_page)
699    return retryable.run().response
700
701  def _next_page(self) -> FeedPage:
702    with self._http.request(
703        method="POST",
704        url=self._endpoint,
705        headers=self._headers,
706        data=self._request,
707    ) as response:
708      status_code = response.status_code()
709      decoded: Any = FaunaDecoder.decode(response.json())
710
711      if status_code > 399:
712        FaunaError.parse_error_and_throw(decoded, status_code)
713
714      self._is_done = not decoded["has_next"]
715      self._request["cursor"] = decoded["cursor"]
716
717      if "start_ts" in self._request:
718        del self._request["start_ts"]
719
720      return FeedPage(decoded["events"], decoded["cursor"],
721                      QueryStats(decoded["stats"]))
722
723  def flatten(self) -> Iterator:
724    """A generator that yields events instead of pages of events."""
725    for page in self:
726      for event in page:
727        yield event
728
729
730class QueryIterator:
731  """A class to provider an iterator on top of Fauna queries."""
732
733  def __init__(self,
734               client: Client,
735               fql: Query,
736               opts: Optional[QueryOptions] = None):
737    """Initializes the QueryIterator
738
739        :param fql: A Query
740        :param opts: (Optional) Query Options
741
742        :raises TypeError: Invalid param types
743        """
744    if not isinstance(client, Client):
745      err_msg = f"'client' must be a Client but was a {type(client)}. You can build a " \
746                  f"Client by calling fauna.client.Client()"
747      raise TypeError(err_msg)
748
749    if not isinstance(fql, Query):
750      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
751                 f"Query by calling fauna.fql()"
752      raise TypeError(err_msg)
753
754    self.client = client
755    self.fql = fql
756    self.opts = opts
757
758  def __iter__(self) -> Iterator:
759    return self.iter()
760
761  def iter(self) -> Iterator:
762    """
763        A generator function that immediately fetches and yields the results of
764        the stored query. Yields additional pages on subsequent iterations if
765        they exist
766        """
767
768    cursor = None
769    initial_response = self.client.query(self.fql, self.opts)
770
771    if isinstance(initial_response.data, Page):
772      cursor = initial_response.data.after
773      yield initial_response.data.data
774
775      while cursor is not None:
776        next_response = self.client.query(
777            fql("Set.paginate(${after})", after=cursor), self.opts)
778        # TODO: `Set.paginate` does not yet return a `@set` tagged value
779        #       so we will get back a plain object that might not have
780        #       an after property.
781        cursor = next_response.data.get("after")
782        yield next_response.data.get("data")
783
784    else:
785      yield [initial_response.data]
786
787  def flatten(self) -> Iterator:
788    """
789        A generator function that immediately fetches and yields the results of
790        the stored query. Yields each item individually, rather than a whole
791        Page at a time. Fetches additional pages as required if they exist.
792        """
793
794    for page in self.iter():
795      for item in page:
796        yield item
logger = <Logger fauna (WARNING)>
DefaultHttpConnectTimeout = datetime.timedelta(seconds=5)
DefaultHttpReadTimeout: Optional[datetime.timedelta] = None
DefaultHttpWriteTimeout = datetime.timedelta(seconds=5)
DefaultHttpPoolTimeout = datetime.timedelta(seconds=5)
DefaultIdleConnectionTimeout = datetime.timedelta(seconds=5)
DefaultQueryTimeout = datetime.timedelta(seconds=5)
DefaultClientBufferTimeout = datetime.timedelta(seconds=5)
DefaultMaxConnections = 20
DefaultMaxIdleConnections = 20
@dataclass
class QueryOptions:
31@dataclass
32class QueryOptions:
33  """
34    A dataclass representing options available for a query.
35
36    * linearized - If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
37    * max_contention_retries - The max number of times to retry the query if contention is encountered.
38    * query_timeout - Controls the maximum amount of time Fauna will execute your query before marking it failed.
39    * query_tags - Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_
40    * traceparent - A traceparent to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ Must match format: https://www.w3.org/TR/trace-context/#traceparent-header
41    * typecheck - Enable or disable typechecking of the query before evaluation. If not set, the value configured on the Client will be used. If neither is set, Fauna will use the value of the "typechecked" flag on the database configuration.
42    * additional_headers - Add/update HTTP request headers for the query. In general, this should not be necessary.
43    """
44
45  linearized: Optional[bool] = None
46  max_contention_retries: Optional[int] = None
47  query_timeout: Optional[timedelta] = DefaultQueryTimeout
48  query_tags: Optional[Mapping[str, str]] = None
49  traceparent: Optional[str] = None
50  typecheck: Optional[bool] = None
51  additional_headers: Optional[Dict[str, str]] = None

A dataclass representing options available for a query.

  • linearized - If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
  • max_contention_retries - The max number of times to retry the query if contention is encountered.
  • query_timeout - Controls the maximum amount of time Fauna will execute your query before marking it failed.
  • query_tags - Tags to associate with the query. See logging
  • traceparent - A traceparent to associate with the query. See logging Must match format: https://www.w3.org/TR/trace-context/#traceparent-header
  • typecheck - Enable or disable typechecking of the query before evaluation. If not set, the value configured on the Client will be used. If neither is set, Fauna will use the value of the "typechecked" flag on the database configuration.
  • additional_headers - Add/update HTTP request headers for the query. In general, this should not be necessary.
QueryOptions( linearized: Optional[bool] = None, max_contention_retries: Optional[int] = None, query_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5), query_tags: Optional[Mapping[str, str]] = None, traceparent: Optional[str] = None, typecheck: Optional[bool] = None, additional_headers: Optional[Dict[str, str]] = None)
linearized: Optional[bool] = None
max_contention_retries: Optional[int] = None
query_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5)
query_tags: Optional[Mapping[str, str]] = None
traceparent: Optional[str] = None
typecheck: Optional[bool] = None
additional_headers: Optional[Dict[str, str]] = None
@dataclass
class StreamOptions:
54@dataclass
55class StreamOptions:
56  """
57    A dataclass representing options available for a stream.
58
59    * max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown.
60    * max_backoff - The maximum backoff in seconds for an individual retry.
61    * start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after
62    the timestamp.
63    * cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
64    * status_events - Indicates if stream should include status events. Status events are periodic events that
65    update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics
66    about the cost of maintaining the stream other than the cost of the received events.
67    """
68
69  max_attempts: Optional[int] = None
70  max_backoff: Optional[int] = None
71  start_ts: Optional[int] = None
72  cursor: Optional[str] = None
73  status_events: bool = False

A dataclass representing options available for a stream.

  • max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown.
  • max_backoff - The maximum backoff in seconds for an individual retry.
  • start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after the timestamp.
  • cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
  • status_events - Indicates if stream should include status events. Status events are periodic events that update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics about the cost of maintaining the stream other than the cost of the received events.
StreamOptions( max_attempts: Optional[int] = None, max_backoff: Optional[int] = None, start_ts: Optional[int] = None, cursor: Optional[str] = None, status_events: bool = False)
max_attempts: Optional[int] = None
max_backoff: Optional[int] = None
start_ts: Optional[int] = None
cursor: Optional[str] = None
status_events: bool = False
@dataclass
class FeedOptions:
76@dataclass
77class FeedOptions:
78  """
79    A dataclass representing options available for an Event Feed.
80
81    * max_attempts - The maximum number of times to attempt an Event Feed query when a retryable exception is thrown.
82    * max_backoff - The maximum backoff in seconds for an individual retry.
83    * query_timeout - Controls the maximum amount of time Fauna will execute a query before returning a page of events.
84    * start_ts - The starting timestamp of the Event Feed, exclusive. If set, Fauna will return events starting after
85    the timestamp.
86    * cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
87    * page_size - Maximum number of events returned per page. Must be in the
88    range 1 to 16000 (inclusive). Defaults to 16.
89    """
90  max_attempts: Optional[int] = None
91  max_backoff: Optional[int] = None
92  query_timeout: Optional[timedelta] = None
93  page_size: Optional[int] = None
94  start_ts: Optional[int] = None
95  cursor: Optional[str] = None

A dataclass representing options available for an Event Feed.

  • max_attempts - The maximum number of times to attempt an Event Feed query when a retryable exception is thrown.
  • max_backoff - The maximum backoff in seconds for an individual retry.
  • query_timeout - Controls the maximum amount of time Fauna will execute a query before returning a page of events.
  • start_ts - The starting timestamp of the Event Feed, exclusive. If set, Fauna will return events starting after the timestamp.
  • cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
  • page_size - Maximum number of events returned per page. Must be in the range 1 to 16000 (inclusive). Defaults to 16.
FeedOptions( max_attempts: Optional[int] = None, max_backoff: Optional[int] = None, query_timeout: Optional[datetime.timedelta] = None, page_size: Optional[int] = None, start_ts: Optional[int] = None, cursor: Optional[str] = None)
max_attempts: Optional[int] = None
max_backoff: Optional[int] = None
query_timeout: Optional[datetime.timedelta] = None
page_size: Optional[int] = None
start_ts: Optional[int] = None
cursor: Optional[str] = None
class Client:
 98class Client:
 99
100  def __init__(
101      self,
102      endpoint: Optional[str] = None,
103      secret: Optional[str] = None,
104      http_client: Optional[HTTPClient] = None,
105      query_tags: Optional[Mapping[str, str]] = None,
106      linearized: Optional[bool] = None,
107      max_contention_retries: Optional[int] = None,
108      typecheck: Optional[bool] = None,
109      additional_headers: Optional[Dict[str, str]] = None,
110      query_timeout: Optional[timedelta] = DefaultQueryTimeout,
111      client_buffer_timeout: Optional[timedelta] = DefaultClientBufferTimeout,
112      http_read_timeout: Optional[timedelta] = DefaultHttpReadTimeout,
113      http_write_timeout: Optional[timedelta] = DefaultHttpWriteTimeout,
114      http_connect_timeout: Optional[timedelta] = DefaultHttpConnectTimeout,
115      http_pool_timeout: Optional[timedelta] = DefaultHttpPoolTimeout,
116      http_idle_timeout: Optional[timedelta] = DefaultIdleConnectionTimeout,
117      max_attempts: int = 3,
118      max_backoff: int = 20,
119  ):
120    """Initializes a Client.
121
122        :param endpoint: The Fauna Endpoint to use. Defaults to https://db.fauna.com, or the `FAUNA_ENDPOINT` env variable.
123        :param secret: The Fauna Secret to use. Defaults to empty, or the `FAUNA_SECRET` env variable.
124        :param http_client: An :class:`HTTPClient` implementation. Defaults to a global :class:`HTTPXClient`.
125        :param query_tags: Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_
126        :param linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
127        :param max_contention_retries: The max number of times to retry the query if contention is encountered.
128        :param typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration.
129        :param additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary.
130        :param query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is :py:data:`DefaultQueryTimeout`.
131        :param client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is :py:data:`DefaultClientBufferTimeout`, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error.
132        :param http_read_timeout: Set HTTP Read timeout, default is :py:data:`DefaultHttpReadTimeout`.
133        :param http_write_timeout: Set HTTP Write timeout, default is :py:data:`DefaultHttpWriteTimeout`.
134        :param http_connect_timeout: Set HTTP Connect timeout, default is :py:data:`DefaultHttpConnectTimeout`.
135        :param http_pool_timeout: Set HTTP Pool timeout, default is :py:data:`DefaultHttpPoolTimeout`.
136        :param http_idle_timeout: Set HTTP Idle timeout, default is :py:data:`DefaultIdleConnectionTimeout`.
137        :param max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3.
138        :param max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20.
139        """
140
141    self._set_endpoint(endpoint)
142    self._max_attempts = max_attempts
143    self._max_backoff = max_backoff
144
145    if secret is None:
146      self._auth = _Auth(_Environment.EnvFaunaSecret())
147    else:
148      self._auth = _Auth(secret)
149
150    self._last_txn_ts = LastTxnTs()
151
152    self._query_tags = {}
153    if query_tags is not None:
154      self._query_tags.update(query_tags)
155
156    if query_timeout is not None:
157      self._query_timeout_ms = int(query_timeout.total_seconds() * 1000)
158    else:
159      self._query_timeout_ms = None
160
161    self._headers: Dict[str, str] = {
162        _Header.AcceptEncoding: "gzip",
163        _Header.ContentType: "application/json;charset=utf-8",
164        _Header.Driver: "python",
165        _Header.DriverEnv: str(_DriverEnvironment()),
166    }
167
168    if typecheck is not None:
169      self._headers[Header.Typecheck] = str(typecheck).lower()
170
171    if linearized is not None:
172      self._headers[Header.Linearized] = str(linearized).lower()
173
174    if max_contention_retries is not None and max_contention_retries > 0:
175      self._headers[Header.MaxContentionRetries] = \
176          f"{max_contention_retries}"
177
178    if additional_headers is not None:
179      self._headers = {
180          **self._headers,
181          **additional_headers,
182      }
183
184    self._session: HTTPClient
185
186    if http_client is not None:
187      self._session = http_client
188    else:
189      if fauna.global_http_client is None:
190        timeout_s: Optional[float] = None
191        if query_timeout is not None and client_buffer_timeout is not None:
192          timeout_s = (query_timeout + client_buffer_timeout).total_seconds()
193        read_timeout_s: Optional[float] = None
194        if http_read_timeout is not None:
195          read_timeout_s = http_read_timeout.total_seconds()
196
197        write_timeout_s: Optional[float] = http_write_timeout.total_seconds(
198        ) if http_write_timeout is not None else None
199        connect_timeout_s: Optional[float] = http_connect_timeout.total_seconds(
200        ) if http_connect_timeout is not None else None
201        pool_timeout_s: Optional[float] = http_pool_timeout.total_seconds(
202        ) if http_pool_timeout is not None else None
203        idle_timeout_s: Optional[float] = http_idle_timeout.total_seconds(
204        ) if http_idle_timeout is not None else None
205
206        import httpx
207        from fauna.http.httpx_client import HTTPXClient
208        c = HTTPXClient(
209            httpx.Client(
210                http1=True,
211                http2=False,
212                timeout=httpx.Timeout(
213                    timeout=timeout_s,
214                    connect=connect_timeout_s,
215                    read=read_timeout_s,
216                    write=write_timeout_s,
217                    pool=pool_timeout_s,
218                ),
219                limits=httpx.Limits(
220                    max_connections=DefaultMaxConnections,
221                    max_keepalive_connections=DefaultMaxIdleConnections,
222                    keepalive_expiry=idle_timeout_s,
223                ),
224            ), logger)
225        fauna.global_http_client = c
226
227      self._session = fauna.global_http_client
228
229  def close(self):
230    self._session.close()
231    if self._session == fauna.global_http_client:
232      fauna.global_http_client = None
233
234  def set_last_txn_ts(self, txn_ts: int):
235    """
236        Set the last timestamp seen by this client.
237        This has no effect if earlier than stored timestamp.
238
239        .. WARNING:: This should be used only when coordinating timestamps across
240        multiple clients. Moving the timestamp arbitrarily forward into
241        the future will cause transactions to stall.
242
243        :param txn_ts: the new transaction time.
244        """
245    self._last_txn_ts.update_txn_time(txn_ts)
246
247  def get_last_txn_ts(self) -> Optional[int]:
248    """
249        Get the last timestamp seen by this client.
250        :return:
251        """
252    return self._last_txn_ts.time
253
254  def get_query_timeout(self) -> Optional[timedelta]:
255    """
256        Get the query timeout for all queries.
257        """
258    if self._query_timeout_ms is not None:
259      return timedelta(milliseconds=self._query_timeout_ms)
260    else:
261      return None
262
263  def paginate(
264      self,
265      fql: Query,
266      opts: Optional[QueryOptions] = None,
267  ) -> "QueryIterator":
268    """
269        Run a query on Fauna and returning an iterator of results. If the query
270        returns a Page, the iterator will fetch additional Pages until the
271        after token is null. Each call for a page will be retried with exponential
272        backoff up to the max_attempts set in the client's retry policy in the
273        event of a 429 or 502.
274
275        :param fql: A Query
276        :param opts: (Optional) Query Options
277
278        :return: a :class:`QueryResponse`
279
280        :raises NetworkError: HTTP Request failed in transit
281        :raises ProtocolError: HTTP error not from Fauna
282        :raises ServiceError: Fauna returned an error
283        :raises ValueError: Encoding and decoding errors
284        :raises TypeError: Invalid param types
285        """
286
287    if not isinstance(fql, Query):
288      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
289                 f"Query by calling fauna.fql()"
290      raise TypeError(err_msg)
291
292    return QueryIterator(self, fql, opts)
293
294  def query(
295      self,
296      fql: Query,
297      opts: Optional[QueryOptions] = None,
298  ) -> QuerySuccess:
299    """
300        Run a query on Fauna. A query will be retried max_attempt times with exponential backoff
301        up to the max_backoff in the event of a 429.
302
303        :param fql: A Query
304        :param opts: (Optional) Query Options
305
306        :return: a :class:`QueryResponse`
307
308        :raises NetworkError: HTTP Request failed in transit
309        :raises ProtocolError: HTTP error not from Fauna
310        :raises ServiceError: Fauna returned an error
311        :raises ValueError: Encoding and decoding errors
312        :raises TypeError: Invalid param types
313        """
314
315    if not isinstance(fql, Query):
316      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
317                 f"Query by calling fauna.fql()"
318      raise TypeError(err_msg)
319
320    try:
321      encoded_query: Mapping[str, Any] = FaunaEncoder.encode(fql)
322    except Exception as e:
323      raise ClientError("Failed to encode Query") from e
324
325    retryable = Retryable[QuerySuccess](
326        self._max_attempts,
327        self._max_backoff,
328        self._query,
329        "/query/1",
330        fql=encoded_query,
331        opts=opts,
332    )
333
334    r = retryable.run()
335    r.response.stats.attempts = r.attempts
336    return r.response
337
338  def _query(
339      self,
340      path: str,
341      fql: Mapping[str, Any],
342      arguments: Optional[Mapping[str, Any]] = None,
343      opts: Optional[QueryOptions] = None,
344  ) -> QuerySuccess:
345
346    headers = self._headers.copy()
347    headers[_Header.Format] = "tagged"
348    headers[_Header.Authorization] = self._auth.bearer()
349
350    if self._query_timeout_ms is not None:
351      headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms)
352
353    headers.update(self._last_txn_ts.request_header)
354
355    query_tags = {}
356    if self._query_tags is not None:
357      query_tags.update(self._query_tags)
358
359    if opts is not None:
360      if opts.linearized is not None:
361        headers[Header.Linearized] = str(opts.linearized).lower()
362      if opts.max_contention_retries is not None:
363        headers[Header.MaxContentionRetries] = \
364            f"{opts.max_contention_retries}"
365      if opts.traceparent is not None:
366        headers[Header.Traceparent] = opts.traceparent
367      if opts.query_timeout is not None:
368        timeout_ms = f"{int(opts.query_timeout.total_seconds() * 1000)}"
369        headers[Header.QueryTimeoutMs] = timeout_ms
370      if opts.query_tags is not None:
371        query_tags.update(opts.query_tags)
372      if opts.typecheck is not None:
373        headers[Header.Typecheck] = str(opts.typecheck).lower()
374      if opts.additional_headers is not None:
375        headers.update(opts.additional_headers)
376
377    if len(query_tags) > 0:
378      headers[Header.Tags] = QueryTags.encode(query_tags)
379
380    data: dict[str, Any] = {
381        "query": fql,
382        "arguments": arguments or {},
383    }
384
385    with self._session.request(
386        method="POST",
387        url=self._endpoint + path,
388        headers=headers,
389        data=data,
390    ) as response:
391      status_code = response.status_code()
392      response_json = response.json()
393      headers = response.headers()
394
395      self._check_protocol(response_json, status_code)
396
397      dec: Any = FaunaDecoder.decode(response_json)
398
399      if status_code > 399:
400        FaunaError.parse_error_and_throw(dec, status_code)
401
402      if "txn_ts" in dec:
403        self.set_last_txn_ts(int(response_json["txn_ts"]))
404
405      stats = QueryStats(dec["stats"]) if "stats" in dec else None
406      summary = dec["summary"] if "summary" in dec else None
407      query_tags = QueryTags.decode(
408          dec["query_tags"]) if "query_tags" in dec else None
409      txn_ts = dec["txn_ts"] if "txn_ts" in dec else None
410      schema_version = dec["schema_version"] if "schema_version" in dec else None
411      traceparent = headers.get("traceparent", None)
412      static_type = dec["static_type"] if "static_type" in dec else None
413
414      return QuerySuccess(
415          data=dec["data"],
416          query_tags=query_tags,
417          static_type=static_type,
418          stats=stats,
419          summary=summary,
420          traceparent=traceparent,
421          txn_ts=txn_ts,
422          schema_version=schema_version,
423      )
424
425  def stream(
426      self,
427      fql: Union[EventSource, Query],
428      opts: StreamOptions = StreamOptions()
429  ) -> "StreamIterator":
430    """
431        Opens a Stream in Fauna and returns an iterator that consume Fauna events.
432
433        :param fql: An EventSource or a Query that returns an EventSource.
434        :param opts: (Optional) Stream Options.
435
436        :return: a :class:`StreamIterator`
437
438        :raises ClientError: Invalid options provided
439        :raises NetworkError: HTTP Request failed in transit
440        :raises ProtocolError: HTTP error not from Fauna
441        :raises ServiceError: Fauna returned an error
442        :raises ValueError: Encoding and decoding errors
443        :raises TypeError: Invalid param types
444        """
445
446    if isinstance(fql, Query):
447      if opts.cursor is not None:
448        raise ClientError(
449            "The 'cursor' configuration can only be used with an event source.")
450
451      source = self.query(fql).data
452    else:
453      source = fql
454
455    if not isinstance(source, EventSource):
456      err_msg = f"'fql' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}."
457      raise TypeError(err_msg)
458
459    headers = self._headers.copy()
460    headers[_Header.Format] = "tagged"
461    headers[_Header.Authorization] = self._auth.bearer()
462
463    return StreamIterator(self._session, headers, self._endpoint + "/stream/1",
464                          self._max_attempts, self._max_backoff, opts, source)
465
466  def feed(
467      self,
468      source: Union[EventSource, Query],
469      opts: FeedOptions = FeedOptions(),
470  ) -> "FeedIterator":
471    """
472        Opens an Event Feed in Fauna and returns an iterator that consume Fauna events.
473
474        :param source: An EventSource or a Query that returns an EventSource.
475        :param opts: (Optional) Event Feed options.
476
477        :return: a :class:`FeedIterator`
478
479        :raises ClientError: Invalid options provided
480        :raises NetworkError: HTTP Request failed in transit
481        :raises ProtocolError: HTTP error not from Fauna
482        :raises ServiceError: Fauna returned an error
483        :raises ValueError: Encoding and decoding errors
484        :raises TypeError: Invalid param types
485        """
486
487    if isinstance(source, Query):
488      source = self.query(source).data
489
490    if not isinstance(source, EventSource):
491      err_msg = f"'source' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}."
492      raise TypeError(err_msg)
493
494    headers = self._headers.copy()
495    headers[_Header.Format] = "tagged"
496    headers[_Header.Authorization] = self._auth.bearer()
497
498    if opts.query_timeout is not None:
499      query_timeout_ms = int(opts.query_timeout.total_seconds() * 1000)
500      headers[Header.QueryTimeoutMs] = str(query_timeout_ms)
501    elif self._query_timeout_ms is not None:
502      headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms)
503
504    return FeedIterator(self._session, headers, self._endpoint + "/feed/1",
505                        self._max_attempts, self._max_backoff, opts, source)
506
507  def _check_protocol(self, response_json: Any, status_code):
508    # TODO: Logic to validate wire protocol belongs elsewhere.
509    should_raise = False
510
511    # check for QuerySuccess
512    if status_code <= 399 and "data" not in response_json:
513      should_raise = True
514
515    # check for QueryFailure
516    if status_code > 399:
517      if "error" not in response_json:
518        should_raise = True
519      else:
520        e = response_json["error"]
521        if "code" not in e or "message" not in e:
522          should_raise = True
523
524    if should_raise:
525      raise ProtocolError(
526          status_code,
527          f"Response is in an unknown format: \n{response_json}",
528      )
529
530  def _set_endpoint(self, endpoint):
531    if endpoint is None:
532      endpoint = _Environment.EnvFaunaEndpoint()
533
534    if endpoint[-1:] == "/":
535      endpoint = endpoint[:-1]
536
537    self._endpoint = endpoint
Client( endpoint: Optional[str] = None, secret: Optional[str] = None, http_client: Optional[fauna.http.http_client.HTTPClient] = None, query_tags: Optional[Mapping[str, str]] = None, linearized: Optional[bool] = None, max_contention_retries: Optional[int] = None, typecheck: Optional[bool] = None, additional_headers: Optional[Dict[str, str]] = None, query_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5), client_buffer_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5), http_read_timeout: Optional[datetime.timedelta] = None, http_write_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5), http_connect_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5), http_pool_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5), http_idle_timeout: Optional[datetime.timedelta] = datetime.timedelta(seconds=5), max_attempts: int = 3, max_backoff: int = 20)
100  def __init__(
101      self,
102      endpoint: Optional[str] = None,
103      secret: Optional[str] = None,
104      http_client: Optional[HTTPClient] = None,
105      query_tags: Optional[Mapping[str, str]] = None,
106      linearized: Optional[bool] = None,
107      max_contention_retries: Optional[int] = None,
108      typecheck: Optional[bool] = None,
109      additional_headers: Optional[Dict[str, str]] = None,
110      query_timeout: Optional[timedelta] = DefaultQueryTimeout,
111      client_buffer_timeout: Optional[timedelta] = DefaultClientBufferTimeout,
112      http_read_timeout: Optional[timedelta] = DefaultHttpReadTimeout,
113      http_write_timeout: Optional[timedelta] = DefaultHttpWriteTimeout,
114      http_connect_timeout: Optional[timedelta] = DefaultHttpConnectTimeout,
115      http_pool_timeout: Optional[timedelta] = DefaultHttpPoolTimeout,
116      http_idle_timeout: Optional[timedelta] = DefaultIdleConnectionTimeout,
117      max_attempts: int = 3,
118      max_backoff: int = 20,
119  ):
120    """Initializes a Client.
121
122        :param endpoint: The Fauna Endpoint to use. Defaults to https://db.fauna.com, or the `FAUNA_ENDPOINT` env variable.
123        :param secret: The Fauna Secret to use. Defaults to empty, or the `FAUNA_SECRET` env variable.
124        :param http_client: An :class:`HTTPClient` implementation. Defaults to a global :class:`HTTPXClient`.
125        :param query_tags: Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_
126        :param linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
127        :param max_contention_retries: The max number of times to retry the query if contention is encountered.
128        :param typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration.
129        :param additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary.
130        :param query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is :py:data:`DefaultQueryTimeout`.
131        :param client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is :py:data:`DefaultClientBufferTimeout`, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error.
132        :param http_read_timeout: Set HTTP Read timeout, default is :py:data:`DefaultHttpReadTimeout`.
133        :param http_write_timeout: Set HTTP Write timeout, default is :py:data:`DefaultHttpWriteTimeout`.
134        :param http_connect_timeout: Set HTTP Connect timeout, default is :py:data:`DefaultHttpConnectTimeout`.
135        :param http_pool_timeout: Set HTTP Pool timeout, default is :py:data:`DefaultHttpPoolTimeout`.
136        :param http_idle_timeout: Set HTTP Idle timeout, default is :py:data:`DefaultIdleConnectionTimeout`.
137        :param max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3.
138        :param max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20.
139        """
140
141    self._set_endpoint(endpoint)
142    self._max_attempts = max_attempts
143    self._max_backoff = max_backoff
144
145    if secret is None:
146      self._auth = _Auth(_Environment.EnvFaunaSecret())
147    else:
148      self._auth = _Auth(secret)
149
150    self._last_txn_ts = LastTxnTs()
151
152    self._query_tags = {}
153    if query_tags is not None:
154      self._query_tags.update(query_tags)
155
156    if query_timeout is not None:
157      self._query_timeout_ms = int(query_timeout.total_seconds() * 1000)
158    else:
159      self._query_timeout_ms = None
160
161    self._headers: Dict[str, str] = {
162        _Header.AcceptEncoding: "gzip",
163        _Header.ContentType: "application/json;charset=utf-8",
164        _Header.Driver: "python",
165        _Header.DriverEnv: str(_DriverEnvironment()),
166    }
167
168    if typecheck is not None:
169      self._headers[Header.Typecheck] = str(typecheck).lower()
170
171    if linearized is not None:
172      self._headers[Header.Linearized] = str(linearized).lower()
173
174    if max_contention_retries is not None and max_contention_retries > 0:
175      self._headers[Header.MaxContentionRetries] = \
176          f"{max_contention_retries}"
177
178    if additional_headers is not None:
179      self._headers = {
180          **self._headers,
181          **additional_headers,
182      }
183
184    self._session: HTTPClient
185
186    if http_client is not None:
187      self._session = http_client
188    else:
189      if fauna.global_http_client is None:
190        timeout_s: Optional[float] = None
191        if query_timeout is not None and client_buffer_timeout is not None:
192          timeout_s = (query_timeout + client_buffer_timeout).total_seconds()
193        read_timeout_s: Optional[float] = None
194        if http_read_timeout is not None:
195          read_timeout_s = http_read_timeout.total_seconds()
196
197        write_timeout_s: Optional[float] = http_write_timeout.total_seconds(
198        ) if http_write_timeout is not None else None
199        connect_timeout_s: Optional[float] = http_connect_timeout.total_seconds(
200        ) if http_connect_timeout is not None else None
201        pool_timeout_s: Optional[float] = http_pool_timeout.total_seconds(
202        ) if http_pool_timeout is not None else None
203        idle_timeout_s: Optional[float] = http_idle_timeout.total_seconds(
204        ) if http_idle_timeout is not None else None
205
206        import httpx
207        from fauna.http.httpx_client import HTTPXClient
208        c = HTTPXClient(
209            httpx.Client(
210                http1=True,
211                http2=False,
212                timeout=httpx.Timeout(
213                    timeout=timeout_s,
214                    connect=connect_timeout_s,
215                    read=read_timeout_s,
216                    write=write_timeout_s,
217                    pool=pool_timeout_s,
218                ),
219                limits=httpx.Limits(
220                    max_connections=DefaultMaxConnections,
221                    max_keepalive_connections=DefaultMaxIdleConnections,
222                    keepalive_expiry=idle_timeout_s,
223                ),
224            ), logger)
225        fauna.global_http_client = c
226
227      self._session = fauna.global_http_client

Initializes a Client.

Parameters
  • endpoint: The Fauna Endpoint to use. Defaults to https: //db.fauna.com, or the FAUNA_ENDPOINT env variable.
  • secret: The Fauna Secret to use. Defaults to empty, or the FAUNA_SECRET env variable.
  • http_client: An HTTPClient implementation. Defaults to a global HTTPXClient.
  • **query_tags: Tags to associate with the query. See logging
  • linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
  • max_contention_retries: The max number of times to retry the query if contention is encountered.
  • typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration.
  • additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary.
  • query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is DefaultQueryTimeout.
  • client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is DefaultClientBufferTimeout, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error.
  • http_read_timeout: Set HTTP Read timeout, default is DefaultHttpReadTimeout.
  • http_write_timeout: Set HTTP Write timeout, default is DefaultHttpWriteTimeout.
  • http_connect_timeout: Set HTTP Connect timeout, default is DefaultHttpConnectTimeout.
  • http_pool_timeout: Set HTTP Pool timeout, default is DefaultHttpPoolTimeout.
  • http_idle_timeout: Set HTTP Idle timeout, default is DefaultIdleConnectionTimeout.
  • max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3.
  • max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20.
def close(self):
229  def close(self):
230    self._session.close()
231    if self._session == fauna.global_http_client:
232      fauna.global_http_client = None
def set_last_txn_ts(self, txn_ts: int):
234  def set_last_txn_ts(self, txn_ts: int):
235    """
236        Set the last timestamp seen by this client.
237        This has no effect if earlier than stored timestamp.
238
239        .. WARNING:: This should be used only when coordinating timestamps across
240        multiple clients. Moving the timestamp arbitrarily forward into
241        the future will cause transactions to stall.
242
243        :param txn_ts: the new transaction time.
244        """
245    self._last_txn_ts.update_txn_time(txn_ts)

Set the last timestamp seen by this client. This has no effect if earlier than stored timestamp.

.. WARNING:: This should be used only when coordinating timestamps across multiple clients. Moving the timestamp arbitrarily forward into the future will cause transactions to stall.

Parameters
  • txn_ts: the new transaction time.
def get_last_txn_ts(self) -> Optional[int]:
247  def get_last_txn_ts(self) -> Optional[int]:
248    """
249        Get the last timestamp seen by this client.
250        :return:
251        """
252    return self._last_txn_ts.time

Get the last timestamp seen by this client.

Returns
def get_query_timeout(self) -> Optional[datetime.timedelta]:
254  def get_query_timeout(self) -> Optional[timedelta]:
255    """
256        Get the query timeout for all queries.
257        """
258    if self._query_timeout_ms is not None:
259      return timedelta(milliseconds=self._query_timeout_ms)
260    else:
261      return None

Get the query timeout for all queries.

def paginate( self, fql: fauna.query.query_builder.Query, opts: Optional[QueryOptions] = None) -> QueryIterator:
263  def paginate(
264      self,
265      fql: Query,
266      opts: Optional[QueryOptions] = None,
267  ) -> "QueryIterator":
268    """
269        Run a query on Fauna and returning an iterator of results. If the query
270        returns a Page, the iterator will fetch additional Pages until the
271        after token is null. Each call for a page will be retried with exponential
272        backoff up to the max_attempts set in the client's retry policy in the
273        event of a 429 or 502.
274
275        :param fql: A Query
276        :param opts: (Optional) Query Options
277
278        :return: a :class:`QueryResponse`
279
280        :raises NetworkError: HTTP Request failed in transit
281        :raises ProtocolError: HTTP error not from Fauna
282        :raises ServiceError: Fauna returned an error
283        :raises ValueError: Encoding and decoding errors
284        :raises TypeError: Invalid param types
285        """
286
287    if not isinstance(fql, Query):
288      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
289                 f"Query by calling fauna.fql()"
290      raise TypeError(err_msg)
291
292    return QueryIterator(self, fql, opts)

Run a query on Fauna and returning an iterator of results. If the query returns a Page, the iterator will fetch additional Pages until the after token is null. Each call for a page will be retried with exponential backoff up to the max_attempts set in the client's retry policy in the event of a 429 or 502.

Parameters
  • fql: A Query
  • opts: (Optional) Query Options
Returns

a QueryResponse

Raises
  • NetworkError: HTTP Request failed in transit
  • ProtocolError: HTTP error not from Fauna
  • ServiceError: Fauna returned an error
  • ValueError: Encoding and decoding errors
  • TypeError: Invalid param types
def query( self, fql: fauna.query.query_builder.Query, opts: Optional[QueryOptions] = None) -> fauna.encoding.wire_protocol.QuerySuccess:
294  def query(
295      self,
296      fql: Query,
297      opts: Optional[QueryOptions] = None,
298  ) -> QuerySuccess:
299    """
300        Run a query on Fauna. A query will be retried max_attempt times with exponential backoff
301        up to the max_backoff in the event of a 429.
302
303        :param fql: A Query
304        :param opts: (Optional) Query Options
305
306        :return: a :class:`QueryResponse`
307
308        :raises NetworkError: HTTP Request failed in transit
309        :raises ProtocolError: HTTP error not from Fauna
310        :raises ServiceError: Fauna returned an error
311        :raises ValueError: Encoding and decoding errors
312        :raises TypeError: Invalid param types
313        """
314
315    if not isinstance(fql, Query):
316      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
317                 f"Query by calling fauna.fql()"
318      raise TypeError(err_msg)
319
320    try:
321      encoded_query: Mapping[str, Any] = FaunaEncoder.encode(fql)
322    except Exception as e:
323      raise ClientError("Failed to encode Query") from e
324
325    retryable = Retryable[QuerySuccess](
326        self._max_attempts,
327        self._max_backoff,
328        self._query,
329        "/query/1",
330        fql=encoded_query,
331        opts=opts,
332    )
333
334    r = retryable.run()
335    r.response.stats.attempts = r.attempts
336    return r.response

Run a query on Fauna. A query will be retried max_attempt times with exponential backoff up to the max_backoff in the event of a 429.

Parameters
  • fql: A Query
  • opts: (Optional) Query Options
Returns

a QueryResponse

Raises
  • NetworkError: HTTP Request failed in transit
  • ProtocolError: HTTP error not from Fauna
  • ServiceError: Fauna returned an error
  • ValueError: Encoding and decoding errors
  • TypeError: Invalid param types
def stream( self, fql: Union[fauna.query.models.EventSource, fauna.query.query_builder.Query], opts: StreamOptions = StreamOptions(max_attempts=None, max_backoff=None, start_ts=None, cursor=None, status_events=False)) -> StreamIterator:
425  def stream(
426      self,
427      fql: Union[EventSource, Query],
428      opts: StreamOptions = StreamOptions()
429  ) -> "StreamIterator":
430    """
431        Opens a Stream in Fauna and returns an iterator that consume Fauna events.
432
433        :param fql: An EventSource or a Query that returns an EventSource.
434        :param opts: (Optional) Stream Options.
435
436        :return: a :class:`StreamIterator`
437
438        :raises ClientError: Invalid options provided
439        :raises NetworkError: HTTP Request failed in transit
440        :raises ProtocolError: HTTP error not from Fauna
441        :raises ServiceError: Fauna returned an error
442        :raises ValueError: Encoding and decoding errors
443        :raises TypeError: Invalid param types
444        """
445
446    if isinstance(fql, Query):
447      if opts.cursor is not None:
448        raise ClientError(
449            "The 'cursor' configuration can only be used with an event source.")
450
451      source = self.query(fql).data
452    else:
453      source = fql
454
455    if not isinstance(source, EventSource):
456      err_msg = f"'fql' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}."
457      raise TypeError(err_msg)
458
459    headers = self._headers.copy()
460    headers[_Header.Format] = "tagged"
461    headers[_Header.Authorization] = self._auth.bearer()
462
463    return StreamIterator(self._session, headers, self._endpoint + "/stream/1",
464                          self._max_attempts, self._max_backoff, opts, source)

Opens a Stream in Fauna and returns an iterator that consume Fauna events.

Parameters
  • fql: An EventSource or a Query that returns an EventSource.
  • opts: (Optional) Stream Options.
Returns

a StreamIterator

Raises
  • ClientError: Invalid options provided
  • NetworkError: HTTP Request failed in transit
  • ProtocolError: HTTP error not from Fauna
  • ServiceError: Fauna returned an error
  • ValueError: Encoding and decoding errors
  • TypeError: Invalid param types
def feed( self, source: Union[fauna.query.models.EventSource, fauna.query.query_builder.Query], opts: FeedOptions = FeedOptions(max_attempts=None, max_backoff=None, query_timeout=None, page_size=None, start_ts=None, cursor=None)) -> FeedIterator:
466  def feed(
467      self,
468      source: Union[EventSource, Query],
469      opts: FeedOptions = FeedOptions(),
470  ) -> "FeedIterator":
471    """
472        Opens an Event Feed in Fauna and returns an iterator that consume Fauna events.
473
474        :param source: An EventSource or a Query that returns an EventSource.
475        :param opts: (Optional) Event Feed options.
476
477        :return: a :class:`FeedIterator`
478
479        :raises ClientError: Invalid options provided
480        :raises NetworkError: HTTP Request failed in transit
481        :raises ProtocolError: HTTP error not from Fauna
482        :raises ServiceError: Fauna returned an error
483        :raises ValueError: Encoding and decoding errors
484        :raises TypeError: Invalid param types
485        """
486
487    if isinstance(source, Query):
488      source = self.query(source).data
489
490    if not isinstance(source, EventSource):
491      err_msg = f"'source' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}."
492      raise TypeError(err_msg)
493
494    headers = self._headers.copy()
495    headers[_Header.Format] = "tagged"
496    headers[_Header.Authorization] = self._auth.bearer()
497
498    if opts.query_timeout is not None:
499      query_timeout_ms = int(opts.query_timeout.total_seconds() * 1000)
500      headers[Header.QueryTimeoutMs] = str(query_timeout_ms)
501    elif self._query_timeout_ms is not None:
502      headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms)
503
504    return FeedIterator(self._session, headers, self._endpoint + "/feed/1",
505                        self._max_attempts, self._max_backoff, opts, source)

Opens an Event Feed in Fauna and returns an iterator that consume Fauna events.

Parameters
  • source: An EventSource or a Query that returns an EventSource.
  • opts: (Optional) Event Feed options.
Returns

a FeedIterator

Raises
  • ClientError: Invalid options provided
  • NetworkError: HTTP Request failed in transit
  • ProtocolError: HTTP error not from Fauna
  • ServiceError: Fauna returned an error
  • ValueError: Encoding and decoding errors
  • TypeError: Invalid param types
class StreamIterator:
540class StreamIterator:
541  """A class that mixes a ContextManager and an Iterator so we can detected retryable errors."""
542
543  def __init__(self, http_client: HTTPClient, headers: Dict[str, str],
544               endpoint: str, max_attempts: int, max_backoff: int,
545               opts: StreamOptions, source: EventSource):
546    self._http_client = http_client
547    self._headers = headers
548    self._endpoint = endpoint
549    self._max_attempts = max_attempts
550    self._max_backoff = max_backoff
551    self._opts = opts
552    self._source = source
553    self._stream = None
554    self.last_ts = None
555    self.last_cursor = None
556    self._ctx = self._create_stream()
557
558    if opts.start_ts is not None and opts.cursor is not None:
559      err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the StreamOptions."
560      raise TypeError(err_msg)
561
562  def __enter__(self):
563    return self
564
565  def __exit__(self, exc_type, exc_value, exc_traceback):
566    if self._stream is not None:
567      self._stream.close()
568
569    self._ctx.__exit__(exc_type, exc_value, exc_traceback)
570    return False
571
572  def __iter__(self):
573    return self
574
575  def __next__(self):
576    if self._opts.max_attempts is not None:
577      max_attempts = self._opts.max_attempts
578    else:
579      max_attempts = self._max_attempts
580
581    if self._opts.max_backoff is not None:
582      max_backoff = self._opts.max_backoff
583    else:
584      max_backoff = self._max_backoff
585
586    retryable = Retryable[Any](max_attempts, max_backoff, self._next_element)
587    return retryable.run().response
588
589  def _next_element(self):
590    try:
591      if self._stream is None:
592        try:
593          self._stream = self._ctx.__enter__()
594        except Exception:
595          self._retry_stream()
596
597      if self._stream is not None:
598        event: Any = FaunaDecoder.decode(next(self._stream))
599
600        if event["type"] == "error":
601          FaunaError.parse_error_and_throw(event, 400)
602
603        self.last_ts = event["txn_ts"]
604        self.last_cursor = event.get('cursor')
605
606        if event["type"] == "start":
607          return self._next_element()
608
609        if not self._opts.status_events and event["type"] == "status":
610          return self._next_element()
611
612        return event
613
614      raise StopIteration
615    except NetworkError:
616      self._retry_stream()
617
618  def _retry_stream(self):
619    if self._stream is not None:
620      self._stream.close()
621
622    self._stream = None
623
624    try:
625      self._ctx = self._create_stream()
626    except Exception:
627      pass
628    raise RetryableFaunaException
629
630  def _create_stream(self):
631    data: Dict[str, Any] = {"token": self._source.token}
632    if self.last_cursor is not None:
633      data["cursor"] = self.last_cursor
634    elif self._opts.cursor is not None:
635      data["cursor"] = self._opts.cursor
636    elif self._opts.start_ts is not None:
637      data["start_ts"] = self._opts.start_ts
638
639    return self._http_client.stream(
640        url=self._endpoint, headers=self._headers, data=data)
641
642  def close(self):
643    if self._stream is not None:
644      self._stream.close()

A class that mixes a ContextManager and an Iterator so we can detected retryable errors.

StreamIterator( http_client: fauna.http.http_client.HTTPClient, headers: Dict[str, str], endpoint: str, max_attempts: int, max_backoff: int, opts: StreamOptions, source: fauna.query.models.EventSource)
543  def __init__(self, http_client: HTTPClient, headers: Dict[str, str],
544               endpoint: str, max_attempts: int, max_backoff: int,
545               opts: StreamOptions, source: EventSource):
546    self._http_client = http_client
547    self._headers = headers
548    self._endpoint = endpoint
549    self._max_attempts = max_attempts
550    self._max_backoff = max_backoff
551    self._opts = opts
552    self._source = source
553    self._stream = None
554    self.last_ts = None
555    self.last_cursor = None
556    self._ctx = self._create_stream()
557
558    if opts.start_ts is not None and opts.cursor is not None:
559      err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the StreamOptions."
560      raise TypeError(err_msg)
last_ts
last_cursor
def close(self):
642  def close(self):
643    if self._stream is not None:
644      self._stream.close()
class FeedPage:
647class FeedPage:
648
649  def __init__(self, events: List[Any], cursor: str, stats: QueryStats):
650    self._events = events
651    self.cursor = cursor
652    self.stats = stats
653
654  def __len__(self):
655    return len(self._events)
656
657  def __iter__(self) -> Iterator[Any]:
658    for event in self._events:
659      if event["type"] == "error":
660        FaunaError.parse_error_and_throw(event, 400)
661      yield event
FeedPage( events: List[Any], cursor: str, stats: fauna.encoding.wire_protocol.QueryStats)
649  def __init__(self, events: List[Any], cursor: str, stats: QueryStats):
650    self._events = events
651    self.cursor = cursor
652    self.stats = stats
cursor
stats
class FeedIterator:
664class FeedIterator:
665  """A class to provide an iterator on top of Event Feed pages."""
666
667  def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str,
668               max_attempts: int, max_backoff: int, opts: FeedOptions,
669               source: EventSource):
670    self._http = http
671    self._headers = headers
672    self._endpoint = endpoint
673    self._max_attempts = opts.max_attempts or max_attempts
674    self._max_backoff = opts.max_backoff or max_backoff
675    self._request: Dict[str, Any] = {"token": source.token}
676    self._is_done = False
677
678    if opts.start_ts is not None and opts.cursor is not None:
679      err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the FeedOptions."
680      raise TypeError(err_msg)
681
682    if opts.page_size is not None:
683      self._request["page_size"] = opts.page_size
684
685    if opts.cursor is not None:
686      self._request["cursor"] = opts.cursor
687    elif opts.start_ts is not None:
688      self._request["start_ts"] = opts.start_ts
689
690  def __iter__(self) -> Iterator[FeedPage]:
691    self._is_done = False
692    return self
693
694  def __next__(self) -> FeedPage:
695    if self._is_done:
696      raise StopIteration
697
698    retryable = Retryable[Any](self._max_attempts, self._max_backoff,
699                               self._next_page)
700    return retryable.run().response
701
702  def _next_page(self) -> FeedPage:
703    with self._http.request(
704        method="POST",
705        url=self._endpoint,
706        headers=self._headers,
707        data=self._request,
708    ) as response:
709      status_code = response.status_code()
710      decoded: Any = FaunaDecoder.decode(response.json())
711
712      if status_code > 399:
713        FaunaError.parse_error_and_throw(decoded, status_code)
714
715      self._is_done = not decoded["has_next"]
716      self._request["cursor"] = decoded["cursor"]
717
718      if "start_ts" in self._request:
719        del self._request["start_ts"]
720
721      return FeedPage(decoded["events"], decoded["cursor"],
722                      QueryStats(decoded["stats"]))
723
724  def flatten(self) -> Iterator:
725    """A generator that yields events instead of pages of events."""
726    for page in self:
727      for event in page:
728        yield event

A class to provide an iterator on top of Event Feed pages.

FeedIterator( http: fauna.http.http_client.HTTPClient, headers: Dict[str, str], endpoint: str, max_attempts: int, max_backoff: int, opts: FeedOptions, source: fauna.query.models.EventSource)
667  def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str,
668               max_attempts: int, max_backoff: int, opts: FeedOptions,
669               source: EventSource):
670    self._http = http
671    self._headers = headers
672    self._endpoint = endpoint
673    self._max_attempts = opts.max_attempts or max_attempts
674    self._max_backoff = opts.max_backoff or max_backoff
675    self._request: Dict[str, Any] = {"token": source.token}
676    self._is_done = False
677
678    if opts.start_ts is not None and opts.cursor is not None:
679      err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the FeedOptions."
680      raise TypeError(err_msg)
681
682    if opts.page_size is not None:
683      self._request["page_size"] = opts.page_size
684
685    if opts.cursor is not None:
686      self._request["cursor"] = opts.cursor
687    elif opts.start_ts is not None:
688      self._request["start_ts"] = opts.start_ts
def flatten(self) -> Iterator:
724  def flatten(self) -> Iterator:
725    """A generator that yields events instead of pages of events."""
726    for page in self:
727      for event in page:
728        yield event

A generator that yields events instead of pages of events.

class QueryIterator:
731class QueryIterator:
732  """A class to provider an iterator on top of Fauna queries."""
733
734  def __init__(self,
735               client: Client,
736               fql: Query,
737               opts: Optional[QueryOptions] = None):
738    """Initializes the QueryIterator
739
740        :param fql: A Query
741        :param opts: (Optional) Query Options
742
743        :raises TypeError: Invalid param types
744        """
745    if not isinstance(client, Client):
746      err_msg = f"'client' must be a Client but was a {type(client)}. You can build a " \
747                  f"Client by calling fauna.client.Client()"
748      raise TypeError(err_msg)
749
750    if not isinstance(fql, Query):
751      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
752                 f"Query by calling fauna.fql()"
753      raise TypeError(err_msg)
754
755    self.client = client
756    self.fql = fql
757    self.opts = opts
758
759  def __iter__(self) -> Iterator:
760    return self.iter()
761
762  def iter(self) -> Iterator:
763    """
764        A generator function that immediately fetches and yields the results of
765        the stored query. Yields additional pages on subsequent iterations if
766        they exist
767        """
768
769    cursor = None
770    initial_response = self.client.query(self.fql, self.opts)
771
772    if isinstance(initial_response.data, Page):
773      cursor = initial_response.data.after
774      yield initial_response.data.data
775
776      while cursor is not None:
777        next_response = self.client.query(
778            fql("Set.paginate(${after})", after=cursor), self.opts)
779        # TODO: `Set.paginate` does not yet return a `@set` tagged value
780        #       so we will get back a plain object that might not have
781        #       an after property.
782        cursor = next_response.data.get("after")
783        yield next_response.data.get("data")
784
785    else:
786      yield [initial_response.data]
787
788  def flatten(self) -> Iterator:
789    """
790        A generator function that immediately fetches and yields the results of
791        the stored query. Yields each item individually, rather than a whole
792        Page at a time. Fetches additional pages as required if they exist.
793        """
794
795    for page in self.iter():
796      for item in page:
797        yield item

A class to provider an iterator on top of Fauna queries.

QueryIterator( client: Client, fql: fauna.query.query_builder.Query, opts: Optional[QueryOptions] = None)
734  def __init__(self,
735               client: Client,
736               fql: Query,
737               opts: Optional[QueryOptions] = None):
738    """Initializes the QueryIterator
739
740        :param fql: A Query
741        :param opts: (Optional) Query Options
742
743        :raises TypeError: Invalid param types
744        """
745    if not isinstance(client, Client):
746      err_msg = f"'client' must be a Client but was a {type(client)}. You can build a " \
747                  f"Client by calling fauna.client.Client()"
748      raise TypeError(err_msg)
749
750    if not isinstance(fql, Query):
751      err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
752                 f"Query by calling fauna.fql()"
753      raise TypeError(err_msg)
754
755    self.client = client
756    self.fql = fql
757    self.opts = opts

Initializes the QueryIterator

Parameters
  • fql: A Query
  • opts: (Optional) Query Options
Raises
  • TypeError: Invalid param types
client
fql
opts
def iter(self) -> Iterator:
762  def iter(self) -> Iterator:
763    """
764        A generator function that immediately fetches and yields the results of
765        the stored query. Yields additional pages on subsequent iterations if
766        they exist
767        """
768
769    cursor = None
770    initial_response = self.client.query(self.fql, self.opts)
771
772    if isinstance(initial_response.data, Page):
773      cursor = initial_response.data.after
774      yield initial_response.data.data
775
776      while cursor is not None:
777        next_response = self.client.query(
778            fql("Set.paginate(${after})", after=cursor), self.opts)
779        # TODO: `Set.paginate` does not yet return a `@set` tagged value
780        #       so we will get back a plain object that might not have
781        #       an after property.
782        cursor = next_response.data.get("after")
783        yield next_response.data.get("data")
784
785    else:
786      yield [initial_response.data]

A generator function that immediately fetches and yields the results of the stored query. Yields additional pages on subsequent iterations if they exist

def flatten(self) -> Iterator:
788  def flatten(self) -> Iterator:
789    """
790        A generator function that immediately fetches and yields the results of
791        the stored query. Yields each item individually, rather than a whole
792        Page at a time. Fetches additional pages as required if they exist.
793        """
794
795    for page in self.iter():
796      for item in page:
797        yield item

A generator function that immediately fetches and yields the results of the stored query. Yields each item individually, rather than a whole Page at a time. Fetches additional pages as required if they exist.