-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-44361: [C#][Integration] Include .NET in Flight integration tests #44377
Merged
CurtHagenlocher
merged 8 commits into
apache:main
from
adamreeve:csharp-flight-integration
Oct 15, 2024
Merged
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
a4b25ba
Add .NET client for Flight integration tests
adamreeve 6f62ec4
Exclude new csproj file from RAT
adamreeve 5874aef
Add .NET Flight server for integration tests
adamreeve 0a977ce
Echo application metada back in TestFlightServer DoPut
adamreeve 7ec6ec3
Use grpc+tcp URI scheme
adamreeve c765f1a
Tidy ups
adamreeve f2e34a2
Don't rely on default logging to get the port
adamreeve ed02950
Fix race condition
adamreeve File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
18 changes: 18 additions & 0 deletions
18
csharp/test/Apache.Arrow.Flight.IntegrationTest/Apache.Arrow.Flight.IntegrationTest.csproj
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
<?xml version="1.0" encoding="utf-8"?> | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<OutputType>Exe</OutputType> | ||
<TargetFramework>net8.0</TargetFramework> | ||
<RootNamespace>Apache.Arrow.Flight.IntegrationTest</RootNamespace> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" /> | ||
<PackageReference Include="System.Text.Json" Version="8.0.5" /> | ||
<ProjectReference Include="..\..\src\Apache.Arrow.Flight\Apache.Arrow.Flight.csproj" /> | ||
<ProjectReference Include="..\Apache.Arrow.Flight.TestWeb\Apache.Arrow.Flight.TestWeb.csproj" /> | ||
<ProjectReference Include="..\Apache.Arrow.IntegrationTest\Apache.Arrow.IntegrationTest.csproj" /> | ||
</ItemGroup> | ||
|
||
</Project> |
51 changes: 51 additions & 0 deletions
51
csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightClientCommand.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one or more | ||
// contributor license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright ownership. | ||
// The ASF licenses this file to You under the Apache License, Version 2.0 | ||
// (the "License"); you may not use this file except in compliance with | ||
// the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
using System; | ||
using System.IO; | ||
using System.Threading.Tasks; | ||
|
||
namespace Apache.Arrow.Flight.IntegrationTest; | ||
|
||
public class FlightClientCommand | ||
{ | ||
private readonly int _port; | ||
private readonly string _scenario; | ||
private readonly FileInfo _jsonFileInfo; | ||
|
||
public FlightClientCommand(int port, string scenario, FileInfo jsonFileInfo) | ||
{ | ||
_port = port; | ||
_scenario = scenario; | ||
_jsonFileInfo = jsonFileInfo; | ||
} | ||
|
||
public async Task Execute() | ||
{ | ||
if (!string.IsNullOrEmpty(_scenario)) | ||
{ | ||
// No named scenarios are currently implemented | ||
throw new Exception($"Scenario '{_scenario}' is not supported."); | ||
} | ||
|
||
if (!(_jsonFileInfo?.Exists ?? false)) | ||
{ | ||
throw new Exception($"Invalid JSON file path '{_jsonFileInfo?.FullName}'"); | ||
} | ||
|
||
var scenario = new JsonTestScenario(_port, _jsonFileInfo); | ||
await scenario.RunClient().ConfigureAwait(false); | ||
} | ||
} |
68 changes: 68 additions & 0 deletions
68
csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightServerCommand.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one or more | ||
// contributor license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright ownership. | ||
// The ASF licenses this file to You under the Apache License, Version 2.0 | ||
// (the "License"); you may not use this file except in compliance with | ||
// the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
using System; | ||
using System.Net; | ||
using System.Threading.Tasks; | ||
using Apache.Arrow.Flight.TestWeb; | ||
using Microsoft.AspNetCore.Hosting; | ||
using Microsoft.AspNetCore.Hosting.Server; | ||
using Microsoft.AspNetCore.Hosting.Server.Features; | ||
using Microsoft.AspNetCore.Server.Kestrel.Core; | ||
using Microsoft.Extensions.DependencyInjection; | ||
using Microsoft.Extensions.Hosting; | ||
|
||
namespace Apache.Arrow.Flight.IntegrationTest; | ||
|
||
public class FlightServerCommand | ||
{ | ||
private readonly string _scenario; | ||
|
||
public FlightServerCommand(string scenario) | ||
{ | ||
_scenario = scenario; | ||
} | ||
|
||
public async Task Execute() | ||
{ | ||
if (!string.IsNullOrEmpty(_scenario)) | ||
{ | ||
// No named scenarios are currently implemented | ||
throw new Exception($"Scenario '{_scenario}' is not supported."); | ||
} | ||
|
||
var host = Host.CreateDefaultBuilder() | ||
.ConfigureWebHostDefaults(webBuilder => | ||
{ | ||
webBuilder | ||
.ConfigureKestrel(options => | ||
{ | ||
options.Listen(IPEndPoint.Parse("127.0.0.1:0"), l => l.Protocols = HttpProtocols.Http2); | ||
}) | ||
.UseStartup<Startup>(); | ||
}) | ||
.Build(); | ||
|
||
await host.StartAsync().ConfigureAwait(false); | ||
|
||
var addresses = host.Services.GetService<IServer>().Features.Get<IServerAddressesFeature>().Addresses; | ||
foreach (var address in addresses) | ||
{ | ||
Console.WriteLine($"Server listening on {address}"); | ||
} | ||
|
||
await host.WaitForShutdownAsync().ConfigureAwait(false); | ||
} | ||
} |
34 changes: 34 additions & 0 deletions
34
csharp/test/Apache.Arrow.Flight.IntegrationTest/GrpcResolver.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one or more | ||
// contributor license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright ownership. | ||
// The ASF licenses this file to You under the Apache License, Version 2.0 | ||
// (the "License"); you may not use this file except in compliance with | ||
// the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
using Grpc.Net.Client.Balancer; | ||
|
||
namespace Apache.Arrow.Flight.IntegrationTest; | ||
|
||
/// <summary> | ||
/// The Grpc.Net.Client library doesn't know how to handle the "grpc+tcp" scheme used by Arrow Flight. | ||
/// This ResolverFactory passes these through to the standard Static Resolver used for the http scheme. | ||
/// </summary> | ||
public class GrpcTcpResolverFactory : ResolverFactory | ||
{ | ||
public override string Name => "grpc+tcp"; | ||
|
||
public override Resolver Create(ResolverOptions options) | ||
{ | ||
return new StaticResolverFactory( | ||
uri => new[] { new BalancerAddress(options.Address.Host, options.Address.Port) }) | ||
.Create(options); | ||
} | ||
} |
167 changes: 167 additions & 0 deletions
167
csharp/test/Apache.Arrow.Flight.IntegrationTest/JsonTestScenario.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one or more | ||
// contributor license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright ownership. | ||
// The ASF licenses this file to You under the Apache License, Version 2.0 | ||
// (the "License"); you may not use this file except in compliance with | ||
// the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
using System; | ||
using System.IO; | ||
using System.Linq; | ||
using System.Threading.Tasks; | ||
using Apache.Arrow.Flight.Client; | ||
using Apache.Arrow.IntegrationTest; | ||
using Apache.Arrow.Tests; | ||
using Apache.Arrow.Types; | ||
using Google.Protobuf; | ||
using Grpc.Net.Client; | ||
using Grpc.Core; | ||
using Grpc.Net.Client.Balancer; | ||
using Microsoft.Extensions.DependencyInjection; | ||
|
||
namespace Apache.Arrow.Flight.IntegrationTest; | ||
|
||
/// <summary> | ||
/// A test scenario defined using a JSON data file | ||
/// </summary> | ||
internal class JsonTestScenario | ||
{ | ||
private readonly int _serverPort; | ||
private readonly FileInfo _jsonFile; | ||
private readonly ServiceProvider _serviceProvider; | ||
|
||
public JsonTestScenario(int serverPort, FileInfo jsonFile) | ||
{ | ||
_serverPort = serverPort; | ||
_jsonFile = jsonFile; | ||
|
||
var services = new ServiceCollection(); | ||
services.AddSingleton<ResolverFactory>(new GrpcTcpResolverFactory()); | ||
_serviceProvider = services.BuildServiceProvider(); | ||
} | ||
|
||
public async Task RunClient() | ||
{ | ||
var address = $"grpc+tcp://localhost:{_serverPort}"; | ||
using var channel = GrpcChannel.ForAddress( | ||
address, | ||
new GrpcChannelOptions | ||
{ | ||
ServiceProvider = _serviceProvider, | ||
Credentials = ChannelCredentials.Insecure | ||
}); | ||
var client = new FlightClient(channel); | ||
|
||
var descriptor = FlightDescriptor.CreatePathDescriptor(_jsonFile.FullName); | ||
|
||
var jsonFile = await JsonFile.ParseAsync(_jsonFile).ConfigureAwait(false); | ||
var schema = jsonFile.GetSchemaAndDictionaries(out Func<DictionaryType, IArrowArray> dictionaries); | ||
var batches = jsonFile.Batches.Select(batch => batch.ToArrow(schema, dictionaries)).ToArray(); | ||
|
||
// 1. Put the data to the server. | ||
await UploadBatches(client, descriptor, batches).ConfigureAwait(false); | ||
|
||
// 2. Get the ticket for the data. | ||
var info = await client.GetInfo(descriptor).ConfigureAwait(false); | ||
if (info.Endpoints.Count == 0) | ||
{ | ||
throw new Exception("No endpoints received"); | ||
} | ||
|
||
// 3. Stream data from the server, comparing individual batches. | ||
foreach (var endpoint in info.Endpoints) | ||
{ | ||
var locations = endpoint.Locations.ToArray(); | ||
if (locations.Length == 0) | ||
{ | ||
// Can read with existing client | ||
await ConsumeFlightLocation(client, endpoint.Ticket, batches).ConfigureAwait(false); | ||
} | ||
else | ||
{ | ||
foreach (var location in locations) | ||
{ | ||
using var readChannel = GrpcChannel.ForAddress( | ||
location.Uri, | ||
new GrpcChannelOptions | ||
{ | ||
ServiceProvider = _serviceProvider, | ||
Credentials = ChannelCredentials.Insecure | ||
}); | ||
var readClient = new FlightClient(readChannel); | ||
await ConsumeFlightLocation(readClient, endpoint.Ticket, batches).ConfigureAwait(false); | ||
} | ||
} | ||
} | ||
} | ||
|
||
private static async Task UploadBatches(FlightClient client, FlightDescriptor descriptor, RecordBatch[] batches) | ||
{ | ||
using var putCall = client.StartPut(descriptor); | ||
using var writer = putCall.RequestStream; | ||
|
||
try | ||
{ | ||
var counter = 0; | ||
foreach (var batch in batches) | ||
{ | ||
var metadata = $"{counter}"; | ||
|
||
await writer.WriteAsync(batch, ByteString.CopyFromUtf8(metadata)).ConfigureAwait(false); | ||
|
||
// Verify server has acknowledged the write request | ||
await putCall.ResponseStream.MoveNext().ConfigureAwait(false); | ||
var responseString = putCall.ResponseStream.Current.ApplicationMetadata.ToStringUtf8(); | ||
|
||
if (responseString != metadata) | ||
{ | ||
throw new Exception($"Response metadata '{responseString}' does not match expected metadata '{metadata}'"); | ||
} | ||
|
||
counter++; | ||
} | ||
} | ||
finally | ||
{ | ||
await writer.CompleteAsync().ConfigureAwait(false); | ||
} | ||
|
||
// Drain the response stream to ensure the server has stored the data | ||
var hasMore = await putCall.ResponseStream.MoveNext().ConfigureAwait(false); | ||
if (hasMore) | ||
{ | ||
throw new Exception("Expected to have reached the end of the response stream"); | ||
} | ||
} | ||
|
||
private static async Task ConsumeFlightLocation(FlightClient client, FlightTicket ticket, RecordBatch[] batches) | ||
{ | ||
using var readStream = client.GetStream(ticket); | ||
var counter = 0; | ||
foreach (var originalBatch in batches) | ||
{ | ||
if (!await readStream.ResponseStream.MoveNext().ConfigureAwait(false)) | ||
{ | ||
throw new Exception($"Expected {batches.Length} batches but received {counter}"); | ||
} | ||
|
||
var batch = readStream.ResponseStream.Current; | ||
ArrowReaderVerifier.CompareBatches(originalBatch, batch, strictCompare: false); | ||
|
||
counter++; | ||
} | ||
|
||
if (await readStream.ResponseStream.MoveNext().ConfigureAwait(false)) | ||
{ | ||
throw new Exception($"Expected to reach the end of the response stream after {batches.Length} batches"); | ||
} | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't required for any of the included tests, but before disabling the
primitive_no_batches
test, it would crash here with a NullReferenceException due to the writer being disposed before the stream was created.With this fix, writing doesn't crash, but the data isn't found when trying to retrieve it.