diff --git a/csharp/Apache.Arrow.sln b/csharp/Apache.Arrow.sln
index 7e7f7c6331e88..0e569de1d6c8f 100644
--- a/csharp/Apache.Arrow.sln
+++ b/csharp/Apache.Arrow.sln
@@ -27,6 +27,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Flight.Sql.Tes
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Flight.Sql", "src\Apache.Arrow.Flight.Sql\Apache.Arrow.Flight.Sql.csproj", "{2ADE087A-B424-4895-8CC5-10170D10BA62}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Flight.IntegrationTest", "test\Apache.Arrow.Flight.IntegrationTest\Apache.Arrow.Flight.IntegrationTest.csproj", "{7E66CBB4-D921-41E7-A98A-7C6DEA521696}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -81,6 +83,10 @@ Global
{2ADE087A-B424-4895-8CC5-10170D10BA62}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2ADE087A-B424-4895-8CC5-10170D10BA62}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2ADE087A-B424-4895-8CC5-10170D10BA62}.Release|Any CPU.Build.0 = Release|Any CPU
+ {7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/csharp/src/Apache.Arrow.Flight/FlightRecordBatchStreamWriter.cs b/csharp/src/Apache.Arrow.Flight/FlightRecordBatchStreamWriter.cs
index f76f08224541f..7a8a6fd677c68 100644
--- a/csharp/src/Apache.Arrow.Flight/FlightRecordBatchStreamWriter.cs
+++ b/csharp/src/Apache.Arrow.Flight/FlightRecordBatchStreamWriter.cs
@@ -64,7 +64,7 @@ protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
- _flightDataStream.Dispose();
+ _flightDataStream?.Dispose();
_disposed = true;
}
}
diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/Apache.Arrow.Flight.IntegrationTest.csproj b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Apache.Arrow.Flight.IntegrationTest.csproj
new file mode 100644
index 0000000000000..34030621b4bde
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Apache.Arrow.Flight.IntegrationTest.csproj
@@ -0,0 +1,18 @@
+
+
+
+
+ Exe
+ net8.0
+ Apache.Arrow.Flight.IntegrationTest
+
+
+
+
+
+
+
+
+
+
+
diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightClientCommand.cs b/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightClientCommand.cs
new file mode 100644
index 0000000000000..d9e0ff5230611
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightClientCommand.cs
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.IO;
+using System.Threading.Tasks;
+
+namespace Apache.Arrow.Flight.IntegrationTest;
+
+public class FlightClientCommand
+{
+ private readonly int _port;
+ private readonly string _scenario;
+ private readonly FileInfo _jsonFileInfo;
+
+ public FlightClientCommand(int port, string scenario, FileInfo jsonFileInfo)
+ {
+ _port = port;
+ _scenario = scenario;
+ _jsonFileInfo = jsonFileInfo;
+ }
+
+ public async Task Execute()
+ {
+ if (!string.IsNullOrEmpty(_scenario))
+ {
+ // No named scenarios are currently implemented
+ throw new Exception($"Scenario '{_scenario}' is not supported.");
+ }
+
+ if (!(_jsonFileInfo?.Exists ?? false))
+ {
+ throw new Exception($"Invalid JSON file path '{_jsonFileInfo?.FullName}'");
+ }
+
+ var scenario = new JsonTestScenario(_port, _jsonFileInfo);
+ await scenario.RunClient().ConfigureAwait(false);
+ }
+}
diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightServerCommand.cs b/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightServerCommand.cs
new file mode 100644
index 0000000000000..c3a7694485b69
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightServerCommand.cs
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Net;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.TestWeb;
+using Microsoft.AspNetCore.Hosting;
+using Microsoft.AspNetCore.Hosting.Server;
+using Microsoft.AspNetCore.Hosting.Server.Features;
+using Microsoft.AspNetCore.Server.Kestrel.Core;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+
+namespace Apache.Arrow.Flight.IntegrationTest;
+
+public class FlightServerCommand
+{
+ private readonly string _scenario;
+
+ public FlightServerCommand(string scenario)
+ {
+ _scenario = scenario;
+ }
+
+ public async Task Execute()
+ {
+ if (!string.IsNullOrEmpty(_scenario))
+ {
+ // No named scenarios are currently implemented
+ throw new Exception($"Scenario '{_scenario}' is not supported.");
+ }
+
+ var host = Host.CreateDefaultBuilder()
+ .ConfigureWebHostDefaults(webBuilder =>
+ {
+ webBuilder
+ .ConfigureKestrel(options =>
+ {
+ options.Listen(IPEndPoint.Parse("127.0.0.1:0"), l => l.Protocols = HttpProtocols.Http2);
+ })
+ .UseStartup();
+ })
+ .Build();
+
+ await host.StartAsync().ConfigureAwait(false);
+
+ var addresses = host.Services.GetService().Features.Get().Addresses;
+ foreach (var address in addresses)
+ {
+ Console.WriteLine($"Server listening on {address}");
+ }
+
+ await host.WaitForShutdownAsync().ConfigureAwait(false);
+ }
+}
diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/GrpcResolver.cs b/csharp/test/Apache.Arrow.Flight.IntegrationTest/GrpcResolver.cs
new file mode 100644
index 0000000000000..44b1075e7abf2
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/GrpcResolver.cs
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Grpc.Net.Client.Balancer;
+
+namespace Apache.Arrow.Flight.IntegrationTest;
+
+///
+/// The Grpc.Net.Client library doesn't know how to handle the "grpc+tcp" scheme used by Arrow Flight.
+/// This ResolverFactory passes these through to the standard Static Resolver used for the http scheme.
+///
+public class GrpcTcpResolverFactory : ResolverFactory
+{
+ public override string Name => "grpc+tcp";
+
+ public override Resolver Create(ResolverOptions options)
+ {
+ return new StaticResolverFactory(
+ uri => new[] { new BalancerAddress(options.Address.Host, options.Address.Port) })
+ .Create(options);
+ }
+}
diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/JsonTestScenario.cs b/csharp/test/Apache.Arrow.Flight.IntegrationTest/JsonTestScenario.cs
new file mode 100644
index 0000000000000..f4f3ac28bfa1b
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/JsonTestScenario.cs
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.IO;
+using System.Linq;
+using System.Threading.Tasks;
+using Apache.Arrow.Flight.Client;
+using Apache.Arrow.IntegrationTest;
+using Apache.Arrow.Tests;
+using Apache.Arrow.Types;
+using Google.Protobuf;
+using Grpc.Net.Client;
+using Grpc.Core;
+using Grpc.Net.Client.Balancer;
+using Microsoft.Extensions.DependencyInjection;
+
+namespace Apache.Arrow.Flight.IntegrationTest;
+
+///
+/// A test scenario defined using a JSON data file
+///
+internal class JsonTestScenario
+{
+ private readonly int _serverPort;
+ private readonly FileInfo _jsonFile;
+ private readonly ServiceProvider _serviceProvider;
+
+ public JsonTestScenario(int serverPort, FileInfo jsonFile)
+ {
+ _serverPort = serverPort;
+ _jsonFile = jsonFile;
+
+ var services = new ServiceCollection();
+ services.AddSingleton(new GrpcTcpResolverFactory());
+ _serviceProvider = services.BuildServiceProvider();
+ }
+
+ public async Task RunClient()
+ {
+ var address = $"grpc+tcp://localhost:{_serverPort}";
+ using var channel = GrpcChannel.ForAddress(
+ address,
+ new GrpcChannelOptions
+ {
+ ServiceProvider = _serviceProvider,
+ Credentials = ChannelCredentials.Insecure
+ });
+ var client = new FlightClient(channel);
+
+ var descriptor = FlightDescriptor.CreatePathDescriptor(_jsonFile.FullName);
+
+ var jsonFile = await JsonFile.ParseAsync(_jsonFile).ConfigureAwait(false);
+ var schema = jsonFile.GetSchemaAndDictionaries(out Func dictionaries);
+ var batches = jsonFile.Batches.Select(batch => batch.ToArrow(schema, dictionaries)).ToArray();
+
+ // 1. Put the data to the server.
+ await UploadBatches(client, descriptor, batches).ConfigureAwait(false);
+
+ // 2. Get the ticket for the data.
+ var info = await client.GetInfo(descriptor).ConfigureAwait(false);
+ if (info.Endpoints.Count == 0)
+ {
+ throw new Exception("No endpoints received");
+ }
+
+ // 3. Stream data from the server, comparing individual batches.
+ foreach (var endpoint in info.Endpoints)
+ {
+ var locations = endpoint.Locations.ToArray();
+ if (locations.Length == 0)
+ {
+ // Can read with existing client
+ await ConsumeFlightLocation(client, endpoint.Ticket, batches).ConfigureAwait(false);
+ }
+ else
+ {
+ foreach (var location in locations)
+ {
+ using var readChannel = GrpcChannel.ForAddress(
+ location.Uri,
+ new GrpcChannelOptions
+ {
+ ServiceProvider = _serviceProvider,
+ Credentials = ChannelCredentials.Insecure
+ });
+ var readClient = new FlightClient(readChannel);
+ await ConsumeFlightLocation(readClient, endpoint.Ticket, batches).ConfigureAwait(false);
+ }
+ }
+ }
+ }
+
+ private static async Task UploadBatches(FlightClient client, FlightDescriptor descriptor, RecordBatch[] batches)
+ {
+ using var putCall = client.StartPut(descriptor);
+ using var writer = putCall.RequestStream;
+
+ try
+ {
+ var counter = 0;
+ foreach (var batch in batches)
+ {
+ var metadata = $"{counter}";
+
+ await writer.WriteAsync(batch, ByteString.CopyFromUtf8(metadata)).ConfigureAwait(false);
+
+ // Verify server has acknowledged the write request
+ await putCall.ResponseStream.MoveNext().ConfigureAwait(false);
+ var responseString = putCall.ResponseStream.Current.ApplicationMetadata.ToStringUtf8();
+
+ if (responseString != metadata)
+ {
+ throw new Exception($"Response metadata '{responseString}' does not match expected metadata '{metadata}'");
+ }
+
+ counter++;
+ }
+ }
+ finally
+ {
+ await writer.CompleteAsync().ConfigureAwait(false);
+ }
+
+ // Drain the response stream to ensure the server has stored the data
+ var hasMore = await putCall.ResponseStream.MoveNext().ConfigureAwait(false);
+ if (hasMore)
+ {
+ throw new Exception("Expected to have reached the end of the response stream");
+ }
+ }
+
+ private static async Task ConsumeFlightLocation(FlightClient client, FlightTicket ticket, RecordBatch[] batches)
+ {
+ using var readStream = client.GetStream(ticket);
+ var counter = 0;
+ foreach (var originalBatch in batches)
+ {
+ if (!await readStream.ResponseStream.MoveNext().ConfigureAwait(false))
+ {
+ throw new Exception($"Expected {batches.Length} batches but received {counter}");
+ }
+
+ var batch = readStream.ResponseStream.Current;
+ ArrowReaderVerifier.CompareBatches(originalBatch, batch, strictCompare: false);
+
+ counter++;
+ }
+
+ if (await readStream.ResponseStream.MoveNext().ConfigureAwait(false))
+ {
+ throw new Exception($"Expected to reach the end of the response stream after {batches.Length} batches");
+ }
+ }
+}
diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/Program.cs b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Program.cs
new file mode 100644
index 0000000000000..24d39de28a731
--- /dev/null
+++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Program.cs
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System.CommandLine;
+using System.IO;
+using System.Threading.Tasks;
+
+namespace Apache.Arrow.Flight.IntegrationTest;
+
+public static class Program
+{
+ public static async Task Main(string[] args)
+ {
+ var portOption = new Option(
+ new[] { "--port", "-p" },
+ description: "Port the Flight server is listening on");
+ var scenarioOption = new Option(
+ new[] { "--scenario", "-s" },
+ "The name of the scenario to run");
+ var pathOption = new Option(
+ new[] { "--path", "-j" },
+ "Path to a JSON file of test data");
+
+ var rootCommand = new RootCommand(
+ "Integration test application for Apache.Arrow .NET Flight.");
+
+ var clientCommand = new Command("client", "Run the Flight client")
+ {
+ portOption,
+ scenarioOption,
+ pathOption,
+ };
+ rootCommand.AddCommand(clientCommand);
+
+ clientCommand.SetHandler(async (port, scenario, jsonFile) =>
+ {
+ var command = new FlightClientCommand(port, scenario, jsonFile);
+ await command.Execute().ConfigureAwait(false);
+ }, portOption, scenarioOption, pathOption);
+
+ var serverCommand = new Command("server", "Run the Flight server")
+ {
+ scenarioOption,
+ };
+ rootCommand.AddCommand(serverCommand);
+
+ serverCommand.SetHandler(async scenario =>
+ {
+ var command = new FlightServerCommand(scenario);
+ await command.Execute().ConfigureAwait(false);
+ }, scenarioOption);
+
+ return await rootCommand.InvokeAsync(args).ConfigureAwait(false);
+ }
+}
diff --git a/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs b/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs
index 97c1af2f06cb8..d1cfe9e445808 100644
--- a/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs
+++ b/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs
@@ -13,15 +13,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Logging.Console;
namespace Apache.Arrow.Flight.TestWeb
{
@@ -35,6 +33,11 @@ public void ConfigureServices(IServiceCollection services)
.AddFlightServer();
services.AddSingleton(new FlightStore());
+
+ // The integration tests rely on the port being written to the first line of stdout,
+ // so send all logging to stderr.
+ services.Configure(
+ o => o.LogToStandardErrorThreshold = LogLevel.Debug);
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
diff --git a/csharp/test/Apache.Arrow.Flight.TestWeb/TestFlightServer.cs b/csharp/test/Apache.Arrow.Flight.TestWeb/TestFlightServer.cs
index 4a72b73274f1e..46c5460912d8c 100644
--- a/csharp/test/Apache.Arrow.Flight.TestWeb/TestFlightServer.cs
+++ b/csharp/test/Apache.Arrow.Flight.TestWeb/TestFlightServer.cs
@@ -67,14 +67,16 @@ public override async Task DoPut(FlightServerRecordBatchStreamReader requestStre
if(!_flightStore.Flights.TryGetValue(flightDescriptor, out var flightHolder))
{
- flightHolder = new FlightHolder(flightDescriptor, await requestStream.Schema, $"http://{context.Host}");
+ flightHolder = new FlightHolder(flightDescriptor, await requestStream.Schema, $"grpc+tcp://{context.Host}");
_flightStore.Flights.Add(flightDescriptor, flightHolder);
}
while (await requestStream.MoveNext())
{
- flightHolder.AddBatch(new RecordBatchWithMetadata(requestStream.Current, requestStream.ApplicationMetadata.FirstOrDefault()));
- await responseStream.WriteAsync(FlightPutResult.Empty);
+ var applicationMetadata = requestStream.ApplicationMetadata.FirstOrDefault();
+ flightHolder.AddBatch(new RecordBatchWithMetadata(requestStream.Current, applicationMetadata));
+ await responseStream.WriteAsync(
+ applicationMetadata == null ? FlightPutResult.Empty : new FlightPutResult(applicationMetadata));
}
}
diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py
index 9f86d172ddbcf..bc862963405f2 100644
--- a/dev/archery/archery/integration/datagen.py
+++ b/dev/archery/archery/integration/datagen.py
@@ -25,7 +25,7 @@
import numpy as np
from .util import frombytes, tobytes, random_bytes, random_utf8
-from .util import SKIP_C_SCHEMA, SKIP_C_ARRAY
+from .util import SKIP_C_SCHEMA, SKIP_C_ARRAY, SKIP_FLIGHT
def metadata_key_values(pairs):
@@ -1890,7 +1890,10 @@ def _temp_path():
return
file_objs = [
- generate_primitive_case([], name='primitive_no_batches'),
+ generate_primitive_case([], name='primitive_no_batches')
+ # TODO(https://github.com/apache/arrow/issues/44363)
+ .skip_format(SKIP_FLIGHT, 'C#'),
+
generate_primitive_case([17, 20], name='primitive'),
generate_primitive_case([0, 0, 0], name='primitive_zerolength'),
@@ -1952,16 +1955,22 @@ def _temp_path():
generate_dictionary_case()
# TODO(https://github.com/apache/arrow-nanoarrow/issues/622)
- .skip_tester('nanoarrow'),
+ .skip_tester('nanoarrow')
+ # TODO(https://github.com/apache/arrow/issues/38045)
+ .skip_format(SKIP_FLIGHT, 'C#'),
generate_dictionary_unsigned_case()
.skip_tester('nanoarrow')
- .skip_tester('Java'), # TODO(ARROW-9377)
+ .skip_tester('Java') # TODO(ARROW-9377)
+ # TODO(https://github.com/apache/arrow/issues/38045)
+ .skip_format(SKIP_FLIGHT, 'C#'),
generate_nested_dictionary_case()
# TODO(https://github.com/apache/arrow-nanoarrow/issues/622)
.skip_tester('nanoarrow')
- .skip_tester('Java'), # TODO(ARROW-7779)
+ .skip_tester('Java') # TODO(ARROW-7779)
+ # TODO(https://github.com/apache/arrow/issues/38045)
+ .skip_format(SKIP_FLIGHT, 'C#'),
generate_run_end_encoded_case()
.skip_tester('C#')
@@ -1988,7 +1997,9 @@ def _temp_path():
.skip_tester('nanoarrow')
# TODO: ensure the extension is registered in the C++ entrypoint
.skip_format(SKIP_C_SCHEMA, 'C++')
- .skip_format(SKIP_C_ARRAY, 'C++'),
+ .skip_format(SKIP_C_ARRAY, 'C++')
+ # TODO(https://github.com/apache/arrow/issues/38045)
+ .skip_format(SKIP_FLIGHT, 'C#'),
]
generated_paths = []
diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py
index e276738846371..378b17d75fdce 100644
--- a/dev/archery/archery/integration/runner.py
+++ b/dev/archery/archery/integration/runner.py
@@ -631,10 +631,13 @@ def append_tester(implementation, tester):
flight_scenarios = [
Scenario(
"auth:basic_proto",
- description="Authenticate using the BasicAuth protobuf."),
+ description="Authenticate using the BasicAuth protobuf.",
+ skip_testers={"C#"},
+ ),
Scenario(
"middleware",
description="Ensure headers are propagated via middleware.",
+ skip_testers={"C#"},
),
Scenario(
"ordered",
@@ -689,12 +692,12 @@ def append_tester(implementation, tester):
Scenario(
"flight_sql",
description="Ensure Flight SQL protocol is working as expected.",
- skip_testers={"Rust"}
+ skip_testers={"Rust", "C#"}
),
Scenario(
"flight_sql:extension",
description="Ensure Flight SQL extensions work as expected.",
- skip_testers={"Rust"}
+ skip_testers={"Rust", "C#"}
),
Scenario(
"flight_sql:ingestion",
diff --git a/dev/archery/archery/integration/tester_csharp.py b/dev/archery/archery/integration/tester_csharp.py
index 02ced0701deaf..50b3499fbf285 100644
--- a/dev/archery/archery/integration/tester_csharp.py
+++ b/dev/archery/archery/integration/tester_csharp.py
@@ -17,6 +17,7 @@
from contextlib import contextmanager
import os
+import subprocess
from . import cdata
from .tester import Tester, CDataExporter, CDataImporter
@@ -25,12 +26,20 @@
_ARTIFACTS_PATH = os.path.join(ARROW_ROOT_DEFAULT, "csharp/artifacts")
+_BUILD_SUBDIR = "Debug/net8.0"
_EXE_PATH = os.path.join(_ARTIFACTS_PATH,
"Apache.Arrow.IntegrationTest",
- "Debug/net8.0/Apache.Arrow.IntegrationTest",
+ _BUILD_SUBDIR,
+ "Apache.Arrow.IntegrationTest",
)
+_FLIGHT_EXE_PATH = os.path.join(_ARTIFACTS_PATH,
+ "Apache.Arrow.Flight.IntegrationTest",
+ _BUILD_SUBDIR,
+ "Apache.Arrow.Flight.IntegrationTest",
+ )
+
_clr_loaded = False
@@ -44,10 +53,10 @@ def _load_clr():
import clr
clr.AddReference(
f"{_ARTIFACTS_PATH}/Apache.Arrow.IntegrationTest/"
- f"Debug/net8.0/Apache.Arrow.IntegrationTest.dll")
+ f"{_BUILD_SUBDIR}/Apache.Arrow.IntegrationTest.dll")
clr.AddReference(
f"{_ARTIFACTS_PATH}/Apache.Arrow.Tests/"
- f"Debug/net8.0/Apache.Arrow.Tests.dll")
+ f"{_BUILD_SUBDIR}/Apache.Arrow.Tests.dll")
from Apache.Arrow.IntegrationTest import CDataInterface
CDataInterface.Initialize()
@@ -146,6 +155,8 @@ def run_gc(self):
class CSharpTester(Tester):
PRODUCER = True
CONSUMER = True
+ FLIGHT_SERVER = True
+ FLIGHT_CLIENT = True
C_DATA_SCHEMA_EXPORTER = True
C_DATA_SCHEMA_IMPORTER = True
C_DATA_ARRAY_EXPORTER = True
@@ -192,3 +203,43 @@ def make_c_data_exporter(self):
def make_c_data_importer(self):
return CSharpCDataImporter(self.debug, self.args)
+
+ def flight_request(self, port, json_path=None, scenario_name=None):
+ cmd = [_FLIGHT_EXE_PATH, 'client', '--port', f'{port}']
+ if json_path:
+ cmd.extend(['--path', json_path])
+ elif scenario_name:
+ cmd.extend(['--scenario', scenario_name])
+ else:
+ raise TypeError("Must provide one of json_path or scenario_name")
+
+ if self.debug:
+ log(' '.join(cmd))
+ run_cmd(cmd)
+
+ @contextmanager
+ def flight_server(self, scenario_name=None):
+ cmd = [_FLIGHT_EXE_PATH, 'server']
+ if scenario_name:
+ cmd.extend(['--scenario', scenario_name])
+ if self.debug:
+ log(' '.join(cmd))
+ server = subprocess.Popen(
+ cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+ try:
+ output = server.stdout.readline().decode()
+ if not output.startswith("Server listening on "):
+ server.kill()
+ out, err = server.communicate()
+ raise RuntimeError(
+ '.NET Flight server did not start properly, '
+ 'stdout: \n{}\n\nstderr:\n{}\n'.format(
+ output + out.decode(), err.decode()
+ )
+ )
+ port = int(output.split(':')[-1])
+ yield port
+ finally:
+ server.kill()
+ server.wait(5)
diff --git a/dev/release/rat_exclude_files.txt b/dev/release/rat_exclude_files.txt
index e149c179813a0..dda1d36dc1aeb 100644
--- a/dev/release/rat_exclude_files.txt
+++ b/dev/release/rat_exclude_files.txt
@@ -116,6 +116,7 @@ csharp/src/Apache.Arrow.Flight.AspNetCore/Apache.Arrow.Flight.AspNetCore.csproj
csharp/src/Apache.Arrow.Compression/Apache.Arrow.Compression.csproj
csharp/src/Apache.Arrow.Flight.Sql/Apache.Arrow.Flight.Sql.csproj
csharp/test/Apache.Arrow.Benchmarks/Apache.Arrow.Benchmarks.csproj
+csharp/test/Apache.Arrow.Flight.IntegrationTest/Apache.Arrow.Flight.IntegrationTest.csproj
csharp/test/Apache.Arrow.Flight.Tests/Apache.Arrow.Flight.Tests.csproj
csharp/test/Apache.Arrow.Flight.Sql.Tests/Apache.Arrow.Flight.Sql.Tests.csproj
csharp/test/Apache.Arrow.Flight.TestWeb/Apache.Arrow.Flight.TestWeb.csproj