Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/cli protocol refactor changes #10009

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 116 additions & 82 deletions cli/src/main/java/hudson/cli/PlainCLIProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,29 +54,83 @@ class PlainCLIProtocol {
/** One-byte operation to send to the other side. */
private enum Op {
/** UTF-8 command name or argument. */
ARG(true),
ARG(true) {
@Override
void execute(DataInputStream dis, EitherSide side) throws IOException {
((ServerSide) side).onArg(dis.readUTF());
}
},
/** UTF-8 locale identifier. */
LOCALE(true),
LOCALE(true) {
@Override
void execute(DataInputStream dis, EitherSide side) throws IOException {
((ServerSide) side).onLocale(dis.readUTF());
}
},
/** UTF-8 client encoding. */
ENCODING(true),
ENCODING(true) {
@Override
void execute(DataInputStream dis, EitherSide side) throws IOException {
((ServerSide) side).onEncoding(dis.readUTF());
}
},
/** Start running command. */
START(true),
START(true) {
@Override
void execute(DataInputStream dis, EitherSide side) throws IOException {
((ServerSide) side).onStart();
}
},
/** Exit code, as int. */
EXIT(false),
EXIT(false) {
@Override
void execute(DataInputStream dis, EitherSide side) throws IOException {
((ClientSide) side).onExit(dis.readInt());
}
},
/** Chunk of stdin, as int length followed by bytes. */
STDIN(true),
STDIN(true) {
@Override
void execute(DataInputStream dis, EitherSide side) throws IOException {
((ServerSide) side).onStdin(dis.readAllBytes());
}
},
/** EOF on stdin. */
END_STDIN(true),
END_STDIN(true) {
@Override
void execute(DataInputStream dis, EitherSide side) throws IOException {
((ServerSide) side).onEndStdin();
}
},
/** Chunk of stdout. */
STDOUT(false),
STDOUT(false) {
@Override
void execute(DataInputStream dis, EitherSide side) throws IOException {
((ClientSide) side).onStdout(dis.readAllBytes());
}
},
/** Chunk of stderr. */
STDERR(false);
STDERR(false) {
@Override
void execute(DataInputStream dis, EitherSide side) throws IOException {
((ClientSide) side).onStderr(dis.readAllBytes());
}
};

/** True if sent from the client to the server; false if sent from the server to the client. */
final boolean clientSide;

Op(boolean clientSide) {
this.clientSide = clientSide;
}

abstract void execute(DataInputStream dis, EitherSide side) throws IOException;

void validate(boolean isClient) throws ProtocolException {
if (this.clientSide != isClient) {
throw new ProtocolException("Operation not allowed on this side: " + this);
}
}
}

interface Output extends Closeable {
Expand Down Expand Up @@ -124,48 +178,59 @@ static final class FramedReader extends Thread {
public void run() {
try {
while (true) {
LOGGER.finest("reading frame");
int framelen;
try {
framelen = dis.readInt();
} catch (EOFException x) {
side.handleClose();
break; // TODO verify that we hit EOF immediately, not partway into framelen
}
if (framelen < 0) {
throw new IOException("corrupt stream: negative frame length");
}
LOGGER.finest("read frame length " + framelen);
long start = cis.getByteCount();
try {
side.handle(new DataInputStream(new BoundedInputStream(dis, /* op byte not counted */framelen + 1)));
} catch (ProtocolException x) {
LOGGER.log(Level.WARNING, null, x);
// but read another frame
} finally {
long actuallyRead = cis.getByteCount() - start;
long unread = framelen + 1 - actuallyRead;
if (unread > 0) {
LOGGER.warning(() -> "Did not read " + unread + " bytes");
IOUtils.skipFully(dis, unread);
}
}
int framelen = readFrameLength();
validateFrameLength(framelen);
processFrame(framelen);
}
} catch (ClosedChannelException x) {
LOGGER.log(Level.FINE, null, x);
side.handleClose();
} catch (IOException x) {
LOGGER.log(Level.WARNING, null, flightRecorder.analyzeCrash(x, "broken stream"));
} catch (ReadPendingException x) {
// in case trick in CLIAction does not work
LOGGER.log(Level.FINE, null, x);
} catch (IOException | RuntimeException x) {
handleException(x);
}
}

private void validateFrameLength(int framelen) throws IOException {
if (framelen < 0) {
throw new IOException("corrupt stream: negative frame length");
}
}

private int readFrameLength() throws IOException {
try {
return dis.readInt();
} catch (EOFException x) {
side.handleClose();
} catch (RuntimeException x) {
throw x;
}
}

private void processFrame(int framelen) throws IOException {
long start = cis.getByteCount();
try {
side.handle(new DataInputStream(new BoundedInputStream(dis, framelen + 1)));
} catch (ProtocolException x) {
LOGGER.log(Level.WARNING, null, x);
side.handleClose();
} finally {
skipUnreadBytes(framelen, start);
}
}

private void skipUnreadBytes(int framelen, long start) throws IOException {
long actuallyRead = cis.getByteCount() - start;
long unread = framelen + 1 - actuallyRead;
if (unread > 0) {
LOGGER.warning(() -> "Did not read " + unread + " bytes");
IOUtils.skipFully(dis, unread);
}
}

private void handleException(Exception x) {
if (x instanceof ClosedChannelException || x instanceof ReadPendingException) {
LOGGER.log(Level.FINE, null, x);
} else {
LOGGER.log(Level.WARNING, null, flightRecorder.analyzeCrash((IOException) x, "broken stream"));
}
side.handleClose();
}

}

private static final class ProtocolException extends IOException {
Expand Down Expand Up @@ -262,29 +327,9 @@ abstract static class ServerSide extends EitherSide {

@Override
protected final boolean handle(Op op, DataInputStream dis) throws IOException {
assert op.clientSide;
switch (op) {
case ARG:
onArg(dis.readUTF());
return true;
case LOCALE:
onLocale(dis.readUTF());
return true;
case ENCODING:
onEncoding(dis.readUTF());
return true;
case START:
onStart();
return true;
case STDIN:
onStdin(dis.readAllBytes());
return true;
case END_STDIN:
onEndStdin();
return true;
default:
return false;
}
op.validate(true);
op.execute(dis, this);
return true;
}

protected abstract void onArg(String text);
Expand Down Expand Up @@ -321,20 +366,9 @@ abstract static class ClientSide extends EitherSide {

@Override
protected boolean handle(Op op, DataInputStream dis) throws IOException {
assert !op.clientSide;
switch (op) {
case EXIT:
onExit(dis.readInt());
return true;
case STDOUT:
onStdout(dis.readAllBytes());
return true;
case STDERR:
onStderr(dis.readAllBytes());
return true;
default:
return false;
}
op.validate(false);
op.execute(dis, this);
return true;
}

protected abstract void onExit(int code);
Expand Down
Loading