fauna.client.client
1from dataclasses import dataclass 2from datetime import timedelta 3from typing import Any, Dict, Iterator, Mapping, Optional, Union 4 5import fauna 6from fauna.client.headers import _DriverEnvironment, _Header, _Auth, Header 7from fauna.client.retryable import Retryable 8from fauna.client.utils import _Environment, LastTxnTs 9from fauna.encoding import FaunaEncoder, FaunaDecoder 10from fauna.encoding import QuerySuccess, QueryTags, QueryStats 11from fauna.errors import FaunaError, ClientError, ProtocolError, \ 12 RetryableFaunaException, NetworkError 13from fauna.http.http_client import HTTPClient 14from fauna.query import Query, Page, fql 15from fauna.query.models import StreamToken 16 17DefaultHttpConnectTimeout = timedelta(seconds=5) 18DefaultHttpReadTimeout: Optional[timedelta] = None 19DefaultHttpWriteTimeout = timedelta(seconds=5) 20DefaultHttpPoolTimeout = timedelta(seconds=5) 21DefaultIdleConnectionTimeout = timedelta(seconds=5) 22DefaultQueryTimeout = timedelta(seconds=5) 23DefaultClientBufferTimeout = timedelta(seconds=5) 24DefaultMaxConnections = 20 25DefaultMaxIdleConnections = 20 26 27 28@dataclass 29class QueryOptions: 30 """ 31 A dataclass representing options available for a query. 32 33 * linearized - If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized. 34 * max_contention_retries - The max number of times to retry the query if contention is encountered. 35 * query_timeout - Controls the maximum amount of time Fauna will execute your query before marking it failed. 36 * query_tags - Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ 37 * traceparent - A traceparent to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ Must match format: https://www.w3.org/TR/trace-context/#traceparent-header 38 * typecheck - Enable or disable typechecking of the query before evaluation. If not set, the value configured on the Client will be used. If neither is set, Fauna will use the value of the "typechecked" flag on the database configuration. 39 * additional_headers - Add/update HTTP request headers for the query. In general, this should not be necessary. 40 """ 41 42 linearized: Optional[bool] = None 43 max_contention_retries: Optional[int] = None 44 query_timeout: Optional[timedelta] = DefaultQueryTimeout 45 query_tags: Optional[Mapping[str, str]] = None 46 traceparent: Optional[str] = None 47 typecheck: Optional[bool] = None 48 additional_headers: Optional[Dict[str, str]] = None 49 50 51@dataclass 52class StreamOptions: 53 """ 54 A dataclass representing options available for a stream. 55 56 * max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown. 57 * max_backoff - The maximum backoff in seconds for an individual retry. 58 * start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after 59 the timestamp. 60 * status_events - Indicates if stream should include status events. Status events are periodic events that 61 update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics 62 about the cost of maintaining the stream other than the cost of the received events. 63 """ 64 65 max_attempts: Optional[int] = None 66 max_backoff: Optional[int] = None 67 start_ts: Optional[int] = None 68 status_events: bool = False 69 70 71class Client: 72 73 def __init__( 74 self, 75 endpoint: Optional[str] = None, 76 secret: Optional[str] = None, 77 http_client: Optional[HTTPClient] = None, 78 query_tags: Optional[Mapping[str, str]] = None, 79 linearized: Optional[bool] = None, 80 max_contention_retries: Optional[int] = None, 81 typecheck: Optional[bool] = None, 82 additional_headers: Optional[Dict[str, str]] = None, 83 query_timeout: Optional[timedelta] = DefaultQueryTimeout, 84 client_buffer_timeout: Optional[timedelta] = DefaultClientBufferTimeout, 85 http_read_timeout: Optional[timedelta] = DefaultHttpReadTimeout, 86 http_write_timeout: Optional[timedelta] = DefaultHttpWriteTimeout, 87 http_connect_timeout: Optional[timedelta] = DefaultHttpConnectTimeout, 88 http_pool_timeout: Optional[timedelta] = DefaultHttpPoolTimeout, 89 http_idle_timeout: Optional[timedelta] = DefaultIdleConnectionTimeout, 90 max_attempts: int = 3, 91 max_backoff: int = 20, 92 ): 93 """Initializes a Client. 94 95 :param endpoint: The Fauna Endpoint to use. Defaults to https://db.fauna.com, or the `FAUNA_ENDPOINT` env variable. 96 :param secret: The Fauna Secret to use. Defaults to empty, or the `FAUNA_SECRET` env variable. 97 :param http_client: An :class:`HTTPClient` implementation. Defaults to a global :class:`HTTPXClient`. 98 :param query_tags: Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ 99 :param linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized. 100 :param max_contention_retries: The max number of times to retry the query if contention is encountered. 101 :param typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration. 102 :param additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary. 103 :param query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is :py:data:`DefaultQueryTimeout`. 104 :param client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is :py:data:`DefaultClientBufferTimeout`, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error. 105 :param http_read_timeout: Set HTTP Read timeout, default is :py:data:`DefaultHttpReadTimeout`. 106 :param http_write_timeout: Set HTTP Write timeout, default is :py:data:`DefaultHttpWriteTimeout`. 107 :param http_connect_timeout: Set HTTP Connect timeout, default is :py:data:`DefaultHttpConnectTimeout`. 108 :param http_pool_timeout: Set HTTP Pool timeout, default is :py:data:`DefaultHttpPoolTimeout`. 109 :param http_idle_timeout: Set HTTP Idle timeout, default is :py:data:`DefaultIdleConnectionTimeout`. 110 :param max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3. 111 :param max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20. 112 """ 113 114 self._set_endpoint(endpoint) 115 self._max_attempts = max_attempts 116 self._max_backoff = max_backoff 117 118 if secret is None: 119 self._auth = _Auth(_Environment.EnvFaunaSecret()) 120 else: 121 self._auth = _Auth(secret) 122 123 self._last_txn_ts = LastTxnTs() 124 125 self._query_tags = {} 126 if query_tags is not None: 127 self._query_tags.update(query_tags) 128 129 if query_timeout is not None: 130 self._query_timeout_ms = int(query_timeout.total_seconds() * 1000) 131 else: 132 self._query_timeout_ms = None 133 134 self._headers: Dict[str, str] = { 135 _Header.AcceptEncoding: "gzip", 136 _Header.ContentType: "application/json;charset=utf-8", 137 _Header.Driver: "python", 138 _Header.DriverEnv: str(_DriverEnvironment()), 139 } 140 141 if typecheck is not None: 142 self._headers[Header.Typecheck] = str(typecheck).lower() 143 144 if linearized is not None: 145 self._headers[Header.Linearized] = str(linearized).lower() 146 147 if max_contention_retries is not None and max_contention_retries > 0: 148 self._headers[Header.MaxContentionRetries] = \ 149 f"{max_contention_retries}" 150 151 if additional_headers is not None: 152 self._headers = { 153 **self._headers, 154 **additional_headers, 155 } 156 157 self._session: HTTPClient 158 159 if http_client is not None: 160 self._session = http_client 161 else: 162 if fauna.global_http_client is None: 163 timeout_s: Optional[float] = None 164 if query_timeout is not None and client_buffer_timeout is not None: 165 timeout_s = (query_timeout + client_buffer_timeout).total_seconds() 166 read_timeout_s: Optional[float] = None 167 if http_read_timeout is not None: 168 read_timeout_s = http_read_timeout.total_seconds() 169 170 write_timeout_s: Optional[float] = http_write_timeout.total_seconds( 171 ) if http_write_timeout is not None else None 172 connect_timeout_s: Optional[float] = http_connect_timeout.total_seconds( 173 ) if http_connect_timeout is not None else None 174 pool_timeout_s: Optional[float] = http_pool_timeout.total_seconds( 175 ) if http_pool_timeout is not None else None 176 idle_timeout_s: Optional[float] = http_idle_timeout.total_seconds( 177 ) if http_idle_timeout is not None else None 178 179 import httpx 180 from fauna.http.httpx_client import HTTPXClient 181 c = HTTPXClient( 182 httpx.Client( 183 http1=True, 184 http2=False, 185 timeout=httpx.Timeout( 186 timeout=timeout_s, 187 connect=connect_timeout_s, 188 read=read_timeout_s, 189 write=write_timeout_s, 190 pool=pool_timeout_s, 191 ), 192 limits=httpx.Limits( 193 max_connections=DefaultMaxConnections, 194 max_keepalive_connections=DefaultMaxIdleConnections, 195 keepalive_expiry=idle_timeout_s, 196 ), 197 )) 198 fauna.global_http_client = c 199 200 self._session = fauna.global_http_client 201 202 def close(self): 203 self._session.close() 204 if self._session == fauna.global_http_client: 205 fauna.global_http_client = None 206 207 def set_last_txn_ts(self, txn_ts: int): 208 """ 209 Set the last timestamp seen by this client. 210 This has no effect if earlier than stored timestamp. 211 212 .. WARNING:: This should be used only when coordinating timestamps across 213 multiple clients. Moving the timestamp arbitrarily forward into 214 the future will cause transactions to stall. 215 216 :param txn_ts: the new transaction time. 217 """ 218 self._last_txn_ts.update_txn_time(txn_ts) 219 220 def get_last_txn_ts(self) -> Optional[int]: 221 """ 222 Get the last timestamp seen by this client. 223 :return: 224 """ 225 return self._last_txn_ts.time 226 227 def get_query_timeout(self) -> Optional[timedelta]: 228 """ 229 Get the query timeout for all queries. 230 """ 231 if self._query_timeout_ms is not None: 232 return timedelta(milliseconds=self._query_timeout_ms) 233 else: 234 return None 235 236 def paginate( 237 self, 238 fql: Query, 239 opts: Optional[QueryOptions] = None, 240 ) -> "QueryIterator": 241 """ 242 Run a query on Fauna and returning an iterator of results. If the query 243 returns a Page, the iterator will fetch additional Pages until the 244 after token is null. Each call for a page will be retried with exponential 245 backoff up to the max_attempts set in the client's retry policy in the 246 event of a 429 or 502. 247 248 :param fql: A Query 249 :param opts: (Optional) Query Options 250 251 :return: a :class:`QueryResponse` 252 253 :raises NetworkError: HTTP Request failed in transit 254 :raises ProtocolError: HTTP error not from Fauna 255 :raises ServiceError: Fauna returned an error 256 :raises ValueError: Encoding and decoding errors 257 :raises TypeError: Invalid param types 258 """ 259 260 if not isinstance(fql, Query): 261 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 262 f"Query by calling fauna.fql()" 263 raise TypeError(err_msg) 264 265 return QueryIterator(self, fql, opts) 266 267 def query( 268 self, 269 fql: Query, 270 opts: Optional[QueryOptions] = None, 271 ) -> QuerySuccess: 272 """ 273 Run a query on Fauna. A query will be retried max_attempt times with exponential backoff 274 up to the max_backoff in the event of a 429. 275 276 :param fql: A Query 277 :param opts: (Optional) Query Options 278 279 :return: a :class:`QueryResponse` 280 281 :raises NetworkError: HTTP Request failed in transit 282 :raises ProtocolError: HTTP error not from Fauna 283 :raises ServiceError: Fauna returned an error 284 :raises ValueError: Encoding and decoding errors 285 :raises TypeError: Invalid param types 286 """ 287 288 if not isinstance(fql, Query): 289 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 290 f"Query by calling fauna.fql()" 291 raise TypeError(err_msg) 292 293 try: 294 encoded_query: Mapping[str, Any] = FaunaEncoder.encode(fql) 295 except Exception as e: 296 raise ClientError("Failed to encode Query") from e 297 298 retryable = Retryable[QuerySuccess]( 299 self._max_attempts, 300 self._max_backoff, 301 self._query, 302 "/query/1", 303 fql=encoded_query, 304 opts=opts, 305 ) 306 307 r = retryable.run() 308 r.response.stats.attempts = r.attempts 309 return r.response 310 311 def _query( 312 self, 313 path: str, 314 fql: Mapping[str, Any], 315 arguments: Optional[Mapping[str, Any]] = None, 316 opts: Optional[QueryOptions] = None, 317 ) -> QuerySuccess: 318 319 headers = self._headers.copy() 320 headers[_Header.Format] = "tagged" 321 headers[_Header.Authorization] = self._auth.bearer() 322 323 if self._query_timeout_ms is not None: 324 headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms) 325 326 headers.update(self._last_txn_ts.request_header) 327 328 query_tags = {} 329 if self._query_tags is not None: 330 query_tags.update(self._query_tags) 331 332 if opts is not None: 333 if opts.linearized is not None: 334 headers[Header.Linearized] = str(opts.linearized).lower() 335 if opts.max_contention_retries is not None: 336 headers[Header.MaxContentionRetries] = \ 337 f"{opts.max_contention_retries}" 338 if opts.traceparent is not None: 339 headers[Header.Traceparent] = opts.traceparent 340 if opts.query_timeout is not None: 341 timeout_ms = f"{int(opts.query_timeout.total_seconds() * 1000)}" 342 headers[Header.QueryTimeoutMs] = timeout_ms 343 if opts.query_tags is not None: 344 query_tags.update(opts.query_tags) 345 if opts.typecheck is not None: 346 headers[Header.Typecheck] = str(opts.typecheck).lower() 347 if opts.additional_headers is not None: 348 headers.update(opts.additional_headers) 349 350 if len(query_tags) > 0: 351 headers[Header.Tags] = QueryTags.encode(query_tags) 352 353 data: dict[str, Any] = { 354 "query": fql, 355 "arguments": arguments or {}, 356 } 357 358 with self._session.request( 359 method="POST", 360 url=self._endpoint + path, 361 headers=headers, 362 data=data, 363 ) as response: 364 status_code = response.status_code() 365 response_json = response.json() 366 headers = response.headers() 367 368 self._check_protocol(response_json, status_code) 369 370 dec: Any = FaunaDecoder.decode(response_json) 371 372 if status_code > 399: 373 FaunaError.parse_error_and_throw(dec, status_code) 374 375 if "txn_ts" in dec: 376 self.set_last_txn_ts(int(response_json["txn_ts"])) 377 378 stats = QueryStats(dec["stats"]) if "stats" in dec else None 379 summary = dec["summary"] if "summary" in dec else None 380 query_tags = QueryTags.decode( 381 dec["query_tags"]) if "query_tags" in dec else None 382 txn_ts = dec["txn_ts"] if "txn_ts" in dec else None 383 schema_version = dec["schema_version"] if "schema_version" in dec else None 384 traceparent = headers.get("traceparent", None) 385 static_type = dec["static_type"] if "static_type" in dec else None 386 387 return QuerySuccess( 388 data=dec["data"], 389 query_tags=query_tags, 390 static_type=static_type, 391 stats=stats, 392 summary=summary, 393 traceparent=traceparent, 394 txn_ts=txn_ts, 395 schema_version=schema_version, 396 ) 397 398 def stream( 399 self, 400 fql: Union[StreamToken, Query], 401 opts: StreamOptions = StreamOptions() 402 ) -> "StreamIterator": 403 """ 404 Opens a Stream in Fauna and returns an iterator that consume Fauna events. 405 406 :param fql: A Query that returns a StreamToken or a StreamToken. 407 :param opts: (Optional) Stream Options. 408 409 :return: a :class:`StreamIterator` 410 411 :raises NetworkError: HTTP Request failed in transit 412 :raises ProtocolError: HTTP error not from Fauna 413 :raises ServiceError: Fauna returned an error 414 :raises ValueError: Encoding and decoding errors 415 :raises TypeError: Invalid param types 416 """ 417 418 if isinstance(fql, Query): 419 token = self.query(fql).data 420 else: 421 token = fql 422 423 if not isinstance(token, StreamToken): 424 err_msg = f"'fql' must be a StreamToken, or a Query that returns a StreamToken but was a {type(token)}." 425 raise TypeError(err_msg) 426 427 headers = self._headers.copy() 428 headers[_Header.Format] = "tagged" 429 headers[_Header.Authorization] = self._auth.bearer() 430 431 return StreamIterator(self._session, headers, self._endpoint + "/stream/1", 432 self._max_attempts, self._max_backoff, opts, token) 433 434 def _check_protocol(self, response_json: Any, status_code): 435 # TODO: Logic to validate wire protocol belongs elsewhere. 436 should_raise = False 437 438 # check for QuerySuccess 439 if status_code <= 399 and "data" not in response_json: 440 should_raise = True 441 442 # check for QueryFailure 443 if status_code > 399: 444 if "error" not in response_json: 445 should_raise = True 446 else: 447 e = response_json["error"] 448 if "code" not in e or "message" not in e: 449 should_raise = True 450 451 if should_raise: 452 raise ProtocolError( 453 status_code, 454 f"Response is in an unknown format: \n{response_json}", 455 ) 456 457 def _set_endpoint(self, endpoint): 458 if endpoint is None: 459 endpoint = _Environment.EnvFaunaEndpoint() 460 461 if endpoint[-1:] == "/": 462 endpoint = endpoint[:-1] 463 464 self._endpoint = endpoint 465 466 467class StreamIterator: 468 """A class that mixes a ContextManager and an Iterator so we can detected retryable errors.""" 469 470 def __init__(self, http_client: HTTPClient, headers: Dict[str, str], 471 endpoint: str, max_attempts: int, max_backoff: int, 472 opts: StreamOptions, token: StreamToken): 473 self._http_client = http_client 474 self._headers = headers 475 self._endpoint = endpoint 476 self._max_attempts = max_attempts 477 self._max_backoff = max_backoff 478 self._opts = opts 479 self._token = token 480 self._stream = None 481 self.last_ts = None 482 self._ctx = self._create_stream() 483 484 def __enter__(self): 485 return self 486 487 def __exit__(self, exc_type, exc_value, exc_traceback): 488 if self._stream is not None: 489 self._stream.close() 490 491 self._ctx.__exit__(exc_type, exc_value, exc_traceback) 492 return False 493 494 def __iter__(self): 495 return self 496 497 def __next__(self): 498 if self._opts.max_attempts is not None: 499 max_attempts = self._opts.max_attempts 500 else: 501 max_attempts = self._max_attempts 502 503 if self._opts.max_backoff is not None: 504 max_backoff = self._opts.max_backoff 505 else: 506 max_backoff = self._max_backoff 507 508 retryable = Retryable[Any](max_attempts, max_backoff, self._next_element) 509 return retryable.run().response 510 511 def _next_element(self): 512 try: 513 if self._stream is None: 514 try: 515 self._stream = self._ctx.__enter__() 516 except Exception: 517 self._retry_stream() 518 519 if self._stream is not None: 520 event: Any = FaunaDecoder.decode(next(self._stream)) 521 522 if event["type"] == "error": 523 FaunaError.parse_error_and_throw(event, 400) 524 525 self.last_ts = event["txn_ts"] 526 527 if event["type"] == "start": 528 return self._next_element() 529 530 if not self._opts.status_events and event["type"] == "status": 531 return self._next_element() 532 533 return event 534 535 raise StopIteration 536 except NetworkError: 537 self._retry_stream() 538 539 def _retry_stream(self): 540 if self._stream is not None: 541 self._stream.close() 542 543 self._stream = None 544 545 try: 546 self._ctx = self._create_stream() 547 except Exception: 548 pass 549 raise RetryableFaunaException 550 551 def _create_stream(self): 552 data: Dict[str, Any] = {"token": self._token.token} 553 if self.last_ts is not None: 554 data["start_ts"] = self.last_ts 555 elif self._opts.start_ts is not None: 556 data["start_ts"] = self._opts.start_ts 557 558 return self._http_client.stream( 559 url=self._endpoint, headers=self._headers, data=data) 560 561 def close(self): 562 if self._stream is not None: 563 self._stream.close() 564 565 566class QueryIterator: 567 """A class to provider an iterator on top of Fauna queries.""" 568 569 def __init__(self, 570 client: Client, 571 fql: Query, 572 opts: Optional[QueryOptions] = None): 573 """Initializes the QueryIterator 574 575 :param fql: A Query 576 :param opts: (Optional) Query Options 577 578 :raises TypeError: Invalid param types 579 """ 580 if not isinstance(client, Client): 581 err_msg = f"'client' must be a Client but was a {type(client)}. You can build a " \ 582 f"Client by calling fauna.client.Client()" 583 raise TypeError(err_msg) 584 585 if not isinstance(fql, Query): 586 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 587 f"Query by calling fauna.fql()" 588 raise TypeError(err_msg) 589 590 self.client = client 591 self.fql = fql 592 self.opts = opts 593 594 def __iter__(self) -> Iterator: 595 return self.iter() 596 597 def iter(self) -> Iterator: 598 """ 599 A generator function that immediately fetches and yields the results of 600 the stored query. Yields additional pages on subsequent iterations if 601 they exist 602 """ 603 604 cursor = None 605 initial_response = self.client.query(self.fql, self.opts) 606 607 if isinstance(initial_response.data, Page): 608 cursor = initial_response.data.after 609 yield initial_response.data.data 610 611 while cursor is not None: 612 next_response = self.client.query( 613 fql("Set.paginate(${after})", after=cursor), self.opts) 614 # TODO: `Set.paginate` does not yet return a `@set` tagged value 615 # so we will get back a plain object that might not have 616 # an after property. 617 cursor = next_response.data.get("after") 618 yield next_response.data.get("data") 619 620 else: 621 yield [initial_response.data] 622 623 def flatten(self) -> Iterator: 624 """ 625 A generator function that immediately fetches and yields the results of 626 the stored query. Yields each item individually, rather than a whole 627 Page at a time. Fetches additional pages as required if they exist. 628 """ 629 630 for page in self.iter(): 631 for item in page: 632 yield item
29@dataclass 30class QueryOptions: 31 """ 32 A dataclass representing options available for a query. 33 34 * linearized - If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized. 35 * max_contention_retries - The max number of times to retry the query if contention is encountered. 36 * query_timeout - Controls the maximum amount of time Fauna will execute your query before marking it failed. 37 * query_tags - Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ 38 * traceparent - A traceparent to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ Must match format: https://www.w3.org/TR/trace-context/#traceparent-header 39 * typecheck - Enable or disable typechecking of the query before evaluation. If not set, the value configured on the Client will be used. If neither is set, Fauna will use the value of the "typechecked" flag on the database configuration. 40 * additional_headers - Add/update HTTP request headers for the query. In general, this should not be necessary. 41 """ 42 43 linearized: Optional[bool] = None 44 max_contention_retries: Optional[int] = None 45 query_timeout: Optional[timedelta] = DefaultQueryTimeout 46 query_tags: Optional[Mapping[str, str]] = None 47 traceparent: Optional[str] = None 48 typecheck: Optional[bool] = None 49 additional_headers: Optional[Dict[str, str]] = None
A dataclass representing options available for a query.
- linearized - If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
- max_contention_retries - The max number of times to retry the query if contention is encountered.
- query_timeout - Controls the maximum amount of time Fauna will execute your query before marking it failed.
- query_tags - Tags to associate with the query. See logging
- traceparent - A traceparent to associate with the query. See logging Must match format: https://www.w3.org/TR/trace-context/#traceparent-header
- typecheck - Enable or disable typechecking of the query before evaluation. If not set, the value configured on the Client will be used. If neither is set, Fauna will use the value of the "typechecked" flag on the database configuration.
- additional_headers - Add/update HTTP request headers for the query. In general, this should not be necessary.
52@dataclass 53class StreamOptions: 54 """ 55 A dataclass representing options available for a stream. 56 57 * max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown. 58 * max_backoff - The maximum backoff in seconds for an individual retry. 59 * start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after 60 the timestamp. 61 * status_events - Indicates if stream should include status events. Status events are periodic events that 62 update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics 63 about the cost of maintaining the stream other than the cost of the received events. 64 """ 65 66 max_attempts: Optional[int] = None 67 max_backoff: Optional[int] = None 68 start_ts: Optional[int] = None 69 status_events: bool = False
A dataclass representing options available for a stream.
- max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown.
- max_backoff - The maximum backoff in seconds for an individual retry.
- start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after the timestamp.
- status_events - Indicates if stream should include status events. Status events are periodic events that update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics about the cost of maintaining the stream other than the cost of the received events.
72class Client: 73 74 def __init__( 75 self, 76 endpoint: Optional[str] = None, 77 secret: Optional[str] = None, 78 http_client: Optional[HTTPClient] = None, 79 query_tags: Optional[Mapping[str, str]] = None, 80 linearized: Optional[bool] = None, 81 max_contention_retries: Optional[int] = None, 82 typecheck: Optional[bool] = None, 83 additional_headers: Optional[Dict[str, str]] = None, 84 query_timeout: Optional[timedelta] = DefaultQueryTimeout, 85 client_buffer_timeout: Optional[timedelta] = DefaultClientBufferTimeout, 86 http_read_timeout: Optional[timedelta] = DefaultHttpReadTimeout, 87 http_write_timeout: Optional[timedelta] = DefaultHttpWriteTimeout, 88 http_connect_timeout: Optional[timedelta] = DefaultHttpConnectTimeout, 89 http_pool_timeout: Optional[timedelta] = DefaultHttpPoolTimeout, 90 http_idle_timeout: Optional[timedelta] = DefaultIdleConnectionTimeout, 91 max_attempts: int = 3, 92 max_backoff: int = 20, 93 ): 94 """Initializes a Client. 95 96 :param endpoint: The Fauna Endpoint to use. Defaults to https://db.fauna.com, or the `FAUNA_ENDPOINT` env variable. 97 :param secret: The Fauna Secret to use. Defaults to empty, or the `FAUNA_SECRET` env variable. 98 :param http_client: An :class:`HTTPClient` implementation. Defaults to a global :class:`HTTPXClient`. 99 :param query_tags: Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ 100 :param linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized. 101 :param max_contention_retries: The max number of times to retry the query if contention is encountered. 102 :param typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration. 103 :param additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary. 104 :param query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is :py:data:`DefaultQueryTimeout`. 105 :param client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is :py:data:`DefaultClientBufferTimeout`, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error. 106 :param http_read_timeout: Set HTTP Read timeout, default is :py:data:`DefaultHttpReadTimeout`. 107 :param http_write_timeout: Set HTTP Write timeout, default is :py:data:`DefaultHttpWriteTimeout`. 108 :param http_connect_timeout: Set HTTP Connect timeout, default is :py:data:`DefaultHttpConnectTimeout`. 109 :param http_pool_timeout: Set HTTP Pool timeout, default is :py:data:`DefaultHttpPoolTimeout`. 110 :param http_idle_timeout: Set HTTP Idle timeout, default is :py:data:`DefaultIdleConnectionTimeout`. 111 :param max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3. 112 :param max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20. 113 """ 114 115 self._set_endpoint(endpoint) 116 self._max_attempts = max_attempts 117 self._max_backoff = max_backoff 118 119 if secret is None: 120 self._auth = _Auth(_Environment.EnvFaunaSecret()) 121 else: 122 self._auth = _Auth(secret) 123 124 self._last_txn_ts = LastTxnTs() 125 126 self._query_tags = {} 127 if query_tags is not None: 128 self._query_tags.update(query_tags) 129 130 if query_timeout is not None: 131 self._query_timeout_ms = int(query_timeout.total_seconds() * 1000) 132 else: 133 self._query_timeout_ms = None 134 135 self._headers: Dict[str, str] = { 136 _Header.AcceptEncoding: "gzip", 137 _Header.ContentType: "application/json;charset=utf-8", 138 _Header.Driver: "python", 139 _Header.DriverEnv: str(_DriverEnvironment()), 140 } 141 142 if typecheck is not None: 143 self._headers[Header.Typecheck] = str(typecheck).lower() 144 145 if linearized is not None: 146 self._headers[Header.Linearized] = str(linearized).lower() 147 148 if max_contention_retries is not None and max_contention_retries > 0: 149 self._headers[Header.MaxContentionRetries] = \ 150 f"{max_contention_retries}" 151 152 if additional_headers is not None: 153 self._headers = { 154 **self._headers, 155 **additional_headers, 156 } 157 158 self._session: HTTPClient 159 160 if http_client is not None: 161 self._session = http_client 162 else: 163 if fauna.global_http_client is None: 164 timeout_s: Optional[float] = None 165 if query_timeout is not None and client_buffer_timeout is not None: 166 timeout_s = (query_timeout + client_buffer_timeout).total_seconds() 167 read_timeout_s: Optional[float] = None 168 if http_read_timeout is not None: 169 read_timeout_s = http_read_timeout.total_seconds() 170 171 write_timeout_s: Optional[float] = http_write_timeout.total_seconds( 172 ) if http_write_timeout is not None else None 173 connect_timeout_s: Optional[float] = http_connect_timeout.total_seconds( 174 ) if http_connect_timeout is not None else None 175 pool_timeout_s: Optional[float] = http_pool_timeout.total_seconds( 176 ) if http_pool_timeout is not None else None 177 idle_timeout_s: Optional[float] = http_idle_timeout.total_seconds( 178 ) if http_idle_timeout is not None else None 179 180 import httpx 181 from fauna.http.httpx_client import HTTPXClient 182 c = HTTPXClient( 183 httpx.Client( 184 http1=True, 185 http2=False, 186 timeout=httpx.Timeout( 187 timeout=timeout_s, 188 connect=connect_timeout_s, 189 read=read_timeout_s, 190 write=write_timeout_s, 191 pool=pool_timeout_s, 192 ), 193 limits=httpx.Limits( 194 max_connections=DefaultMaxConnections, 195 max_keepalive_connections=DefaultMaxIdleConnections, 196 keepalive_expiry=idle_timeout_s, 197 ), 198 )) 199 fauna.global_http_client = c 200 201 self._session = fauna.global_http_client 202 203 def close(self): 204 self._session.close() 205 if self._session == fauna.global_http_client: 206 fauna.global_http_client = None 207 208 def set_last_txn_ts(self, txn_ts: int): 209 """ 210 Set the last timestamp seen by this client. 211 This has no effect if earlier than stored timestamp. 212 213 .. WARNING:: This should be used only when coordinating timestamps across 214 multiple clients. Moving the timestamp arbitrarily forward into 215 the future will cause transactions to stall. 216 217 :param txn_ts: the new transaction time. 218 """ 219 self._last_txn_ts.update_txn_time(txn_ts) 220 221 def get_last_txn_ts(self) -> Optional[int]: 222 """ 223 Get the last timestamp seen by this client. 224 :return: 225 """ 226 return self._last_txn_ts.time 227 228 def get_query_timeout(self) -> Optional[timedelta]: 229 """ 230 Get the query timeout for all queries. 231 """ 232 if self._query_timeout_ms is not None: 233 return timedelta(milliseconds=self._query_timeout_ms) 234 else: 235 return None 236 237 def paginate( 238 self, 239 fql: Query, 240 opts: Optional[QueryOptions] = None, 241 ) -> "QueryIterator": 242 """ 243 Run a query on Fauna and returning an iterator of results. If the query 244 returns a Page, the iterator will fetch additional Pages until the 245 after token is null. Each call for a page will be retried with exponential 246 backoff up to the max_attempts set in the client's retry policy in the 247 event of a 429 or 502. 248 249 :param fql: A Query 250 :param opts: (Optional) Query Options 251 252 :return: a :class:`QueryResponse` 253 254 :raises NetworkError: HTTP Request failed in transit 255 :raises ProtocolError: HTTP error not from Fauna 256 :raises ServiceError: Fauna returned an error 257 :raises ValueError: Encoding and decoding errors 258 :raises TypeError: Invalid param types 259 """ 260 261 if not isinstance(fql, Query): 262 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 263 f"Query by calling fauna.fql()" 264 raise TypeError(err_msg) 265 266 return QueryIterator(self, fql, opts) 267 268 def query( 269 self, 270 fql: Query, 271 opts: Optional[QueryOptions] = None, 272 ) -> QuerySuccess: 273 """ 274 Run a query on Fauna. A query will be retried max_attempt times with exponential backoff 275 up to the max_backoff in the event of a 429. 276 277 :param fql: A Query 278 :param opts: (Optional) Query Options 279 280 :return: a :class:`QueryResponse` 281 282 :raises NetworkError: HTTP Request failed in transit 283 :raises ProtocolError: HTTP error not from Fauna 284 :raises ServiceError: Fauna returned an error 285 :raises ValueError: Encoding and decoding errors 286 :raises TypeError: Invalid param types 287 """ 288 289 if not isinstance(fql, Query): 290 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 291 f"Query by calling fauna.fql()" 292 raise TypeError(err_msg) 293 294 try: 295 encoded_query: Mapping[str, Any] = FaunaEncoder.encode(fql) 296 except Exception as e: 297 raise ClientError("Failed to encode Query") from e 298 299 retryable = Retryable[QuerySuccess]( 300 self._max_attempts, 301 self._max_backoff, 302 self._query, 303 "/query/1", 304 fql=encoded_query, 305 opts=opts, 306 ) 307 308 r = retryable.run() 309 r.response.stats.attempts = r.attempts 310 return r.response 311 312 def _query( 313 self, 314 path: str, 315 fql: Mapping[str, Any], 316 arguments: Optional[Mapping[str, Any]] = None, 317 opts: Optional[QueryOptions] = None, 318 ) -> QuerySuccess: 319 320 headers = self._headers.copy() 321 headers[_Header.Format] = "tagged" 322 headers[_Header.Authorization] = self._auth.bearer() 323 324 if self._query_timeout_ms is not None: 325 headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms) 326 327 headers.update(self._last_txn_ts.request_header) 328 329 query_tags = {} 330 if self._query_tags is not None: 331 query_tags.update(self._query_tags) 332 333 if opts is not None: 334 if opts.linearized is not None: 335 headers[Header.Linearized] = str(opts.linearized).lower() 336 if opts.max_contention_retries is not None: 337 headers[Header.MaxContentionRetries] = \ 338 f"{opts.max_contention_retries}" 339 if opts.traceparent is not None: 340 headers[Header.Traceparent] = opts.traceparent 341 if opts.query_timeout is not None: 342 timeout_ms = f"{int(opts.query_timeout.total_seconds() * 1000)}" 343 headers[Header.QueryTimeoutMs] = timeout_ms 344 if opts.query_tags is not None: 345 query_tags.update(opts.query_tags) 346 if opts.typecheck is not None: 347 headers[Header.Typecheck] = str(opts.typecheck).lower() 348 if opts.additional_headers is not None: 349 headers.update(opts.additional_headers) 350 351 if len(query_tags) > 0: 352 headers[Header.Tags] = QueryTags.encode(query_tags) 353 354 data: dict[str, Any] = { 355 "query": fql, 356 "arguments": arguments or {}, 357 } 358 359 with self._session.request( 360 method="POST", 361 url=self._endpoint + path, 362 headers=headers, 363 data=data, 364 ) as response: 365 status_code = response.status_code() 366 response_json = response.json() 367 headers = response.headers() 368 369 self._check_protocol(response_json, status_code) 370 371 dec: Any = FaunaDecoder.decode(response_json) 372 373 if status_code > 399: 374 FaunaError.parse_error_and_throw(dec, status_code) 375 376 if "txn_ts" in dec: 377 self.set_last_txn_ts(int(response_json["txn_ts"])) 378 379 stats = QueryStats(dec["stats"]) if "stats" in dec else None 380 summary = dec["summary"] if "summary" in dec else None 381 query_tags = QueryTags.decode( 382 dec["query_tags"]) if "query_tags" in dec else None 383 txn_ts = dec["txn_ts"] if "txn_ts" in dec else None 384 schema_version = dec["schema_version"] if "schema_version" in dec else None 385 traceparent = headers.get("traceparent", None) 386 static_type = dec["static_type"] if "static_type" in dec else None 387 388 return QuerySuccess( 389 data=dec["data"], 390 query_tags=query_tags, 391 static_type=static_type, 392 stats=stats, 393 summary=summary, 394 traceparent=traceparent, 395 txn_ts=txn_ts, 396 schema_version=schema_version, 397 ) 398 399 def stream( 400 self, 401 fql: Union[StreamToken, Query], 402 opts: StreamOptions = StreamOptions() 403 ) -> "StreamIterator": 404 """ 405 Opens a Stream in Fauna and returns an iterator that consume Fauna events. 406 407 :param fql: A Query that returns a StreamToken or a StreamToken. 408 :param opts: (Optional) Stream Options. 409 410 :return: a :class:`StreamIterator` 411 412 :raises NetworkError: HTTP Request failed in transit 413 :raises ProtocolError: HTTP error not from Fauna 414 :raises ServiceError: Fauna returned an error 415 :raises ValueError: Encoding and decoding errors 416 :raises TypeError: Invalid param types 417 """ 418 419 if isinstance(fql, Query): 420 token = self.query(fql).data 421 else: 422 token = fql 423 424 if not isinstance(token, StreamToken): 425 err_msg = f"'fql' must be a StreamToken, or a Query that returns a StreamToken but was a {type(token)}." 426 raise TypeError(err_msg) 427 428 headers = self._headers.copy() 429 headers[_Header.Format] = "tagged" 430 headers[_Header.Authorization] = self._auth.bearer() 431 432 return StreamIterator(self._session, headers, self._endpoint + "/stream/1", 433 self._max_attempts, self._max_backoff, opts, token) 434 435 def _check_protocol(self, response_json: Any, status_code): 436 # TODO: Logic to validate wire protocol belongs elsewhere. 437 should_raise = False 438 439 # check for QuerySuccess 440 if status_code <= 399 and "data" not in response_json: 441 should_raise = True 442 443 # check for QueryFailure 444 if status_code > 399: 445 if "error" not in response_json: 446 should_raise = True 447 else: 448 e = response_json["error"] 449 if "code" not in e or "message" not in e: 450 should_raise = True 451 452 if should_raise: 453 raise ProtocolError( 454 status_code, 455 f"Response is in an unknown format: \n{response_json}", 456 ) 457 458 def _set_endpoint(self, endpoint): 459 if endpoint is None: 460 endpoint = _Environment.EnvFaunaEndpoint() 461 462 if endpoint[-1:] == "/": 463 endpoint = endpoint[:-1] 464 465 self._endpoint = endpoint
74 def __init__( 75 self, 76 endpoint: Optional[str] = None, 77 secret: Optional[str] = None, 78 http_client: Optional[HTTPClient] = None, 79 query_tags: Optional[Mapping[str, str]] = None, 80 linearized: Optional[bool] = None, 81 max_contention_retries: Optional[int] = None, 82 typecheck: Optional[bool] = None, 83 additional_headers: Optional[Dict[str, str]] = None, 84 query_timeout: Optional[timedelta] = DefaultQueryTimeout, 85 client_buffer_timeout: Optional[timedelta] = DefaultClientBufferTimeout, 86 http_read_timeout: Optional[timedelta] = DefaultHttpReadTimeout, 87 http_write_timeout: Optional[timedelta] = DefaultHttpWriteTimeout, 88 http_connect_timeout: Optional[timedelta] = DefaultHttpConnectTimeout, 89 http_pool_timeout: Optional[timedelta] = DefaultHttpPoolTimeout, 90 http_idle_timeout: Optional[timedelta] = DefaultIdleConnectionTimeout, 91 max_attempts: int = 3, 92 max_backoff: int = 20, 93 ): 94 """Initializes a Client. 95 96 :param endpoint: The Fauna Endpoint to use. Defaults to https://db.fauna.com, or the `FAUNA_ENDPOINT` env variable. 97 :param secret: The Fauna Secret to use. Defaults to empty, or the `FAUNA_SECRET` env variable. 98 :param http_client: An :class:`HTTPClient` implementation. Defaults to a global :class:`HTTPXClient`. 99 :param query_tags: Tags to associate with the query. See `logging <https://docs.fauna.com/fauna/current/build/logs/query_log/>`_ 100 :param linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized. 101 :param max_contention_retries: The max number of times to retry the query if contention is encountered. 102 :param typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration. 103 :param additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary. 104 :param query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is :py:data:`DefaultQueryTimeout`. 105 :param client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is :py:data:`DefaultClientBufferTimeout`, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error. 106 :param http_read_timeout: Set HTTP Read timeout, default is :py:data:`DefaultHttpReadTimeout`. 107 :param http_write_timeout: Set HTTP Write timeout, default is :py:data:`DefaultHttpWriteTimeout`. 108 :param http_connect_timeout: Set HTTP Connect timeout, default is :py:data:`DefaultHttpConnectTimeout`. 109 :param http_pool_timeout: Set HTTP Pool timeout, default is :py:data:`DefaultHttpPoolTimeout`. 110 :param http_idle_timeout: Set HTTP Idle timeout, default is :py:data:`DefaultIdleConnectionTimeout`. 111 :param max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3. 112 :param max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20. 113 """ 114 115 self._set_endpoint(endpoint) 116 self._max_attempts = max_attempts 117 self._max_backoff = max_backoff 118 119 if secret is None: 120 self._auth = _Auth(_Environment.EnvFaunaSecret()) 121 else: 122 self._auth = _Auth(secret) 123 124 self._last_txn_ts = LastTxnTs() 125 126 self._query_tags = {} 127 if query_tags is not None: 128 self._query_tags.update(query_tags) 129 130 if query_timeout is not None: 131 self._query_timeout_ms = int(query_timeout.total_seconds() * 1000) 132 else: 133 self._query_timeout_ms = None 134 135 self._headers: Dict[str, str] = { 136 _Header.AcceptEncoding: "gzip", 137 _Header.ContentType: "application/json;charset=utf-8", 138 _Header.Driver: "python", 139 _Header.DriverEnv: str(_DriverEnvironment()), 140 } 141 142 if typecheck is not None: 143 self._headers[Header.Typecheck] = str(typecheck).lower() 144 145 if linearized is not None: 146 self._headers[Header.Linearized] = str(linearized).lower() 147 148 if max_contention_retries is not None and max_contention_retries > 0: 149 self._headers[Header.MaxContentionRetries] = \ 150 f"{max_contention_retries}" 151 152 if additional_headers is not None: 153 self._headers = { 154 **self._headers, 155 **additional_headers, 156 } 157 158 self._session: HTTPClient 159 160 if http_client is not None: 161 self._session = http_client 162 else: 163 if fauna.global_http_client is None: 164 timeout_s: Optional[float] = None 165 if query_timeout is not None and client_buffer_timeout is not None: 166 timeout_s = (query_timeout + client_buffer_timeout).total_seconds() 167 read_timeout_s: Optional[float] = None 168 if http_read_timeout is not None: 169 read_timeout_s = http_read_timeout.total_seconds() 170 171 write_timeout_s: Optional[float] = http_write_timeout.total_seconds( 172 ) if http_write_timeout is not None else None 173 connect_timeout_s: Optional[float] = http_connect_timeout.total_seconds( 174 ) if http_connect_timeout is not None else None 175 pool_timeout_s: Optional[float] = http_pool_timeout.total_seconds( 176 ) if http_pool_timeout is not None else None 177 idle_timeout_s: Optional[float] = http_idle_timeout.total_seconds( 178 ) if http_idle_timeout is not None else None 179 180 import httpx 181 from fauna.http.httpx_client import HTTPXClient 182 c = HTTPXClient( 183 httpx.Client( 184 http1=True, 185 http2=False, 186 timeout=httpx.Timeout( 187 timeout=timeout_s, 188 connect=connect_timeout_s, 189 read=read_timeout_s, 190 write=write_timeout_s, 191 pool=pool_timeout_s, 192 ), 193 limits=httpx.Limits( 194 max_connections=DefaultMaxConnections, 195 max_keepalive_connections=DefaultMaxIdleConnections, 196 keepalive_expiry=idle_timeout_s, 197 ), 198 )) 199 fauna.global_http_client = c 200 201 self._session = fauna.global_http_client
Initializes a Client.
Parameters
- endpoint: The Fauna Endpoint to use. Defaults to https: //db.fauna.com, or the
FAUNA_ENDPOINT
env variable. - secret: The Fauna Secret to use. Defaults to empty, or the
FAUNA_SECRET
env variable. - http_client: An
HTTPClient
implementation. Defaults to a globalHTTPXClient
. - **query_tags: Tags to associate with the query. See logging
- linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
- max_contention_retries: The max number of times to retry the query if contention is encountered.
- typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration.
- additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary.
- query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is
DefaultQueryTimeout
. - client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is
DefaultClientBufferTimeout
, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error. - http_read_timeout: Set HTTP Read timeout, default is
DefaultHttpReadTimeout
. - http_write_timeout: Set HTTP Write timeout, default is
DefaultHttpWriteTimeout
. - http_connect_timeout: Set HTTP Connect timeout, default is
DefaultHttpConnectTimeout
. - http_pool_timeout: Set HTTP Pool timeout, default is
DefaultHttpPoolTimeout
. - http_idle_timeout: Set HTTP Idle timeout, default is
DefaultIdleConnectionTimeout
. - max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3.
- max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20.
208 def set_last_txn_ts(self, txn_ts: int): 209 """ 210 Set the last timestamp seen by this client. 211 This has no effect if earlier than stored timestamp. 212 213 .. WARNING:: This should be used only when coordinating timestamps across 214 multiple clients. Moving the timestamp arbitrarily forward into 215 the future will cause transactions to stall. 216 217 :param txn_ts: the new transaction time. 218 """ 219 self._last_txn_ts.update_txn_time(txn_ts)
Set the last timestamp seen by this client. This has no effect if earlier than stored timestamp.
.. WARNING:: This should be used only when coordinating timestamps across multiple clients. Moving the timestamp arbitrarily forward into the future will cause transactions to stall.
Parameters
- txn_ts: the new transaction time.
221 def get_last_txn_ts(self) -> Optional[int]: 222 """ 223 Get the last timestamp seen by this client. 224 :return: 225 """ 226 return self._last_txn_ts.time
Get the last timestamp seen by this client.
Returns
228 def get_query_timeout(self) -> Optional[timedelta]: 229 """ 230 Get the query timeout for all queries. 231 """ 232 if self._query_timeout_ms is not None: 233 return timedelta(milliseconds=self._query_timeout_ms) 234 else: 235 return None
Get the query timeout for all queries.
237 def paginate( 238 self, 239 fql: Query, 240 opts: Optional[QueryOptions] = None, 241 ) -> "QueryIterator": 242 """ 243 Run a query on Fauna and returning an iterator of results. If the query 244 returns a Page, the iterator will fetch additional Pages until the 245 after token is null. Each call for a page will be retried with exponential 246 backoff up to the max_attempts set in the client's retry policy in the 247 event of a 429 or 502. 248 249 :param fql: A Query 250 :param opts: (Optional) Query Options 251 252 :return: a :class:`QueryResponse` 253 254 :raises NetworkError: HTTP Request failed in transit 255 :raises ProtocolError: HTTP error not from Fauna 256 :raises ServiceError: Fauna returned an error 257 :raises ValueError: Encoding and decoding errors 258 :raises TypeError: Invalid param types 259 """ 260 261 if not isinstance(fql, Query): 262 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 263 f"Query by calling fauna.fql()" 264 raise TypeError(err_msg) 265 266 return QueryIterator(self, fql, opts)
Run a query on Fauna and returning an iterator of results. If the query returns a Page, the iterator will fetch additional Pages until the after token is null. Each call for a page will be retried with exponential backoff up to the max_attempts set in the client's retry policy in the event of a 429 or 502.
Parameters
- fql: A Query
- opts: (Optional) Query Options
Returns
a
QueryResponse
Raises
- NetworkError: HTTP Request failed in transit
- ProtocolError: HTTP error not from Fauna
- ServiceError: Fauna returned an error
- ValueError: Encoding and decoding errors
- TypeError: Invalid param types
268 def query( 269 self, 270 fql: Query, 271 opts: Optional[QueryOptions] = None, 272 ) -> QuerySuccess: 273 """ 274 Run a query on Fauna. A query will be retried max_attempt times with exponential backoff 275 up to the max_backoff in the event of a 429. 276 277 :param fql: A Query 278 :param opts: (Optional) Query Options 279 280 :return: a :class:`QueryResponse` 281 282 :raises NetworkError: HTTP Request failed in transit 283 :raises ProtocolError: HTTP error not from Fauna 284 :raises ServiceError: Fauna returned an error 285 :raises ValueError: Encoding and decoding errors 286 :raises TypeError: Invalid param types 287 """ 288 289 if not isinstance(fql, Query): 290 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 291 f"Query by calling fauna.fql()" 292 raise TypeError(err_msg) 293 294 try: 295 encoded_query: Mapping[str, Any] = FaunaEncoder.encode(fql) 296 except Exception as e: 297 raise ClientError("Failed to encode Query") from e 298 299 retryable = Retryable[QuerySuccess]( 300 self._max_attempts, 301 self._max_backoff, 302 self._query, 303 "/query/1", 304 fql=encoded_query, 305 opts=opts, 306 ) 307 308 r = retryable.run() 309 r.response.stats.attempts = r.attempts 310 return r.response
Run a query on Fauna. A query will be retried max_attempt times with exponential backoff up to the max_backoff in the event of a 429.
Parameters
- fql: A Query
- opts: (Optional) Query Options
Returns
a
QueryResponse
Raises
- NetworkError: HTTP Request failed in transit
- ProtocolError: HTTP error not from Fauna
- ServiceError: Fauna returned an error
- ValueError: Encoding and decoding errors
- TypeError: Invalid param types
399 def stream( 400 self, 401 fql: Union[StreamToken, Query], 402 opts: StreamOptions = StreamOptions() 403 ) -> "StreamIterator": 404 """ 405 Opens a Stream in Fauna and returns an iterator that consume Fauna events. 406 407 :param fql: A Query that returns a StreamToken or a StreamToken. 408 :param opts: (Optional) Stream Options. 409 410 :return: a :class:`StreamIterator` 411 412 :raises NetworkError: HTTP Request failed in transit 413 :raises ProtocolError: HTTP error not from Fauna 414 :raises ServiceError: Fauna returned an error 415 :raises ValueError: Encoding and decoding errors 416 :raises TypeError: Invalid param types 417 """ 418 419 if isinstance(fql, Query): 420 token = self.query(fql).data 421 else: 422 token = fql 423 424 if not isinstance(token, StreamToken): 425 err_msg = f"'fql' must be a StreamToken, or a Query that returns a StreamToken but was a {type(token)}." 426 raise TypeError(err_msg) 427 428 headers = self._headers.copy() 429 headers[_Header.Format] = "tagged" 430 headers[_Header.Authorization] = self._auth.bearer() 431 432 return StreamIterator(self._session, headers, self._endpoint + "/stream/1", 433 self._max_attempts, self._max_backoff, opts, token)
Opens a Stream in Fauna and returns an iterator that consume Fauna events.
Parameters
- fql: A Query that returns a StreamToken or a StreamToken.
- opts: (Optional) Stream Options.
Returns
Raises
- NetworkError: HTTP Request failed in transit
- ProtocolError: HTTP error not from Fauna
- ServiceError: Fauna returned an error
- ValueError: Encoding and decoding errors
- TypeError: Invalid param types
468class StreamIterator: 469 """A class that mixes a ContextManager and an Iterator so we can detected retryable errors.""" 470 471 def __init__(self, http_client: HTTPClient, headers: Dict[str, str], 472 endpoint: str, max_attempts: int, max_backoff: int, 473 opts: StreamOptions, token: StreamToken): 474 self._http_client = http_client 475 self._headers = headers 476 self._endpoint = endpoint 477 self._max_attempts = max_attempts 478 self._max_backoff = max_backoff 479 self._opts = opts 480 self._token = token 481 self._stream = None 482 self.last_ts = None 483 self._ctx = self._create_stream() 484 485 def __enter__(self): 486 return self 487 488 def __exit__(self, exc_type, exc_value, exc_traceback): 489 if self._stream is not None: 490 self._stream.close() 491 492 self._ctx.__exit__(exc_type, exc_value, exc_traceback) 493 return False 494 495 def __iter__(self): 496 return self 497 498 def __next__(self): 499 if self._opts.max_attempts is not None: 500 max_attempts = self._opts.max_attempts 501 else: 502 max_attempts = self._max_attempts 503 504 if self._opts.max_backoff is not None: 505 max_backoff = self._opts.max_backoff 506 else: 507 max_backoff = self._max_backoff 508 509 retryable = Retryable[Any](max_attempts, max_backoff, self._next_element) 510 return retryable.run().response 511 512 def _next_element(self): 513 try: 514 if self._stream is None: 515 try: 516 self._stream = self._ctx.__enter__() 517 except Exception: 518 self._retry_stream() 519 520 if self._stream is not None: 521 event: Any = FaunaDecoder.decode(next(self._stream)) 522 523 if event["type"] == "error": 524 FaunaError.parse_error_and_throw(event, 400) 525 526 self.last_ts = event["txn_ts"] 527 528 if event["type"] == "start": 529 return self._next_element() 530 531 if not self._opts.status_events and event["type"] == "status": 532 return self._next_element() 533 534 return event 535 536 raise StopIteration 537 except NetworkError: 538 self._retry_stream() 539 540 def _retry_stream(self): 541 if self._stream is not None: 542 self._stream.close() 543 544 self._stream = None 545 546 try: 547 self._ctx = self._create_stream() 548 except Exception: 549 pass 550 raise RetryableFaunaException 551 552 def _create_stream(self): 553 data: Dict[str, Any] = {"token": self._token.token} 554 if self.last_ts is not None: 555 data["start_ts"] = self.last_ts 556 elif self._opts.start_ts is not None: 557 data["start_ts"] = self._opts.start_ts 558 559 return self._http_client.stream( 560 url=self._endpoint, headers=self._headers, data=data) 561 562 def close(self): 563 if self._stream is not None: 564 self._stream.close()
A class that mixes a ContextManager and an Iterator so we can detected retryable errors.
471 def __init__(self, http_client: HTTPClient, headers: Dict[str, str], 472 endpoint: str, max_attempts: int, max_backoff: int, 473 opts: StreamOptions, token: StreamToken): 474 self._http_client = http_client 475 self._headers = headers 476 self._endpoint = endpoint 477 self._max_attempts = max_attempts 478 self._max_backoff = max_backoff 479 self._opts = opts 480 self._token = token 481 self._stream = None 482 self.last_ts = None 483 self._ctx = self._create_stream()
567class QueryIterator: 568 """A class to provider an iterator on top of Fauna queries.""" 569 570 def __init__(self, 571 client: Client, 572 fql: Query, 573 opts: Optional[QueryOptions] = None): 574 """Initializes the QueryIterator 575 576 :param fql: A Query 577 :param opts: (Optional) Query Options 578 579 :raises TypeError: Invalid param types 580 """ 581 if not isinstance(client, Client): 582 err_msg = f"'client' must be a Client but was a {type(client)}. You can build a " \ 583 f"Client by calling fauna.client.Client()" 584 raise TypeError(err_msg) 585 586 if not isinstance(fql, Query): 587 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 588 f"Query by calling fauna.fql()" 589 raise TypeError(err_msg) 590 591 self.client = client 592 self.fql = fql 593 self.opts = opts 594 595 def __iter__(self) -> Iterator: 596 return self.iter() 597 598 def iter(self) -> Iterator: 599 """ 600 A generator function that immediately fetches and yields the results of 601 the stored query. Yields additional pages on subsequent iterations if 602 they exist 603 """ 604 605 cursor = None 606 initial_response = self.client.query(self.fql, self.opts) 607 608 if isinstance(initial_response.data, Page): 609 cursor = initial_response.data.after 610 yield initial_response.data.data 611 612 while cursor is not None: 613 next_response = self.client.query( 614 fql("Set.paginate(${after})", after=cursor), self.opts) 615 # TODO: `Set.paginate` does not yet return a `@set` tagged value 616 # so we will get back a plain object that might not have 617 # an after property. 618 cursor = next_response.data.get("after") 619 yield next_response.data.get("data") 620 621 else: 622 yield [initial_response.data] 623 624 def flatten(self) -> Iterator: 625 """ 626 A generator function that immediately fetches and yields the results of 627 the stored query. Yields each item individually, rather than a whole 628 Page at a time. Fetches additional pages as required if they exist. 629 """ 630 631 for page in self.iter(): 632 for item in page: 633 yield item
A class to provider an iterator on top of Fauna queries.
570 def __init__(self, 571 client: Client, 572 fql: Query, 573 opts: Optional[QueryOptions] = None): 574 """Initializes the QueryIterator 575 576 :param fql: A Query 577 :param opts: (Optional) Query Options 578 579 :raises TypeError: Invalid param types 580 """ 581 if not isinstance(client, Client): 582 err_msg = f"'client' must be a Client but was a {type(client)}. You can build a " \ 583 f"Client by calling fauna.client.Client()" 584 raise TypeError(err_msg) 585 586 if not isinstance(fql, Query): 587 err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ 588 f"Query by calling fauna.fql()" 589 raise TypeError(err_msg) 590 591 self.client = client 592 self.fql = fql 593 self.opts = opts
Initializes the QueryIterator
Parameters
- fql: A Query
- opts: (Optional) Query Options
Raises
- TypeError: Invalid param types
598 def iter(self) -> Iterator: 599 """ 600 A generator function that immediately fetches and yields the results of 601 the stored query. Yields additional pages on subsequent iterations if 602 they exist 603 """ 604 605 cursor = None 606 initial_response = self.client.query(self.fql, self.opts) 607 608 if isinstance(initial_response.data, Page): 609 cursor = initial_response.data.after 610 yield initial_response.data.data 611 612 while cursor is not None: 613 next_response = self.client.query( 614 fql("Set.paginate(${after})", after=cursor), self.opts) 615 # TODO: `Set.paginate` does not yet return a `@set` tagged value 616 # so we will get back a plain object that might not have 617 # an after property. 618 cursor = next_response.data.get("after") 619 yield next_response.data.get("data") 620 621 else: 622 yield [initial_response.data]
A generator function that immediately fetches and yields the results of the stored query. Yields additional pages on subsequent iterations if they exist
624 def flatten(self) -> Iterator: 625 """ 626 A generator function that immediately fetches and yields the results of 627 the stored query. Yields each item individually, rather than a whole 628 Page at a time. Fetches additional pages as required if they exist. 629 """ 630 631 for page in self.iter(): 632 for item in page: 633 yield item
A generator function that immediately fetches and yields the results of the stored query. Yields each item individually, rather than a whole Page at a time. Fetches additional pages as required if they exist.