fauna.client.client
1import logging 2from dataclasses import dataclass 3from datetime import timedelta 4from typing import Any, Dict, Iterator, Mapping, Optional, Union, List 5 6import fauna 7from fauna.client.headers import _DriverEnvironment, _Header, _Auth, Header 8from fauna.client.retryable import Retryable 9from fauna.client.utils import _Environment, LastTxnTs 10from fauna.encoding import FaunaEncoder, FaunaDecoder 11from fauna.encoding import QuerySuccess, QueryTags, QueryStats 12from fauna.errors import FaunaError, ClientError, ProtocolError, \ 13 RetryableFaunaException, NetworkError 14from fauna.http.http_client import HTTPClient 15from fauna.query import EventSource, Query, Page, fql 16 17logger = logging.getLogger("fauna") 18 19DefaultHttpConnectTimeout = timedelta(seconds=5) 20DefaultHttpReadTimeout: Optional[timedelta] = None 21DefaultHttpWriteTimeout = timedelta(seconds=5) 22DefaultHttpPoolTimeout = timedelta(seconds=5) 23DefaultIdleConnectionTimeout = timedelta(seconds=5) 24DefaultQueryTimeout = timedelta(seconds=5) 25DefaultClientBufferTimeout = timedelta(seconds=5) 26DefaultMaxConnections = 20 27DefaultMaxIdleConnections = 20 28 29 30@dataclass 31class QueryOptions: 32 """ 33 A dataclass representing options available for a query. 34 35 * linearized - If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized. 36 * max_contention_retries - The max number of times to retry the query if contention is encountered. 37 * query_timeout - Controls the maximum amount of time Fauna will execute your query before marking it failed. 38 * query_tags - Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ 39 * traceparent - A traceparent to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ Must match format: https://www.w3.org/TR/trace-context/#traceparent-header 40 * typecheck - Enable or disable typechecking of the query before evaluation. If not set, the value configured on the Client will be used. If neither is set, Fauna will use the value of the "typechecked" flag on the database configuration. 41 * additional_headers - Add/update HTTP request headers for the query. In general, this should not be necessary. 42 """ 43 44 linearized: Optional[bool] = None 45 max_contention_retries: Optional[int] = None 46 query_timeout: Optional[timedelta] = DefaultQueryTimeout 47 query_tags: Optional[Mapping[str, str]] = None 48 traceparent: Optional[str] = None 49 typecheck: Optional[bool] = None 50 additional_headers: Optional[Dict[str, str]] = None 51 52 53@dataclass 54class StreamOptions: 55 """ 56 A dataclass representing options available for a stream. 57 58 * max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown. 59 * max_backoff - The maximum backoff in seconds for an individual retry. 60 * start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after 61 the timestamp. 62 * cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor. 63 * status_events - Indicates if stream should include status events. Status events are periodic events that 64 update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics 65 about the cost of maintaining the stream other than the cost of the received events. 66 """ 67 68 max_attempts: Optional[int] = None 69 max_backoff: Optional[int] = None 70 start_ts: Optional[int] = None 71 cursor: Optional[str] = None 72 status_events: bool = False 73 74 75@dataclass 76class FeedOptions: 77 """ 78 A dataclass representing options available for an Event Feed. 79 80 * max_attempts - The maximum number of times to attempt an Event Feed query when a retryable exception is thrown. 81 * max_backoff - The maximum backoff in seconds for an individual retry. 82 * query_timeout - Controls the maximum amount of time Fauna will execute a query before returning a page of events. 83 * start_ts - The starting timestamp of the Event Feed, exclusive. If set, Fauna will return events starting after 84 the timestamp. 85 * cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor. 86 * page_size - The desired number of events per page. 87 """ 88 max_attempts: Optional[int] = None 89 max_backoff: Optional[int] = None 90 query_timeout: Optional[timedelta] = None 91 page_size: Optional[int] = None 92 start_ts: Optional[int] = None 93 cursor: Optional[str] = None 94 95 96class Client: 97 98 def __init__( 99 self, 100 endpoint: Optional[str] = None, 101 secret: Optional[str] = None, 102 http_client: Optional[HTTPClient] = None, 103 query_tags: Optional[Mapping[str, str]] = None, 104 linearized: Optional[bool] = None, 105 max_contention_retries: Optional[int] = None, 106 typecheck: Optional[bool] = None, 107 additional_headers: Optional[Dict[str, str]] = None, 108 query_timeout: Optional[timedelta] = DefaultQueryTimeout, 109 client_buffer_timeout: Optional[timedelta] = DefaultClientBufferTimeout, 110 http_read_timeout: Optional[timedelta] = DefaultHttpReadTimeout, 111 http_write_timeout: Optional[timedelta] = DefaultHttpWriteTimeout, 112 http_connect_timeout: Optional[timedelta] = DefaultHttpConnectTimeout, 113 http_pool_timeout: Optional[timedelta] = DefaultHttpPoolTimeout, 114 http_idle_timeout: Optional[timedelta] = DefaultIdleConnectionTimeout, 115 max_attempts: int = 3, 116 max_backoff: int = 20, 117 ): 118 """Initializes a Client. 119 120 :param endpoint: The Fauna Endpoint to use. Defaults to https://db.fauna.com, or the `FAUNA_ENDPOINT` env variable. 121 :param secret: The Fauna Secret to use. Defaults to empty, or the `FAUNA_SECRET` env variable. 122 :param http_client: An :class:`HTTPClient` implementation. Defaults to a global :class:`HTTPXClient`. 123 :param query_tags: Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ 124 :param linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized. 125 :param max_contention_retries: The max number of times to retry the query if contention is encountered. 126 :param typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration. 127 :param additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary. 128 :param query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is :py:data:`DefaultQueryTimeout`. 129 :param client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is :py:data:`DefaultClientBufferTimeout`, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error. 130 :param http_read_timeout: Set HTTP Read timeout, default is :py:data:`DefaultHttpReadTimeout`. 131 :param http_write_timeout: Set HTTP Write timeout, default is :py:data:`DefaultHttpWriteTimeout`. 132 :param http_connect_timeout: Set HTTP Connect timeout, default is :py:data:`DefaultHttpConnectTimeout`. 133 :param http_pool_timeout: Set HTTP Pool timeout, default is :py:data:`DefaultHttpPoolTimeout`. 134 :param http_idle_timeout: Set HTTP Idle timeout, default is :py:data:`DefaultIdleConnectionTimeout`. 135 :param max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3. 136 :param max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20. 137 """ 138 139 self._set_endpoint(endpoint) 140 self._max_attempts = max_attempts 141 self._max_backoff = max_backoff 142 143 if secret is None: 144 self._auth = _Auth(_Environment.EnvFaunaSecret()) 145 else: 146 self._auth = _Auth(secret) 147 148 self._last_txn_ts = LastTxnTs() 149 150 self._query_tags = {} 151 if query_tags is not None: 152 self._query_tags.update(query_tags) 153 154 if query_timeout is not None: 155 self._query_timeout_ms = int(query_timeout.total_seconds() * 1000) 156 else: 157 self._query_timeout_ms = None 158 159 self._headers: Dict[str, str] = { 160 _Header.AcceptEncoding: "gzip", 161 _Header.ContentType: "application/json;charset=utf-8", 162 _Header.Driver: "python", 163 _Header.DriverEnv: str(_DriverEnvironment()), 164 } 165 166 if typecheck is not None: 167 self._headers[Header.Typecheck] = str(typecheck).lower() 168 169 if linearized is not None: 170 self._headers[Header.Linearized] = str(linearized).lower() 171 172 if max_contention_retries is not None and max_contention_retries > 0: 173 self._headers[Header.MaxContentionRetries] = \ 174 f"{max_contention_retries}" 175 176 if additional_headers is not None: 177 self._headers = { 178 **self._headers, 179 **additional_headers, 180 } 181 182 self._session: HTTPClient 183 184 if http_client is not None: 185 self._session = http_client 186 else: 187 if fauna.global_http_client is None: 188 timeout_s: Optional[float] = None 189 if query_timeout is not None and client_buffer_timeout is not None: 190 timeout_s = (query_timeout + client_buffer_timeout).total_seconds() 191 read_timeout_s: Optional[float] = None 192 if http_read_timeout is not None: 193 read_timeout_s = http_read_timeout.total_seconds() 194 195 write_timeout_s: Optional[float] = http_write_timeout.total_seconds( 196 ) if http_write_timeout is not None else None 197 connect_timeout_s: Optional[float] = http_connect_timeout.total_seconds( 198 ) if http_connect_timeout is not None else None 199 pool_timeout_s: Optional[float] = http_pool_timeout.total_seconds( 200 ) if http_pool_timeout is not None else None 201 idle_timeout_s: Optional[float] = http_idle_timeout.total_seconds( 202 ) if http_idle_timeout is not None else None 203 204 import httpx 205 from fauna.http.httpx_client import HTTPXClient 206 c = HTTPXClient( 207 httpx.Client( 208 http1=True, 209 http2=False, 210 timeout=httpx.Timeout( 211 timeout=timeout_s, 212 connect=connect_timeout_s, 213 read=read_timeout_s, 214 write=write_timeout_s, 215 pool=pool_timeout_s, 216 ), 217 limits=httpx.Limits( 218 max_connections=DefaultMaxConnections, 219 max_keepalive_connections=DefaultMaxIdleConnections, 220 keepalive_expiry=idle_timeout_s, 221 ), 222 ), logger) 223 fauna.global_http_client = c 224 225 self._session = fauna.global_http_client 226 227 def close(self): 228 self._session.close() 229 if self._session == fauna.global_http_client: 230 fauna.global_http_client = None 231 232 def set_last_txn_ts(self, txn_ts: int): 233 """ 234 Set the last timestamp seen by this client. 235 This has no effect if earlier than stored timestamp. 236 237 .. WARNING:: This should be used only when coordinating timestamps across 238 multiple clients. Moving the timestamp arbitrarily forward into 239 the future will cause transactions to stall. 240 241 :param txn_ts: the new transaction time. 242 """ 243 self._last_txn_ts.update_txn_time(txn_ts) 244 245 def get_last_txn_ts(self) -> Optional[int]: 246 """ 247 Get the last timestamp seen by this client. 248 :return: 249 """ 250 return self._last_txn_ts.time 251 252 def get_query_timeout(self) -> Optional[timedelta]: 253 """ 254 Get the query timeout for all queries. 255 """ 256 if self._query_timeout_ms is not None: 257 return timedelta(milliseconds=self._query_timeout_ms) 258 else: 259 return None 260 261 def paginate( 262 self, 263 fql: Query, 264 opts: Optional[QueryOptions] = None, 265 ) -> "QueryIterator": 266 """ 267 Run a query on Fauna and returning an iterator of results. If the query 268 returns a Page, the iterator will fetch additional Pages until the 269 after token is null. Each call for a page will be retried with exponential 270 backoff up to the max_attempts set in the client's retry policy in the 271 event of a 429 or 502. 272 273 :param fql: A Query 274 :param opts: (Optional) Query Options 275 276 :return: a :class:`QueryResponse` 277 278 :raises NetworkError: HTTP Request failed in transit 279 :raises ProtocolError: HTTP error not from Fauna 280 :raises ServiceError: Fauna returned an error 281 :raises ValueError: Encoding and decoding errors 282 :raises TypeError: Invalid param types 283 """ 284 285 if not isinstance(fql, Query): 286 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 287 f"Query by calling fauna.fql()" 288 raise TypeError(err_msg) 289 290 return QueryIterator(self, fql, opts) 291 292 def query( 293 self, 294 fql: Query, 295 opts: Optional[QueryOptions] = None, 296 ) -> QuerySuccess: 297 """ 298 Run a query on Fauna. A query will be retried max_attempt times with exponential backoff 299 up to the max_backoff in the event of a 429. 300 301 :param fql: A Query 302 :param opts: (Optional) Query Options 303 304 :return: a :class:`QueryResponse` 305 306 :raises NetworkError: HTTP Request failed in transit 307 :raises ProtocolError: HTTP error not from Fauna 308 :raises ServiceError: Fauna returned an error 309 :raises ValueError: Encoding and decoding errors 310 :raises TypeError: Invalid param types 311 """ 312 313 if not isinstance(fql, Query): 314 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 315 f"Query by calling fauna.fql()" 316 raise TypeError(err_msg) 317 318 try: 319 encoded_query: Mapping[str, Any] = FaunaEncoder.encode(fql) 320 except Exception as e: 321 raise ClientError("Failed to encode Query") from e 322 323 retryable = Retryable[QuerySuccess]( 324 self._max_attempts, 325 self._max_backoff, 326 self._query, 327 "/query/1", 328 fql=encoded_query, 329 opts=opts, 330 ) 331 332 r = retryable.run() 333 r.response.stats.attempts = r.attempts 334 return r.response 335 336 def _query( 337 self, 338 path: str, 339 fql: Mapping[str, Any], 340 arguments: Optional[Mapping[str, Any]] = None, 341 opts: Optional[QueryOptions] = None, 342 ) -> QuerySuccess: 343 344 headers = self._headers.copy() 345 headers[_Header.Format] = "tagged" 346 headers[_Header.Authorization] = self._auth.bearer() 347 348 if self._query_timeout_ms is not None: 349 headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms) 350 351 headers.update(self._last_txn_ts.request_header) 352 353 query_tags = {} 354 if self._query_tags is not None: 355 query_tags.update(self._query_tags) 356 357 if opts is not None: 358 if opts.linearized is not None: 359 headers[Header.Linearized] = str(opts.linearized).lower() 360 if opts.max_contention_retries is not None: 361 headers[Header.MaxContentionRetries] = \ 362 f"{opts.max_contention_retries}" 363 if opts.traceparent is not None: 364 headers[Header.Traceparent] = opts.traceparent 365 if opts.query_timeout is not None: 366 timeout_ms = f"{int(opts.query_timeout.total_seconds() * 1000)}" 367 headers[Header.QueryTimeoutMs] = timeout_ms 368 if opts.query_tags is not None: 369 query_tags.update(opts.query_tags) 370 if opts.typecheck is not None: 371 headers[Header.Typecheck] = str(opts.typecheck).lower() 372 if opts.additional_headers is not None: 373 headers.update(opts.additional_headers) 374 375 if len(query_tags) > 0: 376 headers[Header.Tags] = QueryTags.encode(query_tags) 377 378 data: dict[str, Any] = { 379 "query": fql, 380 "arguments": arguments or {}, 381 } 382 383 with self._session.request( 384 method="POST", 385 url=self._endpoint + path, 386 headers=headers, 387 data=data, 388 ) as response: 389 status_code = response.status_code() 390 response_json = response.json() 391 headers = response.headers() 392 393 self._check_protocol(response_json, status_code) 394 395 dec: Any = FaunaDecoder.decode(response_json) 396 397 if status_code > 399: 398 FaunaError.parse_error_and_throw(dec, status_code) 399 400 if "txn_ts" in dec: 401 self.set_last_txn_ts(int(response_json["txn_ts"])) 402 403 stats = QueryStats(dec["stats"]) if "stats" in dec else None 404 summary = dec["summary"] if "summary" in dec else None 405 query_tags = QueryTags.decode( 406 dec["query_tags"]) if "query_tags" in dec else None 407 txn_ts = dec["txn_ts"] if "txn_ts" in dec else None 408 schema_version = dec["schema_version"] if "schema_version" in dec else None 409 traceparent = headers.get("traceparent", None) 410 static_type = dec["static_type"] if "static_type" in dec else None 411 412 return QuerySuccess( 413 data=dec["data"], 414 query_tags=query_tags, 415 static_type=static_type, 416 stats=stats, 417 summary=summary, 418 traceparent=traceparent, 419 txn_ts=txn_ts, 420 schema_version=schema_version, 421 ) 422 423 def stream( 424 self, 425 fql: Union[EventSource, Query], 426 opts: StreamOptions = StreamOptions() 427 ) -> "StreamIterator": 428 """ 429 Opens a Stream in Fauna and returns an iterator that consume Fauna events. 430 431 :param fql: An EventSource or a Query that returns an EventSource. 432 :param opts: (Optional) Stream Options. 433 434 :return: a :class:`StreamIterator` 435 436 :raises ClientError: Invalid options provided 437 :raises NetworkError: HTTP Request failed in transit 438 :raises ProtocolError: HTTP error not from Fauna 439 :raises ServiceError: Fauna returned an error 440 :raises ValueError: Encoding and decoding errors 441 :raises TypeError: Invalid param types 442 """ 443 444 if isinstance(fql, Query): 445 if opts.cursor is not None: 446 raise ClientError( 447 "The 'cursor' configuration can only be used with an event source.") 448 449 source = self.query(fql).data 450 else: 451 source = fql 452 453 if not isinstance(source, EventSource): 454 err_msg = f"'fql' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}." 455 raise TypeError(err_msg) 456 457 headers = self._headers.copy() 458 headers[_Header.Format] = "tagged" 459 headers[_Header.Authorization] = self._auth.bearer() 460 461 return StreamIterator(self._session, headers, self._endpoint + "/stream/1", 462 self._max_attempts, self._max_backoff, opts, source) 463 464 def feed( 465 self, 466 source: Union[EventSource, Query], 467 opts: FeedOptions = FeedOptions(), 468 ) -> "FeedIterator": 469 """ 470 Opens an Event Feed in Fauna and returns an iterator that consume Fauna events. 471 472 :param source: An EventSource or a Query that returns an EventSource. 473 :param opts: (Optional) Event Feed options. 474 475 :return: a :class:`FeedIterator` 476 477 :raises ClientError: Invalid options provided 478 :raises NetworkError: HTTP Request failed in transit 479 :raises ProtocolError: HTTP error not from Fauna 480 :raises ServiceError: Fauna returned an error 481 :raises ValueError: Encoding and decoding errors 482 :raises TypeError: Invalid param types 483 """ 484 485 if isinstance(source, Query): 486 source = self.query(source).data 487 488 if not isinstance(source, EventSource): 489 err_msg = f"'source' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}." 490 raise TypeError(err_msg) 491 492 headers = self._headers.copy() 493 headers[_Header.Format] = "tagged" 494 headers[_Header.Authorization] = self._auth.bearer() 495 496 if opts.query_timeout is not None: 497 query_timeout_ms = int(opts.query_timeout.total_seconds() * 1000) 498 headers[Header.QueryTimeoutMs] = str(query_timeout_ms) 499 elif self._query_timeout_ms is not None: 500 headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms) 501 502 return FeedIterator(self._session, headers, self._endpoint + "/feed/1", 503 self._max_attempts, self._max_backoff, opts, source) 504 505 def _check_protocol(self, response_json: Any, status_code): 506 # TODO: Logic to validate wire protocol belongs elsewhere. 507 should_raise = False 508 509 # check for QuerySuccess 510 if status_code <= 399 and "data" not in response_json: 511 should_raise = True 512 513 # check for QueryFailure 514 if status_code > 399: 515 if "error" not in response_json: 516 should_raise = True 517 else: 518 e = response_json["error"] 519 if "code" not in e or "message" not in e: 520 should_raise = True 521 522 if should_raise: 523 raise ProtocolError( 524 status_code, 525 f"Response is in an unknown format: \n{response_json}", 526 ) 527 528 def _set_endpoint(self, endpoint): 529 if endpoint is None: 530 endpoint = _Environment.EnvFaunaEndpoint() 531 532 if endpoint[-1:] == "/": 533 endpoint = endpoint[:-1] 534 535 self._endpoint = endpoint 536 537 538class StreamIterator: 539 """A class that mixes a ContextManager and an Iterator so we can detected retryable errors.""" 540 541 def __init__(self, http_client: HTTPClient, headers: Dict[str, str], 542 endpoint: str, max_attempts: int, max_backoff: int, 543 opts: StreamOptions, source: EventSource): 544 self._http_client = http_client 545 self._headers = headers 546 self._endpoint = endpoint 547 self._max_attempts = max_attempts 548 self._max_backoff = max_backoff 549 self._opts = opts 550 self._source = source 551 self._stream = None 552 self.last_ts = None 553 self.last_cursor = None 554 self._ctx = self._create_stream() 555 556 if opts.start_ts is not None and opts.cursor is not None: 557 err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the StreamOptions." 558 raise TypeError(err_msg) 559 560 def __enter__(self): 561 return self 562 563 def __exit__(self, exc_type, exc_value, exc_traceback): 564 if self._stream is not None: 565 self._stream.close() 566 567 self._ctx.__exit__(exc_type, exc_value, exc_traceback) 568 return False 569 570 def __iter__(self): 571 return self 572 573 def __next__(self): 574 if self._opts.max_attempts is not None: 575 max_attempts = self._opts.max_attempts 576 else: 577 max_attempts = self._max_attempts 578 579 if self._opts.max_backoff is not None: 580 max_backoff = self._opts.max_backoff 581 else: 582 max_backoff = self._max_backoff 583 584 retryable = Retryable[Any](max_attempts, max_backoff, self._next_element) 585 return retryable.run().response 586 587 def _next_element(self): 588 try: 589 if self._stream is None: 590 try: 591 self._stream = self._ctx.__enter__() 592 except Exception: 593 self._retry_stream() 594 595 if self._stream is not None: 596 event: Any = FaunaDecoder.decode(next(self._stream)) 597 598 if event["type"] == "error": 599 FaunaError.parse_error_and_throw(event, 400) 600 601 self.last_ts = event["txn_ts"] 602 self.last_cursor = event.get('cursor') 603 604 if event["type"] == "start": 605 return self._next_element() 606 607 if not self._opts.status_events and event["type"] == "status": 608 return self._next_element() 609 610 return event 611 612 raise StopIteration 613 except NetworkError: 614 self._retry_stream() 615 616 def _retry_stream(self): 617 if self._stream is not None: 618 self._stream.close() 619 620 self._stream = None 621 622 try: 623 self._ctx = self._create_stream() 624 except Exception: 625 pass 626 raise RetryableFaunaException 627 628 def _create_stream(self): 629 data: Dict[str, Any] = {"token": self._source.token} 630 if self.last_cursor is not None: 631 data["cursor"] = self.last_cursor 632 elif self._opts.cursor is not None: 633 data["cursor"] = self._opts.cursor 634 elif self._opts.start_ts is not None: 635 data["start_ts"] = self._opts.start_ts 636 637 return self._http_client.stream( 638 url=self._endpoint, headers=self._headers, data=data) 639 640 def close(self): 641 if self._stream is not None: 642 self._stream.close() 643 644 645class FeedPage: 646 647 def __init__(self, events: List[Any], cursor: str, stats: QueryStats): 648 self._events = events 649 self.cursor = cursor 650 self.stats = stats 651 652 def __len__(self): 653 return len(self._events) 654 655 def __iter__(self) -> Iterator[Any]: 656 for event in self._events: 657 if event["type"] == "error": 658 FaunaError.parse_error_and_throw(event, 400) 659 yield event 660 661 662class FeedIterator: 663 """A class to provide an iterator on top of Event Feed pages.""" 664 665 def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str, 666 max_attempts: int, max_backoff: int, opts: FeedOptions, 667 source: EventSource): 668 self._http = http 669 self._headers = headers 670 self._endpoint = endpoint 671 self._max_attempts = opts.max_attempts or max_attempts 672 self._max_backoff = opts.max_backoff or max_backoff 673 self._request: Dict[str, Any] = {"token": source.token} 674 self._is_done = False 675 676 if opts.start_ts is not None and opts.cursor is not None: 677 err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the FeedOptions." 678 raise TypeError(err_msg) 679 680 if opts.page_size is not None: 681 self._request["page_size"] = opts.page_size 682 683 if opts.cursor is not None: 684 self._request["cursor"] = opts.cursor 685 elif opts.start_ts is not None: 686 self._request["start_ts"] = opts.start_ts 687 688 def __iter__(self) -> Iterator[FeedPage]: 689 self._is_done = False 690 return self 691 692 def __next__(self) -> FeedPage: 693 if self._is_done: 694 raise StopIteration 695 696 retryable = Retryable[Any](self._max_attempts, self._max_backoff, 697 self._next_page) 698 return retryable.run().response 699 700 def _next_page(self) -> FeedPage: 701 with self._http.request( 702 method="POST", 703 url=self._endpoint, 704 headers=self._headers, 705 data=self._request, 706 ) as response: 707 status_code = response.status_code() 708 decoded: Any = FaunaDecoder.decode(response.json()) 709 710 if status_code > 399: 711 FaunaError.parse_error_and_throw(decoded, status_code) 712 713 self._is_done = not decoded["has_next"] 714 self._request["cursor"] = decoded["cursor"] 715 716 if "start_ts" in self._request: 717 del self._request["start_ts"] 718 719 return FeedPage(decoded["events"], decoded["cursor"], 720 QueryStats(decoded["stats"])) 721 722 def flatten(self) -> Iterator: 723 """A generator that yields events instead of pages of events.""" 724 for page in self: 725 for event in page: 726 yield event 727 728 729class QueryIterator: 730 """A class to provider an iterator on top of Fauna queries.""" 731 732 def __init__(self, 733 client: Client, 734 fql: Query, 735 opts: Optional[QueryOptions] = None): 736 """Initializes the QueryIterator 737 738 :param fql: A Query 739 :param opts: (Optional) Query Options 740 741 :raises TypeError: Invalid param types 742 """ 743 if not isinstance(client, Client): 744 err_msg = f"'client' must be a Client but was a {type(client)}. You can build a " \ 745 f"Client by calling fauna.client.Client()" 746 raise TypeError(err_msg) 747 748 if not isinstance(fql, Query): 749 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 750 f"Query by calling fauna.fql()" 751 raise TypeError(err_msg) 752 753 self.client = client 754 self.fql = fql 755 self.opts = opts 756 757 def __iter__(self) -> Iterator: 758 return self.iter() 759 760 def iter(self) -> Iterator: 761 """ 762 A generator function that immediately fetches and yields the results of 763 the stored query. Yields additional pages on subsequent iterations if 764 they exist 765 """ 766 767 cursor = None 768 initial_response = self.client.query(self.fql, self.opts) 769 770 if isinstance(initial_response.data, Page): 771 cursor = initial_response.data.after 772 yield initial_response.data.data 773 774 while cursor is not None: 775 next_response = self.client.query( 776 fql("Set.paginate(${after})", after=cursor), self.opts) 777 # TODO: `Set.paginate` does not yet return a `@set` tagged value 778 # so we will get back a plain object that might not have 779 # an after property. 780 cursor = next_response.data.get("after") 781 yield next_response.data.get("data") 782 783 else: 784 yield [initial_response.data] 785 786 def flatten(self) -> Iterator: 787 """ 788 A generator function that immediately fetches and yields the results of 789 the stored query. Yields each item individually, rather than a whole 790 Page at a time. Fetches additional pages as required if they exist. 791 """ 792 793 for page in self.iter(): 794 for item in page: 795 yield item
31@dataclass 32class QueryOptions: 33 """ 34 A dataclass representing options available for a query. 35 36 * linearized - If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized. 37 * max_contention_retries - The max number of times to retry the query if contention is encountered. 38 * query_timeout - Controls the maximum amount of time Fauna will execute your query before marking it failed. 39 * query_tags - Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ 40 * traceparent - A traceparent to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ Must match format: https://www.w3.org/TR/trace-context/#traceparent-header 41 * typecheck - Enable or disable typechecking of the query before evaluation. If not set, the value configured on the Client will be used. If neither is set, Fauna will use the value of the "typechecked" flag on the database configuration. 42 * additional_headers - Add/update HTTP request headers for the query. In general, this should not be necessary. 43 """ 44 45 linearized: Optional[bool] = None 46 max_contention_retries: Optional[int] = None 47 query_timeout: Optional[timedelta] = DefaultQueryTimeout 48 query_tags: Optional[Mapping[str, str]] = None 49 traceparent: Optional[str] = None 50 typecheck: Optional[bool] = None 51 additional_headers: Optional[Dict[str, str]] = None
A dataclass representing options available for a query.
- linearized - If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
- max_contention_retries - The max number of times to retry the query if contention is encountered.
- query_timeout - Controls the maximum amount of time Fauna will execute your query before marking it failed.
- query_tags - Tags to associate with the query. See logging
- traceparent - A traceparent to associate with the query. See logging Must match format: https://www.w3.org/TR/trace-context/#traceparent-header
- typecheck - Enable or disable typechecking of the query before evaluation. If not set, the value configured on the Client will be used. If neither is set, Fauna will use the value of the "typechecked" flag on the database configuration.
- additional_headers - Add/update HTTP request headers for the query. In general, this should not be necessary.
54@dataclass 55class StreamOptions: 56 """ 57 A dataclass representing options available for a stream. 58 59 * max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown. 60 * max_backoff - The maximum backoff in seconds for an individual retry. 61 * start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after 62 the timestamp. 63 * cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor. 64 * status_events - Indicates if stream should include status events. Status events are periodic events that 65 update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics 66 about the cost of maintaining the stream other than the cost of the received events. 67 """ 68 69 max_attempts: Optional[int] = None 70 max_backoff: Optional[int] = None 71 start_ts: Optional[int] = None 72 cursor: Optional[str] = None 73 status_events: bool = False
A dataclass representing options available for a stream.
- max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown.
- max_backoff - The maximum backoff in seconds for an individual retry.
- start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after the timestamp.
- cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
- status_events - Indicates if stream should include status events. Status events are periodic events that update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics about the cost of maintaining the stream other than the cost of the received events.
76@dataclass 77class FeedOptions: 78 """ 79 A dataclass representing options available for an Event Feed. 80 81 * max_attempts - The maximum number of times to attempt an Event Feed query when a retryable exception is thrown. 82 * max_backoff - The maximum backoff in seconds for an individual retry. 83 * query_timeout - Controls the maximum amount of time Fauna will execute a query before returning a page of events. 84 * start_ts - The starting timestamp of the Event Feed, exclusive. If set, Fauna will return events starting after 85 the timestamp. 86 * cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor. 87 * page_size - The desired number of events per page. 88 """ 89 max_attempts: Optional[int] = None 90 max_backoff: Optional[int] = None 91 query_timeout: Optional[timedelta] = None 92 page_size: Optional[int] = None 93 start_ts: Optional[int] = None 94 cursor: Optional[str] = None
A dataclass representing options available for an Event Feed.
- max_attempts - The maximum number of times to attempt an Event Feed query when a retryable exception is thrown.
- max_backoff - The maximum backoff in seconds for an individual retry.
- query_timeout - Controls the maximum amount of time Fauna will execute a query before returning a page of events.
- start_ts - The starting timestamp of the Event Feed, exclusive. If set, Fauna will return events starting after the timestamp.
- cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
- page_size - The desired number of events per page.
97class Client: 98 99 def __init__( 100 self, 101 endpoint: Optional[str] = None, 102 secret: Optional[str] = None, 103 http_client: Optional[HTTPClient] = None, 104 query_tags: Optional[Mapping[str, str]] = None, 105 linearized: Optional[bool] = None, 106 max_contention_retries: Optional[int] = None, 107 typecheck: Optional[bool] = None, 108 additional_headers: Optional[Dict[str, str]] = None, 109 query_timeout: Optional[timedelta] = DefaultQueryTimeout, 110 client_buffer_timeout: Optional[timedelta] = DefaultClientBufferTimeout, 111 http_read_timeout: Optional[timedelta] = DefaultHttpReadTimeout, 112 http_write_timeout: Optional[timedelta] = DefaultHttpWriteTimeout, 113 http_connect_timeout: Optional[timedelta] = DefaultHttpConnectTimeout, 114 http_pool_timeout: Optional[timedelta] = DefaultHttpPoolTimeout, 115 http_idle_timeout: Optional[timedelta] = DefaultIdleConnectionTimeout, 116 max_attempts: int = 3, 117 max_backoff: int = 20, 118 ): 119 """Initializes a Client. 120 121 :param endpoint: The Fauna Endpoint to use. Defaults to https://db.fauna.com, or the `FAUNA_ENDPOINT` env variable. 122 :param secret: The Fauna Secret to use. Defaults to empty, or the `FAUNA_SECRET` env variable. 123 :param http_client: An :class:`HTTPClient` implementation. Defaults to a global :class:`HTTPXClient`. 124 :param query_tags: Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ 125 :param linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized. 126 :param max_contention_retries: The max number of times to retry the query if contention is encountered. 127 :param typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration. 128 :param additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary. 129 :param query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is :py:data:`DefaultQueryTimeout`. 130 :param client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is :py:data:`DefaultClientBufferTimeout`, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error. 131 :param http_read_timeout: Set HTTP Read timeout, default is :py:data:`DefaultHttpReadTimeout`. 132 :param http_write_timeout: Set HTTP Write timeout, default is :py:data:`DefaultHttpWriteTimeout`. 133 :param http_connect_timeout: Set HTTP Connect timeout, default is :py:data:`DefaultHttpConnectTimeout`. 134 :param http_pool_timeout: Set HTTP Pool timeout, default is :py:data:`DefaultHttpPoolTimeout`. 135 :param http_idle_timeout: Set HTTP Idle timeout, default is :py:data:`DefaultIdleConnectionTimeout`. 136 :param max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3. 137 :param max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20. 138 """ 139 140 self._set_endpoint(endpoint) 141 self._max_attempts = max_attempts 142 self._max_backoff = max_backoff 143 144 if secret is None: 145 self._auth = _Auth(_Environment.EnvFaunaSecret()) 146 else: 147 self._auth = _Auth(secret) 148 149 self._last_txn_ts = LastTxnTs() 150 151 self._query_tags = {} 152 if query_tags is not None: 153 self._query_tags.update(query_tags) 154 155 if query_timeout is not None: 156 self._query_timeout_ms = int(query_timeout.total_seconds() * 1000) 157 else: 158 self._query_timeout_ms = None 159 160 self._headers: Dict[str, str] = { 161 _Header.AcceptEncoding: "gzip", 162 _Header.ContentType: "application/json;charset=utf-8", 163 _Header.Driver: "python", 164 _Header.DriverEnv: str(_DriverEnvironment()), 165 } 166 167 if typecheck is not None: 168 self._headers[Header.Typecheck] = str(typecheck).lower() 169 170 if linearized is not None: 171 self._headers[Header.Linearized] = str(linearized).lower() 172 173 if max_contention_retries is not None and max_contention_retries > 0: 174 self._headers[Header.MaxContentionRetries] = \ 175 f"{max_contention_retries}" 176 177 if additional_headers is not None: 178 self._headers = { 179 **self._headers, 180 **additional_headers, 181 } 182 183 self._session: HTTPClient 184 185 if http_client is not None: 186 self._session = http_client 187 else: 188 if fauna.global_http_client is None: 189 timeout_s: Optional[float] = None 190 if query_timeout is not None and client_buffer_timeout is not None: 191 timeout_s = (query_timeout + client_buffer_timeout).total_seconds() 192 read_timeout_s: Optional[float] = None 193 if http_read_timeout is not None: 194 read_timeout_s = http_read_timeout.total_seconds() 195 196 write_timeout_s: Optional[float] = http_write_timeout.total_seconds( 197 ) if http_write_timeout is not None else None 198 connect_timeout_s: Optional[float] = http_connect_timeout.total_seconds( 199 ) if http_connect_timeout is not None else None 200 pool_timeout_s: Optional[float] = http_pool_timeout.total_seconds( 201 ) if http_pool_timeout is not None else None 202 idle_timeout_s: Optional[float] = http_idle_timeout.total_seconds( 203 ) if http_idle_timeout is not None else None 204 205 import httpx 206 from fauna.http.httpx_client import HTTPXClient 207 c = HTTPXClient( 208 httpx.Client( 209 http1=True, 210 http2=False, 211 timeout=httpx.Timeout( 212 timeout=timeout_s, 213 connect=connect_timeout_s, 214 read=read_timeout_s, 215 write=write_timeout_s, 216 pool=pool_timeout_s, 217 ), 218 limits=httpx.Limits( 219 max_connections=DefaultMaxConnections, 220 max_keepalive_connections=DefaultMaxIdleConnections, 221 keepalive_expiry=idle_timeout_s, 222 ), 223 ), logger) 224 fauna.global_http_client = c 225 226 self._session = fauna.global_http_client 227 228 def close(self): 229 self._session.close() 230 if self._session == fauna.global_http_client: 231 fauna.global_http_client = None 232 233 def set_last_txn_ts(self, txn_ts: int): 234 """ 235 Set the last timestamp seen by this client. 236 This has no effect if earlier than stored timestamp. 237 238 .. WARNING:: This should be used only when coordinating timestamps across 239 multiple clients. Moving the timestamp arbitrarily forward into 240 the future will cause transactions to stall. 241 242 :param txn_ts: the new transaction time. 243 """ 244 self._last_txn_ts.update_txn_time(txn_ts) 245 246 def get_last_txn_ts(self) -> Optional[int]: 247 """ 248 Get the last timestamp seen by this client. 249 :return: 250 """ 251 return self._last_txn_ts.time 252 253 def get_query_timeout(self) -> Optional[timedelta]: 254 """ 255 Get the query timeout for all queries. 256 """ 257 if self._query_timeout_ms is not None: 258 return timedelta(milliseconds=self._query_timeout_ms) 259 else: 260 return None 261 262 def paginate( 263 self, 264 fql: Query, 265 opts: Optional[QueryOptions] = None, 266 ) -> "QueryIterator": 267 """ 268 Run a query on Fauna and returning an iterator of results. If the query 269 returns a Page, the iterator will fetch additional Pages until the 270 after token is null. Each call for a page will be retried with exponential 271 backoff up to the max_attempts set in the client's retry policy in the 272 event of a 429 or 502. 273 274 :param fql: A Query 275 :param opts: (Optional) Query Options 276 277 :return: a :class:`QueryResponse` 278 279 :raises NetworkError: HTTP Request failed in transit 280 :raises ProtocolError: HTTP error not from Fauna 281 :raises ServiceError: Fauna returned an error 282 :raises ValueError: Encoding and decoding errors 283 :raises TypeError: Invalid param types 284 """ 285 286 if not isinstance(fql, Query): 287 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 288 f"Query by calling fauna.fql()" 289 raise TypeError(err_msg) 290 291 return QueryIterator(self, fql, opts) 292 293 def query( 294 self, 295 fql: Query, 296 opts: Optional[QueryOptions] = None, 297 ) -> QuerySuccess: 298 """ 299 Run a query on Fauna. A query will be retried max_attempt times with exponential backoff 300 up to the max_backoff in the event of a 429. 301 302 :param fql: A Query 303 :param opts: (Optional) Query Options 304 305 :return: a :class:`QueryResponse` 306 307 :raises NetworkError: HTTP Request failed in transit 308 :raises ProtocolError: HTTP error not from Fauna 309 :raises ServiceError: Fauna returned an error 310 :raises ValueError: Encoding and decoding errors 311 :raises TypeError: Invalid param types 312 """ 313 314 if not isinstance(fql, Query): 315 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 316 f"Query by calling fauna.fql()" 317 raise TypeError(err_msg) 318 319 try: 320 encoded_query: Mapping[str, Any] = FaunaEncoder.encode(fql) 321 except Exception as e: 322 raise ClientError("Failed to encode Query") from e 323 324 retryable = Retryable[QuerySuccess]( 325 self._max_attempts, 326 self._max_backoff, 327 self._query, 328 "/query/1", 329 fql=encoded_query, 330 opts=opts, 331 ) 332 333 r = retryable.run() 334 r.response.stats.attempts = r.attempts 335 return r.response 336 337 def _query( 338 self, 339 path: str, 340 fql: Mapping[str, Any], 341 arguments: Optional[Mapping[str, Any]] = None, 342 opts: Optional[QueryOptions] = None, 343 ) -> QuerySuccess: 344 345 headers = self._headers.copy() 346 headers[_Header.Format] = "tagged" 347 headers[_Header.Authorization] = self._auth.bearer() 348 349 if self._query_timeout_ms is not None: 350 headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms) 351 352 headers.update(self._last_txn_ts.request_header) 353 354 query_tags = {} 355 if self._query_tags is not None: 356 query_tags.update(self._query_tags) 357 358 if opts is not None: 359 if opts.linearized is not None: 360 headers[Header.Linearized] = str(opts.linearized).lower() 361 if opts.max_contention_retries is not None: 362 headers[Header.MaxContentionRetries] = \ 363 f"{opts.max_contention_retries}" 364 if opts.traceparent is not None: 365 headers[Header.Traceparent] = opts.traceparent 366 if opts.query_timeout is not None: 367 timeout_ms = f"{int(opts.query_timeout.total_seconds() * 1000)}" 368 headers[Header.QueryTimeoutMs] = timeout_ms 369 if opts.query_tags is not None: 370 query_tags.update(opts.query_tags) 371 if opts.typecheck is not None: 372 headers[Header.Typecheck] = str(opts.typecheck).lower() 373 if opts.additional_headers is not None: 374 headers.update(opts.additional_headers) 375 376 if len(query_tags) > 0: 377 headers[Header.Tags] = QueryTags.encode(query_tags) 378 379 data: dict[str, Any] = { 380 "query": fql, 381 "arguments": arguments or {}, 382 } 383 384 with self._session.request( 385 method="POST", 386 url=self._endpoint + path, 387 headers=headers, 388 data=data, 389 ) as response: 390 status_code = response.status_code() 391 response_json = response.json() 392 headers = response.headers() 393 394 self._check_protocol(response_json, status_code) 395 396 dec: Any = FaunaDecoder.decode(response_json) 397 398 if status_code > 399: 399 FaunaError.parse_error_and_throw(dec, status_code) 400 401 if "txn_ts" in dec: 402 self.set_last_txn_ts(int(response_json["txn_ts"])) 403 404 stats = QueryStats(dec["stats"]) if "stats" in dec else None 405 summary = dec["summary"] if "summary" in dec else None 406 query_tags = QueryTags.decode( 407 dec["query_tags"]) if "query_tags" in dec else None 408 txn_ts = dec["txn_ts"] if "txn_ts" in dec else None 409 schema_version = dec["schema_version"] if "schema_version" in dec else None 410 traceparent = headers.get("traceparent", None) 411 static_type = dec["static_type"] if "static_type" in dec else None 412 413 return QuerySuccess( 414 data=dec["data"], 415 query_tags=query_tags, 416 static_type=static_type, 417 stats=stats, 418 summary=summary, 419 traceparent=traceparent, 420 txn_ts=txn_ts, 421 schema_version=schema_version, 422 ) 423 424 def stream( 425 self, 426 fql: Union[EventSource, Query], 427 opts: StreamOptions = StreamOptions() 428 ) -> "StreamIterator": 429 """ 430 Opens a Stream in Fauna and returns an iterator that consume Fauna events. 431 432 :param fql: An EventSource or a Query that returns an EventSource. 433 :param opts: (Optional) Stream Options. 434 435 :return: a :class:`StreamIterator` 436 437 :raises ClientError: Invalid options provided 438 :raises NetworkError: HTTP Request failed in transit 439 :raises ProtocolError: HTTP error not from Fauna 440 :raises ServiceError: Fauna returned an error 441 :raises ValueError: Encoding and decoding errors 442 :raises TypeError: Invalid param types 443 """ 444 445 if isinstance(fql, Query): 446 if opts.cursor is not None: 447 raise ClientError( 448 "The 'cursor' configuration can only be used with an event source.") 449 450 source = self.query(fql).data 451 else: 452 source = fql 453 454 if not isinstance(source, EventSource): 455 err_msg = f"'fql' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}." 456 raise TypeError(err_msg) 457 458 headers = self._headers.copy() 459 headers[_Header.Format] = "tagged" 460 headers[_Header.Authorization] = self._auth.bearer() 461 462 return StreamIterator(self._session, headers, self._endpoint + "/stream/1", 463 self._max_attempts, self._max_backoff, opts, source) 464 465 def feed( 466 self, 467 source: Union[EventSource, Query], 468 opts: FeedOptions = FeedOptions(), 469 ) -> "FeedIterator": 470 """ 471 Opens an Event Feed in Fauna and returns an iterator that consume Fauna events. 472 473 :param source: An EventSource or a Query that returns an EventSource. 474 :param opts: (Optional) Event Feed options. 475 476 :return: a :class:`FeedIterator` 477 478 :raises ClientError: Invalid options provided 479 :raises NetworkError: HTTP Request failed in transit 480 :raises ProtocolError: HTTP error not from Fauna 481 :raises ServiceError: Fauna returned an error 482 :raises ValueError: Encoding and decoding errors 483 :raises TypeError: Invalid param types 484 """ 485 486 if isinstance(source, Query): 487 source = self.query(source).data 488 489 if not isinstance(source, EventSource): 490 err_msg = f"'source' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}." 491 raise TypeError(err_msg) 492 493 headers = self._headers.copy() 494 headers[_Header.Format] = "tagged" 495 headers[_Header.Authorization] = self._auth.bearer() 496 497 if opts.query_timeout is not None: 498 query_timeout_ms = int(opts.query_timeout.total_seconds() * 1000) 499 headers[Header.QueryTimeoutMs] = str(query_timeout_ms) 500 elif self._query_timeout_ms is not None: 501 headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms) 502 503 return FeedIterator(self._session, headers, self._endpoint + "/feed/1", 504 self._max_attempts, self._max_backoff, opts, source) 505 506 def _check_protocol(self, response_json: Any, status_code): 507 # TODO: Logic to validate wire protocol belongs elsewhere. 508 should_raise = False 509 510 # check for QuerySuccess 511 if status_code <= 399 and "data" not in response_json: 512 should_raise = True 513 514 # check for QueryFailure 515 if status_code > 399: 516 if "error" not in response_json: 517 should_raise = True 518 else: 519 e = response_json["error"] 520 if "code" not in e or "message" not in e: 521 should_raise = True 522 523 if should_raise: 524 raise ProtocolError( 525 status_code, 526 f"Response is in an unknown format: \n{response_json}", 527 ) 528 529 def _set_endpoint(self, endpoint): 530 if endpoint is None: 531 endpoint = _Environment.EnvFaunaEndpoint() 532 533 if endpoint[-1:] == "/": 534 endpoint = endpoint[:-1] 535 536 self._endpoint = endpoint
99 def __init__( 100 self, 101 endpoint: Optional[str] = None, 102 secret: Optional[str] = None, 103 http_client: Optional[HTTPClient] = None, 104 query_tags: Optional[Mapping[str, str]] = None, 105 linearized: Optional[bool] = None, 106 max_contention_retries: Optional[int] = None, 107 typecheck: Optional[bool] = None, 108 additional_headers: Optional[Dict[str, str]] = None, 109 query_timeout: Optional[timedelta] = DefaultQueryTimeout, 110 client_buffer_timeout: Optional[timedelta] = DefaultClientBufferTimeout, 111 http_read_timeout: Optional[timedelta] = DefaultHttpReadTimeout, 112 http_write_timeout: Optional[timedelta] = DefaultHttpWriteTimeout, 113 http_connect_timeout: Optional[timedelta] = DefaultHttpConnectTimeout, 114 http_pool_timeout: Optional[timedelta] = DefaultHttpPoolTimeout, 115 http_idle_timeout: Optional[timedelta] = DefaultIdleConnectionTimeout, 116 max_attempts: int = 3, 117 max_backoff: int = 20, 118 ): 119 """Initializes a Client. 120 121 :param endpoint: The Fauna Endpoint to use. Defaults to https://db.fauna.com, or the `FAUNA_ENDPOINT` env variable. 122 :param secret: The Fauna Secret to use. Defaults to empty, or the `FAUNA_SECRET` env variable. 123 :param http_client: An :class:`HTTPClient` implementation. Defaults to a global :class:`HTTPXClient`. 124 :param query_tags: Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ 125 :param linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized. 126 :param max_contention_retries: The max number of times to retry the query if contention is encountered. 127 :param typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration. 128 :param additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary. 129 :param query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is :py:data:`DefaultQueryTimeout`. 130 :param client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is :py:data:`DefaultClientBufferTimeout`, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error. 131 :param http_read_timeout: Set HTTP Read timeout, default is :py:data:`DefaultHttpReadTimeout`. 132 :param http_write_timeout: Set HTTP Write timeout, default is :py:data:`DefaultHttpWriteTimeout`. 133 :param http_connect_timeout: Set HTTP Connect timeout, default is :py:data:`DefaultHttpConnectTimeout`. 134 :param http_pool_timeout: Set HTTP Pool timeout, default is :py:data:`DefaultHttpPoolTimeout`. 135 :param http_idle_timeout: Set HTTP Idle timeout, default is :py:data:`DefaultIdleConnectionTimeout`. 136 :param max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3. 137 :param max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20. 138 """ 139 140 self._set_endpoint(endpoint) 141 self._max_attempts = max_attempts 142 self._max_backoff = max_backoff 143 144 if secret is None: 145 self._auth = _Auth(_Environment.EnvFaunaSecret()) 146 else: 147 self._auth = _Auth(secret) 148 149 self._last_txn_ts = LastTxnTs() 150 151 self._query_tags = {} 152 if query_tags is not None: 153 self._query_tags.update(query_tags) 154 155 if query_timeout is not None: 156 self._query_timeout_ms = int(query_timeout.total_seconds() * 1000) 157 else: 158 self._query_timeout_ms = None 159 160 self._headers: Dict[str, str] = { 161 _Header.AcceptEncoding: "gzip", 162 _Header.ContentType: "application/json;charset=utf-8", 163 _Header.Driver: "python", 164 _Header.DriverEnv: str(_DriverEnvironment()), 165 } 166 167 if typecheck is not None: 168 self._headers[Header.Typecheck] = str(typecheck).lower() 169 170 if linearized is not None: 171 self._headers[Header.Linearized] = str(linearized).lower() 172 173 if max_contention_retries is not None and max_contention_retries > 0: 174 self._headers[Header.MaxContentionRetries] = \ 175 f"{max_contention_retries}" 176 177 if additional_headers is not None: 178 self._headers = { 179 **self._headers, 180 **additional_headers, 181 } 182 183 self._session: HTTPClient 184 185 if http_client is not None: 186 self._session = http_client 187 else: 188 if fauna.global_http_client is None: 189 timeout_s: Optional[float] = None 190 if query_timeout is not None and client_buffer_timeout is not None: 191 timeout_s = (query_timeout + client_buffer_timeout).total_seconds() 192 read_timeout_s: Optional[float] = None 193 if http_read_timeout is not None: 194 read_timeout_s = http_read_timeout.total_seconds() 195 196 write_timeout_s: Optional[float] = http_write_timeout.total_seconds( 197 ) if http_write_timeout is not None else None 198 connect_timeout_s: Optional[float] = http_connect_timeout.total_seconds( 199 ) if http_connect_timeout is not None else None 200 pool_timeout_s: Optional[float] = http_pool_timeout.total_seconds( 201 ) if http_pool_timeout is not None else None 202 idle_timeout_s: Optional[float] = http_idle_timeout.total_seconds( 203 ) if http_idle_timeout is not None else None 204 205 import httpx 206 from fauna.http.httpx_client import HTTPXClient 207 c = HTTPXClient( 208 httpx.Client( 209 http1=True, 210 http2=False, 211 timeout=httpx.Timeout( 212 timeout=timeout_s, 213 connect=connect_timeout_s, 214 read=read_timeout_s, 215 write=write_timeout_s, 216 pool=pool_timeout_s, 217 ), 218 limits=httpx.Limits( 219 max_connections=DefaultMaxConnections, 220 max_keepalive_connections=DefaultMaxIdleConnections, 221 keepalive_expiry=idle_timeout_s, 222 ), 223 ), logger) 224 fauna.global_http_client = c 225 226 self._session = fauna.global_http_client
Initializes a Client.
Parameters
- endpoint: The Fauna Endpoint to use. Defaults to https: //db.fauna.com, or the
FAUNA_ENDPOINT
env variable. - secret: The Fauna Secret to use. Defaults to empty, or the
FAUNA_SECRET
env variable. - http_client: An
HTTPClient
implementation. Defaults to a globalHTTPXClient
. - **query_tags: Tags to associate with the query. See logging
- linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
- max_contention_retries: The max number of times to retry the query if contention is encountered.
- typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration.
- additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary.
- query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is
DefaultQueryTimeout
. - client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is
DefaultClientBufferTimeout
, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error. - http_read_timeout: Set HTTP Read timeout, default is
DefaultHttpReadTimeout
. - http_write_timeout: Set HTTP Write timeout, default is
DefaultHttpWriteTimeout
. - http_connect_timeout: Set HTTP Connect timeout, default is
DefaultHttpConnectTimeout
. - http_pool_timeout: Set HTTP Pool timeout, default is
DefaultHttpPoolTimeout
. - http_idle_timeout: Set HTTP Idle timeout, default is
DefaultIdleConnectionTimeout
. - max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3.
- max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20.
233 def set_last_txn_ts(self, txn_ts: int): 234 """ 235 Set the last timestamp seen by this client. 236 This has no effect if earlier than stored timestamp. 237 238 .. WARNING:: This should be used only when coordinating timestamps across 239 multiple clients. Moving the timestamp arbitrarily forward into 240 the future will cause transactions to stall. 241 242 :param txn_ts: the new transaction time. 243 """ 244 self._last_txn_ts.update_txn_time(txn_ts)
Set the last timestamp seen by this client. This has no effect if earlier than stored timestamp.
.. WARNING:: This should be used only when coordinating timestamps across multiple clients. Moving the timestamp arbitrarily forward into the future will cause transactions to stall.
Parameters
- txn_ts: the new transaction time.
246 def get_last_txn_ts(self) -> Optional[int]: 247 """ 248 Get the last timestamp seen by this client. 249 :return: 250 """ 251 return self._last_txn_ts.time
Get the last timestamp seen by this client.
Returns
253 def get_query_timeout(self) -> Optional[timedelta]: 254 """ 255 Get the query timeout for all queries. 256 """ 257 if self._query_timeout_ms is not None: 258 return timedelta(milliseconds=self._query_timeout_ms) 259 else: 260 return None
Get the query timeout for all queries.
262 def paginate( 263 self, 264 fql: Query, 265 opts: Optional[QueryOptions] = None, 266 ) -> "QueryIterator": 267 """ 268 Run a query on Fauna and returning an iterator of results. If the query 269 returns a Page, the iterator will fetch additional Pages until the 270 after token is null. Each call for a page will be retried with exponential 271 backoff up to the max_attempts set in the client's retry policy in the 272 event of a 429 or 502. 273 274 :param fql: A Query 275 :param opts: (Optional) Query Options 276 277 :return: a :class:`QueryResponse` 278 279 :raises NetworkError: HTTP Request failed in transit 280 :raises ProtocolError: HTTP error not from Fauna 281 :raises ServiceError: Fauna returned an error 282 :raises ValueError: Encoding and decoding errors 283 :raises TypeError: Invalid param types 284 """ 285 286 if not isinstance(fql, Query): 287 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 288 f"Query by calling fauna.fql()" 289 raise TypeError(err_msg) 290 291 return QueryIterator(self, fql, opts)
Run a query on Fauna and returning an iterator of results. If the query returns a Page, the iterator will fetch additional Pages until the after token is null. Each call for a page will be retried with exponential backoff up to the max_attempts set in the client's retry policy in the event of a 429 or 502.
Parameters
- fql: A Query
- opts: (Optional) Query Options
Returns
a
QueryResponse
Raises
- NetworkError: HTTP Request failed in transit
- ProtocolError: HTTP error not from Fauna
- ServiceError: Fauna returned an error
- ValueError: Encoding and decoding errors
- TypeError: Invalid param types
293 def query( 294 self, 295 fql: Query, 296 opts: Optional[QueryOptions] = None, 297 ) -> QuerySuccess: 298 """ 299 Run a query on Fauna. A query will be retried max_attempt times with exponential backoff 300 up to the max_backoff in the event of a 429. 301 302 :param fql: A Query 303 :param opts: (Optional) Query Options 304 305 :return: a :class:`QueryResponse` 306 307 :raises NetworkError: HTTP Request failed in transit 308 :raises ProtocolError: HTTP error not from Fauna 309 :raises ServiceError: Fauna returned an error 310 :raises ValueError: Encoding and decoding errors 311 :raises TypeError: Invalid param types 312 """ 313 314 if not isinstance(fql, Query): 315 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 316 f"Query by calling fauna.fql()" 317 raise TypeError(err_msg) 318 319 try: 320 encoded_query: Mapping[str, Any] = FaunaEncoder.encode(fql) 321 except Exception as e: 322 raise ClientError("Failed to encode Query") from e 323 324 retryable = Retryable[QuerySuccess]( 325 self._max_attempts, 326 self._max_backoff, 327 self._query, 328 "/query/1", 329 fql=encoded_query, 330 opts=opts, 331 ) 332 333 r = retryable.run() 334 r.response.stats.attempts = r.attempts 335 return r.response
Run a query on Fauna. A query will be retried max_attempt times with exponential backoff up to the max_backoff in the event of a 429.
Parameters
- fql: A Query
- opts: (Optional) Query Options
Returns
a
QueryResponse
Raises
- NetworkError: HTTP Request failed in transit
- ProtocolError: HTTP error not from Fauna
- ServiceError: Fauna returned an error
- ValueError: Encoding and decoding errors
- TypeError: Invalid param types
424 def stream( 425 self, 426 fql: Union[EventSource, Query], 427 opts: StreamOptions = StreamOptions() 428 ) -> "StreamIterator": 429 """ 430 Opens a Stream in Fauna and returns an iterator that consume Fauna events. 431 432 :param fql: An EventSource or a Query that returns an EventSource. 433 :param opts: (Optional) Stream Options. 434 435 :return: a :class:`StreamIterator` 436 437 :raises ClientError: Invalid options provided 438 :raises NetworkError: HTTP Request failed in transit 439 :raises ProtocolError: HTTP error not from Fauna 440 :raises ServiceError: Fauna returned an error 441 :raises ValueError: Encoding and decoding errors 442 :raises TypeError: Invalid param types 443 """ 444 445 if isinstance(fql, Query): 446 if opts.cursor is not None: 447 raise ClientError( 448 "The 'cursor' configuration can only be used with an event source.") 449 450 source = self.query(fql).data 451 else: 452 source = fql 453 454 if not isinstance(source, EventSource): 455 err_msg = f"'fql' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}." 456 raise TypeError(err_msg) 457 458 headers = self._headers.copy() 459 headers[_Header.Format] = "tagged" 460 headers[_Header.Authorization] = self._auth.bearer() 461 462 return StreamIterator(self._session, headers, self._endpoint + "/stream/1", 463 self._max_attempts, self._max_backoff, opts, source)
Opens a Stream in Fauna and returns an iterator that consume Fauna events.
Parameters
- fql: An EventSource or a Query that returns an EventSource.
- opts: (Optional) Stream Options.
Returns
Raises
- ClientError: Invalid options provided
- NetworkError: HTTP Request failed in transit
- ProtocolError: HTTP error not from Fauna
- ServiceError: Fauna returned an error
- ValueError: Encoding and decoding errors
- TypeError: Invalid param types
465 def feed( 466 self, 467 source: Union[EventSource, Query], 468 opts: FeedOptions = FeedOptions(), 469 ) -> "FeedIterator": 470 """ 471 Opens an Event Feed in Fauna and returns an iterator that consume Fauna events. 472 473 :param source: An EventSource or a Query that returns an EventSource. 474 :param opts: (Optional) Event Feed options. 475 476 :return: a :class:`FeedIterator` 477 478 :raises ClientError: Invalid options provided 479 :raises NetworkError: HTTP Request failed in transit 480 :raises ProtocolError: HTTP error not from Fauna 481 :raises ServiceError: Fauna returned an error 482 :raises ValueError: Encoding and decoding errors 483 :raises TypeError: Invalid param types 484 """ 485 486 if isinstance(source, Query): 487 source = self.query(source).data 488 489 if not isinstance(source, EventSource): 490 err_msg = f"'source' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}." 491 raise TypeError(err_msg) 492 493 headers = self._headers.copy() 494 headers[_Header.Format] = "tagged" 495 headers[_Header.Authorization] = self._auth.bearer() 496 497 if opts.query_timeout is not None: 498 query_timeout_ms = int(opts.query_timeout.total_seconds() * 1000) 499 headers[Header.QueryTimeoutMs] = str(query_timeout_ms) 500 elif self._query_timeout_ms is not None: 501 headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms) 502 503 return FeedIterator(self._session, headers, self._endpoint + "/feed/1", 504 self._max_attempts, self._max_backoff, opts, source)
Opens an Event Feed in Fauna and returns an iterator that consume Fauna events.
Parameters
- source: An EventSource or a Query that returns an EventSource.
- opts: (Optional) Event Feed options.
Returns
Raises
- ClientError: Invalid options provided
- NetworkError: HTTP Request failed in transit
- ProtocolError: HTTP error not from Fauna
- ServiceError: Fauna returned an error
- ValueError: Encoding and decoding errors
- TypeError: Invalid param types
539class StreamIterator: 540 """A class that mixes a ContextManager and an Iterator so we can detected retryable errors.""" 541 542 def __init__(self, http_client: HTTPClient, headers: Dict[str, str], 543 endpoint: str, max_attempts: int, max_backoff: int, 544 opts: StreamOptions, source: EventSource): 545 self._http_client = http_client 546 self._headers = headers 547 self._endpoint = endpoint 548 self._max_attempts = max_attempts 549 self._max_backoff = max_backoff 550 self._opts = opts 551 self._source = source 552 self._stream = None 553 self.last_ts = None 554 self.last_cursor = None 555 self._ctx = self._create_stream() 556 557 if opts.start_ts is not None and opts.cursor is not None: 558 err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the StreamOptions." 559 raise TypeError(err_msg) 560 561 def __enter__(self): 562 return self 563 564 def __exit__(self, exc_type, exc_value, exc_traceback): 565 if self._stream is not None: 566 self._stream.close() 567 568 self._ctx.__exit__(exc_type, exc_value, exc_traceback) 569 return False 570 571 def __iter__(self): 572 return self 573 574 def __next__(self): 575 if self._opts.max_attempts is not None: 576 max_attempts = self._opts.max_attempts 577 else: 578 max_attempts = self._max_attempts 579 580 if self._opts.max_backoff is not None: 581 max_backoff = self._opts.max_backoff 582 else: 583 max_backoff = self._max_backoff 584 585 retryable = Retryable[Any](max_attempts, max_backoff, self._next_element) 586 return retryable.run().response 587 588 def _next_element(self): 589 try: 590 if self._stream is None: 591 try: 592 self._stream = self._ctx.__enter__() 593 except Exception: 594 self._retry_stream() 595 596 if self._stream is not None: 597 event: Any = FaunaDecoder.decode(next(self._stream)) 598 599 if event["type"] == "error": 600 FaunaError.parse_error_and_throw(event, 400) 601 602 self.last_ts = event["txn_ts"] 603 self.last_cursor = event.get('cursor') 604 605 if event["type"] == "start": 606 return self._next_element() 607 608 if not self._opts.status_events and event["type"] == "status": 609 return self._next_element() 610 611 return event 612 613 raise StopIteration 614 except NetworkError: 615 self._retry_stream() 616 617 def _retry_stream(self): 618 if self._stream is not None: 619 self._stream.close() 620 621 self._stream = None 622 623 try: 624 self._ctx = self._create_stream() 625 except Exception: 626 pass 627 raise RetryableFaunaException 628 629 def _create_stream(self): 630 data: Dict[str, Any] = {"token": self._source.token} 631 if self.last_cursor is not None: 632 data["cursor"] = self.last_cursor 633 elif self._opts.cursor is not None: 634 data["cursor"] = self._opts.cursor 635 elif self._opts.start_ts is not None: 636 data["start_ts"] = self._opts.start_ts 637 638 return self._http_client.stream( 639 url=self._endpoint, headers=self._headers, data=data) 640 641 def close(self): 642 if self._stream is not None: 643 self._stream.close()
A class that mixes a ContextManager and an Iterator so we can detected retryable errors.
542 def __init__(self, http_client: HTTPClient, headers: Dict[str, str], 543 endpoint: str, max_attempts: int, max_backoff: int, 544 opts: StreamOptions, source: EventSource): 545 self._http_client = http_client 546 self._headers = headers 547 self._endpoint = endpoint 548 self._max_attempts = max_attempts 549 self._max_backoff = max_backoff 550 self._opts = opts 551 self._source = source 552 self._stream = None 553 self.last_ts = None 554 self.last_cursor = None 555 self._ctx = self._create_stream() 556 557 if opts.start_ts is not None and opts.cursor is not None: 558 err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the StreamOptions." 559 raise TypeError(err_msg)
646class FeedPage: 647 648 def __init__(self, events: List[Any], cursor: str, stats: QueryStats): 649 self._events = events 650 self.cursor = cursor 651 self.stats = stats 652 653 def __len__(self): 654 return len(self._events) 655 656 def __iter__(self) -> Iterator[Any]: 657 for event in self._events: 658 if event["type"] == "error": 659 FaunaError.parse_error_and_throw(event, 400) 660 yield event
663class FeedIterator: 664 """A class to provide an iterator on top of Event Feed pages.""" 665 666 def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str, 667 max_attempts: int, max_backoff: int, opts: FeedOptions, 668 source: EventSource): 669 self._http = http 670 self._headers = headers 671 self._endpoint = endpoint 672 self._max_attempts = opts.max_attempts or max_attempts 673 self._max_backoff = opts.max_backoff or max_backoff 674 self._request: Dict[str, Any] = {"token": source.token} 675 self._is_done = False 676 677 if opts.start_ts is not None and opts.cursor is not None: 678 err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the FeedOptions." 679 raise TypeError(err_msg) 680 681 if opts.page_size is not None: 682 self._request["page_size"] = opts.page_size 683 684 if opts.cursor is not None: 685 self._request["cursor"] = opts.cursor 686 elif opts.start_ts is not None: 687 self._request["start_ts"] = opts.start_ts 688 689 def __iter__(self) -> Iterator[FeedPage]: 690 self._is_done = False 691 return self 692 693 def __next__(self) -> FeedPage: 694 if self._is_done: 695 raise StopIteration 696 697 retryable = Retryable[Any](self._max_attempts, self._max_backoff, 698 self._next_page) 699 return retryable.run().response 700 701 def _next_page(self) -> FeedPage: 702 with self._http.request( 703 method="POST", 704 url=self._endpoint, 705 headers=self._headers, 706 data=self._request, 707 ) as response: 708 status_code = response.status_code() 709 decoded: Any = FaunaDecoder.decode(response.json()) 710 711 if status_code > 399: 712 FaunaError.parse_error_and_throw(decoded, status_code) 713 714 self._is_done = not decoded["has_next"] 715 self._request["cursor"] = decoded["cursor"] 716 717 if "start_ts" in self._request: 718 del self._request["start_ts"] 719 720 return FeedPage(decoded["events"], decoded["cursor"], 721 QueryStats(decoded["stats"])) 722 723 def flatten(self) -> Iterator: 724 """A generator that yields events instead of pages of events.""" 725 for page in self: 726 for event in page: 727 yield event
A class to provide an iterator on top of Event Feed pages.
666 def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str, 667 max_attempts: int, max_backoff: int, opts: FeedOptions, 668 source: EventSource): 669 self._http = http 670 self._headers = headers 671 self._endpoint = endpoint 672 self._max_attempts = opts.max_attempts or max_attempts 673 self._max_backoff = opts.max_backoff or max_backoff 674 self._request: Dict[str, Any] = {"token": source.token} 675 self._is_done = False 676 677 if opts.start_ts is not None and opts.cursor is not None: 678 err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the FeedOptions." 679 raise TypeError(err_msg) 680 681 if opts.page_size is not None: 682 self._request["page_size"] = opts.page_size 683 684 if opts.cursor is not None: 685 self._request["cursor"] = opts.cursor 686 elif opts.start_ts is not None: 687 self._request["start_ts"] = opts.start_ts
730class QueryIterator: 731 """A class to provider an iterator on top of Fauna queries.""" 732 733 def __init__(self, 734 client: Client, 735 fql: Query, 736 opts: Optional[QueryOptions] = None): 737 """Initializes the QueryIterator 738 739 :param fql: A Query 740 :param opts: (Optional) Query Options 741 742 :raises TypeError: Invalid param types 743 """ 744 if not isinstance(client, Client): 745 err_msg = f"'client' must be a Client but was a {type(client)}. You can build a " \ 746 f"Client by calling fauna.client.Client()" 747 raise TypeError(err_msg) 748 749 if not isinstance(fql, Query): 750 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 751 f"Query by calling fauna.fql()" 752 raise TypeError(err_msg) 753 754 self.client = client 755 self.fql = fql 756 self.opts = opts 757 758 def __iter__(self) -> Iterator: 759 return self.iter() 760 761 def iter(self) -> Iterator: 762 """ 763 A generator function that immediately fetches and yields the results of 764 the stored query. Yields additional pages on subsequent iterations if 765 they exist 766 """ 767 768 cursor = None 769 initial_response = self.client.query(self.fql, self.opts) 770 771 if isinstance(initial_response.data, Page): 772 cursor = initial_response.data.after 773 yield initial_response.data.data 774 775 while cursor is not None: 776 next_response = self.client.query( 777 fql("Set.paginate(${after})", after=cursor), self.opts) 778 # TODO: `Set.paginate` does not yet return a `@set` tagged value 779 # so we will get back a plain object that might not have 780 # an after property. 781 cursor = next_response.data.get("after") 782 yield next_response.data.get("data") 783 784 else: 785 yield [initial_response.data] 786 787 def flatten(self) -> Iterator: 788 """ 789 A generator function that immediately fetches and yields the results of 790 the stored query. Yields each item individually, rather than a whole 791 Page at a time. Fetches additional pages as required if they exist. 792 """ 793 794 for page in self.iter(): 795 for item in page: 796 yield item
A class to provider an iterator on top of Fauna queries.
733 def __init__(self, 734 client: Client, 735 fql: Query, 736 opts: Optional[QueryOptions] = None): 737 """Initializes the QueryIterator 738 739 :param fql: A Query 740 :param opts: (Optional) Query Options 741 742 :raises TypeError: Invalid param types 743 """ 744 if not isinstance(client, Client): 745 err_msg = f"'client' must be a Client but was a {type(client)}. You can build a " \ 746 f"Client by calling fauna.client.Client()" 747 raise TypeError(err_msg) 748 749 if not isinstance(fql, Query): 750 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 751 f"Query by calling fauna.fql()" 752 raise TypeError(err_msg) 753 754 self.client = client 755 self.fql = fql 756 self.opts = opts
Initializes the QueryIterator
Parameters
- fql: A Query
- opts: (Optional) Query Options
Raises
- TypeError: Invalid param types
761 def iter(self) -> Iterator: 762 """ 763 A generator function that immediately fetches and yields the results of 764 the stored query. Yields additional pages on subsequent iterations if 765 they exist 766 """ 767 768 cursor = None 769 initial_response = self.client.query(self.fql, self.opts) 770 771 if isinstance(initial_response.data, Page): 772 cursor = initial_response.data.after 773 yield initial_response.data.data 774 775 while cursor is not None: 776 next_response = self.client.query( 777 fql("Set.paginate(${after})", after=cursor), self.opts) 778 # TODO: `Set.paginate` does not yet return a `@set` tagged value 779 # so we will get back a plain object that might not have 780 # an after property. 781 cursor = next_response.data.get("after") 782 yield next_response.data.get("data") 783 784 else: 785 yield [initial_response.data]
A generator function that immediately fetches and yields the results of the stored query. Yields additional pages on subsequent iterations if they exist
787 def flatten(self) -> Iterator: 788 """ 789 A generator function that immediately fetches and yields the results of 790 the stored query. Yields each item individually, rather than a whole 791 Page at a time. Fetches additional pages as required if they exist. 792 """ 793 794 for page in self.iter(): 795 for item in page: 796 yield item
A generator function that immediately fetches and yields the results of the stored query. Yields each item individually, rather than a whole Page at a time. Fetches additional pages as required if they exist.