Skip to content

Commit

Permalink
fixing warnings v3
Browse files Browse the repository at this point in the history
  • Loading branch information
vibhatha committed Jan 9, 2024
1 parent fe7abdc commit ead5930
Show file tree
Hide file tree
Showing 14 changed files with 58 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
if (!(o instanceof CancelFlightInfoResult)) {
return false;
}
CancelFlightInfoResult that = (CancelFlightInfoResult) o;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Iterable<byte[]> getAllByte(String key) {

@Override
public void insert(String key, String value) {
metadata.put(key, value.getBytes());
metadata.put(key, value.getBytes(StandardCharsets.UTF_8));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.arrow.flight;

import static java.nio.charset.StandardCharsets.UTF_8;

import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -46,7 +48,7 @@ public String get(String key) {
}

if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
return new String((byte[]) Iterables.get(values, 0));
return new String((byte[]) Iterables.get(values, 0), UTF_8);
}

return (String) Iterables.get(values, 0);
Expand All @@ -63,13 +65,13 @@ public byte[] getByte(String key) {
return (byte[]) Iterables.get(values, 0);
}

return ((String) Iterables.get(values, 0)).getBytes();
return ((String) Iterables.get(values, 0)).getBytes(UTF_8);
}

@Override
public Iterable<String> getAll(String key) {
if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
return this.keysAndValues.get(key).stream().map(o -> new String((byte[]) o)).collect(Collectors.toList());
return this.keysAndValues.get(key).stream().map(o -> new String((byte[]) o, UTF_8)).collect(Collectors.toList());
}
return (Collection<String>) (Collection<?>) this.keysAndValues.get(key);
}
Expand All @@ -79,7 +81,7 @@ public Iterable<byte[]> getAllByte(String key) {
if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
return (Collection<byte[]>) (Collection<?>) this.keysAndValues.get(key);
}
return this.keysAndValues.get(key).stream().map(o -> ((String) o).getBytes()).collect(Collectors.toList());
return this.keysAndValues.get(key).stream().map(o -> ((String) o).getBytes(UTF_8)).collect(Collectors.toList());
}

@Override
Expand All @@ -105,6 +107,7 @@ public boolean containsKey(String key) {
return this.keysAndValues.containsKey(key);
}

@Override
public String toString() {
return this.keysAndValues.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public CallStatus status() {
}

@Override
public String toString() {
public String getMessage() {
String s = getClass().getName();
return String.format("%s: %s: %s", s, status.code(), status.description());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
Expand Down Expand Up @@ -134,6 +135,7 @@ public boolean awaitTermination(final long timeout, final TimeUnit unit) throws
}

/** Shutdown the server, waits for up to 6 seconds for successful shutdown before returning. */
@Override
public void close() throws InterruptedException {
shutdown();
final boolean terminated = awaitTermination(3000, TimeUnit.MILLISECONDS);
Expand All @@ -146,7 +148,7 @@ public void close() throws InterruptedException {
server.shutdownNow();

int count = 0;
while (!server.isTerminated() & count < 30) {
while (!server.isTerminated() && count < 30) {
count++;
logger.debug("Waiting for termination");
Thread.sleep(100);
Expand Down Expand Up @@ -216,22 +218,23 @@ public FlightServer build() {
try {
try {
// Linux
builder.channelType(
(Class<? extends ServerChannel>) Class
.forName("io.netty.channel.epoll.EpollServerDomainSocketChannel"));
final EventLoopGroup elg = (EventLoopGroup) Class.forName("io.netty.channel.epoll.EpollEventLoopGroup")
.newInstance();
builder.channelType(Class
.forName("io.netty.channel.epoll.EpollServerDomainSocketChannel")
.asSubclass(ServerChannel.class));
final EventLoopGroup elg = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup")
.asSubclass(EventLoopGroup.class).getConstructor().newInstance();
builder.bossEventLoopGroup(elg).workerEventLoopGroup(elg);
} catch (ClassNotFoundException e) {
// BSD
builder.channelType(
(Class<? extends ServerChannel>) Class
.forName("io.netty.channel.kqueue.KQueueServerDomainSocketChannel"));
final EventLoopGroup elg = (EventLoopGroup) Class.forName("io.netty.channel.kqueue.KQueueEventLoopGroup")
.newInstance();
Class.forName("io.netty.channel.kqueue.KQueueServerDomainSocketChannel")
.asSubclass(ServerChannel.class));
final EventLoopGroup elg = Class.forName("io.netty.channel.kqueue.KQueueEventLoopGroup")
.asSubclass(EventLoopGroup.class).getConstructor().newInstance();
builder.bossEventLoopGroup(elg).workerEventLoopGroup(elg);
}
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException |
InvocationTargetException e) {
throw new UnsupportedOperationException(
"Could not find suitable Netty native transport implementation for domain socket address.");
}
Expand Down Expand Up @@ -342,7 +345,8 @@ private void closeInputStreamIfNotNull(InputStream stream) {
if (stream != null) {
try {
stream.close();
} catch (IOException ignored) {
} catch (IOException expected) {
// stream closes gracefully, doesn't expect an exception.
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;

Expand Down Expand Up @@ -142,7 +143,7 @@ public void listActions(Flight.Empty request, StreamObserver<Flight.ActionType>
}

private static class GetListener extends OutboundStreamListenerImpl implements ServerStreamListener {
private ServerCallStreamObserver<ArrowMessage> responseObserver;
private final ServerCallStreamObserver<ArrowMessage> serverCallResponseObserver;
private final Consumer<Throwable> errorHandler;
private Runnable onCancelHandler = null;
private Runnable onReadyHandler = null;
Expand All @@ -152,10 +153,10 @@ public GetListener(ServerCallStreamObserver<ArrowMessage> responseObserver, Cons
super(null, responseObserver);
this.errorHandler = errorHandler;
this.completed = false;
this.responseObserver = responseObserver;
this.responseObserver.setOnCancelHandler(this::onCancel);
this.responseObserver.setOnReadyHandler(this::onReady);
this.responseObserver.disableAutoInboundFlowControl();
this.serverCallResponseObserver = responseObserver;
this.serverCallResponseObserver.setOnCancelHandler(this::onCancel);
this.serverCallResponseObserver.setOnReadyHandler(this::onReady);
this.serverCallResponseObserver.disableAutoInboundFlowControl();
}

private void onCancel() {
Expand Down Expand Up @@ -183,7 +184,7 @@ public void setOnReadyHandler(Runnable handler) {

@Override
public boolean isCancelled() {
return responseObserver.isCancelled();
return serverCallResponseObserver.isCancelled();
}

@Override
Expand Down Expand Up @@ -228,7 +229,7 @@ public StreamObserver<ArrowMessage> doPutCustom(final StreamObserver<Flight.PutR
// When the ackStream is completed, the FlightStream will be closed with it
ackStream.setAutoCloseable(fs);
final StreamObserver<ArrowMessage> observer = fs.asObserver();
executors.submit(() -> {
Future<?> unused = executors.submit(() -> {
try {
producer.acceptPut(makeContext(responseObserver), fs, ackStream).run();
} catch (Throwable ex) {
Expand Down Expand Up @@ -277,7 +278,8 @@ public void pollFlightInfo(Flight.FlightDescriptor request, StreamObserver<Fligh
* Broadcast the given exception to all registered middleware.
*/
private void handleExceptionWithMiddleware(Throwable t) {
final Map<Key<?>, FlightServerMiddleware> middleware = ServerInterceptorAdapter.SERVER_MIDDLEWARE_KEY.get();
final Map<FlightServerMiddleware.Key<?>, FlightServerMiddleware> middleware = ServerInterceptorAdapter
.SERVER_MIDDLEWARE_KEY.get();
if (middleware == null || middleware.isEmpty()) {
logger.error("Uncaught exception in Flight method body", t);
return;
Expand Down Expand Up @@ -377,7 +379,7 @@ public StreamObserver<ArrowMessage> doExchangeCustom(StreamObserver<ArrowMessage
responseObserver.request(1);
final StreamObserver<ArrowMessage> observer = fs.asObserver();
try {
executors.submit(() -> {
Future<?> unused = executors.submit(() -> {
try {
producer.doExchange(makeContext(responseObserver), fs, listener);
} catch (Exception ex) {
Expand Down Expand Up @@ -416,7 +418,7 @@ public boolean isCancelled() {
}

@Override
public <T extends FlightServerMiddleware> T getMiddleware(Key<T> key) {
public <T extends FlightServerMiddleware> T getMiddleware(FlightServerMiddleware.Key<T> key) {
final Map<Key<?>, FlightServerMiddleware> middleware = ServerInterceptorAdapter.SERVER_MIDDLEWARE_KEY.get();
if (middleware == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.apache.arrow.flight.FlightConstants;

import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Metadata.BinaryMarshaller;
import io.grpc.Metadata.Key;
import io.grpc.MethodDescriptor;

/**
Expand All @@ -32,7 +32,7 @@ public final class AuthConstants {
public static final String HANDSHAKE_DESCRIPTOR_NAME = MethodDescriptor
.generateFullMethodName(FlightConstants.SERVICE, "Handshake");
public static final String TOKEN_NAME = "Auth-Token-bin";
public static final Key<byte[]> TOKEN_KEY = Key.of(TOKEN_NAME, new BinaryMarshaller<byte[]>() {
public static final Metadata.Key<byte[]> TOKEN_KEY = Metadata.Key.of(TOKEN_NAME, new BinaryMarshaller<byte[]>() {

@Override
public byte[] toBytes(byte[] value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public AuthResult authenticate(CallHeaders incomingHeaders) {
* Validate the bearer token.
* @param bearerToken The bearer token to validate.
* @return A successful AuthResult if validation succeeded.
* @throws Exception If the token validation fails.
*/
protected abstract AuthResult validateBearer(String bearerToken);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class AddWritableBuffer {
tmpBufChainOut = tmpBufChainOut2;

} catch (Exception ex) {
new RuntimeException("Failed to initialize AddWritableBuffer, falling back to slow path", ex).printStackTrace();
throw new RuntimeException("Failed to initialize AddWritableBuffer, falling back to slow path", ex);
}

bufConstruct = tmpConstruct;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@
*/
public class ClientInterceptorAdapter implements ClientInterceptor {

private final List<Factory> factories;
private final List<FlightClientMiddleware.Factory> factories;

public ClientInterceptorAdapter(List<Factory> factories) {
public ClientInterceptorAdapter(List<FlightClientMiddleware.Factory> factories) {
this.factories = factories;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class GetReadableBuffer {
tmpField = f;
tmpClazz = clazz;
} catch (Exception e) {
new RuntimeException("Failed to initialize GetReadableBuffer, falling back to slow path", e).printStackTrace();
throw new RuntimeException("Failed to initialize GetReadableBuffer, falling back to slow path", e);
}
READABLE_BUFFER = tmpField;
BUFFER_INPUT_STREAM = tmpClazz;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.arrow.flight.grpc;

import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -42,39 +43,39 @@ public MetadataAdapter(Metadata metadata) {

@Override
public String get(String key) {
return this.metadata.get(Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
return this.metadata.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
}

@Override
public byte[] getByte(String key) {
if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
return this.metadata.get(Key.of(key, Metadata.BINARY_BYTE_MARSHALLER));
return this.metadata.get(Metadata.Key.of(key, Metadata.BINARY_BYTE_MARSHALLER));
}
return get(key).getBytes();
return get(key).getBytes(StandardCharsets.UTF_8);
}

@Override
public Iterable<String> getAll(String key) {
return this.metadata.getAll(Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
return this.metadata.getAll(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
}

@Override
public Iterable<byte[]> getAllByte(String key) {
if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
return this.metadata.getAll(Key.of(key, Metadata.BINARY_BYTE_MARSHALLER));
return this.metadata.getAll(Metadata.Key.of(key, Metadata.BINARY_BYTE_MARSHALLER));
}
return StreamSupport.stream(getAll(key).spliterator(), false)
.map(String::getBytes).collect(Collectors.toList());
}

@Override
public void insert(String key, String value) {
this.metadata.put(Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value);
this.metadata.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value);
}

@Override
public void insert(String key, byte[] value) {
this.metadata.put(Key.of(key, Metadata.BINARY_BYTE_MARSHALLER), value);
this.metadata.put(Metadata.Key.of(key, Metadata.BINARY_BYTE_MARSHALLER), value);
}

@Override
Expand All @@ -85,13 +86,14 @@ public Set<String> keys() {
@Override
public boolean containsKey(String key) {
if (key.endsWith("-bin")) {
final Key<?> grpcKey = Key.of(key, Metadata.BINARY_BYTE_MARSHALLER);
final Metadata.Key<?> grpcKey = Metadata.Key.of(key, Metadata.BINARY_BYTE_MARSHALLER);
return this.metadata.containsKey(grpcKey);
}
final Key<?> grpcKey = Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
final Metadata.Key<?> grpcKey = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
return this.metadata.containsKey(grpcKey);
}

@Override
public String toString() {
return this.metadata.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static class KeyFactory<T extends FlightServerMiddleware> {
private final FlightServerMiddleware.Key<T> key;
private final FlightServerMiddleware.Factory<T> factory;

public KeyFactory(Key<T> key, Factory<T> factory) {
public KeyFactory(FlightServerMiddleware.Key<T> key, FlightServerMiddleware.Factory<T> factory) {
this.key = key;
this.factory = factory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.arrow.flight.grpc;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Objects;
import java.util.function.Function;
Expand Down Expand Up @@ -171,7 +172,7 @@ private static ErrorFlightMetadata parseTrailers(Metadata trailers) {
if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
metadata.insert(key, trailers.get(keyOfBinary(key)));
} else {
metadata.insert(key, Objects.requireNonNull(trailers.get(keyOfAscii(key))).getBytes());
metadata.insert(key, Objects.requireNonNull(trailers.get(keyOfAscii(key))).getBytes(StandardCharsets.UTF_8));
}
}
return metadata;
Expand Down

0 comments on commit ead5930

Please sign in to comment.