Skip to content

Commit 724839a

Browse files
Per Kopsperkops
authored andcommitted
docs(sample): add streaming queries with decimal conversion examples
Add CustomerSalesStreamingQuery demonstrating both direct and buffered streaming with decimal fields. Validates SqlDecimal conversion end-to-end.
1 parent ef9d846 commit 724839a

File tree

5 files changed

+77
-2
lines changed

5 files changed

+77
-2
lines changed

sample/Atc.Kusto.Sample/Atc.Kusto.Sample.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
<None Remove="Queries\CustomersSplitByGenderQuery.kusto" />
1313
<None Remove="Queries\StormEventsQuery.kusto" />
1414
<None Remove="Queries\CustomersStreamingQuery.kusto" />
15+
<None Remove="Queries\CustomerSalesStreamingQuery.kusto" />
1516
</ItemGroup>
1617

1718
<ItemGroup>
@@ -20,6 +21,7 @@
2021
<EmbeddedResource Include="Queries\CustomersSplitByGenderQuery.kusto" />
2122
<EmbeddedResource Include="Queries\StormEventsQuery.kusto" />
2223
<EmbeddedResource Include="Queries\CustomersStreamingQuery.kusto" />
24+
<EmbeddedResource Include="Queries\CustomerSalesStreamingQuery.kusto" />
2325
</ItemGroup>
2426

2527
<ItemGroup>

sample/Atc.Kusto.Sample/Program.cs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,63 @@
190190
await warmupEnumerator.DisposeAsync();
191191
}
192192

193+
// Demonstrate streaming with decimal conversion (CustomerSales has decimal fields)
194+
logger.LogInformation("Streaming customer sales (testing SqlDecimal to decimal conversion)...");
195+
var salesStreamingQuery = new CustomerSalesStreamingQuery();
196+
var salesCount = 0;
197+
198+
await foreach (var sale in contosoSalesKustoProcessor.ExecuteStreamingQuery(salesStreamingQuery, CancellationToken.None))
199+
{
200+
if (salesCount < 3)
201+
{
202+
// Log first 3 to verify decimal conversion works
203+
logger.LogInformation(
204+
"\tCustomer {CustomerName}: SalesAmount={SalesAmount:C}, TotalCost={TotalCost:C}",
205+
sale.CustomerName,
206+
sale.SalesAmount, // SqlDecimal → decimal conversion via DataReaderExtensions!
207+
sale.TotalCost);
208+
}
209+
210+
salesCount++;
211+
}
212+
213+
logger.LogInformation("Streamed {SalesCount} customer sales records with decimal values", salesCount);
214+
215+
// Demonstrate buffered streaming with decimal conversion
216+
logger.LogInformation("Executing buffered streaming query for customer sales (testing SqlDecimal to decimal conversion)...");
217+
218+
var salesBufferedResult = await contosoSalesKustoProcessor.ExecuteBufferedStreamingQuery(
219+
salesStreamingQuery,
220+
CancellationToken.None);
221+
222+
if (salesBufferedResult is not null)
223+
{
224+
var bufferedSalesCount = 0;
225+
await foreach (var sale in salesBufferedResult.Rows.WithCancellation(CancellationToken.None))
226+
{
227+
if (bufferedSalesCount < 3)
228+
{
229+
// Log first 3 to verify decimal conversion works
230+
logger.LogInformation(
231+
"\tCustomer {CustomerName}: SalesAmount={SalesAmount:C}, TotalCost={TotalCost:C}",
232+
sale.CustomerName,
233+
sale.SalesAmount, // SqlDecimal → decimal conversion via DataRowExtensions!
234+
sale.TotalCost);
235+
}
236+
237+
bufferedSalesCount++;
238+
}
239+
240+
logger.LogInformation("Buffered streaming processed {BufferedSalesCount} records with decimals", bufferedSalesCount);
241+
242+
if (salesBufferedResult.Completion is not null)
243+
{
244+
logger.LogInformation(
245+
"Sales streaming completion: HasErrors={HasErrors}",
246+
salesBufferedResult.Completion.HasErrors);
247+
}
248+
}
249+
193250
// Demonstrate cancellation with server-side cancel enabled (default)
194251
logger.LogInformation("Demonstrating cancellation of a long-running streaming query (server-side cancel enabled)...");
195252
var rowCount = 0;

sample/Atc.Kusto.Sample/Queries/CustomerSalesQuery.kusto

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,4 @@ Customers
44
| summarize
55
SalesAmount = todecimal(round(sum(SalesAmount), 2)),
66
TotalCost = todecimal(round(sum(TotalCost), 2))
7-
by CustomerKey, CustomerName
8-
| take 100
7+
by CustomerKey, CustomerName
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
namespace Atc.Kusto.Sample.Queries;
2+
3+
public record CustomerSalesStreamingQuery(
4+
long? CustomerKey = null)
5+
: KustoStreamingQuery<CustomerSales>;
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
declare query_parameters (
2+
customerKey:long = long(null)
3+
);
4+
Customers
5+
| join kind=inner SalesFact on CustomerKey
6+
| where isnull(customerKey) or customerKey == CustomerKey
7+
| extend CustomerName = strcat(FirstName, ' ', LastName)
8+
| summarize
9+
SalesAmount = todecimal(round(sum(SalesAmount), 2)),
10+
TotalCost = todecimal(round(sum(TotalCost), 2))
11+
by CustomerKey, CustomerName
12+
| take 100

0 commit comments

Comments
 (0)