Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update transport #30404

Open
wants to merge 5 commits into
base: integration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.ibm.ws.http.internal.VirtualHostMap.RequestHelper;
import com.ibm.ws.transport.access.TransportConnectionAccess;
import com.ibm.ws.transport.access.TransportConstants;
import com.ibm.wsspi.bytebuffer.WsByteBuffer;
import com.ibm.wsspi.channelfw.ConnectionLink;
import com.ibm.wsspi.channelfw.VirtualConnection;
import com.ibm.wsspi.channelfw.base.InboundApplicationLink;
Expand Down Expand Up @@ -167,20 +168,20 @@ public void init(VirtualConnection inVC, HttpDispatcherChannel channel) {
public void close(VirtualConnection conn, Exception e) {

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Close called , vc ->" + this.vc + " hc: " + this.hashCode());
Tr.debug(tc, "close ENTER, vc ->" + this.vc + " hc: " + this.hashCode());
}

if (this.vc == null) {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Connection must be already closed since vc is null");
Tr.debug(tc, "close, Connection must be already closed since vc is null");
}

// closeCompleted check is for the close, destroy, close order scenario.
// Without this check, this second close (after the destroy) would decrement the connection again and produce a quiesce error.
if (this.decrementNeeded.compareAndSet(true, false) & !closeCompleted.get()) {
// ^ set back to false in case close is called more than once after destroy is called (highly unlikely)
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "decrementNeeded is true: decrement active connection");
Tr.debug(tc, "close, decrementNeeded is true: decrement active connection");
}
this.myChannel.decrementActiveConns();
}
Expand All @@ -193,24 +194,43 @@ public void close(VirtualConnection conn, Exception e) {
// so we will have to use close API from SRTConnectionContext31 and call closeStreams.
String closeNonUpgraded = (String) (this.vc.getStateMap().get(TransportConstants.CLOSE_NON_UPGRADED_STREAMS));
if (closeNonUpgraded != null && closeNonUpgraded.equalsIgnoreCase("true")) {

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "close streams from HttpDispatcherLink.close");
Tr.debug(tc, "close, CLOSE_NON_UPGRADED_STREAMS");
}

// Save the remain upgrading and unread data into a VC's stateMap which will be consumed in the UpgradeInputByteBufferUtil.initialRead
if (this.isc.isReadDataAvailable()) {
WsByteBuffer currentBuffer = this.isc.getReadBuffer();
WsByteBuffer newBuffer = HttpDispatcher.getBufferManager().allocate(currentBuffer.remaining());
newBuffer.put(currentBuffer);
newBuffer.flip();

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "close, saved unread data [" + newBuffer.remaining() + "] from isc buffer [" + currentBuffer + "] to vc statemap [" + newBuffer + "]");
}

currentBuffer = null;
vc.getStateMap().put(TransportConstants.NOT_UPGRADED_UNREAD_DATA, newBuffer);
}

Exception errorinClosing = this.closeStreams();

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Error closing in streams" + errorinClosing);
Tr.debug(tc, "close, Error closing in streams" + errorinClosing);
}

vc.getStateMap().put(TransportConstants.CLOSE_NON_UPGRADED_STREAMS, "CLOSED_NON_UPGRADED_STREAMS");

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "close EXIT");
}

return;
}

String upgradedListener = (String) (this.vc.getStateMap().get(TransportConstants.UPGRADED_LISTENER));
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "upgradedListener ->" + upgradedListener);
Tr.debug(tc, "close, upgradedListener ->" + upgradedListener);
}
if (upgradedListener != null && upgradedListener.equalsIgnoreCase("true")) {
boolean closeCalledFromWebConnection = false;
Expand All @@ -237,7 +257,7 @@ public void close(VirtualConnection conn, Exception e) {
// but we don't want to manipulate existing logic so a separate constant in the state map will be used for that below

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Connection Not to be closed here because Servlet Upgrade.");
Tr.debug(tc, "close EXIT, Connection Not to be closed here because Servlet Upgrade.");
}
return;
}
Expand All @@ -253,11 +273,11 @@ public void close(VirtualConnection conn, Exception e) {
// want to call close outside of the sync to avoid deadlocks.
WebConnCanClose = false;
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Upgraded Web Connection closing Dispatcher Link");
Tr.debug(tc, "close, Upgraded Web Connection closing Dispatcher Link");
}
} else {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "Upgraded Web Connection already called close; returning");
Tr.debug(tc, "close EXIT, Upgraded Web Connection already called close; returning");
}
return;
}
Expand All @@ -274,12 +294,16 @@ public void close(VirtualConnection conn, Exception e) {
super.close(conn, e);
} finally {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "decrement active connection count");
Tr.debug(tc, "close, decrement active connection count");
}
this.myChannel.decrementActiveConns();
}
closeCompleted.compareAndSet(false, true);
}

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "close EXIT");
}
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ public class TransportConstants {
public static final String UPGRADED_LISTENER = "UpgradedListener";
public static final String CLOSE_UPGRADED_WEBCONNECTION = "CloseUpgradedWebConnection";

//Initial upgrade request data may be read together with the headers before the upgrade.
public static final String NOT_UPGRADED_UNREAD_DATA = "NotUpgradedUnreadData";
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
* http://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* IBM Corporation - initial API and implementation
*******************************************************************************/
package com.ibm.ws.webcontainer31.upgrade;

Expand Down Expand Up @@ -48,6 +45,10 @@ public UpgradeReadCallback(ReadListener rl, UpgradeInputByteBufferUtil uIBBU, Th
_upgradeStream = uIBBU;
_contextManager = tcm;
_srtUpgradeStream = srtUpgradeStream;

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "UpgradeReadCallback constructor, ReadListener [" + _rl + "], this " + this);
}
}

/* (non-Javadoc)
Expand All @@ -56,6 +57,9 @@ public UpgradeReadCallback(ReadListener rl, UpgradeInputByteBufferUtil uIBBU, Th
@Override
@FFDCIgnore(IOException.class)
public void complete(VirtualConnection vc, TCPReadRequestContext rsc) {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "complete ENTER , ReadListener [" + _rl + "], this " + this);
}

if(vc == null){
return;
Expand Down Expand Up @@ -106,6 +110,10 @@ public void complete(VirtualConnection vc, TCPReadRequestContext rsc) {
_srtUpgradeStream.notify();
}
}

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
Tr.debug(tc, "complete EXIT ");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corporation and others.
* Copyright (c) 2014, 2024 IBM Corporation and others.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License 2.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* IBM Corporation - initial API and implementation
*******************************************************************************/
package com.ibm.ws.webcontainer31.util;

Expand Down Expand Up @@ -164,51 +161,75 @@ private boolean syncRead(int amountToRead) throws IOException{
*/
private boolean immediateRead(int amountToRead){
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){
Tr.debug(tc, "immediateRead, Executing a read");
Tr.debug(tc, "immediateRead ENTER , buffer size [" + amountToRead + "]");
}

if(amountToRead > 1){
//Allocate a new temp buffer, then set the position to 0 and limit to the amount we want to read
//Copy in the current this.buffer as it should only have one byte in it
WsByteBuffer tempBuffer = allocateBuffer(amountToRead);
tempBuffer.position(0);
tempBuffer.limit(amountToRead);
tempBuffer.put(_buffer);
tempBuffer.position(1);
_buffer.release();
_buffer = tempBuffer;
tempBuffer = null;

_tcpContext.getReadInterface().setBuffer(_buffer);

long bytesRead = 0;

try{
bytesRead = _tcpContext.getReadInterface().read(0, WCCustomProperties31.UPGRADE_READ_TIMEOUT);
} catch (IOException readException){
//If we encounter an exception here we need to return the 1 byte that we already have.
//Returned true immediately and the next read will catch the exception and propagate it properly
/*
* On a slow system/startup, data comes from the wire is faster than the channel can read/parse.
* If so, data needs to be read from the prepared buffer (set during the first initialRead()) instead of the wire.
* This happens mainly on the very first POST.
* this method is called down from the application readListener onDataAvailable() read.
*/
if (_upConn.getVirtualConnection().getStateMap().get(TransportConstants.NOT_UPGRADED_UNREAD_DATA) != null) {
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){
Tr.debug(tc, "immediateRead, The read encountered an exception. " + readException);
Tr.debug(tc, "immediateRead, Return with our one byte");
Tr.debug(tc, "immediateRead, read from saved buffer [" + _buffer + "] , amount [" + _buffer.remaining() +"]");
}

_upConn.getVirtualConnection().getStateMap().remove(TransportConstants.NOT_UPGRADED_UNREAD_DATA);
}
// Read from the interface
else {
//Allocate a new temp buffer, then set the position to 0 and limit to the amount we want to read
//Copy in the current this.buffer as it should only have one byte in it
WsByteBuffer tempBuffer = allocateBuffer(amountToRead);
tempBuffer.position(0);
tempBuffer.limit(amountToRead);
tempBuffer.put(_buffer);
tempBuffer.position(1);
_buffer.release();
_buffer = tempBuffer;
tempBuffer = null;

_tcpContext.getReadInterface().setBuffer(_buffer);

long bytesRead = 0;

try{
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){
Tr.debug(tc, "immediateRead, read from interface");
}
bytesRead = _tcpContext.getReadInterface().read(0, WCCustomProperties31.UPGRADE_READ_TIMEOUT);
} catch (IOException readException){
//If we encounter an exception here we need to return the 1 byte that we already have.
//Returned true immediately and the next read will catch the exception and propagate it properly
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){
Tr.debug(tc, "immediateRead, The read encountered an exception. " + readException);
Tr.debug(tc, "immediateRead, Return with our one byte");
}

configurePostReadBuffer();
return true;
}


if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){
Tr.debug(tc, "immediateRead, Complete, " + bytesRead);
}
//Get the buffer from the TCP Channel after we have told them to read.
_buffer = _tcpContext.getReadInterface().getBuffer();

//We don't need to check for null first as we know we will always get the buffer we just set
configurePostReadBuffer();
return true;
}

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){
Tr.debug(tc, "immediateRead, Complete, " + bytesRead);
}
//Get the buffer from the TCP Channel after we have told them to read.
_buffer = _tcpContext.getReadInterface().getBuffer();

//We don't need to check for null first as we know we will always get the buffer we just set
configurePostReadBuffer();

// record the new amount of data read from the channel
_totalBytesRead += _buffer.remaining();
}

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){
Tr.debug(tc, "immediateRead EXIT , returns to application this buffer [" + _buffer + "], totalBytesRead [" + _totalBytesRead + "]");
}

//We will return true here in all circumstances because we always have 1 byte read from the isReady call or the initial read of the connection
return true;
}
Expand Down Expand Up @@ -296,7 +317,7 @@ private void setAndAllocateBuffer(int sizeToAllocate) {
configurePreReadBuffer(sizeToAllocate);

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){
Tr.debug(tc, "setAndAllocateBuffer, Setting the buffer : " + _buffer );
Tr.debug(tc, "setAndAllocateBuffer, buffer [" + _buffer + "] , size [" + sizeToAllocate + "]");
}

_tcpContext.getReadInterface().setBuffer(_buffer);
Expand Down Expand Up @@ -390,7 +411,7 @@ public void setupReadListener(ReadListener readListenerl, SRTUpgradeInputStream3
initialRead();

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){
Tr.debug(tc, "setupReadListener, ReadListener set : " + _rl);
Tr.debug(tc, "setupReadListener, UpgradeReadCallback [" + _tcpChannelCallback + "] , ReadListener [" + _rl + "]");
}

}
Expand Down Expand Up @@ -469,26 +490,73 @@ public boolean isReady(){
* The provided callback will be called when the read is completed and that callback will invoke the ReadListener logic.
*/
public void initialRead(){
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){
Tr.debug(tc, "initialRead ENTER , callback [" + _tcpChannelCallback + "], readListener [" + _rl + "]");
}

_isInitialRead = true;
if(_buffer != null){
_buffer.release();
_buffer = null;
}

/*
* If there is any unread data during the very first initialRead, those data are stored in the buffer, not at the wire, so callback complete() needs to be called
* to trigger the customer onDataAvailable to read those data from this prep buffer.
* The prep buffer has already been prepared during the HttpDispatcherLink.close() and saved in the VC.stateMap
*/
WsByteBuffer data = null;
if ((data = (WsByteBuffer) _upConn.getVirtualConnection().getStateMap().get(TransportConstants.NOT_UPGRADED_UNREAD_DATA)) != null) {
int remaining = data.remaining();
setAndAllocateBuffer(remaining);
_buffer.put(data);
//don't flip here! It will be flipped during the UpgradeReadCallback.complete

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){
Tr.debug(tc, "initialRead, saved unread data [" + remaining + "] from statemap [" + data + "] into _buffer [" + _buffer + "]");
Tr.debug(tc, "initialRead, invoke callback complete [" + _tcpChannelCallback + "]");
}

data.release();
data = null;

//standin dummy for immediateRead() to check and then remove (so that we don't need to save off the ByteBuffer again just for a cleanup)
_upConn.getVirtualConnection().getStateMap().put(TransportConstants.NOT_UPGRADED_UNREAD_DATA, "initialRead");

//complete() invokes app's onDataAvailable() to read the prepared _buffer.
//That read eventually call back into this.immediateRead(int).
//The callback complete here (and eventually the first immediateRead) happens in the same thread.
//Subsequently, the getReadInterface().read (below) will go async.
_tcpChannelCallback.complete(_upConn.getVirtualConnection(), _tcpContext.getReadInterface());

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){
Tr.debug(tc, "initialRead, callback complete. All initial data read.");
}
}

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){
Tr.debug(tc, "initialRead, async read data from interface");
}

setAndAllocateBuffer(1);
configurePreReadBuffer(1);


//This if the first read of the ReadListener, which means force the read to go async
//We won't get an actual response from this read as it will always come back on another thread
_tcpContext.getReadInterface().setBuffer(_buffer);
_tcpContext.getReadInterface().read(1, _tcpChannelCallback, true, WCCustomProperties31.UPGRADE_READ_TIMEOUT);

if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){
Tr.debug(tc, "initialRead EXIT");
}
}

/**
* Called after the initial read is completed. This will set the first read flag to false, get the buffer from the TCP Channel,
* and post configure the buffer. Without this method we would lose the first byte we are reading
*/
public void configurePostInitialReadBuffer(){
if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()){
Tr.debug(tc, "configurePostInitialReadBuffer");
}
_isInitialRead = false;
_isFirstRead = false;
_buffer = _tcpContext.getReadInterface().getBuffer();
Expand Down
Loading