Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement the unix domain socket #1688

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions client/src/main/java/org/asynchttpclient/AsyncHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.asynchttpclient.netty.request.NettyRequest;

import javax.net.ssl.SSLSession;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;


Expand Down Expand Up @@ -132,7 +132,7 @@ default void onHostnameResolutionAttempt(String name) {
* @param name the name to be resolved
* @param addresses the resolved addresses
*/
default void onHostnameResolutionSuccess(String name, List<InetSocketAddress> addresses) {
default void onHostnameResolutionSuccess(String name, List<? extends SocketAddress> addresses) {
}

/**
Expand All @@ -153,7 +153,7 @@ default void onHostnameResolutionFailure(String name, Throwable cause) {
*
* @param remoteAddress the address we try to connect to
*/
default void onTcpConnectAttempt(InetSocketAddress remoteAddress) {
default void onTcpConnectAttempt(SocketAddress remoteAddress) {
}

/**
Expand All @@ -162,7 +162,7 @@ default void onTcpConnectAttempt(InetSocketAddress remoteAddress) {
* @param remoteAddress the address we try to connect to
* @param connection the connection
*/
default void onTcpConnectSuccess(InetSocketAddress remoteAddress, Channel connection) {
default void onTcpConnectSuccess(SocketAddress remoteAddress, Channel connection) {
}

/**
Expand All @@ -173,7 +173,7 @@ default void onTcpConnectSuccess(InetSocketAddress remoteAddress, Channel connec
* @param remoteAddress the address we try to connect to
* @param cause the cause of the failure
*/
default void onTcpConnectFailure(InetSocketAddress remoteAddress, Throwable cause) {
default void onTcpConnectFailure(SocketAddress remoteAddress, Throwable cause) {
}

// ////////////// TLS ///////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,13 @@ public interface AsyncHttpClientConfig {

boolean isUseNativeTransport();

String getUnixSocket();

default boolean isUseUnixDomain(){
String unixSocket = getUnixSocket();
return unixSocket !=null && !unixSocket.isEmpty();
}

Consumer<Channel> getHttpAdditionalChannelInitializer();

Consumer<Channel> getWsAdditionalChannelInitializer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public class DefaultAsyncHttpClientConfig implements AsyncHttpClientConfig {
private final Map<ChannelOption<Object>, Object> channelOptions;
private final EventLoopGroup eventLoopGroup;
private final boolean useNativeTransport;
private final String unixSocket;
private final ByteBufAllocator allocator;
private final boolean tcpNoDelay;
private final boolean soReuseAddress;
Expand Down Expand Up @@ -209,6 +210,7 @@ private DefaultAsyncHttpClientConfig(// http
Map<ChannelOption<Object>, Object> channelOptions,
EventLoopGroup eventLoopGroup,
boolean useNativeTransport,
String unixSocket,
ByteBufAllocator allocator,
Timer nettyTimer,
ThreadFactory threadFactory,
Expand Down Expand Up @@ -295,6 +297,7 @@ private DefaultAsyncHttpClientConfig(// http
this.channelOptions = channelOptions;
this.eventLoopGroup = eventLoopGroup;
this.useNativeTransport = useNativeTransport;
this.unixSocket = unixSocket;
this.allocator = allocator;
this.nettyTimer = nettyTimer;
this.threadFactory = threadFactory;
Expand Down Expand Up @@ -621,6 +624,11 @@ public boolean isUseNativeTransport() {
return useNativeTransport;
}

@Override
public String getUnixSocket() {
return unixSocket;
}

@Override
public ByteBufAllocator getAllocator() {
return allocator;
Expand Down Expand Up @@ -738,6 +746,7 @@ public static class Builder {
private int httpClientCodecInitialBufferSize = defaultHttpClientCodecInitialBufferSize();
private int chunkedFileChunkSize = defaultChunkedFileChunkSize();
private boolean useNativeTransport = defaultUseNativeTransport();
private String unixSocket = defaultUnixSocket();
private ByteBufAllocator allocator;
private Map<ChannelOption<Object>, Object> channelOptions = new HashMap<>();
private EventLoopGroup eventLoopGroup;
Expand Down Expand Up @@ -821,6 +830,7 @@ public Builder(AsyncHttpClientConfig config) {
channelOptions.putAll(config.getChannelOptions());
eventLoopGroup = config.getEventLoopGroup();
useNativeTransport = config.isUseNativeTransport();
unixSocket = config.getUnixSocket();
allocator = config.getAllocator();
nettyTimer = config.getNettyTimer();
threadFactory = config.getThreadFactory();
Expand Down Expand Up @@ -1189,6 +1199,12 @@ public Builder setUseNativeTransport(boolean useNativeTransport) {
return this;
}

public Builder setUnixSocket(String unixSocket) {
setUseNativeTransport(true);
this.unixSocket = unixSocket;
return this;
}

public Builder setAllocator(ByteBufAllocator allocator) {
this.allocator = allocator;
return this;
Expand Down Expand Up @@ -1301,6 +1317,7 @@ public DefaultAsyncHttpClientConfig build() {
channelOptions.isEmpty() ? Collections.emptyMap() : Collections.unmodifiableMap(channelOptions),
eventLoopGroup,
useNativeTransport,
unixSocket,
allocator,
nettyTimer,
threadFactory,
Expand Down
24 changes: 17 additions & 7 deletions client/src/main/java/org/asynchttpclient/DefaultRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package org.asynchttpclient;

import io.netty.channel.unix.DomainSocketAddress;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.resolver.NameResolver;
Expand All @@ -25,6 +26,7 @@
import java.io.File;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
Expand All @@ -39,8 +41,8 @@ public class DefaultRequest implements Request {
public final ProxyServer proxyServer;
private final String method;
private final Uri uri;
private final InetAddress address;
private final InetAddress localAddress;
private final SocketAddress address;
private final SocketAddress localAddress;
private final HttpHeaders headers;
private final List<Cookie> cookies;
private final byte[] byteData;
Expand All @@ -61,13 +63,14 @@ public class DefaultRequest implements Request {
private final Charset charset;
private final ChannelPoolPartitioning channelPoolPartitioning;
private final NameResolver<InetAddress> nameResolver;
private final NameResolver<DomainSocketAddress> domainNameResolver;
// lazily loaded
private List<Param> queryParams;

public DefaultRequest(String method,
Uri uri,
InetAddress address,
InetAddress localAddress,
SocketAddress address,
SocketAddress localAddress,
HttpHeaders headers,
List<Cookie> cookies,
byte[] byteData,
Expand All @@ -88,7 +91,8 @@ public DefaultRequest(String method,
long rangeOffset,
Charset charset,
ChannelPoolPartitioning channelPoolPartitioning,
NameResolver<InetAddress> nameResolver) {
NameResolver<InetAddress> nameResolver,
NameResolver<DomainSocketAddress> domainNameResolver) {
this.method = method;
this.uri = uri;
this.address = address;
Expand All @@ -114,6 +118,7 @@ public DefaultRequest(String method,
this.charset = charset;
this.channelPoolPartitioning = channelPoolPartitioning;
this.nameResolver = nameResolver;
this.domainNameResolver = domainNameResolver;
}

@Override
Expand All @@ -132,12 +137,12 @@ public Uri getUri() {
}

@Override
public InetAddress getAddress() {
public SocketAddress getAddress() {
return address;
}

@Override
public InetAddress getLocalAddress() {
public SocketAddress getLocalAddress() {
return localAddress;
}

Expand Down Expand Up @@ -246,6 +251,11 @@ public NameResolver<InetAddress> getNameResolver() {
return nameResolver;
}

@Override
public NameResolver<DomainSocketAddress> getDomainNameResolver() {
return domainNameResolver;
}

@Override
public List<Param> getQueryParams() {
if (queryParams == null)
Expand Down
13 changes: 10 additions & 3 deletions client/src/main/java/org/asynchttpclient/Request.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.asynchttpclient;

import io.netty.channel.unix.DomainSocketAddress;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.resolver.NameResolver;
Expand All @@ -28,6 +29,7 @@
import java.io.File;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.List;
Expand Down Expand Up @@ -62,14 +64,14 @@ public interface Request {
String getUrl();

/**
* @return the InetAddress to be used to bypass uri's hostname resolution
* @return the SocketAddress to be used to bypass uri's hostname or unix domain path resolution
*/
InetAddress getAddress();
SocketAddress getAddress();

/**
* @return the local address to bind from
*/
InetAddress getLocalAddress();
SocketAddress getLocalAddress();

/**
* @return the HTTP headers
Expand Down Expand Up @@ -181,6 +183,11 @@ public interface Request {
*/
NameResolver<InetAddress> getNameResolver();

/**
* @return the NameResolver to be used to resolve hostnams's IP
*/
NameResolver<DomainSocketAddress> getDomainNameResolver();

/**
* @return a new request builder using this request as a prototype
*/
Expand Down
33 changes: 30 additions & 3 deletions client/src/main/java/org/asynchttpclient/RequestBuilderBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.asynchttpclient;

import io.netty.buffer.ByteBuf;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.cookie.Cookie;
Expand All @@ -27,6 +28,7 @@
import org.asynchttpclient.request.body.generator.BodyGenerator;
import org.asynchttpclient.request.body.generator.ReactiveStreamsBodyGenerator;
import org.asynchttpclient.request.body.multipart.Part;
import org.asynchttpclient.resolver.DefaultDomainNameResolver;
import org.asynchttpclient.uri.Uri;
import org.asynchttpclient.util.UriEncoder;
import org.reactivestreams.Publisher;
Expand All @@ -36,6 +38,8 @@
import java.io.File;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.*;
Expand All @@ -56,6 +60,7 @@ public abstract class RequestBuilderBase<T extends RequestBuilderBase<T>> {
private final static Logger LOGGER = LoggerFactory.getLogger(RequestBuilderBase.class);
private static final Uri DEFAULT_REQUEST_URL = Uri.create("http://localhost");
public static NameResolver<InetAddress> DEFAULT_NAME_RESOLVER = new DefaultNameResolver(ImmediateEventExecutor.INSTANCE);
public static NameResolver<DomainSocketAddress> DEFAULT_DOMAIN_NAME_RESOLVER = new DefaultDomainNameResolver(ImmediateEventExecutor.INSTANCE);
// builder only fields
protected UriEncoder uriEncoder;
protected List<Param> queryParams;
Expand All @@ -64,8 +69,8 @@ public abstract class RequestBuilderBase<T extends RequestBuilderBase<T>> {
// request fields
protected String method;
protected Uri uri;
protected InetAddress address;
protected InetAddress localAddress;
protected SocketAddress address;
protected SocketAddress localAddress;
protected HttpHeaders headers;
protected ArrayList<Cookie> cookies;
protected byte[] byteData;
Expand All @@ -87,6 +92,7 @@ public abstract class RequestBuilderBase<T extends RequestBuilderBase<T>> {
protected Charset charset;
protected ChannelPoolPartitioning channelPoolPartitioning = ChannelPoolPartitioning.PerHostChannelPoolPartitioning.INSTANCE;
protected NameResolver<InetAddress> nameResolver = DEFAULT_NAME_RESOLVER;
protected NameResolver<DomainSocketAddress> domainNameResolver = DEFAULT_DOMAIN_NAME_RESOLVER;

protected RequestBuilderBase(String method, boolean disableUrlEncoding) {
this(method, disableUrlEncoding, true);
Expand Down Expand Up @@ -136,6 +142,7 @@ protected RequestBuilderBase(Request prototype, boolean disableUrlEncoding, bool
this.charset = prototype.getCharset();
this.channelPoolPartitioning = prototype.getChannelPoolPartitioning();
this.nameResolver = prototype.getNameResolver();
this.domainNameResolver = prototype.getDomainNameResolver();
}

@SuppressWarnings("unchecked")
Expand All @@ -144,6 +151,9 @@ private T asDerivedType() {
}

public T setUrl(String url) {
if (!url.contains("://")){
url = "http://127.0.0.1:80" + url;
silence-code marked this conversation as resolved.
Show resolved Hide resolved
}
return setUri(Uri.create(url));
}

Expand All @@ -153,11 +163,21 @@ public T setUri(Uri uri) {
}

public T setAddress(InetAddress address) {
this.address = new InetSocketAddress(address,0);
return asDerivedType();
}

public T setAddress(SocketAddress address) {
this.address = address;
return asDerivedType();
}

public T setLocalAddress(InetAddress address) {
this.localAddress = new InetSocketAddress(address,0);
return asDerivedType();
}

public T setLocalAddress(SocketAddress address) {
this.localAddress = address;
return asDerivedType();
}
Expand Down Expand Up @@ -534,6 +554,11 @@ public T setNameResolver(NameResolver<InetAddress> nameResolver) {
return asDerivedType();
}

public T setDomainNameResolver(NameResolver<DomainSocketAddress> nameResolver) {
this.domainNameResolver = nameResolver;
return asDerivedType();
}

public T setSignatureCalculator(SignatureCalculator signatureCalculator) {
this.signatureCalculator = signatureCalculator;
return asDerivedType();
Expand Down Expand Up @@ -580,6 +605,7 @@ private RequestBuilderBase<?> executeSignatureCalculator() {
rb.charset = this.charset;
rb.channelPoolPartitioning = this.channelPoolPartitioning;
rb.nameResolver = this.nameResolver;
rb.domainNameResolver = this.domainNameResolver;
Request unsignedRequest = rb.build();
signatureCalculator.calculateAndAddSignature(unsignedRequest, rb);
return rb;
Expand Down Expand Up @@ -642,6 +668,7 @@ public Request build() {
rb.rangeOffset,
rb.charset,
rb.channelPoolPartitioning,
rb.nameResolver);
rb.nameResolver,
rb.domainNameResolver);
}
}
Loading