1using System.Diagnostics;
2using System.Reflection;
3using System.Runtime.CompilerServices;
11internal interface IPipelineExecutor
13 private static readonly MethodInfo _createEnumExec =
14 typeof(IPipelineExecutor).GetMethod(nameof(CreateEnumExec), BindingFlags.Public | BindingFlags.Static)!;
16 private static readonly MethodInfo _createScalarExec =
17 typeof(IPipelineExecutor).GetMethod(nameof(CreateScalarExec), BindingFlags.Public | BindingFlags.Static)!;
19 Type ElemType {
get; }
22 IAsyncEnumerable<Page<object?>> PagedResult(
QueryOptions? queryOptions, CancellationToken cancel =
default);
23 Task<object?> Result(
QueryOptions? queryOptions, CancellationToken cancel =
default);
25 IAsyncEnumerable<Page<T>> PagedResult<T>(
QueryOptions? queryOptions, CancellationToken cancel =
default);
26 Task<T> Result<T>(
QueryOptions? queryOptions, CancellationToken cancel =
default);
28 public static IPipelineExecutor Create(
37 var innerTy = ser.GetType()
39 .GetGenericArguments()[0];
41 var elemTy = proj is
null ?
43 proj.GetType().GetGenInst(typeof(Func<,>))!
44 .GetGenericArguments()[1];
46 var method = mode
switch
48 PipelineMode.Query or PipelineMode.Project => _createEnumExec,
49 PipelineMode.Scalar => _createScalarExec,
50 _ =>
throw new Exception(
"unreachable"),
53 var typeArgs =
new Type[] { innerTy, elemTy };
54 var args =
new object?[] { ctx, query, ser, proj };
55 var exec = method.MakeGenericMethod(typeArgs).Invoke(
null, args);
57 return (IPipelineExecutor)exec!;
60 public static EnumExecutor<E> CreateEnumExec<I, E>(
65 new EnumExecutor<E>(ctx, query,
new PageSerializer<E>(MapSer(ser, proj)));
67 public static ScalarExecutor<E> CreateScalarExec<I, E>(
72 new ScalarExecutor<E>(ctx, query, MapSer(ser, proj));
78 return new MappedDeserializer<I, E>(inner, proj);
81 Debug.Assert(typeof(I) == typeof(E));
85 public readonly record
struct EnumExecutor<E>(
88 PageSerializer<E> Ser) : IPipelineExecutor
90 public Type ElemType {
get => typeof(E); }
91 public Type ResType {
get => typeof(IEnumerable<E>); }
93 public IAsyncEnumerable<Page<T>> PagedResult<T>(
QueryOptions? queryOptions, CancellationToken cancel =
default)
95 var pages = Ctx.PaginateAsyncInternal(
Query, Ser, queryOptions, cancel);
96 if (pages is IAsyncEnumerable<
Page<T>> ret)
101 Debug.Assert(typeof(T) == ElemType);
102 throw new Exception(
"unreachable");
105 public async Task<T> Result<T>(
QueryOptions? queryOptions, CancellationToken cancel =
default)
107 var pages = PagedResult<E>(queryOptions, cancel);
108 var elems =
new List<E>();
112 await
foreach (var page
in pages)
114 cancel.ThrowIfCancellationRequested();
115 elems.AddRange(page.Data);
121 Debug.Assert(typeof(T) == ResType, $
"{typeof(T)} is not {ResType}");
122 throw new Exception(
"unreachable");
125 public async IAsyncEnumerable<Page<object?>> PagedResult(
QueryOptions? queryOptions, [EnumeratorCancellation] CancellationToken cancel =
default)
127 await
foreach (var page
in PagedResult<E>(queryOptions, cancel))
129 var data = page.Data.Select(e => (
object?)e).ToList();
130 yield
return new Page<object?>(data, page.After);
134 public async Task<object?> Result(
QueryOptions? queryOptions, CancellationToken cancel =
default) =>
135 await Result<IEnumerable<E>>(queryOptions, cancel);
139 public readonly record
struct ScalarExecutor<E>(
144 public Type ElemType {
get => typeof(E); }
145 public Type ResType {
get => typeof(E); }
147 public async Task<T> Result<T>(
QueryOptions? queryOptions, CancellationToken cancel =
default)
149 var qres = await Ctx.QueryAsync(
Query, Ser, queryOptions, cancel);
150 if (qres.Data is T ret)
155 if (qres.Data is
null)
160 Debug.Assert(typeof(T) == ResType, $
"{typeof(T)} is not {ResType}");
161 throw new Exception(
"unreachable");
164 public async IAsyncEnumerable<Page<T>> PagedResult<T>(
QueryOptions? queryOptions, [EnumeratorCancellation] CancellationToken cancel =
default)
166 if (await Result<E>(queryOptions, cancel) is T ret)
168 yield
return new Page<T>(
new List<T> { ret },
null);
171 Debug.Assert(typeof(T) == ElemType);
172 throw new Exception(
"unreachable");
175 public async Task<object?> Result(
QueryOptions? queryOptions, CancellationToken cancel =
default) =>
176 await Result<E>(queryOptions, cancel);
178 public async IAsyncEnumerable<Page<object?>> PagedResult(
QueryOptions? queryOptions, [EnumeratorCancellation] CancellationToken cancel =
default)
180 yield
return new Page<object?>(
new List<object?> { await Result(queryOptions, cancel) },
null);
Represents the options for customizing Fauna queries.
Represents the abstract base class for constructing FQL queries.
A generic interface that defines serialize and deserialize behavior for a specific type,...
PipelineMode
The mode of the query pipeline.
record Page< T >(IReadOnlyList< T > Data, string? After)
Represents a page in a dataset for pagination.