Skip to content

Commit

Permalink
Unix domain sockets use when server. #72
Browse files Browse the repository at this point in the history
  • Loading branch information
sshanks-kx committed Nov 7, 2023
1 parent 19a79eb commit b51199c
Showing 1 changed file with 60 additions and 10 deletions.
70 changes: 60 additions & 10 deletions javakdb/src/main/java/com/kx/c.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
import java.lang.reflect.Array;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.time.LocalDate;
import java.time.LocalTime;
Expand Down Expand Up @@ -136,6 +140,8 @@ public static void setEncoding(String encoding) throws UnsupportedEncodingExcept
* greater than 2000 bytes and connection is not localhost)
*/
boolean zip;

private static final String ACCESS="access";
/**
* Sets whether or not to consider compression on outgoing messages (given uncompressed serialized data also has a length
* greater than 2000 bytes and connection is not localhost)
Expand All @@ -145,6 +151,9 @@ public static void setEncoding(String encoding) throws UnsupportedEncodingExcept
public void zip(boolean b){
zip=b;
}
private static boolean isLoopback(InetAddress addr){
return addr.isAnyLocalAddress()||addr.isLoopbackAddress();
}
/**
* Prepare socket for kdb+ ipc comms
* @param x socket to setup
Expand All @@ -153,11 +162,10 @@ public void zip(boolean b){
void io(Socket x) throws IOException{
s=x;
s.setTcpNoDelay(true);
InetAddress addr=s.getInetAddress();
isLoopback=addr.isAnyLocalAddress()||addr.isLoopbackAddress();
s.setKeepAlive(true);
isLoopback=isLoopback(s.getInetAddress());
inStream=new DataInputStream(s.getInputStream());
outStream=s.getOutputStream();
s.setKeepAlive(true);
}

/**
Expand Down Expand Up @@ -195,16 +203,58 @@ public interface IAuthenticate{
*/
public boolean authenticate(String s);
}

/**
* Initializes a new {@link c} instance by acting as a server, blocking
* till a client connects and authenticates using the KDB+ protocol. This object
* should be used for a single client connection. A new instance should be created
* for each new client connection.
* @param s {@link ServerSocket} to accept connections on using kdb+ IPC protocol.
* @param s {@link ServerSocketChannel} to accept connections on using kdb+ IPC protocol.
* @param a {@link IAuthenticate} instance to authenticate incoming connections.
* Accepts all incoming connections if {@code null}.
* @throws IOException if access is denied or an I/O error occurs.
*
*/
public c(ServerSocketChannel s,IAuthenticate a) throws IOException{
this();
channel=s.accept();
SocketAddress addr=channel.getRemoteAddress();
if(addr instanceof InetSocketAddress){
isLoopback=isLoopback(((InetSocketAddress)addr).getAddress());
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
}else
isLoopback=true;
ByteBuffer buf1 = ByteBuffer.allocate(99);
int bytesRead=channel.read(buf1);
buf1.flip();
if(bytesRead==-1||(a!=null&&!a.authenticate(new String(buf1.array(),0,bytesRead>1?bytesRead-2:0)))){
close();
throw new IOException(ACCESS);
}
ipcVersion=bytesRead>1?buf1.get(bytesRead-2):0;
buf1 = ByteBuffer.allocate(1);
buf1.put((byte)(ipcVersion<'\3'?ipcVersion:'\3'));
buf1.flip();
channel.write(buf1);
}
/**
* Initializes a new {@link c} instance by acting as a server, and blocks while waiting
* for a new client connection. A new instance should be created for each new client connection.
* @param s {@link ServerSocketChannel} to accept connections on using kdb+ IPC protocol.
* @throws IOException an I/O error occurs.
*/
public c(ServerSocketChannel s) throws IOException{
this(s,null);
}
/**
* Initializes a new {@link c} instance by acting as a server, blocking
* till a client connects and authenticates using the KDB+ protocol. This object
* should be used for a single client connection. A new instance should be created
* for each new client connection.
* @param s {@link ServerSocket} to accept connections on using kdb+ IPC protocol.
* @param a {@link IAuthenticate} instance to authenticate incoming connections.
* Accepts all incoming connections if {@code null}.
* @throws IOException if access is denied or an I/O error occurs.
*
*/
public c(ServerSocket s,IAuthenticate a) throws IOException{
Expand All @@ -213,7 +263,7 @@ public c(ServerSocket s,IAuthenticate a) throws IOException{
int bytesRead=inStream.read(rBuff);
if(a!=null&&!a.authenticate(new String(rBuff,0,bytesRead>1?bytesRead-2:0))){
close();
throw new IOException("access");
throw new IOException(ACCESS);
}
ipcVersion=bytesRead>1?rBuff[bytesRead-2]:0;
rBuff[0]=(byte)(ipcVersion<'\3'?ipcVersion:'\3');
Expand Down Expand Up @@ -257,7 +307,7 @@ public c(String file,String usernamepassword) throws KException,IOException,Unsu
write(wBuff);
ByteBuffer buf1 = ByteBuffer.allocate(1);
if(1!=channel.read(buf1)){
throw new KException("access");
throw new KException(ACCESS);
}
buf1.flip();
ipcVersion=buf1.get();
Expand Down Expand Up @@ -302,7 +352,7 @@ public c(String host,int port,String usernamepassword,boolean useTLS) throws KEx
outStream.write(wBuff);
if(1!=inStream.read(wBuff,0,1)){
close();
throw new KException("access");
throw new KException(ACCESS);
}
ipcVersion=Math.min(wBuff[0],3);
}
Expand Down Expand Up @@ -1564,7 +1614,7 @@ public Object[] readMsg() throws KException,IOException,UnsupportedEncodingExcep
inStream.readFully(rBuff); // read the msg header
}else{
ByteBuffer buf=ByteBuffer.allocate(8);
while(0!=buf.remaining())channel.read(buf);
while(0!=buf.remaining())if(-1==channel.read(buf))throw new java.io.EOFException("end of stream");
rBuff=buf.array();
}
isLittleEndian=rBuff[0]==1; // endianness of the msg
Expand All @@ -1577,7 +1627,7 @@ public Object[] readMsg() throws KException,IOException,UnsupportedEncodingExcep
}else{
ByteBuffer buf=ByteBuffer.allocate(ri());
buf.put(rBuff,0,rBuff.length);
while(0!=buf.remaining())channel.read(buf);
while(0!=buf.remaining())if(-1==channel.read(buf))throw new java.io.EOFException("end of stream");
rBuff=buf.array();
}
return new Object[]{rBuff[1],deserialize(rBuff)};
Expand Down

0 comments on commit b51199c

Please sign in to comment.