Fauna v10 .NET/C# Driver 1.0.1
 
Loading...
Searching...
No Matches
Connection.cs
Go to the documentation of this file.
1using System.Collections.Concurrent;
2using System.Net.Http.Headers;
3using System.Runtime.CompilerServices;
4using System.Text.Json;
6using Fauna.Mapping;
7using Fauna.Types;
8using Fauna.Util;
9using Microsoft.Extensions.Logging;
10using Polly;
11using Stream = System.IO.Stream;
12
13namespace Fauna.Core;
14
18internal class Connection : IConnection
19{
20 private readonly Configuration _cfg;
21 private bool _disposed;
22
23 public TimeSpan BufferedRequestTimeout { get; init; }
24
29 public Connection(Configuration configuration)
30 {
31 _cfg = configuration;
32 BufferedRequestTimeout = _cfg.DefaultQueryOptions.QueryTimeout.Add(_cfg.ClientBufferTimeout);
33 }
34
35 public async Task<HttpResponseMessage> DoPostAsync(
36 string path,
37 Stream body,
38 Dictionary<string, string> headers,
39 TimeSpan requestTimeout,
40 CancellationToken cancel = default)
41 {
42 HttpResponseMessage response;
43
44 using var timeboundCts = new CancellationTokenSource(requestTimeout);
45 using var combinedCts = CancellationTokenSource.CreateLinkedTokenSource(timeboundCts.Token, cancel);
46
47 var policyResult = await _cfg.RetryConfiguration.RetryPolicy
48 .ExecuteAndCaptureAsync(() =>
49 _cfg.HttpClient.SendAsync(CreateHttpRequest(path, body, headers), combinedCts.Token))
50 .ConfigureAwait(false);
51 response = policyResult.Outcome == OutcomeType.Successful
52 ? policyResult.Result
53 : policyResult.FinalHandledResult ?? throw policyResult.FinalException;
54
55 Logger.Instance.LogDebug(
56 "Fauna HTTP Response {status} from {uri}, headers: {headers}",
57 response.StatusCode.ToString(),
58 response.RequestMessage?.RequestUri?.ToString() ?? "UNKNOWN",
59 JsonSerializer.Serialize(
60 response.Headers.ToDictionary(kv => kv.Key, kv => kv.Value.ToList()))
61 );
62
63 Logger.Instance.LogTrace("Response body: {body}", await response.Content.ReadAsStringAsync(cancel));
64
65 return response;
66 }
67
68 public async IAsyncEnumerable<Event<T>> OpenStream<T>(
69 string path,
70 Types.EventSource eventSource,
71 Dictionary<string, string> headers,
73 [EnumeratorCancellation] CancellationToken cancellationToken = default) where T : notnull
74 {
75 cancellationToken.ThrowIfCancellationRequested();
76
77 while (!cancellationToken.IsCancellationRequested)
78 {
79 using var bc = new BlockingCollection<Event<T>>(new ConcurrentQueue<Event<T>>());
80 Task<PolicyResult<HttpResponseMessage>> streamTask =
81 _cfg.RetryConfiguration.RetryPolicy.ExecuteAndCaptureAsync(async () =>
82 {
83 var streamData = new MemoryStream();
84 eventSource.Serialize(streamData);
85
86 var response = await _cfg.HttpClient
87 .SendAsync(
88 CreateHttpRequest(path, streamData, headers),
89 HttpCompletionOption.ResponseHeadersRead,
90 cancellationToken)
91 .ConfigureAwait(false);
92
93 if (!response.IsSuccessStatusCode)
94 {
95 bc.CompleteAdding();
96 return response;
97 }
98
99 await using var streamAsync = await response.Content.ReadAsStreamAsync(cancellationToken);
100 using var streamReader = new StreamReader(streamAsync);
101
102 while (!streamReader.EndOfStream && !cancellationToken.IsCancellationRequested)
103 {
104 string? line = await streamReader.ReadLineAsync().WaitAsync(cancellationToken);
105 if (string.IsNullOrWhiteSpace(line))
106 {
107 continue;
108 }
109
110 var evt = Event<T>.From(line, ctx);
111 eventSource.Options.Cursor = evt.Cursor;
112
113 bc.Add(evt, cancellationToken);
114 }
115
116 return response;
117 });
118
119 foreach (var evt in bc.GetConsumingEnumerable(cancellationToken))
120 {
121 yield return evt;
122 }
123
124 await streamTask;
125 bc.CompleteAdding();
126 if (streamTask.Result.Result.IsSuccessStatusCode)
127 {
128 continue;
129 }
130
131 var httpResponse = streamTask.Result.Result;
132 string body = await httpResponse.Content.ReadAsStringAsync(cancellationToken);
133
134 throw ExceptionHandler.FromRawResponse(body, httpResponse);
135 }
136 }
137
138 private HttpRequestMessage CreateHttpRequest(string path, Stream body, Dictionary<string, string> headers)
139 {
140 body.Position = 0;
141 var request = new HttpRequestMessage
142 {
143 Content = new StreamContent(body),
144 Method = HttpMethod.Post,
145 RequestUri = new Uri(_cfg.Endpoint, path)
146 };
147
148 request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
149 request.Headers.AcceptEncoding.Add(new StringWithQualityHeaderValue("gzip"));
150
151 foreach (var header in headers)
152 {
153 request.Headers.Add(header.Key, header.Value);
154 }
155
156 Logger.Instance.LogDebug(
157 "Fauna HTTP {method} Request to {uri} (timeout {timeout}ms), headers: {headers}",
158 HttpMethod.Post.ToString(),
159 request.RequestUri.ToString(),
160 _cfg.HttpClient.Timeout.TotalMilliseconds,
161 JsonSerializer.Serialize(
162 request.Headers
163 .Select(header =>
164 {
165 // Redact Auth header in debug logs
166 if (header.Key.StartsWith("Authorization", StringComparison.InvariantCultureIgnoreCase))
167 {
168 return KeyValuePair.Create(header.Key, new[] { "hidden" }.AsEnumerable());
169 }
170
171 return header;
172 })
173 .ToDictionary(kv => kv.Key, kv => kv.Value.ToList()))
174 );
175
176 // Emit unredacted Auth header and response body in trace logs
177 Logger.Instance.LogTrace("Unredacted Authorization header: {value}", request.Headers.Authorization?.ToString() ?? "null");
178 Logger.Instance.LogTrace("Request body: {body}", request.Content.ReadAsStringAsync().Result);
179
180 return request;
181 }
182
183 private void Dispose(bool disposing)
184 {
185 if (_disposed) return;
186
187 if (disposing && _cfg.DisposeHttpClient)
188 {
189 _cfg.HttpClient.Dispose();
190 GC.SuppressFinalize(this);
191 }
192
193 _disposed = true;
194 }
195
199 public void Dispose()
200 {
201 Dispose(true);
202 }
203
204 // A finalizer: https://stackoverflow.com/questions/151051/when-should-i-use-gc-suppressfinalize
205 ~Connection()
206 {
207 Dispose(false);
208 }
209}
System.IO.Stream Stream
Definition Client.cs:8
Configuration is a class used to configure a Fauna Client. It encapsulates various settings such as t...
QueryOptions DefaultQueryOptions
Default options for queries sent to Fauna.
A class representing the mapping context to be used during serialization and deserialization.
A class representing an event from an Event Feed or Event Stream.
Definition Event.cs:39