Skip to content

Commit

Permalink
Fix the issue with single node ClientManager
Browse files Browse the repository at this point in the history
Signed-off-by: Rishabh Maurya <[email protected]>
  • Loading branch information
rishabhmaurya committed Jan 26, 2025
1 parent 1c6fcc2 commit 6b8a6e9
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor
throw CallStatus.NOT_FOUND.withDescription("FlightInfo not found").toRuntimeException();
}
Location location = flightClientManager.getFlightClientLocation(streamTicket.getNodeId());
if (location == null) {
throw CallStatus.UNAVAILABLE.withDescription("Internal error while determining location information from ticket.")
.toRuntimeException();
}
FlightEndpoint endpoint = new FlightEndpoint(new Ticket(descriptor.getCommand()), location);
FlightInfo.Builder infoBuilder = FlightInfo.builder(
streamProducerHolder.getRoot().getSchema(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,20 @@ public void setUp() throws Exception {
clientManager = new FlightClientManager(allocator, clusterService, sslContextProvider, elg, threadPool, client);
ClusterChangedEvent event = new ClusterChangedEvent("test", state, ClusterState.EMPTY_STATE);
clientManager.clusterChanged(event);
<<<<<<< HEAD
assertBusy(() -> {
assertEquals("Flight client isn't built in time limit", 2, clientManager.getClients().size());
assertNotNull("local_node should exist", clientManager.getFlightClient("local_node"));
assertNotNull("remote_node should exist", clientManager.getFlightClient("remote_node"));
}, 2, TimeUnit.SECONDS);
=======
clientManager.updateFlightClients();
assertBusy(
() -> { assertFalse("Flight client isn't built in time limit", clientManager.getClients().isEmpty()); },
2,
TimeUnit.SECONDS
);
>>>>>>> 8cc555604ef (Fix the issue with single node ClientManager)
}

private void mockFlightInfoResponse(DiscoveryNodes nodes, int sleepDuration) {
Expand Down

0 comments on commit 6b8a6e9

Please sign in to comment.