18internal class Connection : IConnection
21 private bool _disposed;
23 public TimeSpan BufferedRequestTimeout {
get; init; }
32 BufferedRequestTimeout = _cfg.
DefaultQueryOptions.QueryTimeout.Add(_cfg.ClientBufferTimeout);
35 public async Task<HttpResponseMessage> DoPostAsync(
38 Dictionary<string, string> headers,
39 TimeSpan requestTimeout,
40 CancellationToken cancel =
default)
42 HttpResponseMessage response;
44 using var timeboundCts =
new CancellationTokenSource(requestTimeout);
45 using var combinedCts = CancellationTokenSource.CreateLinkedTokenSource(timeboundCts.Token, cancel);
47 var policyResult = await _cfg.RetryConfiguration.RetryPolicy
48 .ExecuteAndCaptureAsync(() =>
49 _cfg.HttpClient.SendAsync(CreateHttpRequest(path, body, headers), combinedCts.Token))
50 .ConfigureAwait(
false);
51 response = policyResult.Outcome == OutcomeType.Successful
53 : policyResult.FinalHandledResult ??
throw policyResult.FinalException;
55 Logger.Instance.LogDebug(
56 "Fauna HTTP Response {status} from {uri}, headers: {headers}",
57 response.StatusCode.ToString(),
58 response.RequestMessage?.RequestUri?.ToString() ??
"UNKNOWN",
59 JsonSerializer.Serialize(
60 response.Headers.ToDictionary(kv => kv.Key, kv => kv.Value.ToList()))
63 Logger.Instance.LogTrace(
"Response body: {body}", await response.Content.ReadAsStringAsync(cancel));
68 public async IAsyncEnumerable<Event<T>> OpenStream<T>(
70 Types.EventSource eventSource,
71 Dictionary<string, string> headers,
73 [EnumeratorCancellation] CancellationToken cancellationToken =
default) where T : notnull
75 cancellationToken.ThrowIfCancellationRequested();
77 while (!cancellationToken.IsCancellationRequested)
79 using var bc =
new BlockingCollection<Event<T>>(
new ConcurrentQueue<Event<T>>());
80 Task<PolicyResult<HttpResponseMessage>> streamTask =
81 _cfg.RetryConfiguration.RetryPolicy.ExecuteAndCaptureAsync(async () =>
83 var streamData =
new MemoryStream();
84 eventSource.Serialize(streamData);
86 var response = await _cfg.HttpClient
88 CreateHttpRequest(path, streamData, headers),
89 HttpCompletionOption.ResponseHeadersRead,
91 .ConfigureAwait(
false);
93 if (!response.IsSuccessStatusCode)
99 await
using var streamAsync = await response.Content.ReadAsStreamAsync(cancellationToken);
100 using var streamReader =
new StreamReader(streamAsync);
102 while (!streamReader.EndOfStream && !cancellationToken.IsCancellationRequested)
104 string? line = await streamReader.ReadLineAsync().WaitAsync(cancellationToken);
105 if (
string.IsNullOrWhiteSpace(line))
111 eventSource.Options.Cursor = evt.Cursor;
113 bc.Add(evt, cancellationToken);
119 foreach (var evt
in bc.GetConsumingEnumerable(cancellationToken))
126 if (streamTask.Result.Result.IsSuccessStatusCode)
131 var httpResponse = streamTask.Result.Result;
132 string body = await httpResponse.Content.ReadAsStringAsync(cancellationToken);
134 throw ExceptionHandler.FromRawResponse(body, httpResponse);
138 private HttpRequestMessage CreateHttpRequest(
string path,
Stream body, Dictionary<string, string> headers)
141 var request =
new HttpRequestMessage
143 Content =
new StreamContent(body),
144 Method = HttpMethod.Post,
145 RequestUri =
new Uri(_cfg.Endpoint, path)
148 request.Headers.Accept.Add(
new MediaTypeWithQualityHeaderValue(
"application/json"));
149 request.Headers.AcceptEncoding.Add(
new StringWithQualityHeaderValue(
"gzip"));
151 foreach (var header
in headers)
153 request.Headers.Add(header.Key, header.Value);
156 Logger.Instance.LogDebug(
157 "Fauna HTTP {method} Request to {uri} (timeout {timeout}ms), headers: {headers}",
158 HttpMethod.Post.ToString(),
159 request.RequestUri.ToString(),
160 _cfg.HttpClient.Timeout.TotalMilliseconds,
161 JsonSerializer.Serialize(
166 if (header.Key.StartsWith(
"Authorization", StringComparison.InvariantCultureIgnoreCase))
168 return KeyValuePair.Create(header.Key, new[] {
"hidden" }.AsEnumerable());
173 .ToDictionary(kv => kv.Key, kv => kv.Value.ToList()))
177 Logger.Instance.LogTrace(
"Unredacted Authorization header: {value}", request.Headers.Authorization?.ToString() ??
"null");
178 Logger.Instance.LogTrace(
"Request body: {body}", request.Content.ReadAsStringAsync().Result);
183 private void Dispose(
bool disposing)
185 if (_disposed)
return;
187 if (disposing && _cfg.DisposeHttpClient)
189 _cfg.HttpClient.Dispose();
190 GC.SuppressFinalize(
this);
199 public void Dispose()
A class representing the mapping context to be used during serialization and deserialization.