17 private const string QueryUriPath =
"/query/1";
18 private const string StreamUriPath =
"/stream/1";
19 private const string FeedUriPath =
"/feed/1";
22 private readonly IConnection _connection;
25 private readonly Dictionary<Type, DataContext> _dbCtxs =
new();
27 private bool _disposed;
29 internal override MappingContext MappingCtx {
get => _defaultCtx; }
65 _connection =
new Connection(config);
75 var dbCtxType = typeof(DB);
79 if (!_dbCtxs.TryGetValue(dbCtxType, out ctx))
81 var builder =
new DataContextBuilder<DB>();
82 ctx = builder.Build(
this);
83 _dbCtxs[dbCtxType] = ctx;
96 GC.SuppressFinalize(
this);
109 internal override async Task<QuerySuccess<T>> QueryAsyncInternal<T>(
114 CancellationToken cancel)
118 throw new ArgumentNullException(nameof(query));
121 var finalOptions =
QueryOptions.GetFinalQueryOptions(_config.DefaultQueryOptions, queryOptions);
122 var headers = GetRequestHeaders(finalOptions);
124 using var stream =
new MemoryStream();
125 Serialize(stream, query, ctx);
127 using var httpResponse = await _connection.DoPostAsync(
131 GetRequestTimeoutWithBuffer(finalOptions.QueryTimeout),
133 var body = await httpResponse.Content.ReadAsStringAsync(cancel);
134 var res =
QueryResponse.GetFromResponseBody<T>(ctx, serializer, httpResponse.StatusCode, body);
143 throw ExceptionHandler.FromQueryFailure(ctx, failure);
145 throw ExceptionHandler.FromRawResponse(body, httpResponse);
149 internal override async IAsyncEnumerator<Event<T>> SubscribeStreamInternal<T>(
150 Types.EventSource eventSource,
152 CancellationToken cancel =
default)
154 var finalOptions =
QueryOptions.GetFinalQueryOptions(_config.DefaultQueryOptions,
null);
155 var headers = GetRequestHeaders(finalOptions);
157 await
foreach (var evt
in _connection.OpenStream<T>(
165 eventSource.Options.Cursor = evt.Cursor;
171 internal override async IAsyncEnumerator<FeedPage<T>> SubscribeFeedInternal<T>(
172 Types.EventSource eventSource,
174 CancellationToken cancel =
default)
176 cancel.ThrowIfCancellationRequested();
178 var finalOptions = _config.DefaultQueryOptions;
179 var headers = GetRequestHeaders(finalOptions);
181 while (!cancel.IsCancellationRequested)
183 var feedData =
new MemoryStream();
184 eventSource.Serialize(feedData);
186 using var httpResponse = await _connection.DoPostAsync(
190 GetRequestTimeoutWithBuffer(finalOptions.QueryTimeout),
192 string body = await httpResponse.Content.ReadAsStringAsync(cancel);
195 eventSource.Options.Cursor = res.Cursor;
209 writer.WriteStartObject();
210 writer.WriteFieldName(
"query");
212 writer.WriteEndObject();
216 private Dictionary<string, string> GetRequestHeaders(
QueryOptions queryOptions)
218 var headers =
new Dictionary<string, string>
220 { Headers.Authorization, $
"Bearer {_config.Secret}"},
221 { Headers.Format,
"tagged" },
222 { Headers.Driver,
"C#" }
227 headers.Add(Headers.LastTxnTs,
LastSeenTxn.ToString());
233 Headers.QueryTimeoutMs,
234 queryOptions.
QueryTimeout.TotalMilliseconds.ToString(CultureInfo.InvariantCulture));
239 headers.Add(Headers.QueryTags, EncodeQueryTags(queryOptions.
QueryTags));
242 if (!
string.IsNullOrEmpty(queryOptions.
TraceParent))
244 headers.Add(Headers.TraceParent, queryOptions.
TraceParent);
249 headers.Add(Headers.Linearized, queryOptions.
Linearized.ToString()!);
254 headers.Add(Headers.TypeCheck, queryOptions.
TypeCheck.ToString()!);
260 private TimeSpan GetRequestTimeoutWithBuffer(TimeSpan queryTimeout)
262 return queryTimeout.Add(_config.ClientBufferTimeout);
265 private static string EncodeQueryTags(Dictionary<string, string> tags)
267 return string.Join(
",", tags.Select(entry => entry.Key +
"=" + entry.Value));
270 private void Dispose(
bool disposing)
272 if (_disposed)
return;
276 _connection.Dispose();
277 GC.SuppressFinalize(
this);