Skip to content

Commit

Permalink
apacheGH-40690: [C#][FlightRPC] Add do_exchange csharp implementation (
Browse files Browse the repository at this point in the history
…apache#40691)

### Rationale for this change

This is a draft implementation of DoExchange. A simple usage demo is in FlightTests.cs and TestFlightServer.cs.
I've tried to share the implementation with DoGet/DoPut as much as possible.

### What changes are included in this PR?

- FlightServer.cs and related FlightServerImplementation.cs
- FlightClient.cs with (new) FlightRecordBatchExchangeCall.cs wrapper call.

### Are these changes tested?

Yes, tests are added in FlightTest.cs and TestFlightServer.cs
I've tested locally with the C++ implementation.

### Are there any user-facing changes?

No and the DoExchange documentation is already present

* GitHub Issue: apache#40690

Lead-authored-by: Marco Malagoli <[email protected]>
Co-authored-by: qmmk <[email protected]>
Signed-off-by: Curt Hagenlocher <[email protected]>
  • Loading branch information
qmmk authored Mar 23, 2024
1 parent fa36cde commit 6aa6f85
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 2 deletions.
16 changes: 16 additions & 0 deletions csharp/src/Apache.Arrow.Flight/Client/FlightClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,22 @@ public AsyncDuplexStreamingCall<FlightHandshakeRequest, FlightHandshakeResponse>
return call;
}

public FlightRecordBatchExchangeCall DoExchange(FlightDescriptor flightDescriptor, Metadata headers = null)
{
var channel = _client.DoExchange(headers);
var requestStream = new FlightClientRecordBatchStreamWriter(channel.RequestStream, flightDescriptor);
var responseStream = new FlightClientRecordBatchStreamReader(channel.ResponseStream);
var call = new FlightRecordBatchExchangeCall(
requestStream,
responseStream,
channel.ResponseHeadersAsync,
channel.GetStatus,
channel.GetTrailers,
channel.Dispose);

return call;
}

public AsyncServerStreamingCall<FlightResult> DoAction(FlightAction action, Metadata headers = null)
{
var stream = _client.DoAction(action.ToProtocol(), headers);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using Grpc.Core;
using System.Threading.Tasks;

namespace Apache.Arrow.Flight.Client
{
public class FlightRecordBatchExchangeCall : IDisposable
{
private readonly Func<Status> _getStatusFunc;
private readonly Func<Metadata> _getTrailersFunc;
private readonly Action _disposeAction;

internal FlightRecordBatchExchangeCall(
FlightClientRecordBatchStreamWriter requestStream,
FlightClientRecordBatchStreamReader responseStream,
Task<Metadata> responseHeadersAsync,
Func<Status> getStatusFunc,
Func<Metadata> getTrailersFunc,
Action disposeAction)
{
RequestStream = requestStream;
ResponseStream = responseStream;
ResponseHeadersAsync = responseHeadersAsync;
_getStatusFunc = getStatusFunc;
_getTrailersFunc = getTrailersFunc;
_disposeAction = disposeAction;
}

/// <summary>
/// Async stream to read streaming responses.
/// </summary>
public FlightClientRecordBatchStreamReader ResponseStream { get; }

/// <summary>
/// Async stream to send streaming requests.
/// </summary>
public FlightClientRecordBatchStreamWriter RequestStream { get; }

/// <summary>
/// Asynchronous access to response headers.
/// </summary>
public Task<Metadata> ResponseHeadersAsync { get; }

/// <summary>
/// Provides means to cleanup after the call. If the call has already finished normally
/// (response stream has been fully read), doesn't do anything. Otherwise, requests
/// cancellation of the call which should terminate all pending async operations
/// associated with the call. As a result, all resources being used by the call should
/// be released eventually.
/// </summary>
/// <remarks>
/// Normally, there is no need for you to dispose the call unless you want to utilize
/// the "Cancel" semantics of invoking Dispose.
/// </remarks>
public void Dispose()
{
_disposeAction();
}

/// <summary>
/// Gets the call status if the call has already finished. Throws InvalidOperationException otherwise.
/// </summary>
/// <returns></returns>
public Status GetStatus()
{
return _getStatusFunc();
}

/// <summary>
/// Gets the call trailing metadata if the call has already finished. Throws InvalidOperationException otherwise.
/// </summary>
/// <returns></returns>
public Metadata GetTrailers()
{
return _getTrailersFunc();
}
}
}
5 changes: 5 additions & 0 deletions csharp/src/Apache.Arrow.Flight/Server/FlightServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,10 @@ public virtual Task Handshake(IAsyncStreamReader<FlightHandshakeRequest> request
{
throw new NotImplementedException();
}

public virtual Task DoExchange(FlightServerRecordBatchStreamReader requestStream, FlightServerRecordBatchStreamWriter responseStream, ServerCallContext context)
{
throw new NotImplementedException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ public override async Task<SchemaResult> GetSchema(Protocol.FlightDescriptor req

public override Task DoExchange(IAsyncStreamReader<Protocol.FlightData> requestStream, IServerStreamWriter<Protocol.FlightData> responseStream, ServerCallContext context)
{
//Exchange is not yet implemented
throw new NotImplementedException();
var readStream = new FlightServerRecordBatchStreamReader(requestStream);
var writeStream = new FlightServerRecordBatchStreamWriter(responseStream);
return _flightServer.DoExchange(readStream, writeStream, context);
}

public override Task Handshake(IAsyncStreamReader<HandshakeRequest> requestStream, IServerStreamWriter<HandshakeResponse> responseStream, ServerCallContext context)
Expand Down
8 changes: 8 additions & 0 deletions csharp/test/Apache.Arrow.Flight.TestWeb/TestFlightServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,5 +128,13 @@ public override async Task ListFlights(FlightCriteria request, IAsyncStreamWrite
await responseStream.WriteAsync(flightInfo);
}
}

public override async Task DoExchange(FlightServerRecordBatchStreamReader requestStream, FlightServerRecordBatchStreamWriter responseStream, ServerCallContext context)
{
while(await requestStream.MoveNext().ConfigureAwait(false))
{
await responseStream.WriteAsync(requestStream.Current, requestStream.ApplicationMetadata.FirstOrDefault()).ConfigureAwait(false);
}
}
}
}
57 changes: 57 additions & 0 deletions csharp/test/Apache.Arrow.Flight.Tests/FlightTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,63 @@ public async Task TestHandshake()
Assert.Equal("Done", results.First().Payload.ToStringUtf8());
}

[Fact]
public async Task TestSingleExchange()
{
var flightDescriptor = FlightDescriptor.CreatePathDescriptor("single_exchange");
var duplexStreamingCall = _flightClient.DoExchange(flightDescriptor);
var expectedBatch = CreateTestBatch(0, 100);

await duplexStreamingCall.RequestStream.WriteAsync(expectedBatch).ConfigureAwait(false);
await duplexStreamingCall.RequestStream.CompleteAsync().ConfigureAwait(false);

var results = await duplexStreamingCall.ResponseStream.ToListAsync().ConfigureAwait(false);

Assert.Single(results);
ArrowReaderVerifier.CompareBatches(expectedBatch, results.FirstOrDefault());
}

[Fact]
public async Task TestMultipleExchange()
{
var flightDescriptor = FlightDescriptor.CreatePathDescriptor("multiple_exchange");
var duplexStreamingCall = _flightClient.DoExchange(flightDescriptor);
var expectedBatch1 = CreateTestBatch(0, 100);
var expectedBatch2 = CreateTestBatch(100, 100);

await duplexStreamingCall.RequestStream.WriteAsync(expectedBatch1).ConfigureAwait(false);
await duplexStreamingCall.RequestStream.WriteAsync(expectedBatch2).ConfigureAwait(false);
await duplexStreamingCall.RequestStream.CompleteAsync().ConfigureAwait(false);

var results = await duplexStreamingCall.ResponseStream.ToListAsync().ConfigureAwait(false);

ArrowReaderVerifier.CompareBatches(expectedBatch1, results[0]);
ArrowReaderVerifier.CompareBatches(expectedBatch2, results[1]);
}

[Fact]
public async Task TestExchangeWithMetadata()
{
var flightDescriptor = FlightDescriptor.CreatePathDescriptor("metadata_exchange");
var duplexStreamingCall = _flightClient.DoExchange(flightDescriptor);
var expectedBatch = CreateTestBatch(0, 100);
var expectedMetadata = ByteString.CopyFromUtf8("test metadata");

await duplexStreamingCall.RequestStream.WriteAsync(expectedBatch, expectedMetadata).ConfigureAwait(false);
await duplexStreamingCall.RequestStream.CompleteAsync().ConfigureAwait(false);

List<ByteString> actualMetadata = new List<ByteString>();
List<RecordBatch> actualBatch = new List<RecordBatch>();
while (await duplexStreamingCall.ResponseStream.MoveNext(default))
{
actualBatch.Add(duplexStreamingCall.ResponseStream.Current);
actualMetadata.AddRange(duplexStreamingCall.ResponseStream.ApplicationMetadata);
}

ArrowReaderVerifier.CompareBatches(expectedBatch, actualBatch.FirstOrDefault());
Assert.Equal(expectedMetadata, actualMetadata.FirstOrDefault());
}

[Fact]
public async Task TestHandshakeWithSpecificMessage()
{
Expand Down

0 comments on commit 6aa6f85

Please sign in to comment.