fauna.client.client
1import logging 2from dataclasses import dataclass 3from datetime import timedelta 4from typing import Any, Dict, Iterator, Mapping, Optional, Union, List 5 6import fauna 7from fauna.client.headers import _DriverEnvironment, _Header, _Auth, Header 8from fauna.client.retryable import Retryable 9from fauna.client.utils import _Environment, LastTxnTs 10from fauna.encoding import FaunaEncoder, FaunaDecoder 11from fauna.encoding import QuerySuccess, QueryTags, QueryStats 12from fauna.errors import FaunaError, ClientError, ProtocolError, \ 13 RetryableFaunaException, NetworkError 14from fauna.http.http_client import HTTPClient 15from fauna.query import EventSource, Query, Page, fql 16 17logger = logging.getLogger("fauna") 18 19DefaultHttpConnectTimeout = timedelta(seconds=5) 20DefaultHttpReadTimeout: Optional[timedelta] = None 21DefaultHttpWriteTimeout = timedelta(seconds=5) 22DefaultHttpPoolTimeout = timedelta(seconds=5) 23DefaultIdleConnectionTimeout = timedelta(seconds=5) 24DefaultQueryTimeout = timedelta(seconds=5) 25DefaultClientBufferTimeout = timedelta(seconds=5) 26DefaultMaxConnections = 20 27DefaultMaxIdleConnections = 20 28 29 30@dataclass 31class QueryOptions: 32 """ 33 A dataclass representing options available for a query. 34 35 * linearized - If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized. 36 * max_contention_retries - The max number of times to retry the query if contention is encountered. 37 * query_timeout - Controls the maximum amount of time Fauna will execute your query before marking it failed. 38 * query_tags - Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ 39 * traceparent - A traceparent to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ Must match format: https://www.w3.org/TR/trace-context/#traceparent-header 40 * typecheck - Enable or disable typechecking of the query before evaluation. If not set, the value configured on the Client will be used. If neither is set, Fauna will use the value of the "typechecked" flag on the database configuration. 41 * additional_headers - Add/update HTTP request headers for the query. In general, this should not be necessary. 42 """ 43 44 linearized: Optional[bool] = None 45 max_contention_retries: Optional[int] = None 46 query_timeout: Optional[timedelta] = DefaultQueryTimeout 47 query_tags: Optional[Mapping[str, str]] = None 48 traceparent: Optional[str] = None 49 typecheck: Optional[bool] = None 50 additional_headers: Optional[Dict[str, str]] = None 51 52 53@dataclass 54class StreamOptions: 55 """ 56 A dataclass representing options available for a stream. 57 58 * max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown. 59 * max_backoff - The maximum backoff in seconds for an individual retry. 60 * start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after 61 the timestamp. 62 * cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor. 63 * status_events - Indicates if stream should include status events. Status events are periodic events that 64 update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics 65 about the cost of maintaining the stream other than the cost of the received events. 66 """ 67 68 max_attempts: Optional[int] = None 69 max_backoff: Optional[int] = None 70 start_ts: Optional[int] = None 71 cursor: Optional[str] = None 72 status_events: bool = False 73 74 75@dataclass 76class FeedOptions: 77 """ 78 A dataclass representing options available for an Event Feed. 79 80 * max_attempts - The maximum number of times to attempt an Event Feed query when a retryable exception is thrown. 81 * max_backoff - The maximum backoff in seconds for an individual retry. 82 * query_timeout - Controls the maximum amount of time Fauna will execute a query before returning a page of events. 83 * start_ts - The starting timestamp of the Event Feed, exclusive. If set, Fauna will return events starting after 84 the timestamp. 85 * cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor. 86 * page_size - Maximum number of events returned per page. Must be in the 87 range 1 to 16000 (inclusive). Defaults to 16. 88 """ 89 max_attempts: Optional[int] = None 90 max_backoff: Optional[int] = None 91 query_timeout: Optional[timedelta] = None 92 page_size: Optional[int] = None 93 start_ts: Optional[int] = None 94 cursor: Optional[str] = None 95 96 97class Client: 98 99 def __init__( 100 self, 101 endpoint: Optional[str] = None, 102 secret: Optional[str] = None, 103 http_client: Optional[HTTPClient] = None, 104 query_tags: Optional[Mapping[str, str]] = None, 105 linearized: Optional[bool] = None, 106 max_contention_retries: Optional[int] = None, 107 typecheck: Optional[bool] = None, 108 additional_headers: Optional[Dict[str, str]] = None, 109 query_timeout: Optional[timedelta] = DefaultQueryTimeout, 110 client_buffer_timeout: Optional[timedelta] = DefaultClientBufferTimeout, 111 http_read_timeout: Optional[timedelta] = DefaultHttpReadTimeout, 112 http_write_timeout: Optional[timedelta] = DefaultHttpWriteTimeout, 113 http_connect_timeout: Optional[timedelta] = DefaultHttpConnectTimeout, 114 http_pool_timeout: Optional[timedelta] = DefaultHttpPoolTimeout, 115 http_idle_timeout: Optional[timedelta] = DefaultIdleConnectionTimeout, 116 max_attempts: int = 3, 117 max_backoff: int = 20, 118 ): 119 """Initializes a Client. 120 121 :param endpoint: The Fauna Endpoint to use. Defaults to https://db.fauna.com, or the `FAUNA_ENDPOINT` env variable. 122 :param secret: The Fauna Secret to use. Defaults to empty, or the `FAUNA_SECRET` env variable. 123 :param http_client: An :class:`HTTPClient` implementation. Defaults to a global :class:`HTTPXClient`. 124 :param query_tags: Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ 125 :param linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized. 126 :param max_contention_retries: The max number of times to retry the query if contention is encountered. 127 :param typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration. 128 :param additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary. 129 :param query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is :py:data:`DefaultQueryTimeout`. 130 :param client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is :py:data:`DefaultClientBufferTimeout`, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error. 131 :param http_read_timeout: Set HTTP Read timeout, default is :py:data:`DefaultHttpReadTimeout`. 132 :param http_write_timeout: Set HTTP Write timeout, default is :py:data:`DefaultHttpWriteTimeout`. 133 :param http_connect_timeout: Set HTTP Connect timeout, default is :py:data:`DefaultHttpConnectTimeout`. 134 :param http_pool_timeout: Set HTTP Pool timeout, default is :py:data:`DefaultHttpPoolTimeout`. 135 :param http_idle_timeout: Set HTTP Idle timeout, default is :py:data:`DefaultIdleConnectionTimeout`. 136 :param max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3. 137 :param max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20. 138 """ 139 140 self._set_endpoint(endpoint) 141 self._max_attempts = max_attempts 142 self._max_backoff = max_backoff 143 144 if secret is None: 145 self._auth = _Auth(_Environment.EnvFaunaSecret()) 146 else: 147 self._auth = _Auth(secret) 148 149 self._last_txn_ts = LastTxnTs() 150 151 self._query_tags = {} 152 if query_tags is not None: 153 self._query_tags.update(query_tags) 154 155 if query_timeout is not None: 156 self._query_timeout_ms = int(query_timeout.total_seconds() * 1000) 157 else: 158 self._query_timeout_ms = None 159 160 self._headers: Dict[str, str] = { 161 _Header.AcceptEncoding: "gzip", 162 _Header.ContentType: "application/json;charset=utf-8", 163 _Header.Driver: "python", 164 _Header.DriverEnv: str(_DriverEnvironment()), 165 } 166 167 if typecheck is not None: 168 self._headers[Header.Typecheck] = str(typecheck).lower() 169 170 if linearized is not None: 171 self._headers[Header.Linearized] = str(linearized).lower() 172 173 if max_contention_retries is not None and max_contention_retries > 0: 174 self._headers[Header.MaxContentionRetries] = \ 175 f"{max_contention_retries}" 176 177 if additional_headers is not None: 178 self._headers = { 179 **self._headers, 180 **additional_headers, 181 } 182 183 self._session: HTTPClient 184 185 if http_client is not None: 186 self._session = http_client 187 else: 188 if fauna.global_http_client is None: 189 timeout_s: Optional[float] = None 190 if query_timeout is not None and client_buffer_timeout is not None: 191 timeout_s = (query_timeout + client_buffer_timeout).total_seconds() 192 read_timeout_s: Optional[float] = None 193 if http_read_timeout is not None: 194 read_timeout_s = http_read_timeout.total_seconds() 195 196 write_timeout_s: Optional[float] = http_write_timeout.total_seconds( 197 ) if http_write_timeout is not None else None 198 connect_timeout_s: Optional[float] = http_connect_timeout.total_seconds( 199 ) if http_connect_timeout is not None else None 200 pool_timeout_s: Optional[float] = http_pool_timeout.total_seconds( 201 ) if http_pool_timeout is not None else None 202 idle_timeout_s: Optional[float] = http_idle_timeout.total_seconds( 203 ) if http_idle_timeout is not None else None 204 205 import httpx 206 from fauna.http.httpx_client import HTTPXClient 207 c = HTTPXClient( 208 httpx.Client( 209 http1=True, 210 http2=False, 211 timeout=httpx.Timeout( 212 timeout=timeout_s, 213 connect=connect_timeout_s, 214 read=read_timeout_s, 215 write=write_timeout_s, 216 pool=pool_timeout_s, 217 ), 218 limits=httpx.Limits( 219 max_connections=DefaultMaxConnections, 220 max_keepalive_connections=DefaultMaxIdleConnections, 221 keepalive_expiry=idle_timeout_s, 222 ), 223 ), logger) 224 fauna.global_http_client = c 225 226 self._session = fauna.global_http_client 227 228 def close(self): 229 self._session.close() 230 if self._session == fauna.global_http_client: 231 fauna.global_http_client = None 232 233 def set_last_txn_ts(self, txn_ts: int): 234 """ 235 Set the last timestamp seen by this client. 236 This has no effect if earlier than stored timestamp. 237 238 .. WARNING:: This should be used only when coordinating timestamps across 239 multiple clients. Moving the timestamp arbitrarily forward into 240 the future will cause transactions to stall. 241 242 :param txn_ts: the new transaction time. 243 """ 244 self._last_txn_ts.update_txn_time(txn_ts) 245 246 def get_last_txn_ts(self) -> Optional[int]: 247 """ 248 Get the last timestamp seen by this client. 249 :return: 250 """ 251 return self._last_txn_ts.time 252 253 def get_query_timeout(self) -> Optional[timedelta]: 254 """ 255 Get the query timeout for all queries. 256 """ 257 if self._query_timeout_ms is not None: 258 return timedelta(milliseconds=self._query_timeout_ms) 259 else: 260 return None 261 262 def paginate( 263 self, 264 fql: Query, 265 opts: Optional[QueryOptions] = None, 266 ) -> "QueryIterator": 267 """ 268 Run a query on Fauna and returning an iterator of results. If the query 269 returns a Page, the iterator will fetch additional Pages until the 270 after token is null. Each call for a page will be retried with exponential 271 backoff up to the max_attempts set in the client's retry policy in the 272 event of a 429 or 502. 273 274 :param fql: A Query 275 :param opts: (Optional) Query Options 276 277 :return: a :class:`QueryResponse` 278 279 :raises NetworkError: HTTP Request failed in transit 280 :raises ProtocolError: HTTP error not from Fauna 281 :raises ServiceError: Fauna returned an error 282 :raises ValueError: Encoding and decoding errors 283 :raises TypeError: Invalid param types 284 """ 285 286 if not isinstance(fql, Query): 287 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 288 f"Query by calling fauna.fql()" 289 raise TypeError(err_msg) 290 291 return QueryIterator(self, fql, opts) 292 293 def query( 294 self, 295 fql: Query, 296 opts: Optional[QueryOptions] = None, 297 ) -> QuerySuccess: 298 """ 299 Run a query on Fauna. A query will be retried max_attempt times with exponential backoff 300 up to the max_backoff in the event of a 429. 301 302 :param fql: A Query 303 :param opts: (Optional) Query Options 304 305 :return: a :class:`QueryResponse` 306 307 :raises NetworkError: HTTP Request failed in transit 308 :raises ProtocolError: HTTP error not from Fauna 309 :raises ServiceError: Fauna returned an error 310 :raises ValueError: Encoding and decoding errors 311 :raises TypeError: Invalid param types 312 """ 313 314 if not isinstance(fql, Query): 315 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 316 f"Query by calling fauna.fql()" 317 raise TypeError(err_msg) 318 319 try: 320 encoded_query: Mapping[str, Any] = FaunaEncoder.encode(fql) 321 except Exception as e: 322 raise ClientError("Failed to encode Query") from e 323 324 retryable = Retryable[QuerySuccess]( 325 self._max_attempts, 326 self._max_backoff, 327 self._query, 328 "/query/1", 329 fql=encoded_query, 330 opts=opts, 331 ) 332 333 r = retryable.run() 334 r.response.stats.attempts = r.attempts 335 return r.response 336 337 def _query( 338 self, 339 path: str, 340 fql: Mapping[str, Any], 341 arguments: Optional[Mapping[str, Any]] = None, 342 opts: Optional[QueryOptions] = None, 343 ) -> QuerySuccess: 344 345 headers = self._headers.copy() 346 headers[_Header.Format] = "tagged" 347 headers[_Header.Authorization] = self._auth.bearer() 348 349 if self._query_timeout_ms is not None: 350 headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms) 351 352 headers.update(self._last_txn_ts.request_header) 353 354 query_tags = {} 355 if self._query_tags is not None: 356 query_tags.update(self._query_tags) 357 358 if opts is not None: 359 if opts.linearized is not None: 360 headers[Header.Linearized] = str(opts.linearized).lower() 361 if opts.max_contention_retries is not None: 362 headers[Header.MaxContentionRetries] = \ 363 f"{opts.max_contention_retries}" 364 if opts.traceparent is not None: 365 headers[Header.Traceparent] = opts.traceparent 366 if opts.query_timeout is not None: 367 timeout_ms = f"{int(opts.query_timeout.total_seconds() * 1000)}" 368 headers[Header.QueryTimeoutMs] = timeout_ms 369 if opts.query_tags is not None: 370 query_tags.update(opts.query_tags) 371 if opts.typecheck is not None: 372 headers[Header.Typecheck] = str(opts.typecheck).lower() 373 if opts.additional_headers is not None: 374 headers.update(opts.additional_headers) 375 376 if len(query_tags) > 0: 377 headers[Header.Tags] = QueryTags.encode(query_tags) 378 379 data: dict[str, Any] = { 380 "query": fql, 381 "arguments": arguments or {}, 382 } 383 384 with self._session.request( 385 method="POST", 386 url=self._endpoint + path, 387 headers=headers, 388 data=data, 389 ) as response: 390 status_code = response.status_code() 391 response_json = response.json() 392 headers = response.headers() 393 394 self._check_protocol(response_json, status_code) 395 396 dec: Any = FaunaDecoder.decode(response_json) 397 398 if status_code > 399: 399 FaunaError.parse_error_and_throw(dec, status_code) 400 401 if "txn_ts" in dec: 402 self.set_last_txn_ts(int(response_json["txn_ts"])) 403 404 stats = QueryStats(dec["stats"]) if "stats" in dec else None 405 summary = dec["summary"] if "summary" in dec else None 406 query_tags = QueryTags.decode( 407 dec["query_tags"]) if "query_tags" in dec else None 408 txn_ts = dec["txn_ts"] if "txn_ts" in dec else None 409 schema_version = dec["schema_version"] if "schema_version" in dec else None 410 traceparent = headers.get("traceparent", None) 411 static_type = dec["static_type"] if "static_type" in dec else None 412 413 return QuerySuccess( 414 data=dec["data"], 415 query_tags=query_tags, 416 static_type=static_type, 417 stats=stats, 418 summary=summary, 419 traceparent=traceparent, 420 txn_ts=txn_ts, 421 schema_version=schema_version, 422 ) 423 424 def stream( 425 self, 426 fql: Union[EventSource, Query], 427 opts: StreamOptions = StreamOptions() 428 ) -> "StreamIterator": 429 """ 430 Opens a Stream in Fauna and returns an iterator that consume Fauna events. 431 432 :param fql: An EventSource or a Query that returns an EventSource. 433 :param opts: (Optional) Stream Options. 434 435 :return: a :class:`StreamIterator` 436 437 :raises ClientError: Invalid options provided 438 :raises NetworkError: HTTP Request failed in transit 439 :raises ProtocolError: HTTP error not from Fauna 440 :raises ServiceError: Fauna returned an error 441 :raises ValueError: Encoding and decoding errors 442 :raises TypeError: Invalid param types 443 """ 444 445 if isinstance(fql, Query): 446 if opts.cursor is not None: 447 raise ClientError( 448 "The 'cursor' configuration can only be used with an event source.") 449 450 source = self.query(fql).data 451 else: 452 source = fql 453 454 if not isinstance(source, EventSource): 455 err_msg = f"'fql' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}." 456 raise TypeError(err_msg) 457 458 headers = self._headers.copy() 459 headers[_Header.Format] = "tagged" 460 headers[_Header.Authorization] = self._auth.bearer() 461 462 return StreamIterator(self._session, headers, self._endpoint + "/stream/1", 463 self._max_attempts, self._max_backoff, opts, source) 464 465 def feed( 466 self, 467 source: Union[EventSource, Query], 468 opts: FeedOptions = FeedOptions(), 469 ) -> "FeedIterator": 470 """ 471 Opens an Event Feed in Fauna and returns an iterator that consume Fauna events. 472 473 :param source: An EventSource or a Query that returns an EventSource. 474 :param opts: (Optional) Event Feed options. 475 476 :return: a :class:`FeedIterator` 477 478 :raises ClientError: Invalid options provided 479 :raises NetworkError: HTTP Request failed in transit 480 :raises ProtocolError: HTTP error not from Fauna 481 :raises ServiceError: Fauna returned an error 482 :raises ValueError: Encoding and decoding errors 483 :raises TypeError: Invalid param types 484 """ 485 486 if isinstance(source, Query): 487 source = self.query(source).data 488 489 if not isinstance(source, EventSource): 490 err_msg = f"'source' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}." 491 raise TypeError(err_msg) 492 493 headers = self._headers.copy() 494 headers[_Header.Format] = "tagged" 495 headers[_Header.Authorization] = self._auth.bearer() 496 497 if opts.query_timeout is not None: 498 query_timeout_ms = int(opts.query_timeout.total_seconds() * 1000) 499 headers[Header.QueryTimeoutMs] = str(query_timeout_ms) 500 elif self._query_timeout_ms is not None: 501 headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms) 502 503 return FeedIterator(self._session, headers, self._endpoint + "/feed/1", 504 self._max_attempts, self._max_backoff, opts, source) 505 506 def _check_protocol(self, response_json: Any, status_code): 507 # TODO: Logic to validate wire protocol belongs elsewhere. 508 should_raise = False 509 510 # check for QuerySuccess 511 if status_code <= 399 and "data" not in response_json: 512 should_raise = True 513 514 # check for QueryFailure 515 if status_code > 399: 516 if "error" not in response_json: 517 should_raise = True 518 else: 519 e = response_json["error"] 520 if "code" not in e or "message" not in e: 521 should_raise = True 522 523 if should_raise: 524 raise ProtocolError( 525 status_code, 526 f"Response is in an unknown format: \n{response_json}", 527 ) 528 529 def _set_endpoint(self, endpoint): 530 if endpoint is None: 531 endpoint = _Environment.EnvFaunaEndpoint() 532 533 if endpoint[-1:] == "/": 534 endpoint = endpoint[:-1] 535 536 self._endpoint = endpoint 537 538 539class StreamIterator: 540 """A class that mixes a ContextManager and an Iterator so we can detected retryable errors.""" 541 542 def __init__(self, http_client: HTTPClient, headers: Dict[str, str], 543 endpoint: str, max_attempts: int, max_backoff: int, 544 opts: StreamOptions, source: EventSource): 545 self._http_client = http_client 546 self._headers = headers 547 self._endpoint = endpoint 548 self._max_attempts = max_attempts 549 self._max_backoff = max_backoff 550 self._opts = opts 551 self._source = source 552 self._stream = None 553 self.last_ts = None 554 self.last_cursor = None 555 self._ctx = self._create_stream() 556 557 if opts.start_ts is not None and opts.cursor is not None: 558 err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the StreamOptions." 559 raise TypeError(err_msg) 560 561 def __enter__(self): 562 return self 563 564 def __exit__(self, exc_type, exc_value, exc_traceback): 565 if self._stream is not None: 566 self._stream.close() 567 568 self._ctx.__exit__(exc_type, exc_value, exc_traceback) 569 return False 570 571 def __iter__(self): 572 return self 573 574 def __next__(self): 575 if self._opts.max_attempts is not None: 576 max_attempts = self._opts.max_attempts 577 else: 578 max_attempts = self._max_attempts 579 580 if self._opts.max_backoff is not None: 581 max_backoff = self._opts.max_backoff 582 else: 583 max_backoff = self._max_backoff 584 585 retryable = Retryable[Any](max_attempts, max_backoff, self._next_element) 586 return retryable.run().response 587 588 def _next_element(self): 589 try: 590 if self._stream is None: 591 try: 592 self._stream = self._ctx.__enter__() 593 except Exception: 594 self._retry_stream() 595 596 if self._stream is not None: 597 event: Any = FaunaDecoder.decode(next(self._stream)) 598 599 if event["type"] == "error": 600 FaunaError.parse_error_and_throw(event, 400) 601 602 self.last_ts = event["txn_ts"] 603 self.last_cursor = event.get('cursor') 604 605 if event["type"] == "start": 606 return self._next_element() 607 608 if not self._opts.status_events and event["type"] == "status": 609 return self._next_element() 610 611 return event 612 613 raise StopIteration 614 except NetworkError: 615 self._retry_stream() 616 617 def _retry_stream(self): 618 if self._stream is not None: 619 self._stream.close() 620 621 self._stream = None 622 623 try: 624 self._ctx = self._create_stream() 625 except Exception: 626 pass 627 raise RetryableFaunaException 628 629 def _create_stream(self): 630 data: Dict[str, Any] = {"token": self._source.token} 631 if self.last_cursor is not None: 632 data["cursor"] = self.last_cursor 633 elif self._opts.cursor is not None: 634 data["cursor"] = self._opts.cursor 635 elif self._opts.start_ts is not None: 636 data["start_ts"] = self._opts.start_ts 637 638 return self._http_client.stream( 639 url=self._endpoint, headers=self._headers, data=data) 640 641 def close(self): 642 if self._stream is not None: 643 self._stream.close() 644 645 646class FeedPage: 647 648 def __init__(self, events: List[Any], cursor: str, stats: QueryStats): 649 self._events = events 650 self.cursor = cursor 651 self.stats = stats 652 653 def __len__(self): 654 return len(self._events) 655 656 def __iter__(self) -> Iterator[Any]: 657 for event in self._events: 658 if event["type"] == "error": 659 FaunaError.parse_error_and_throw(event, 400) 660 yield event 661 662 663class FeedIterator: 664 """A class to provide an iterator on top of Event Feed pages.""" 665 666 def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str, 667 max_attempts: int, max_backoff: int, opts: FeedOptions, 668 source: EventSource): 669 self._http = http 670 self._headers = headers 671 self._endpoint = endpoint 672 self._max_attempts = opts.max_attempts or max_attempts 673 self._max_backoff = opts.max_backoff or max_backoff 674 self._request: Dict[str, Any] = {"token": source.token} 675 self._is_done = False 676 677 if opts.start_ts is not None and opts.cursor is not None: 678 err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the FeedOptions." 679 raise TypeError(err_msg) 680 681 if opts.page_size is not None: 682 self._request["page_size"] = opts.page_size 683 684 if opts.cursor is not None: 685 self._request["cursor"] = opts.cursor 686 elif opts.start_ts is not None: 687 self._request["start_ts"] = opts.start_ts 688 689 def __iter__(self) -> Iterator[FeedPage]: 690 self._is_done = False 691 return self 692 693 def __next__(self) -> FeedPage: 694 if self._is_done: 695 raise StopIteration 696 697 retryable = Retryable[Any](self._max_attempts, self._max_backoff, 698 self._next_page) 699 return retryable.run().response 700 701 def _next_page(self) -> FeedPage: 702 with self._http.request( 703 method="POST", 704 url=self._endpoint, 705 headers=self._headers, 706 data=self._request, 707 ) as response: 708 status_code = response.status_code() 709 decoded: Any = FaunaDecoder.decode(response.json()) 710 711 if status_code > 399: 712 FaunaError.parse_error_and_throw(decoded, status_code) 713 714 self._is_done = not decoded["has_next"] 715 self._request["cursor"] = decoded["cursor"] 716 717 if "start_ts" in self._request: 718 del self._request["start_ts"] 719 720 return FeedPage(decoded["events"], decoded["cursor"], 721 QueryStats(decoded["stats"])) 722 723 def flatten(self) -> Iterator: 724 """A generator that yields events instead of pages of events.""" 725 for page in self: 726 for event in page: 727 yield event 728 729 730class QueryIterator: 731 """A class to provider an iterator on top of Fauna queries.""" 732 733 def __init__(self, 734 client: Client, 735 fql: Query, 736 opts: Optional[QueryOptions] = None): 737 """Initializes the QueryIterator 738 739 :param fql: A Query 740 :param opts: (Optional) Query Options 741 742 :raises TypeError: Invalid param types 743 """ 744 if not isinstance(client, Client): 745 err_msg = f"'client' must be a Client but was a {type(client)}. You can build a " \ 746 f"Client by calling fauna.client.Client()" 747 raise TypeError(err_msg) 748 749 if not isinstance(fql, Query): 750 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 751 f"Query by calling fauna.fql()" 752 raise TypeError(err_msg) 753 754 self.client = client 755 self.fql = fql 756 self.opts = opts 757 758 def __iter__(self) -> Iterator: 759 return self.iter() 760 761 def iter(self) -> Iterator: 762 """ 763 A generator function that immediately fetches and yields the results of 764 the stored query. Yields additional pages on subsequent iterations if 765 they exist 766 """ 767 768 cursor = None 769 initial_response = self.client.query(self.fql, self.opts) 770 771 if isinstance(initial_response.data, Page): 772 cursor = initial_response.data.after 773 yield initial_response.data.data 774 775 while cursor is not None: 776 next_response = self.client.query( 777 fql("Set.paginate(${after})", after=cursor), self.opts) 778 # TODO: `Set.paginate` does not yet return a `@set` tagged value 779 # so we will get back a plain object that might not have 780 # an after property. 781 cursor = next_response.data.get("after") 782 yield next_response.data.get("data") 783 784 else: 785 yield [initial_response.data] 786 787 def flatten(self) -> Iterator: 788 """ 789 A generator function that immediately fetches and yields the results of 790 the stored query. Yields each item individually, rather than a whole 791 Page at a time. Fetches additional pages as required if they exist. 792 """ 793 794 for page in self.iter(): 795 for item in page: 796 yield item
31@dataclass 32class QueryOptions: 33 """ 34 A dataclass representing options available for a query. 35 36 * linearized - If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized. 37 * max_contention_retries - The max number of times to retry the query if contention is encountered. 38 * query_timeout - Controls the maximum amount of time Fauna will execute your query before marking it failed. 39 * query_tags - Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ 40 * traceparent - A traceparent to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ Must match format: https://www.w3.org/TR/trace-context/#traceparent-header 41 * typecheck - Enable or disable typechecking of the query before evaluation. If not set, the value configured on the Client will be used. If neither is set, Fauna will use the value of the "typechecked" flag on the database configuration. 42 * additional_headers - Add/update HTTP request headers for the query. In general, this should not be necessary. 43 """ 44 45 linearized: Optional[bool] = None 46 max_contention_retries: Optional[int] = None 47 query_timeout: Optional[timedelta] = DefaultQueryTimeout 48 query_tags: Optional[Mapping[str, str]] = None 49 traceparent: Optional[str] = None 50 typecheck: Optional[bool] = None 51 additional_headers: Optional[Dict[str, str]] = None
A dataclass representing options available for a query.
- linearized - If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
- max_contention_retries - The max number of times to retry the query if contention is encountered.
- query_timeout - Controls the maximum amount of time Fauna will execute your query before marking it failed.
- query_tags - Tags to associate with the query. See logging
- traceparent - A traceparent to associate with the query. See logging Must match format: https://www.w3.org/TR/trace-context/#traceparent-header
- typecheck - Enable or disable typechecking of the query before evaluation. If not set, the value configured on the Client will be used. If neither is set, Fauna will use the value of the "typechecked" flag on the database configuration.
- additional_headers - Add/update HTTP request headers for the query. In general, this should not be necessary.
54@dataclass 55class StreamOptions: 56 """ 57 A dataclass representing options available for a stream. 58 59 * max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown. 60 * max_backoff - The maximum backoff in seconds for an individual retry. 61 * start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after 62 the timestamp. 63 * cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor. 64 * status_events - Indicates if stream should include status events. Status events are periodic events that 65 update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics 66 about the cost of maintaining the stream other than the cost of the received events. 67 """ 68 69 max_attempts: Optional[int] = None 70 max_backoff: Optional[int] = None 71 start_ts: Optional[int] = None 72 cursor: Optional[str] = None 73 status_events: bool = False
A dataclass representing options available for a stream.
- max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown.
- max_backoff - The maximum backoff in seconds for an individual retry.
- start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after the timestamp.
- cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
- status_events - Indicates if stream should include status events. Status events are periodic events that update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics about the cost of maintaining the stream other than the cost of the received events.
76@dataclass 77class FeedOptions: 78 """ 79 A dataclass representing options available for an Event Feed. 80 81 * max_attempts - The maximum number of times to attempt an Event Feed query when a retryable exception is thrown. 82 * max_backoff - The maximum backoff in seconds for an individual retry. 83 * query_timeout - Controls the maximum amount of time Fauna will execute a query before returning a page of events. 84 * start_ts - The starting timestamp of the Event Feed, exclusive. If set, Fauna will return events starting after 85 the timestamp. 86 * cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor. 87 * page_size - Maximum number of events returned per page. Must be in the 88 range 1 to 16000 (inclusive). Defaults to 16. 89 """ 90 max_attempts: Optional[int] = None 91 max_backoff: Optional[int] = None 92 query_timeout: Optional[timedelta] = None 93 page_size: Optional[int] = None 94 start_ts: Optional[int] = None 95 cursor: Optional[str] = None
A dataclass representing options available for an Event Feed.
- max_attempts - The maximum number of times to attempt an Event Feed query when a retryable exception is thrown.
- max_backoff - The maximum backoff in seconds for an individual retry.
- query_timeout - Controls the maximum amount of time Fauna will execute a query before returning a page of events.
- start_ts - The starting timestamp of the Event Feed, exclusive. If set, Fauna will return events starting after the timestamp.
- cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
- page_size - Maximum number of events returned per page. Must be in the range 1 to 16000 (inclusive). Defaults to 16.
98class Client: 99 100 def __init__( 101 self, 102 endpoint: Optional[str] = None, 103 secret: Optional[str] = None, 104 http_client: Optional[HTTPClient] = None, 105 query_tags: Optional[Mapping[str, str]] = None, 106 linearized: Optional[bool] = None, 107 max_contention_retries: Optional[int] = None, 108 typecheck: Optional[bool] = None, 109 additional_headers: Optional[Dict[str, str]] = None, 110 query_timeout: Optional[timedelta] = DefaultQueryTimeout, 111 client_buffer_timeout: Optional[timedelta] = DefaultClientBufferTimeout, 112 http_read_timeout: Optional[timedelta] = DefaultHttpReadTimeout, 113 http_write_timeout: Optional[timedelta] = DefaultHttpWriteTimeout, 114 http_connect_timeout: Optional[timedelta] = DefaultHttpConnectTimeout, 115 http_pool_timeout: Optional[timedelta] = DefaultHttpPoolTimeout, 116 http_idle_timeout: Optional[timedelta] = DefaultIdleConnectionTimeout, 117 max_attempts: int = 3, 118 max_backoff: int = 20, 119 ): 120 """Initializes a Client. 121 122 :param endpoint: The Fauna Endpoint to use. Defaults to https://db.fauna.com, or the `FAUNA_ENDPOINT` env variable. 123 :param secret: The Fauna Secret to use. Defaults to empty, or the `FAUNA_SECRET` env variable. 124 :param http_client: An :class:`HTTPClient` implementation. Defaults to a global :class:`HTTPXClient`. 125 :param query_tags: Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ 126 :param linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized. 127 :param max_contention_retries: The max number of times to retry the query if contention is encountered. 128 :param typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration. 129 :param additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary. 130 :param query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is :py:data:`DefaultQueryTimeout`. 131 :param client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is :py:data:`DefaultClientBufferTimeout`, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error. 132 :param http_read_timeout: Set HTTP Read timeout, default is :py:data:`DefaultHttpReadTimeout`. 133 :param http_write_timeout: Set HTTP Write timeout, default is :py:data:`DefaultHttpWriteTimeout`. 134 :param http_connect_timeout: Set HTTP Connect timeout, default is :py:data:`DefaultHttpConnectTimeout`. 135 :param http_pool_timeout: Set HTTP Pool timeout, default is :py:data:`DefaultHttpPoolTimeout`. 136 :param http_idle_timeout: Set HTTP Idle timeout, default is :py:data:`DefaultIdleConnectionTimeout`. 137 :param max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3. 138 :param max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20. 139 """ 140 141 self._set_endpoint(endpoint) 142 self._max_attempts = max_attempts 143 self._max_backoff = max_backoff 144 145 if secret is None: 146 self._auth = _Auth(_Environment.EnvFaunaSecret()) 147 else: 148 self._auth = _Auth(secret) 149 150 self._last_txn_ts = LastTxnTs() 151 152 self._query_tags = {} 153 if query_tags is not None: 154 self._query_tags.update(query_tags) 155 156 if query_timeout is not None: 157 self._query_timeout_ms = int(query_timeout.total_seconds() * 1000) 158 else: 159 self._query_timeout_ms = None 160 161 self._headers: Dict[str, str] = { 162 _Header.AcceptEncoding: "gzip", 163 _Header.ContentType: "application/json;charset=utf-8", 164 _Header.Driver: "python", 165 _Header.DriverEnv: str(_DriverEnvironment()), 166 } 167 168 if typecheck is not None: 169 self._headers[Header.Typecheck] = str(typecheck).lower() 170 171 if linearized is not None: 172 self._headers[Header.Linearized] = str(linearized).lower() 173 174 if max_contention_retries is not None and max_contention_retries > 0: 175 self._headers[Header.MaxContentionRetries] = \ 176 f"{max_contention_retries}" 177 178 if additional_headers is not None: 179 self._headers = { 180 **self._headers, 181 **additional_headers, 182 } 183 184 self._session: HTTPClient 185 186 if http_client is not None: 187 self._session = http_client 188 else: 189 if fauna.global_http_client is None: 190 timeout_s: Optional[float] = None 191 if query_timeout is not None and client_buffer_timeout is not None: 192 timeout_s = (query_timeout + client_buffer_timeout).total_seconds() 193 read_timeout_s: Optional[float] = None 194 if http_read_timeout is not None: 195 read_timeout_s = http_read_timeout.total_seconds() 196 197 write_timeout_s: Optional[float] = http_write_timeout.total_seconds( 198 ) if http_write_timeout is not None else None 199 connect_timeout_s: Optional[float] = http_connect_timeout.total_seconds( 200 ) if http_connect_timeout is not None else None 201 pool_timeout_s: Optional[float] = http_pool_timeout.total_seconds( 202 ) if http_pool_timeout is not None else None 203 idle_timeout_s: Optional[float] = http_idle_timeout.total_seconds( 204 ) if http_idle_timeout is not None else None 205 206 import httpx 207 from fauna.http.httpx_client import HTTPXClient 208 c = HTTPXClient( 209 httpx.Client( 210 http1=True, 211 http2=False, 212 timeout=httpx.Timeout( 213 timeout=timeout_s, 214 connect=connect_timeout_s, 215 read=read_timeout_s, 216 write=write_timeout_s, 217 pool=pool_timeout_s, 218 ), 219 limits=httpx.Limits( 220 max_connections=DefaultMaxConnections, 221 max_keepalive_connections=DefaultMaxIdleConnections, 222 keepalive_expiry=idle_timeout_s, 223 ), 224 ), logger) 225 fauna.global_http_client = c 226 227 self._session = fauna.global_http_client 228 229 def close(self): 230 self._session.close() 231 if self._session == fauna.global_http_client: 232 fauna.global_http_client = None 233 234 def set_last_txn_ts(self, txn_ts: int): 235 """ 236 Set the last timestamp seen by this client. 237 This has no effect if earlier than stored timestamp. 238 239 .. WARNING:: This should be used only when coordinating timestamps across 240 multiple clients. Moving the timestamp arbitrarily forward into 241 the future will cause transactions to stall. 242 243 :param txn_ts: the new transaction time. 244 """ 245 self._last_txn_ts.update_txn_time(txn_ts) 246 247 def get_last_txn_ts(self) -> Optional[int]: 248 """ 249 Get the last timestamp seen by this client. 250 :return: 251 """ 252 return self._last_txn_ts.time 253 254 def get_query_timeout(self) -> Optional[timedelta]: 255 """ 256 Get the query timeout for all queries. 257 """ 258 if self._query_timeout_ms is not None: 259 return timedelta(milliseconds=self._query_timeout_ms) 260 else: 261 return None 262 263 def paginate( 264 self, 265 fql: Query, 266 opts: Optional[QueryOptions] = None, 267 ) -> "QueryIterator": 268 """ 269 Run a query on Fauna and returning an iterator of results. If the query 270 returns a Page, the iterator will fetch additional Pages until the 271 after token is null. Each call for a page will be retried with exponential 272 backoff up to the max_attempts set in the client's retry policy in the 273 event of a 429 or 502. 274 275 :param fql: A Query 276 :param opts: (Optional) Query Options 277 278 :return: a :class:`QueryResponse` 279 280 :raises NetworkError: HTTP Request failed in transit 281 :raises ProtocolError: HTTP error not from Fauna 282 :raises ServiceError: Fauna returned an error 283 :raises ValueError: Encoding and decoding errors 284 :raises TypeError: Invalid param types 285 """ 286 287 if not isinstance(fql, Query): 288 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 289 f"Query by calling fauna.fql()" 290 raise TypeError(err_msg) 291 292 return QueryIterator(self, fql, opts) 293 294 def query( 295 self, 296 fql: Query, 297 opts: Optional[QueryOptions] = None, 298 ) -> QuerySuccess: 299 """ 300 Run a query on Fauna. A query will be retried max_attempt times with exponential backoff 301 up to the max_backoff in the event of a 429. 302 303 :param fql: A Query 304 :param opts: (Optional) Query Options 305 306 :return: a :class:`QueryResponse` 307 308 :raises NetworkError: HTTP Request failed in transit 309 :raises ProtocolError: HTTP error not from Fauna 310 :raises ServiceError: Fauna returned an error 311 :raises ValueError: Encoding and decoding errors 312 :raises TypeError: Invalid param types 313 """ 314 315 if not isinstance(fql, Query): 316 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 317 f"Query by calling fauna.fql()" 318 raise TypeError(err_msg) 319 320 try: 321 encoded_query: Mapping[str, Any] = FaunaEncoder.encode(fql) 322 except Exception as e: 323 raise ClientError("Failed to encode Query") from e 324 325 retryable = Retryable[QuerySuccess]( 326 self._max_attempts, 327 self._max_backoff, 328 self._query, 329 "/query/1", 330 fql=encoded_query, 331 opts=opts, 332 ) 333 334 r = retryable.run() 335 r.response.stats.attempts = r.attempts 336 return r.response 337 338 def _query( 339 self, 340 path: str, 341 fql: Mapping[str, Any], 342 arguments: Optional[Mapping[str, Any]] = None, 343 opts: Optional[QueryOptions] = None, 344 ) -> QuerySuccess: 345 346 headers = self._headers.copy() 347 headers[_Header.Format] = "tagged" 348 headers[_Header.Authorization] = self._auth.bearer() 349 350 if self._query_timeout_ms is not None: 351 headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms) 352 353 headers.update(self._last_txn_ts.request_header) 354 355 query_tags = {} 356 if self._query_tags is not None: 357 query_tags.update(self._query_tags) 358 359 if opts is not None: 360 if opts.linearized is not None: 361 headers[Header.Linearized] = str(opts.linearized).lower() 362 if opts.max_contention_retries is not None: 363 headers[Header.MaxContentionRetries] = \ 364 f"{opts.max_contention_retries}" 365 if opts.traceparent is not None: 366 headers[Header.Traceparent] = opts.traceparent 367 if opts.query_timeout is not None: 368 timeout_ms = f"{int(opts.query_timeout.total_seconds() * 1000)}" 369 headers[Header.QueryTimeoutMs] = timeout_ms 370 if opts.query_tags is not None: 371 query_tags.update(opts.query_tags) 372 if opts.typecheck is not None: 373 headers[Header.Typecheck] = str(opts.typecheck).lower() 374 if opts.additional_headers is not None: 375 headers.update(opts.additional_headers) 376 377 if len(query_tags) > 0: 378 headers[Header.Tags] = QueryTags.encode(query_tags) 379 380 data: dict[str, Any] = { 381 "query": fql, 382 "arguments": arguments or {}, 383 } 384 385 with self._session.request( 386 method="POST", 387 url=self._endpoint + path, 388 headers=headers, 389 data=data, 390 ) as response: 391 status_code = response.status_code() 392 response_json = response.json() 393 headers = response.headers() 394 395 self._check_protocol(response_json, status_code) 396 397 dec: Any = FaunaDecoder.decode(response_json) 398 399 if status_code > 399: 400 FaunaError.parse_error_and_throw(dec, status_code) 401 402 if "txn_ts" in dec: 403 self.set_last_txn_ts(int(response_json["txn_ts"])) 404 405 stats = QueryStats(dec["stats"]) if "stats" in dec else None 406 summary = dec["summary"] if "summary" in dec else None 407 query_tags = QueryTags.decode( 408 dec["query_tags"]) if "query_tags" in dec else None 409 txn_ts = dec["txn_ts"] if "txn_ts" in dec else None 410 schema_version = dec["schema_version"] if "schema_version" in dec else None 411 traceparent = headers.get("traceparent", None) 412 static_type = dec["static_type"] if "static_type" in dec else None 413 414 return QuerySuccess( 415 data=dec["data"], 416 query_tags=query_tags, 417 static_type=static_type, 418 stats=stats, 419 summary=summary, 420 traceparent=traceparent, 421 txn_ts=txn_ts, 422 schema_version=schema_version, 423 ) 424 425 def stream( 426 self, 427 fql: Union[EventSource, Query], 428 opts: StreamOptions = StreamOptions() 429 ) -> "StreamIterator": 430 """ 431 Opens a Stream in Fauna and returns an iterator that consume Fauna events. 432 433 :param fql: An EventSource or a Query that returns an EventSource. 434 :param opts: (Optional) Stream Options. 435 436 :return: a :class:`StreamIterator` 437 438 :raises ClientError: Invalid options provided 439 :raises NetworkError: HTTP Request failed in transit 440 :raises ProtocolError: HTTP error not from Fauna 441 :raises ServiceError: Fauna returned an error 442 :raises ValueError: Encoding and decoding errors 443 :raises TypeError: Invalid param types 444 """ 445 446 if isinstance(fql, Query): 447 if opts.cursor is not None: 448 raise ClientError( 449 "The 'cursor' configuration can only be used with an event source.") 450 451 source = self.query(fql).data 452 else: 453 source = fql 454 455 if not isinstance(source, EventSource): 456 err_msg = f"'fql' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}." 457 raise TypeError(err_msg) 458 459 headers = self._headers.copy() 460 headers[_Header.Format] = "tagged" 461 headers[_Header.Authorization] = self._auth.bearer() 462 463 return StreamIterator(self._session, headers, self._endpoint + "/stream/1", 464 self._max_attempts, self._max_backoff, opts, source) 465 466 def feed( 467 self, 468 source: Union[EventSource, Query], 469 opts: FeedOptions = FeedOptions(), 470 ) -> "FeedIterator": 471 """ 472 Opens an Event Feed in Fauna and returns an iterator that consume Fauna events. 473 474 :param source: An EventSource or a Query that returns an EventSource. 475 :param opts: (Optional) Event Feed options. 476 477 :return: a :class:`FeedIterator` 478 479 :raises ClientError: Invalid options provided 480 :raises NetworkError: HTTP Request failed in transit 481 :raises ProtocolError: HTTP error not from Fauna 482 :raises ServiceError: Fauna returned an error 483 :raises ValueError: Encoding and decoding errors 484 :raises TypeError: Invalid param types 485 """ 486 487 if isinstance(source, Query): 488 source = self.query(source).data 489 490 if not isinstance(source, EventSource): 491 err_msg = f"'source' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}." 492 raise TypeError(err_msg) 493 494 headers = self._headers.copy() 495 headers[_Header.Format] = "tagged" 496 headers[_Header.Authorization] = self._auth.bearer() 497 498 if opts.query_timeout is not None: 499 query_timeout_ms = int(opts.query_timeout.total_seconds() * 1000) 500 headers[Header.QueryTimeoutMs] = str(query_timeout_ms) 501 elif self._query_timeout_ms is not None: 502 headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms) 503 504 return FeedIterator(self._session, headers, self._endpoint + "/feed/1", 505 self._max_attempts, self._max_backoff, opts, source) 506 507 def _check_protocol(self, response_json: Any, status_code): 508 # TODO: Logic to validate wire protocol belongs elsewhere. 509 should_raise = False 510 511 # check for QuerySuccess 512 if status_code <= 399 and "data" not in response_json: 513 should_raise = True 514 515 # check for QueryFailure 516 if status_code > 399: 517 if "error" not in response_json: 518 should_raise = True 519 else: 520 e = response_json["error"] 521 if "code" not in e or "message" not in e: 522 should_raise = True 523 524 if should_raise: 525 raise ProtocolError( 526 status_code, 527 f"Response is in an unknown format: \n{response_json}", 528 ) 529 530 def _set_endpoint(self, endpoint): 531 if endpoint is None: 532 endpoint = _Environment.EnvFaunaEndpoint() 533 534 if endpoint[-1:] == "/": 535 endpoint = endpoint[:-1] 536 537 self._endpoint = endpoint
100 def __init__( 101 self, 102 endpoint: Optional[str] = None, 103 secret: Optional[str] = None, 104 http_client: Optional[HTTPClient] = None, 105 query_tags: Optional[Mapping[str, str]] = None, 106 linearized: Optional[bool] = None, 107 max_contention_retries: Optional[int] = None, 108 typecheck: Optional[bool] = None, 109 additional_headers: Optional[Dict[str, str]] = None, 110 query_timeout: Optional[timedelta] = DefaultQueryTimeout, 111 client_buffer_timeout: Optional[timedelta] = DefaultClientBufferTimeout, 112 http_read_timeout: Optional[timedelta] = DefaultHttpReadTimeout, 113 http_write_timeout: Optional[timedelta] = DefaultHttpWriteTimeout, 114 http_connect_timeout: Optional[timedelta] = DefaultHttpConnectTimeout, 115 http_pool_timeout: Optional[timedelta] = DefaultHttpPoolTimeout, 116 http_idle_timeout: Optional[timedelta] = DefaultIdleConnectionTimeout, 117 max_attempts: int = 3, 118 max_backoff: int = 20, 119 ): 120 """Initializes a Client. 121 122 :param endpoint: The Fauna Endpoint to use. Defaults to https://db.fauna.com, or the `FAUNA_ENDPOINT` env variable. 123 :param secret: The Fauna Secret to use. Defaults to empty, or the `FAUNA_SECRET` env variable. 124 :param http_client: An :class:`HTTPClient` implementation. Defaults to a global :class:`HTTPXClient`. 125 :param query_tags: Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ 126 :param linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized. 127 :param max_contention_retries: The max number of times to retry the query if contention is encountered. 128 :param typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration. 129 :param additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary. 130 :param query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is :py:data:`DefaultQueryTimeout`. 131 :param client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is :py:data:`DefaultClientBufferTimeout`, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error. 132 :param http_read_timeout: Set HTTP Read timeout, default is :py:data:`DefaultHttpReadTimeout`. 133 :param http_write_timeout: Set HTTP Write timeout, default is :py:data:`DefaultHttpWriteTimeout`. 134 :param http_connect_timeout: Set HTTP Connect timeout, default is :py:data:`DefaultHttpConnectTimeout`. 135 :param http_pool_timeout: Set HTTP Pool timeout, default is :py:data:`DefaultHttpPoolTimeout`. 136 :param http_idle_timeout: Set HTTP Idle timeout, default is :py:data:`DefaultIdleConnectionTimeout`. 137 :param max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3. 138 :param max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20. 139 """ 140 141 self._set_endpoint(endpoint) 142 self._max_attempts = max_attempts 143 self._max_backoff = max_backoff 144 145 if secret is None: 146 self._auth = _Auth(_Environment.EnvFaunaSecret()) 147 else: 148 self._auth = _Auth(secret) 149 150 self._last_txn_ts = LastTxnTs() 151 152 self._query_tags = {} 153 if query_tags is not None: 154 self._query_tags.update(query_tags) 155 156 if query_timeout is not None: 157 self._query_timeout_ms = int(query_timeout.total_seconds() * 1000) 158 else: 159 self._query_timeout_ms = None 160 161 self._headers: Dict[str, str] = { 162 _Header.AcceptEncoding: "gzip", 163 _Header.ContentType: "application/json;charset=utf-8", 164 _Header.Driver: "python", 165 _Header.DriverEnv: str(_DriverEnvironment()), 166 } 167 168 if typecheck is not None: 169 self._headers[Header.Typecheck] = str(typecheck).lower() 170 171 if linearized is not None: 172 self._headers[Header.Linearized] = str(linearized).lower() 173 174 if max_contention_retries is not None and max_contention_retries > 0: 175 self._headers[Header.MaxContentionRetries] = \ 176 f"{max_contention_retries}" 177 178 if additional_headers is not None: 179 self._headers = { 180 **self._headers, 181 **additional_headers, 182 } 183 184 self._session: HTTPClient 185 186 if http_client is not None: 187 self._session = http_client 188 else: 189 if fauna.global_http_client is None: 190 timeout_s: Optional[float] = None 191 if query_timeout is not None and client_buffer_timeout is not None: 192 timeout_s = (query_timeout + client_buffer_timeout).total_seconds() 193 read_timeout_s: Optional[float] = None 194 if http_read_timeout is not None: 195 read_timeout_s = http_read_timeout.total_seconds() 196 197 write_timeout_s: Optional[float] = http_write_timeout.total_seconds( 198 ) if http_write_timeout is not None else None 199 connect_timeout_s: Optional[float] = http_connect_timeout.total_seconds( 200 ) if http_connect_timeout is not None else None 201 pool_timeout_s: Optional[float] = http_pool_timeout.total_seconds( 202 ) if http_pool_timeout is not None else None 203 idle_timeout_s: Optional[float] = http_idle_timeout.total_seconds( 204 ) if http_idle_timeout is not None else None 205 206 import httpx 207 from fauna.http.httpx_client import HTTPXClient 208 c = HTTPXClient( 209 httpx.Client( 210 http1=True, 211 http2=False, 212 timeout=httpx.Timeout( 213 timeout=timeout_s, 214 connect=connect_timeout_s, 215 read=read_timeout_s, 216 write=write_timeout_s, 217 pool=pool_timeout_s, 218 ), 219 limits=httpx.Limits( 220 max_connections=DefaultMaxConnections, 221 max_keepalive_connections=DefaultMaxIdleConnections, 222 keepalive_expiry=idle_timeout_s, 223 ), 224 ), logger) 225 fauna.global_http_client = c 226 227 self._session = fauna.global_http_client
Initializes a Client.
Parameters
- endpoint: The Fauna Endpoint to use. Defaults to https: //db.fauna.com, or the
FAUNA_ENDPOINT
env variable. - secret: The Fauna Secret to use. Defaults to empty, or the
FAUNA_SECRET
env variable. - http_client: An
HTTPClient
implementation. Defaults to a globalHTTPXClient
. - **query_tags: Tags to associate with the query. See logging
- linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
- max_contention_retries: The max number of times to retry the query if contention is encountered.
- typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration.
- additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary.
- query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is
DefaultQueryTimeout
. - client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is
DefaultClientBufferTimeout
, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error. - http_read_timeout: Set HTTP Read timeout, default is
DefaultHttpReadTimeout
. - http_write_timeout: Set HTTP Write timeout, default is
DefaultHttpWriteTimeout
. - http_connect_timeout: Set HTTP Connect timeout, default is
DefaultHttpConnectTimeout
. - http_pool_timeout: Set HTTP Pool timeout, default is
DefaultHttpPoolTimeout
. - http_idle_timeout: Set HTTP Idle timeout, default is
DefaultIdleConnectionTimeout
. - max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3.
- max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20.
234 def set_last_txn_ts(self, txn_ts: int): 235 """ 236 Set the last timestamp seen by this client. 237 This has no effect if earlier than stored timestamp. 238 239 .. WARNING:: This should be used only when coordinating timestamps across 240 multiple clients. Moving the timestamp arbitrarily forward into 241 the future will cause transactions to stall. 242 243 :param txn_ts: the new transaction time. 244 """ 245 self._last_txn_ts.update_txn_time(txn_ts)
Set the last timestamp seen by this client. This has no effect if earlier than stored timestamp.
.. WARNING:: This should be used only when coordinating timestamps across multiple clients. Moving the timestamp arbitrarily forward into the future will cause transactions to stall.
Parameters
- txn_ts: the new transaction time.
247 def get_last_txn_ts(self) -> Optional[int]: 248 """ 249 Get the last timestamp seen by this client. 250 :return: 251 """ 252 return self._last_txn_ts.time
Get the last timestamp seen by this client.
Returns
254 def get_query_timeout(self) -> Optional[timedelta]: 255 """ 256 Get the query timeout for all queries. 257 """ 258 if self._query_timeout_ms is not None: 259 return timedelta(milliseconds=self._query_timeout_ms) 260 else: 261 return None
Get the query timeout for all queries.
263 def paginate( 264 self, 265 fql: Query, 266 opts: Optional[QueryOptions] = None, 267 ) -> "QueryIterator": 268 """ 269 Run a query on Fauna and returning an iterator of results. If the query 270 returns a Page, the iterator will fetch additional Pages until the 271 after token is null. Each call for a page will be retried with exponential 272 backoff up to the max_attempts set in the client's retry policy in the 273 event of a 429 or 502. 274 275 :param fql: A Query 276 :param opts: (Optional) Query Options 277 278 :return: a :class:`QueryResponse` 279 280 :raises NetworkError: HTTP Request failed in transit 281 :raises ProtocolError: HTTP error not from Fauna 282 :raises ServiceError: Fauna returned an error 283 :raises ValueError: Encoding and decoding errors 284 :raises TypeError: Invalid param types 285 """ 286 287 if not isinstance(fql, Query): 288 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 289 f"Query by calling fauna.fql()" 290 raise TypeError(err_msg) 291 292 return QueryIterator(self, fql, opts)
Run a query on Fauna and returning an iterator of results. If the query returns a Page, the iterator will fetch additional Pages until the after token is null. Each call for a page will be retried with exponential backoff up to the max_attempts set in the client's retry policy in the event of a 429 or 502.
Parameters
- fql: A Query
- opts: (Optional) Query Options
Returns
a
QueryResponse
Raises
- NetworkError: HTTP Request failed in transit
- ProtocolError: HTTP error not from Fauna
- ServiceError: Fauna returned an error
- ValueError: Encoding and decoding errors
- TypeError: Invalid param types
294 def query( 295 self, 296 fql: Query, 297 opts: Optional[QueryOptions] = None, 298 ) -> QuerySuccess: 299 """ 300 Run a query on Fauna. A query will be retried max_attempt times with exponential backoff 301 up to the max_backoff in the event of a 429. 302 303 :param fql: A Query 304 :param opts: (Optional) Query Options 305 306 :return: a :class:`QueryResponse` 307 308 :raises NetworkError: HTTP Request failed in transit 309 :raises ProtocolError: HTTP error not from Fauna 310 :raises ServiceError: Fauna returned an error 311 :raises ValueError: Encoding and decoding errors 312 :raises TypeError: Invalid param types 313 """ 314 315 if not isinstance(fql, Query): 316 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 317 f"Query by calling fauna.fql()" 318 raise TypeError(err_msg) 319 320 try: 321 encoded_query: Mapping[str, Any] = FaunaEncoder.encode(fql) 322 except Exception as e: 323 raise ClientError("Failed to encode Query") from e 324 325 retryable = Retryable[QuerySuccess]( 326 self._max_attempts, 327 self._max_backoff, 328 self._query, 329 "/query/1", 330 fql=encoded_query, 331 opts=opts, 332 ) 333 334 r = retryable.run() 335 r.response.stats.attempts = r.attempts 336 return r.response
Run a query on Fauna. A query will be retried max_attempt times with exponential backoff up to the max_backoff in the event of a 429.
Parameters
- fql: A Query
- opts: (Optional) Query Options
Returns
a
QueryResponse
Raises
- NetworkError: HTTP Request failed in transit
- ProtocolError: HTTP error not from Fauna
- ServiceError: Fauna returned an error
- ValueError: Encoding and decoding errors
- TypeError: Invalid param types
425 def stream( 426 self, 427 fql: Union[EventSource, Query], 428 opts: StreamOptions = StreamOptions() 429 ) -> "StreamIterator": 430 """ 431 Opens a Stream in Fauna and returns an iterator that consume Fauna events. 432 433 :param fql: An EventSource or a Query that returns an EventSource. 434 :param opts: (Optional) Stream Options. 435 436 :return: a :class:`StreamIterator` 437 438 :raises ClientError: Invalid options provided 439 :raises NetworkError: HTTP Request failed in transit 440 :raises ProtocolError: HTTP error not from Fauna 441 :raises ServiceError: Fauna returned an error 442 :raises ValueError: Encoding and decoding errors 443 :raises TypeError: Invalid param types 444 """ 445 446 if isinstance(fql, Query): 447 if opts.cursor is not None: 448 raise ClientError( 449 "The 'cursor' configuration can only be used with an event source.") 450 451 source = self.query(fql).data 452 else: 453 source = fql 454 455 if not isinstance(source, EventSource): 456 err_msg = f"'fql' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}." 457 raise TypeError(err_msg) 458 459 headers = self._headers.copy() 460 headers[_Header.Format] = "tagged" 461 headers[_Header.Authorization] = self._auth.bearer() 462 463 return StreamIterator(self._session, headers, self._endpoint + "/stream/1", 464 self._max_attempts, self._max_backoff, opts, source)
Opens a Stream in Fauna and returns an iterator that consume Fauna events.
Parameters
- fql: An EventSource or a Query that returns an EventSource.
- opts: (Optional) Stream Options.
Returns
Raises
- ClientError: Invalid options provided
- NetworkError: HTTP Request failed in transit
- ProtocolError: HTTP error not from Fauna
- ServiceError: Fauna returned an error
- ValueError: Encoding and decoding errors
- TypeError: Invalid param types
466 def feed( 467 self, 468 source: Union[EventSource, Query], 469 opts: FeedOptions = FeedOptions(), 470 ) -> "FeedIterator": 471 """ 472 Opens an Event Feed in Fauna and returns an iterator that consume Fauna events. 473 474 :param source: An EventSource or a Query that returns an EventSource. 475 :param opts: (Optional) Event Feed options. 476 477 :return: a :class:`FeedIterator` 478 479 :raises ClientError: Invalid options provided 480 :raises NetworkError: HTTP Request failed in transit 481 :raises ProtocolError: HTTP error not from Fauna 482 :raises ServiceError: Fauna returned an error 483 :raises ValueError: Encoding and decoding errors 484 :raises TypeError: Invalid param types 485 """ 486 487 if isinstance(source, Query): 488 source = self.query(source).data 489 490 if not isinstance(source, EventSource): 491 err_msg = f"'source' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}." 492 raise TypeError(err_msg) 493 494 headers = self._headers.copy() 495 headers[_Header.Format] = "tagged" 496 headers[_Header.Authorization] = self._auth.bearer() 497 498 if opts.query_timeout is not None: 499 query_timeout_ms = int(opts.query_timeout.total_seconds() * 1000) 500 headers[Header.QueryTimeoutMs] = str(query_timeout_ms) 501 elif self._query_timeout_ms is not None: 502 headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms) 503 504 return FeedIterator(self._session, headers, self._endpoint + "/feed/1", 505 self._max_attempts, self._max_backoff, opts, source)
Opens an Event Feed in Fauna and returns an iterator that consume Fauna events.
Parameters
- source: An EventSource or a Query that returns an EventSource.
- opts: (Optional) Event Feed options.
Returns
Raises
- ClientError: Invalid options provided
- NetworkError: HTTP Request failed in transit
- ProtocolError: HTTP error not from Fauna
- ServiceError: Fauna returned an error
- ValueError: Encoding and decoding errors
- TypeError: Invalid param types
540class StreamIterator: 541 """A class that mixes a ContextManager and an Iterator so we can detected retryable errors.""" 542 543 def __init__(self, http_client: HTTPClient, headers: Dict[str, str], 544 endpoint: str, max_attempts: int, max_backoff: int, 545 opts: StreamOptions, source: EventSource): 546 self._http_client = http_client 547 self._headers = headers 548 self._endpoint = endpoint 549 self._max_attempts = max_attempts 550 self._max_backoff = max_backoff 551 self._opts = opts 552 self._source = source 553 self._stream = None 554 self.last_ts = None 555 self.last_cursor = None 556 self._ctx = self._create_stream() 557 558 if opts.start_ts is not None and opts.cursor is not None: 559 err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the StreamOptions." 560 raise TypeError(err_msg) 561 562 def __enter__(self): 563 return self 564 565 def __exit__(self, exc_type, exc_value, exc_traceback): 566 if self._stream is not None: 567 self._stream.close() 568 569 self._ctx.__exit__(exc_type, exc_value, exc_traceback) 570 return False 571 572 def __iter__(self): 573 return self 574 575 def __next__(self): 576 if self._opts.max_attempts is not None: 577 max_attempts = self._opts.max_attempts 578 else: 579 max_attempts = self._max_attempts 580 581 if self._opts.max_backoff is not None: 582 max_backoff = self._opts.max_backoff 583 else: 584 max_backoff = self._max_backoff 585 586 retryable = Retryable[Any](max_attempts, max_backoff, self._next_element) 587 return retryable.run().response 588 589 def _next_element(self): 590 try: 591 if self._stream is None: 592 try: 593 self._stream = self._ctx.__enter__() 594 except Exception: 595 self._retry_stream() 596 597 if self._stream is not None: 598 event: Any = FaunaDecoder.decode(next(self._stream)) 599 600 if event["type"] == "error": 601 FaunaError.parse_error_and_throw(event, 400) 602 603 self.last_ts = event["txn_ts"] 604 self.last_cursor = event.get('cursor') 605 606 if event["type"] == "start": 607 return self._next_element() 608 609 if not self._opts.status_events and event["type"] == "status": 610 return self._next_element() 611 612 return event 613 614 raise StopIteration 615 except NetworkError: 616 self._retry_stream() 617 618 def _retry_stream(self): 619 if self._stream is not None: 620 self._stream.close() 621 622 self._stream = None 623 624 try: 625 self._ctx = self._create_stream() 626 except Exception: 627 pass 628 raise RetryableFaunaException 629 630 def _create_stream(self): 631 data: Dict[str, Any] = {"token": self._source.token} 632 if self.last_cursor is not None: 633 data["cursor"] = self.last_cursor 634 elif self._opts.cursor is not None: 635 data["cursor"] = self._opts.cursor 636 elif self._opts.start_ts is not None: 637 data["start_ts"] = self._opts.start_ts 638 639 return self._http_client.stream( 640 url=self._endpoint, headers=self._headers, data=data) 641 642 def close(self): 643 if self._stream is not None: 644 self._stream.close()
A class that mixes a ContextManager and an Iterator so we can detected retryable errors.
543 def __init__(self, http_client: HTTPClient, headers: Dict[str, str], 544 endpoint: str, max_attempts: int, max_backoff: int, 545 opts: StreamOptions, source: EventSource): 546 self._http_client = http_client 547 self._headers = headers 548 self._endpoint = endpoint 549 self._max_attempts = max_attempts 550 self._max_backoff = max_backoff 551 self._opts = opts 552 self._source = source 553 self._stream = None 554 self.last_ts = None 555 self.last_cursor = None 556 self._ctx = self._create_stream() 557 558 if opts.start_ts is not None and opts.cursor is not None: 559 err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the StreamOptions." 560 raise TypeError(err_msg)
647class FeedPage: 648 649 def __init__(self, events: List[Any], cursor: str, stats: QueryStats): 650 self._events = events 651 self.cursor = cursor 652 self.stats = stats 653 654 def __len__(self): 655 return len(self._events) 656 657 def __iter__(self) -> Iterator[Any]: 658 for event in self._events: 659 if event["type"] == "error": 660 FaunaError.parse_error_and_throw(event, 400) 661 yield event
664class FeedIterator: 665 """A class to provide an iterator on top of Event Feed pages.""" 666 667 def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str, 668 max_attempts: int, max_backoff: int, opts: FeedOptions, 669 source: EventSource): 670 self._http = http 671 self._headers = headers 672 self._endpoint = endpoint 673 self._max_attempts = opts.max_attempts or max_attempts 674 self._max_backoff = opts.max_backoff or max_backoff 675 self._request: Dict[str, Any] = {"token": source.token} 676 self._is_done = False 677 678 if opts.start_ts is not None and opts.cursor is not None: 679 err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the FeedOptions." 680 raise TypeError(err_msg) 681 682 if opts.page_size is not None: 683 self._request["page_size"] = opts.page_size 684 685 if opts.cursor is not None: 686 self._request["cursor"] = opts.cursor 687 elif opts.start_ts is not None: 688 self._request["start_ts"] = opts.start_ts 689 690 def __iter__(self) -> Iterator[FeedPage]: 691 self._is_done = False 692 return self 693 694 def __next__(self) -> FeedPage: 695 if self._is_done: 696 raise StopIteration 697 698 retryable = Retryable[Any](self._max_attempts, self._max_backoff, 699 self._next_page) 700 return retryable.run().response 701 702 def _next_page(self) -> FeedPage: 703 with self._http.request( 704 method="POST", 705 url=self._endpoint, 706 headers=self._headers, 707 data=self._request, 708 ) as response: 709 status_code = response.status_code() 710 decoded: Any = FaunaDecoder.decode(response.json()) 711 712 if status_code > 399: 713 FaunaError.parse_error_and_throw(decoded, status_code) 714 715 self._is_done = not decoded["has_next"] 716 self._request["cursor"] = decoded["cursor"] 717 718 if "start_ts" in self._request: 719 del self._request["start_ts"] 720 721 return FeedPage(decoded["events"], decoded["cursor"], 722 QueryStats(decoded["stats"])) 723 724 def flatten(self) -> Iterator: 725 """A generator that yields events instead of pages of events.""" 726 for page in self: 727 for event in page: 728 yield event
A class to provide an iterator on top of Event Feed pages.
667 def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str, 668 max_attempts: int, max_backoff: int, opts: FeedOptions, 669 source: EventSource): 670 self._http = http 671 self._headers = headers 672 self._endpoint = endpoint 673 self._max_attempts = opts.max_attempts or max_attempts 674 self._max_backoff = opts.max_backoff or max_backoff 675 self._request: Dict[str, Any] = {"token": source.token} 676 self._is_done = False 677 678 if opts.start_ts is not None and opts.cursor is not None: 679 err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the FeedOptions." 680 raise TypeError(err_msg) 681 682 if opts.page_size is not None: 683 self._request["page_size"] = opts.page_size 684 685 if opts.cursor is not None: 686 self._request["cursor"] = opts.cursor 687 elif opts.start_ts is not None: 688 self._request["start_ts"] = opts.start_ts
731class QueryIterator: 732 """A class to provider an iterator on top of Fauna queries.""" 733 734 def __init__(self, 735 client: Client, 736 fql: Query, 737 opts: Optional[QueryOptions] = None): 738 """Initializes the QueryIterator 739 740 :param fql: A Query 741 :param opts: (Optional) Query Options 742 743 :raises TypeError: Invalid param types 744 """ 745 if not isinstance(client, Client): 746 err_msg = f"'client' must be a Client but was a {type(client)}. You can build a " \ 747 f"Client by calling fauna.client.Client()" 748 raise TypeError(err_msg) 749 750 if not isinstance(fql, Query): 751 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 752 f"Query by calling fauna.fql()" 753 raise TypeError(err_msg) 754 755 self.client = client 756 self.fql = fql 757 self.opts = opts 758 759 def __iter__(self) -> Iterator: 760 return self.iter() 761 762 def iter(self) -> Iterator: 763 """ 764 A generator function that immediately fetches and yields the results of 765 the stored query. Yields additional pages on subsequent iterations if 766 they exist 767 """ 768 769 cursor = None 770 initial_response = self.client.query(self.fql, self.opts) 771 772 if isinstance(initial_response.data, Page): 773 cursor = initial_response.data.after 774 yield initial_response.data.data 775 776 while cursor is not None: 777 next_response = self.client.query( 778 fql("Set.paginate(${after})", after=cursor), self.opts) 779 # TODO: `Set.paginate` does not yet return a `@set` tagged value 780 # so we will get back a plain object that might not have 781 # an after property. 782 cursor = next_response.data.get("after") 783 yield next_response.data.get("data") 784 785 else: 786 yield [initial_response.data] 787 788 def flatten(self) -> Iterator: 789 """ 790 A generator function that immediately fetches and yields the results of 791 the stored query. Yields each item individually, rather than a whole 792 Page at a time. Fetches additional pages as required if they exist. 793 """ 794 795 for page in self.iter(): 796 for item in page: 797 yield item
A class to provider an iterator on top of Fauna queries.
734 def __init__(self, 735 client: Client, 736 fql: Query, 737 opts: Optional[QueryOptions] = None): 738 """Initializes the QueryIterator 739 740 :param fql: A Query 741 :param opts: (Optional) Query Options 742 743 :raises TypeError: Invalid param types 744 """ 745 if not isinstance(client, Client): 746 err_msg = f"'client' must be a Client but was a {type(client)}. You can build a " \ 747 f"Client by calling fauna.client.Client()" 748 raise TypeError(err_msg) 749 750 if not isinstance(fql, Query): 751 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 752 f"Query by calling fauna.fql()" 753 raise TypeError(err_msg) 754 755 self.client = client 756 self.fql = fql 757 self.opts = opts
Initializes the QueryIterator
Parameters
- fql: A Query
- opts: (Optional) Query Options
Raises
- TypeError: Invalid param types
762 def iter(self) -> Iterator: 763 """ 764 A generator function that immediately fetches and yields the results of 765 the stored query. Yields additional pages on subsequent iterations if 766 they exist 767 """ 768 769 cursor = None 770 initial_response = self.client.query(self.fql, self.opts) 771 772 if isinstance(initial_response.data, Page): 773 cursor = initial_response.data.after 774 yield initial_response.data.data 775 776 while cursor is not None: 777 next_response = self.client.query( 778 fql("Set.paginate(${after})", after=cursor), self.opts) 779 # TODO: `Set.paginate` does not yet return a `@set` tagged value 780 # so we will get back a plain object that might not have 781 # an after property. 782 cursor = next_response.data.get("after") 783 yield next_response.data.get("data") 784 785 else: 786 yield [initial_response.data]
A generator function that immediately fetches and yields the results of the stored query. Yields additional pages on subsequent iterations if they exist
788 def flatten(self) -> Iterator: 789 """ 790 A generator function that immediately fetches and yields the results of 791 the stored query. Yields each item individually, rather than a whole 792 Page at a time. Fetches additional pages as required if they exist. 793 """ 794 795 for page in self.iter(): 796 for item in page: 797 yield item
A generator function that immediately fetches and yields the results of the stored query. Yields each item individually, rather than a whole Page at a time. Fetches additional pages as required if they exist.