diff --git a/dev/com.ibm.ws.transport.http/src/com/ibm/ws/http/dispatcher/internal/channel/HttpDispatcherLink.java b/dev/com.ibm.ws.transport.http/src/com/ibm/ws/http/dispatcher/internal/channel/HttpDispatcherLink.java index 29cda3a69fe3..68a4efd9c616 100644 --- a/dev/com.ibm.ws.transport.http/src/com/ibm/ws/http/dispatcher/internal/channel/HttpDispatcherLink.java +++ b/dev/com.ibm.ws.transport.http/src/com/ibm/ws/http/dispatcher/internal/channel/HttpDispatcherLink.java @@ -43,6 +43,7 @@ import com.ibm.ws.http.internal.VirtualHostMap.RequestHelper; import com.ibm.ws.transport.access.TransportConnectionAccess; import com.ibm.ws.transport.access.TransportConstants; +import com.ibm.wsspi.bytebuffer.WsByteBuffer; import com.ibm.wsspi.channelfw.ConnectionLink; import com.ibm.wsspi.channelfw.VirtualConnection; import com.ibm.wsspi.channelfw.base.InboundApplicationLink; @@ -167,12 +168,12 @@ public void init(VirtualConnection inVC, HttpDispatcherChannel channel) { public void close(VirtualConnection conn, Exception e) { if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) { - Tr.debug(tc, "Close called , vc ->" + this.vc + " hc: " + this.hashCode()); + Tr.debug(tc, "close ENTER, vc ->" + this.vc + " hc: " + this.hashCode()); } if (this.vc == null) { if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) { - Tr.debug(tc, "Connection must be already closed since vc is null"); + Tr.debug(tc, "close, Connection must be already closed since vc is null"); } // closeCompleted check is for the close, destroy, close order scenario. @@ -180,7 +181,7 @@ public void close(VirtualConnection conn, Exception e) { if (this.decrementNeeded.compareAndSet(true, false) & !closeCompleted.get()) { // ^ set back to false in case close is called more than once after destroy is called (highly unlikely) if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) { - Tr.debug(tc, "decrementNeeded is true: decrement active connection"); + Tr.debug(tc, "close, decrementNeeded is true: decrement active connection"); } this.myChannel.decrementActiveConns(); } @@ -193,24 +194,43 @@ public void close(VirtualConnection conn, Exception e) { // so we will have to use close API from SRTConnectionContext31 and call closeStreams. String closeNonUpgraded = (String) (this.vc.getStateMap().get(TransportConstants.CLOSE_NON_UPGRADED_STREAMS)); if (closeNonUpgraded != null && closeNonUpgraded.equalsIgnoreCase("true")) { - if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) { - Tr.debug(tc, "close streams from HttpDispatcherLink.close"); + Tr.debug(tc, "close, CLOSE_NON_UPGRADED_STREAMS"); + } + + // Save the remain upgrading and unread data into a VC's stateMap which will be consumed in the UpgradeInputByteBufferUtil.initialRead + if (this.isc.isReadDataAvailable()) { + WsByteBuffer currentBuffer = this.isc.getReadBuffer(); + WsByteBuffer newBuffer = HttpDispatcher.getBufferManager().allocate(currentBuffer.remaining()); + newBuffer.put(currentBuffer); + newBuffer.flip(); + + if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) { + Tr.debug(tc, "close, saved unread data [" + newBuffer.remaining() + "] from isc buffer [" + currentBuffer + "] to vc statemap [" + newBuffer + "]"); + } + + currentBuffer = null; + vc.getStateMap().put(TransportConstants.NOT_UPGRADED_UNREAD_DATA, newBuffer); } Exception errorinClosing = this.closeStreams(); if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) { - Tr.debug(tc, "Error closing in streams" + errorinClosing); + Tr.debug(tc, "close, Error closing in streams" + errorinClosing); } vc.getStateMap().put(TransportConstants.CLOSE_NON_UPGRADED_STREAMS, "CLOSED_NON_UPGRADED_STREAMS"); + + if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) { + Tr.debug(tc, "close EXIT"); + } + return; } String upgradedListener = (String) (this.vc.getStateMap().get(TransportConstants.UPGRADED_LISTENER)); if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) { - Tr.debug(tc, "upgradedListener ->" + upgradedListener); + Tr.debug(tc, "close, upgradedListener ->" + upgradedListener); } if (upgradedListener != null && upgradedListener.equalsIgnoreCase("true")) { boolean closeCalledFromWebConnection = false; @@ -237,7 +257,7 @@ public void close(VirtualConnection conn, Exception e) { // but we don't want to manipulate existing logic so a separate constant in the state map will be used for that below if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) { - Tr.debug(tc, "Connection Not to be closed here because Servlet Upgrade."); + Tr.debug(tc, "close EXIT, Connection Not to be closed here because Servlet Upgrade."); } return; } @@ -253,11 +273,11 @@ public void close(VirtualConnection conn, Exception e) { // want to call close outside of the sync to avoid deadlocks. WebConnCanClose = false; if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) { - Tr.debug(tc, "Upgraded Web Connection closing Dispatcher Link"); + Tr.debug(tc, "close, Upgraded Web Connection closing Dispatcher Link"); } } else { if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) { - Tr.debug(tc, "Upgraded Web Connection already called close; returning"); + Tr.debug(tc, "close EXIT, Upgraded Web Connection already called close; returning"); } return; } @@ -274,12 +294,16 @@ public void close(VirtualConnection conn, Exception e) { super.close(conn, e); } finally { if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) { - Tr.debug(tc, "decrement active connection count"); + Tr.debug(tc, "close, decrement active connection count"); } this.myChannel.decrementActiveConns(); } closeCompleted.compareAndSet(false, true); } + + if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) { + Tr.debug(tc, "close EXIT"); + } } /* diff --git a/dev/com.ibm.ws.transport.http/src/com/ibm/ws/transport/access/TransportConstants.java b/dev/com.ibm.ws.transport.http/src/com/ibm/ws/transport/access/TransportConstants.java index 5168244ed554..ba808cfccfdb 100644 --- a/dev/com.ibm.ws.transport.http/src/com/ibm/ws/transport/access/TransportConstants.java +++ b/dev/com.ibm.ws.transport.http/src/com/ibm/ws/transport/access/TransportConstants.java @@ -31,4 +31,6 @@ public class TransportConstants { public static final String UPGRADED_LISTENER = "UpgradedListener"; public static final String CLOSE_UPGRADED_WEBCONNECTION = "CloseUpgradedWebConnection"; + //Initial upgrade request data may be read together with the headers before the upgrade. + public static final String NOT_UPGRADED_UNREAD_DATA = "NotUpgradedUnreadData"; } diff --git a/dev/com.ibm.ws.webcontainer.servlet.3.1/src/com/ibm/ws/webcontainer31/upgrade/UpgradeReadCallback.java b/dev/com.ibm.ws.webcontainer.servlet.3.1/src/com/ibm/ws/webcontainer31/upgrade/UpgradeReadCallback.java index 68bbbceb6e15..2fd7dcc6a483 100644 --- a/dev/com.ibm.ws.webcontainer.servlet.3.1/src/com/ibm/ws/webcontainer31/upgrade/UpgradeReadCallback.java +++ b/dev/com.ibm.ws.webcontainer.servlet.3.1/src/com/ibm/ws/webcontainer31/upgrade/UpgradeReadCallback.java @@ -6,9 +6,6 @@ * http://www.eclipse.org/legal/epl-2.0/ * * SPDX-License-Identifier: EPL-2.0 - * - * Contributors: - * IBM Corporation - initial API and implementation *******************************************************************************/ package com.ibm.ws.webcontainer31.upgrade; @@ -48,6 +45,10 @@ public UpgradeReadCallback(ReadListener rl, UpgradeInputByteBufferUtil uIBBU, Th _upgradeStream = uIBBU; _contextManager = tcm; _srtUpgradeStream = srtUpgradeStream; + + if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) { + Tr.debug(tc, "UpgradeReadCallback constructor, ReadListener [" + _rl + "], this " + this); + } } /* (non-Javadoc) @@ -56,6 +57,9 @@ public UpgradeReadCallback(ReadListener rl, UpgradeInputByteBufferUtil uIBBU, Th @Override @FFDCIgnore(IOException.class) public void complete(VirtualConnection vc, TCPReadRequestContext rsc) { + if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) { + Tr.debug(tc, "complete ENTER , ReadListener [" + _rl + "], this " + this); + } if(vc == null){ return; @@ -106,6 +110,10 @@ public void complete(VirtualConnection vc, TCPReadRequestContext rsc) { _srtUpgradeStream.notify(); } } + + if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) { + Tr.debug(tc, "complete EXIT "); + } } } diff --git a/dev/com.ibm.ws.webcontainer.servlet.3.1/src/com/ibm/ws/webcontainer31/util/UpgradeInputByteBufferUtil.java b/dev/com.ibm.ws.webcontainer.servlet.3.1/src/com/ibm/ws/webcontainer31/util/UpgradeInputByteBufferUtil.java index 42789b1e962d..aaafb381694c 100644 --- a/dev/com.ibm.ws.webcontainer.servlet.3.1/src/com/ibm/ws/webcontainer31/util/UpgradeInputByteBufferUtil.java +++ b/dev/com.ibm.ws.webcontainer.servlet.3.1/src/com/ibm/ws/webcontainer31/util/UpgradeInputByteBufferUtil.java @@ -1,14 +1,11 @@ /******************************************************************************* - * Copyright (c) 2014 IBM Corporation and others. + * Copyright (c) 2014, 2024 IBM Corporation and others. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License 2.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-2.0/ * * SPDX-License-Identifier: EPL-2.0 - * - * Contributors: - * IBM Corporation - initial API and implementation *******************************************************************************/ package com.ibm.ws.webcontainer31.util; @@ -164,51 +161,75 @@ private boolean syncRead(int amountToRead) throws IOException{ */ private boolean immediateRead(int amountToRead){ if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){ - Tr.debug(tc, "immediateRead, Executing a read"); + Tr.debug(tc, "immediateRead ENTER , buffer size [" + amountToRead + "]"); } if(amountToRead > 1){ - //Allocate a new temp buffer, then set the position to 0 and limit to the amount we want to read - //Copy in the current this.buffer as it should only have one byte in it - WsByteBuffer tempBuffer = allocateBuffer(amountToRead); - tempBuffer.position(0); - tempBuffer.limit(amountToRead); - tempBuffer.put(_buffer); - tempBuffer.position(1); - _buffer.release(); - _buffer = tempBuffer; - tempBuffer = null; - - _tcpContext.getReadInterface().setBuffer(_buffer); - - long bytesRead = 0; - - try{ - bytesRead = _tcpContext.getReadInterface().read(0, WCCustomProperties31.UPGRADE_READ_TIMEOUT); - } catch (IOException readException){ - //If we encounter an exception here we need to return the 1 byte that we already have. - //Returned true immediately and the next read will catch the exception and propagate it properly + /* + * On a slow system/startup, data comes from the wire is faster than the channel can read/parse. + * If so, data needs to be read from the prepared buffer (set during the first initialRead()) instead of the wire. + * This happens mainly on the very first POST. + * this method is called down from the application readListener onDataAvailable() read. + */ + if (_upConn.getVirtualConnection().getStateMap().get(TransportConstants.NOT_UPGRADED_UNREAD_DATA) != null) { if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){ - Tr.debug(tc, "immediateRead, The read encountered an exception. " + readException); - Tr.debug(tc, "immediateRead, Return with our one byte"); + Tr.debug(tc, "immediateRead, read from saved buffer [" + _buffer + "] , amount [" + _buffer.remaining() +"]"); + } + + _upConn.getVirtualConnection().getStateMap().remove(TransportConstants.NOT_UPGRADED_UNREAD_DATA); + } + // Read from the interface + else { + //Allocate a new temp buffer, then set the position to 0 and limit to the amount we want to read + //Copy in the current this.buffer as it should only have one byte in it + WsByteBuffer tempBuffer = allocateBuffer(amountToRead); + tempBuffer.position(0); + tempBuffer.limit(amountToRead); + tempBuffer.put(_buffer); + tempBuffer.position(1); + _buffer.release(); + _buffer = tempBuffer; + tempBuffer = null; + + _tcpContext.getReadInterface().setBuffer(_buffer); + + long bytesRead = 0; + + try{ + if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){ + Tr.debug(tc, "immediateRead, read from interface"); + } + bytesRead = _tcpContext.getReadInterface().read(0, WCCustomProperties31.UPGRADE_READ_TIMEOUT); + } catch (IOException readException){ + //If we encounter an exception here we need to return the 1 byte that we already have. + //Returned true immediately and the next read will catch the exception and propagate it properly + if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){ + Tr.debug(tc, "immediateRead, The read encountered an exception. " + readException); + Tr.debug(tc, "immediateRead, Return with our one byte"); + } + + configurePostReadBuffer(); + return true; } - + + if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){ + Tr.debug(tc, "immediateRead, Complete, " + bytesRead); + } + //Get the buffer from the TCP Channel after we have told them to read. + _buffer = _tcpContext.getReadInterface().getBuffer(); + + //We don't need to check for null first as we know we will always get the buffer we just set configurePostReadBuffer(); - return true; } - - if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){ - Tr.debug(tc, "immediateRead, Complete, " + bytesRead); - } - //Get the buffer from the TCP Channel after we have told them to read. - _buffer = _tcpContext.getReadInterface().getBuffer(); - - //We don't need to check for null first as we know we will always get the buffer we just set - configurePostReadBuffer(); + // record the new amount of data read from the channel _totalBytesRead += _buffer.remaining(); } + if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){ + Tr.debug(tc, "immediateRead EXIT , returns to application this buffer [" + _buffer + "], totalBytesRead [" + _totalBytesRead + "]"); + } + //We will return true here in all circumstances because we always have 1 byte read from the isReady call or the initial read of the connection return true; } @@ -296,7 +317,7 @@ private void setAndAllocateBuffer(int sizeToAllocate) { configurePreReadBuffer(sizeToAllocate); if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){ - Tr.debug(tc, "setAndAllocateBuffer, Setting the buffer : " + _buffer ); + Tr.debug(tc, "setAndAllocateBuffer, buffer [" + _buffer + "] , size [" + sizeToAllocate + "]"); } _tcpContext.getReadInterface().setBuffer(_buffer); @@ -390,7 +411,7 @@ public void setupReadListener(ReadListener readListenerl, SRTUpgradeInputStream3 initialRead(); if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){ - Tr.debug(tc, "setupReadListener, ReadListener set : " + _rl); + Tr.debug(tc, "setupReadListener, UpgradeReadCallback [" + _tcpChannelCallback + "] , ReadListener [" + _rl + "]"); } } @@ -469,19 +490,63 @@ public boolean isReady(){ * The provided callback will be called when the read is completed and that callback will invoke the ReadListener logic. */ public void initialRead(){ + if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){ + Tr.debug(tc, "initialRead ENTER , callback [" + _tcpChannelCallback + "], readListener [" + _rl + "]"); + } + _isInitialRead = true; if(_buffer != null){ _buffer.release(); _buffer = null; + } + + /* + * If there is any unread data during the very first initialRead, those data are stored in the buffer, not at the wire, so callback complete() needs to be called + * to trigger the customer onDataAvailable to read those data from this prep buffer. + * The prep buffer has already been prepared during the HttpDispatcherLink.close() and saved in the VC.stateMap + */ + WsByteBuffer data = null; + if ((data = (WsByteBuffer) _upConn.getVirtualConnection().getStateMap().get(TransportConstants.NOT_UPGRADED_UNREAD_DATA)) != null) { + int remaining = data.remaining(); + setAndAllocateBuffer(remaining); + _buffer.put(data); + //don't flip here! It will be flipped during the UpgradeReadCallback.complete + + if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){ + Tr.debug(tc, "initialRead, saved unread data [" + remaining + "] from statemap [" + data + "] into _buffer [" + _buffer + "]"); + Tr.debug(tc, "initialRead, invoke callback complete [" + _tcpChannelCallback + "]"); + } + + data.release(); + data = null; + + //standin dummy for immediateRead() to check and then remove (so that we don't need to save off the ByteBuffer again just for a cleanup) + _upConn.getVirtualConnection().getStateMap().put(TransportConstants.NOT_UPGRADED_UNREAD_DATA, "initialRead"); + + //complete() invokes app's onDataAvailable() to read the prepared _buffer. + //That read eventually call back into this.immediateRead(int). + //The callback complete here (and eventually the first immediateRead) happens in the same thread. + //Subsequently, the getReadInterface().read (below) will go async. + _tcpChannelCallback.complete(_upConn.getVirtualConnection(), _tcpContext.getReadInterface()); + + if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){ + Tr.debug(tc, "initialRead, callback complete. All initial data read."); + } } + + if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){ + Tr.debug(tc, "initialRead, async read data from interface"); + } setAndAllocateBuffer(1); - configurePreReadBuffer(1); - + //This if the first read of the ReadListener, which means force the read to go async //We won't get an actual response from this read as it will always come back on another thread - _tcpContext.getReadInterface().setBuffer(_buffer); _tcpContext.getReadInterface().read(1, _tcpChannelCallback, true, WCCustomProperties31.UPGRADE_READ_TIMEOUT); + + if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){ + Tr.debug(tc, "initialRead EXIT"); + } } /** @@ -489,6 +554,9 @@ public void initialRead(){ * and post configure the buffer. Without this method we would lose the first byte we are reading */ public void configurePostInitialReadBuffer(){ + if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){ + Tr.debug(tc, "configurePostInitialReadBuffer"); + } _isInitialRead = false; _isFirstRead = false; _buffer = _tcpContext.getReadInterface().getBuffer(); diff --git a/dev/io.openliberty.webcontainer.servlet.6.1.internal_fat/publish/servers/servlet61_ReadListenerAfterHttpUpgradeHandler/bootstrap.properties b/dev/io.openliberty.webcontainer.servlet.6.1.internal_fat/publish/servers/servlet61_ReadListenerAfterHttpUpgradeHandler/bootstrap.properties new file mode 100644 index 000000000000..8f703c9cf501 --- /dev/null +++ b/dev/io.openliberty.webcontainer.servlet.6.1.internal_fat/publish/servers/servlet61_ReadListenerAfterHttpUpgradeHandler/bootstrap.properties @@ -0,0 +1,15 @@ +############################################################################### +# Copyright (c) 2024 IBM Corporation and others. +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License 2.0 +# which accompanies this distribution, and is available at +# http://www.eclipse.org/legal/epl-2.0/ +# +# SPDX-License-Identifier: EPL-2.0 +############################################################################### +bootstrap.include=../testports.properties +osgi.console=7777 +com.ibm.ws.logging.trace.specification=*=info:io.openliberty.webcontainer*=all:com.ibm.ws.webcontainer*=all:com.ibm.wsspi.webcontainer*=all:HTTPChannel=all:GenericBNF=all:HTTPDispatcher=all +com.ibm.ws.logging.max.file.size=20 +com.ibm.ws.logging.max.files=10 +com.ibm.ws.logging.trace.format=BASIC diff --git a/dev/io.openliberty.webcontainer.servlet.6.1.internal_fat/publish/servers/servlet61_ReadListenerAfterHttpUpgradeHandler/server.xml b/dev/io.openliberty.webcontainer.servlet.6.1.internal_fat/publish/servers/servlet61_ReadListenerAfterHttpUpgradeHandler/server.xml new file mode 100644 index 000000000000..284bd38f6d50 --- /dev/null +++ b/dev/io.openliberty.webcontainer.servlet.6.1.internal_fat/publish/servers/servlet61_ReadListenerAfterHttpUpgradeHandler/server.xml @@ -0,0 +1,17 @@ + + + + + + servlet-6.1 + + + diff --git a/dev/io.openliberty.webcontainer.servlet.6.1.internal_fat/test-applications/ReadListenerAfterUpgrade.war/src/readlistener/TestHttpUpgradeHandler.java b/dev/io.openliberty.webcontainer.servlet.6.1.internal_fat/test-applications/ReadListenerAfterUpgrade.war/src/readlistener/TestHttpUpgradeHandler.java new file mode 100644 index 000000000000..d76b888ac368 --- /dev/null +++ b/dev/io.openliberty.webcontainer.servlet.6.1.internal_fat/test-applications/ReadListenerAfterUpgrade.war/src/readlistener/TestHttpUpgradeHandler.java @@ -0,0 +1,41 @@ +/******************************************************************************* + * Copyright (c) 2024 IBM Corporation and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License 2.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ +package readlistener; + +import jakarta.servlet.ServletInputStream; +import jakarta.servlet.ServletOutputStream; +import jakarta.servlet.http.HttpUpgradeHandler; +import jakarta.servlet.http.WebConnection; + +public class TestHttpUpgradeHandler implements HttpUpgradeHandler { + private static final String CLASS_NAME = TestHttpUpgradeHandler.class.getName(); + + public void init(WebConnection wc) { + LOG("init, wc [" + wc + "]"); + try { + ServletInputStream input = wc.getInputStream(); + ServletOutputStream output = wc.getOutputStream(); + TestReadListener readListener = new TestReadListener(input, output); + LOG("init, setReadLisener [" + readListener + "]"); + input.setReadListener(readListener); + } catch (Exception ex) { + LOG("init, exception [" + ex + "]"); + throw new RuntimeException(ex); + } + } + + public void destroy() { + LOG("destroy"); + } + + private static void LOG(String s) { + System.out.println(CLASS_NAME + " " + s); + } +} \ No newline at end of file diff --git a/dev/io.openliberty.webcontainer.servlet.6.1.internal_fat/test-applications/ReadListenerAfterUpgrade.war/src/readlistener/TestReadListener.java b/dev/io.openliberty.webcontainer.servlet.6.1.internal_fat/test-applications/ReadListenerAfterUpgrade.war/src/readlistener/TestReadListener.java new file mode 100644 index 000000000000..997f66a44cb5 --- /dev/null +++ b/dev/io.openliberty.webcontainer.servlet.6.1.internal_fat/test-applications/ReadListenerAfterUpgrade.war/src/readlistener/TestReadListener.java @@ -0,0 +1,111 @@ +/******************************************************************************* + * Copyright (c) 2024 IBM Corporation and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License 2.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ +package readlistener; + +/* + * For test performance, do not reply/echo back on every received data. + * Instead, save all data into the StringBuffer until END string is received; + * then reply PASS or FAIL depending on whether if all data has received (tracking via the dataList) + */ +import jakarta.servlet.ReadListener; +import jakarta.servlet.ServletInputStream; +import jakarta.servlet.ServletOutputStream; + +public class TestReadListener implements ReadListener { + private static final String CLASS_NAME = TestReadListener.class.getName(); + private ServletInputStream input = null; + private ServletOutputStream output = null; + private final String READ_SEPARATOR = " || "; // separate between the read ...help to see how many onDataAvailable has triggered + private static StringBuilder totalData = new StringBuilder(); // tracking total read - ONE instance for this whole test + private ArrayList dataList = new ArrayList(); //This dataList track all data that should be received by the application + private final int sizeList = 30; //If modify, also change in the client sizeList otherwise test will fail. + + TestReadListener(ServletInputStream in, ServletOutputStream out) { + LOG("constructor, "); + this.input = in; + this.output = out; + + //client sending data in chunk, begin with "BEGIN" then ,Data_0,... up to ,Data_30 with comma as delimiter for parsing + //dataList tracks the received Data_ and remove from the list. Its size should be 0 when END string is received; otherwise data has missed. + for(int i = 0; i <= sizeList ; i++) { + dataList.add("Data_" + i); + } + LOG("initial dataList size [" + dataList.size() + "]"); + } + + public void onDataAvailable() { + LOG("onDataAvailable ENTER"); + + try { + StringBuilder sub = new StringBuilder(); + int len = -1; + byte[] b = new byte[1024]; + LOG("onDataAvailable, reading ..."); + while (this.input.isReady() && (len = this.input.read(b)) != -1) { + String data = new String(b, 0, len); + sub.append(data); + } + totalData.append(sub.toString() + READ_SEPARATOR); // || is added to original data for readability + + LOG("onDataAvailable, this read data [" + sub.toString() + "]"); + LOG("onDataAvailable EXIT. Total read data [" + totalData.toString() + "]"); + + //Remove received data from the main dataList + String[] tmp = sub.toString().split(","); + for (String s : tmp) { + dataList.remove(s); + } + + LOG("onDataAvailable, after remove, dataList size [" + dataList.size() + "]"); + if (totalData.toString().contains("END")) { + if (dataList.size() == 0) { + LOG("onDataAvailable, END string found. All data received."); + output.println("Received [" + totalData.toString() + "] . PASS"); + } + else { + LOG("onDataAvailable, END string found BUT not all data has received. Missing the following "); + for(String s : dataList) { + LOG(s); + } + output.println("NOT all data has received [" + totalData.toString() + "] . FAIL"); + } + output.flush(); + } + } catch (Exception ex) { + LOG("onDataAvailable EXIT with exception [" + ex + "]"); + throw new IllegalStateException(ex); + } + } + + public void onAllDataRead() { + LOG("onAllDataRead ENTER"); + + try { + LOG("onAllDataRead, close output stream"); + + this.output.close(); + + LOG("onAllDataRead EXIT"); + } catch (Exception ex) { + LOG("onAllDataRead throws IllegalStateException for exception [" + ex + "]"); + throw new IllegalStateException(ex); + } + } + + public void onError(Throwable t) { + LOG("onError, encounted error"); + + t.printStackTrace(); + } + + private static void LOG(String s) { + System.out.println(CLASS_NAME + " " + s); + } +} \ No newline at end of file diff --git a/dev/io.openliberty.webcontainer.servlet.6.1.internal_fat/test-applications/ReadListenerAfterUpgrade.war/src/readlistener/TestServlet.java b/dev/io.openliberty.webcontainer.servlet.6.1.internal_fat/test-applications/ReadListenerAfterUpgrade.war/src/readlistener/TestServlet.java new file mode 100644 index 000000000000..5cda6c9fd3ff --- /dev/null +++ b/dev/io.openliberty.webcontainer.servlet.6.1.internal_fat/test-applications/ReadListenerAfterUpgrade.war/src/readlistener/TestServlet.java @@ -0,0 +1,44 @@ +/******************************************************************************* + * Copyright (c) 2024 IBM Corporation and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License 2.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ +package readlistener; + +import jakarta.servlet.ServletException; +import jakarta.servlet.annotation.WebServlet; +import jakarta.servlet.http.HttpServlet; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; + +import java.io.IOException; + +@WebServlet({"/TestServlet"}) +public class TestServlet extends HttpServlet { + private static final String CLASS_NAME = TestServlet.class.getName(); + + public void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + + if (request.getHeader("Upgrade") != null) { + LOG("doPost , prepare respone with 100 Switching Protocol"); + response.setStatus(101); + response.setHeader("Upgrade", "YES"); + response.setHeader("Connection", "Upgrade"); + + LOG(" setting up server side HttpUpgradeHandler"); + request.upgrade(TestHttpUpgradeHandler.class); + } else { + LOG("doPost, not an upgrade request. Done"); + response.getWriter().println("No upgrade"); + response.getWriter().println("End of Test"); + } + } + + private static void LOG(String s) { + System.out.println(CLASS_NAME + " " + s); + } +} \ No newline at end of file