Skip to content

Commit

Permalink
test update
Browse files Browse the repository at this point in the history
  • Loading branch information
Unknow authored and Unknow committed Oct 6, 2024
1 parent 12f6560 commit 2afad09
Show file tree
Hide file tree
Showing 18 changed files with 554 additions and 436 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import unknow.server.util.io.Buffers;
import unknow.server.util.io.BuffersInputStream;

Expand All @@ -21,15 +18,14 @@
*
* @author unknow
*/
public class NIOConnection {
private static final Logger logger = LoggerFactory.getLogger(NIOConnection.class);
public abstract class NIOConnectionAbstract {

private static final InetSocketAddress DISCONECTED = InetSocketAddress.createUnresolved("", 0);

/** the data waiting to be wrote */
private final Buffers pendingWrite = new Buffers();
protected final Buffers pendingWrite = new Buffers();
/** the data waiting to be handled */
private final Buffers pendingRead = new Buffers();
protected final Buffers pendingRead = new Buffers();

/** Stream of pending data */
protected final BuffersInputStream in = new BuffersInputStream(pendingRead);
Expand All @@ -41,6 +37,8 @@ public class NIOConnection {
protected final SelectionKey key;
protected final SocketChannel channel;

protected final NIOConnectionHandler handler;

protected final InetSocketAddress local;
protected final InetSocketAddress remote;

Expand All @@ -52,9 +50,10 @@ public class NIOConnection {
*
* @param key the selectionKey
*/
public NIOConnection(SelectionKey key) {
public NIOConnectionAbstract(SelectionKey key, NIOConnectionHandler handler) {
this.key = key;
this.channel = (SocketChannel) key.channel();
this.handler = handler;
this.out = new Out(this);
lastRead = lastWrite = System.currentTimeMillis();
InetSocketAddress a;
Expand All @@ -72,39 +71,7 @@ public NIOConnection(SelectionKey key) {
remote = a;
}

/**
* called after the connection is initialized
*
* @throws InterruptedException on interrupt
*/
protected void onInit() throws InterruptedException { // for override
}

/**
* called after some data has been read
*
* @throws InterruptedException on interrupt
* @throws IOException on io exception
*/
protected void onRead() throws InterruptedException, IOException { // for override
}

/**
* called after data has been written
*
* @throws InterruptedException on interrupt
* @throws IOException on io exception
*/
protected void onWrite() throws InterruptedException, IOException { // for override
}

/**
* called when the connection is free
*
* @throws IOException on io exception
*/
protected void onFree() throws IOException { // for override
}
protected abstract void onInit() throws InterruptedException;

/**
* read data from the channel and try to handles it
Expand All @@ -114,30 +81,7 @@ protected void onFree() throws IOException { // for override
* @throws InterruptedException on interrupt
* @throws IOException on io exception
*/
protected void readFrom(ByteBuffer buf) throws InterruptedException, IOException {
int l;
lastRead = System.currentTimeMillis();
while (true) {
l = channel.read(buf);
if (l == -1) {
in.close();
return;
}
if (l == 0)
return;
buf.flip();

if (logger.isTraceEnabled()) {
buf.mark();
byte[] bytes = new byte[buf.remaining()];
buf.get(bytes);
logger.trace("read {}", new String(bytes));
buf.reset();
}
pendingRead.write(buf);
onRead();
}
}
protected abstract void readFrom(ByteBuffer buf) throws InterruptedException, IOException;

/**
* write pending data to the channel
Expand All @@ -147,28 +91,7 @@ protected void readFrom(ByteBuffer buf) throws InterruptedException, IOException
* @throws InterruptedException on interrupt
* @throws IOException on io exception
*/
protected void writeInto(ByteBuffer buf) throws InterruptedException, IOException {
lastWrite = System.currentTimeMillis();
while (pendingWrite.read(buf, false)) {
buf.flip();

if (logger.isTraceEnabled()) {
buf.mark();
byte[] bytes = new byte[buf.remaining()];
buf.get(bytes);
logger.trace("writ {}", new String(bytes));
buf.reset();
}

channel.write(buf);
if (buf.hasRemaining()) {
pendingWrite.prepend(buf);
break;
}
}
toggleKeyOps();
onWrite();
}
protected abstract void writeInto(ByteBuffer buf) throws InterruptedException, IOException;

public void toggleKeyOps() {
key.interestOps(pendingWrite.isEmpty() ? SelectionKey.OP_READ : SelectionKey.OP_READ | SelectionKey.OP_WRITE);
Expand Down Expand Up @@ -270,7 +193,7 @@ public final void free() throws IOException {
out.close();
pendingWrite.clear();
pendingRead.clear();
onFree();
handler.onFree();
}

@Override
Expand All @@ -280,9 +203,9 @@ public String toString() {

/** output stream for this connection */
public static final class Out extends OutputStream {
private NIOConnection h;
private NIOConnectionAbstract h;

private Out(NIOConnection h) {
private Out(NIOConnectionAbstract h) {
this.h = h;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package unknow.server.nio;

import java.io.IOException;

import javax.net.ssl.SSLEngine;

import unknow.server.util.io.Buffers;

public interface NIOConnectionHandler {

/**
* called after the connection is initialized
*
* @param sslEngine the sslEngine for ssl connection null for other
* @throws InterruptedException on interrupt
*/
void onInit(NIOConnectionAbstract co, SSLEngine sslEngine) throws InterruptedException;

/**
* called when the handshake process finish
* @throws InterruptedException on interrupt
*/
void onHandshakeDone(SSLEngine sslEngine) throws InterruptedException;

/**
* called after some data has been read
*
* @throws InterruptedException on interrupt
* @throws IOException on io exception
*/
void onRead(Buffers b) throws InterruptedException, IOException;

/**
* called after data has been written
*
* @throws InterruptedException on interrupt
* @throws IOException on io exception
*/
void onWrite() throws InterruptedException, IOException;

/**
* check if the connection is closed and should be stoped
*
* @param now System.currentMillis()
* @param stop if true the server is in stop phase
* @return true is the collection is closed
*/
boolean closed(long now, boolean stop);

/**
* called when the connection is free
*
* @throws IOException on io exception
*/
void onFree() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
*
*/
package unknow.server.nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* used to handle raw data
*
* @author unknow
*/
public class NIOConnectionPlain extends NIOConnectionAbstract {
private static final Logger logger = LoggerFactory.getLogger(NIOConnectionPlain.class);

/**
* create new connection
*
* @param key the selectionKey
*/
public NIOConnectionPlain(SelectionKey key, NIOConnectionHandler handler) {
super(key, handler);
}

/**
* called after the connection is initialized
*
* @throws InterruptedException on interrupt
*/
@Override
protected final void onInit() throws InterruptedException { // for override
handler.onInit(this, null);
}

/**
* read data from the channel and try to handles it
*
* @param buf output buffer
*
* @throws InterruptedException on interrupt
* @throws IOException on io exception
*/
@Override
protected final void readFrom(ByteBuffer buf) throws InterruptedException, IOException {
int l;
lastRead = System.currentTimeMillis();
while (true) {
l = channel.read(buf);
if (l == -1) {
in.close();
return;
}
if (l == 0)
return;
buf.flip();

if (logger.isTraceEnabled()) {
buf.mark();
byte[] bytes = new byte[buf.remaining()];
buf.get(bytes);
logger.trace("read {}", new String(bytes));
buf.reset();
}
pendingRead.write(buf);
handler.onRead(pendingRead);
}
}

/**
* write pending data to the channel
*
* @param buf local cache
*
* @throws InterruptedException on interrupt
* @throws IOException on io exception
*/
@Override
protected final void writeInto(ByteBuffer buf) throws InterruptedException, IOException {
lastWrite = System.currentTimeMillis();
while (pendingWrite.read(buf, false)) {
buf.flip();

if (logger.isTraceEnabled()) {
buf.mark();
byte[] bytes = new byte[buf.remaining()];
buf.get(bytes);
logger.trace("writ {}", new String(bytes));
buf.reset();
}

channel.write(buf);
if (buf.hasRemaining()) {
pendingWrite.prepend(buf);
break;
}
}
toggleKeyOps();
handler.onWrite();
}
}
Loading

0 comments on commit 2afad09

Please sign in to comment.