Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WEJBHTTP-140] First round of refactorings in TXN part - focus on client side of the protocol #221

Merged
merged 10 commits into from
Sep 25, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import io.undertow.client.ClientRequest;
import io.undertow.util.Headers;
import io.undertow.util.Methods;
import org.jboss.marshalling.Marshaller;
import org.jboss.marshalling.Marshalling;
import org.wildfly.httpclient.common.HttpTargetContext;
Expand Down Expand Up @@ -87,14 +86,15 @@ public void commit() throws RollbackException, HeuristicMixedException, Heuristi
}
final CompletableFuture<Void> result = new CompletableFuture<>();
statusRef.set(Status.STATUS_COMMITTING);
ClientRequest cr = new ClientRequest()
.setMethod(Methods.POST)
.setPath(targetContext.getUri().getPath() + TXN_CONTEXT + VERSION_PATH +

RequestBuilder builder = new RequestBuilder().setRequestType(RequestType.UT_COMMIT).setVersion(targetContext.getProtocolVersion());
final ClientRequest request = builder.createRequest(targetContext.getUri().getPath());
request.setPath(targetContext.getUri().getPath() + TXN_CONTEXT + VERSION_PATH +
targetContext.getProtocolVersion() + UT_COMMIT_PATH);
cr.getRequestHeaders().put(Headers.ACCEPT, EXCEPTION.toString());
cr.getRequestHeaders().put(Headers.CONTENT_TYPE, XID.toString());
targetContext.sendRequest(cr, sslContext, authenticationConfiguration, output -> {
Marshaller marshaller = targetContext.getHttpMarshallerFactory(cr).createMarshaller();
request.getRequestHeaders().put(Headers.ACCEPT, EXCEPTION.toString());
request.getRequestHeaders().put(Headers.CONTENT_TYPE, XID.toString());
targetContext.sendRequest(request, sslContext, authenticationConfiguration, output -> {
Marshaller marshaller = targetContext.getHttpMarshallerFactory(request).createMarshaller();
marshaller.start(Marshalling.createByteOutput(output));
marshaller.writeInt(id.getFormatId());
final byte[] gtid = id.getGlobalTransactionId();
Expand Down Expand Up @@ -157,14 +157,15 @@ public void rollback() throws SecurityException, SystemException {

final CompletableFuture<Void> result = new CompletableFuture<>();
statusRef.set(Status.STATUS_COMMITTING);
ClientRequest cr = new ClientRequest()
.setMethod(Methods.POST)
.setPath(targetContext.getUri().getPath() + TXN_CONTEXT + VERSION_PATH + targetContext.getProtocolVersion()

RequestBuilder builder = new RequestBuilder().setRequestType(RequestType.UT_ROLLBACK).setVersion(targetContext.getProtocolVersion());
final ClientRequest request = builder.createRequest(targetContext.getUri().getPath());
request.setPath(targetContext.getUri().getPath() + TXN_CONTEXT + VERSION_PATH + targetContext.getProtocolVersion()
+ UT_ROLLBACK_PATH);
cr.getRequestHeaders().put(Headers.ACCEPT, EXCEPTION.toString());
cr.getRequestHeaders().put(Headers.CONTENT_TYPE, XID.toString());
targetContext.sendRequest(cr, sslContext, authenticationConfiguration, output -> {
Marshaller marshaller = targetContext.getHttpMarshallerFactory(cr).createMarshaller();
request.getRequestHeaders().put(Headers.ACCEPT, EXCEPTION.toString());
request.getRequestHeaders().put(Headers.CONTENT_TYPE, XID.toString());
targetContext.sendRequest(request, sslContext, authenticationConfiguration, output -> {
Marshaller marshaller = targetContext.getHttpMarshallerFactory(request).createMarshaller();
marshaller.start(Marshalling.createByteOutput(output));
marshaller.writeInt(id.getFormatId());
final byte[] gtid = id.getGlobalTransactionId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import io.undertow.client.ClientRequest;
import io.undertow.util.Headers;
import io.undertow.util.Methods;
import org.jboss.marshalling.InputStreamByteInput;
import org.jboss.marshalling.Unmarshaller;
import org.wildfly.httpclient.common.HttpTargetContext;
Expand Down Expand Up @@ -87,13 +86,13 @@ public SubordinateTransactionControl lookupXid(Xid xid) throws XAException {
public Xid[] recover(int flag, String parentName) throws XAException {
final CompletableFuture<Xid[]> xidList = new CompletableFuture<>();

ClientRequest cr = new ClientRequest()
.setPath(targetContext.getUri().getPath() + TXN_CONTEXT + VERSION_PATH + targetContext.getProtocolVersion() +
XA_RECOVER_PATH + "/" + parentName)
.setMethod(Methods.GET);
cr.getRequestHeaders().put(Headers.ACCEPT, XID_LIST + "," + NEW_TRANSACTION);
cr.getRequestHeaders().put(RECOVERY_PARENT_NAME, parentName);
cr.getRequestHeaders().put(RECOVERY_FLAGS, Integer.toString(flag));
RequestBuilder builder = new RequestBuilder().setRequestType(RequestType.XA_RECOVER).setVersion(targetContext.getProtocolVersion());
final ClientRequest request = builder.createRequest(targetContext.getUri().getPath());
request.setPath(targetContext.getUri().getPath() + TXN_CONTEXT + VERSION_PATH + targetContext.getProtocolVersion() +
XA_RECOVER_PATH + "/" + parentName);
request.getRequestHeaders().put(Headers.ACCEPT, XID_LIST + "," + NEW_TRANSACTION);
request.getRequestHeaders().put(RECOVERY_PARENT_NAME, parentName);
request.getRequestHeaders().put(RECOVERY_FLAGS, Integer.toString(flag));

final AuthenticationConfiguration authenticationConfiguration = getAuthenticationConfiguration(targetContext.getUri());
final SSLContext sslContext;
Expand All @@ -105,9 +104,9 @@ public Xid[] recover(int flag, String parentName) throws XAException {
throw xaException;
}

targetContext.sendRequest(cr, sslContext, authenticationConfiguration,null, (result, response, closeable) -> {
targetContext.sendRequest(request, sslContext, authenticationConfiguration,null, (result, response, closeable) -> {
try {
Unmarshaller unmarshaller = targetContext.getHttpMarshallerFactory(cr).createUnmarshaller();
Unmarshaller unmarshaller = targetContext.getHttpMarshallerFactory(request).createUnmarshaller();
unmarshaller.start(new InputStreamByteInput(result));
int length = unmarshaller.readInt();
Xid[] ret = new Xid[length];
Expand Down Expand Up @@ -149,12 +148,12 @@ public Xid[] recover(int flag, String parentName) throws XAException {
public SimpleTransactionControl begin(int timeout) throws SystemException {
final CompletableFuture<Xid> beginXid = new CompletableFuture<>();

ClientRequest cr = new ClientRequest()
.setPath(targetContext.getUri().getPath() + TXN_CONTEXT + VERSION_PATH +
targetContext.getProtocolVersion() + UT_BEGIN_PATH)
.setMethod(Methods.POST);
cr.getRequestHeaders().put(Headers.ACCEPT, EXCEPTION + "," + NEW_TRANSACTION);
cr.getRequestHeaders().put(TIMEOUT, timeout);
RequestBuilder builder = new RequestBuilder().setRequestType(RequestType.UT_BEGIN).setVersion(targetContext.getProtocolVersion());
final ClientRequest request = builder.createRequest(targetContext.getUri().getPath());
request.setPath(targetContext.getUri().getPath() + TXN_CONTEXT + VERSION_PATH +
targetContext.getProtocolVersion() + UT_BEGIN_PATH);
request.getRequestHeaders().put(Headers.ACCEPT, EXCEPTION + "," + NEW_TRANSACTION);
request.getRequestHeaders().put(TIMEOUT, timeout);

final AuthenticationConfiguration authenticationConfiguration = getAuthenticationConfiguration(targetContext.getUri());
final SSLContext sslContext;
Expand All @@ -164,9 +163,9 @@ public SimpleTransactionControl begin(int timeout) throws SystemException {
throw new SystemException(e.getMessage());
}

targetContext.sendRequest(cr, sslContext, authenticationConfiguration, null, (result, response, closeable) -> {
targetContext.sendRequest(request, sslContext, authenticationConfiguration, null, (result, response, closeable) -> {
try {
Unmarshaller unmarshaller = targetContext.getHttpMarshallerFactory(cr).createUnmarshaller();
Unmarshaller unmarshaller = targetContext.getHttpMarshallerFactory(request).createUnmarshaller();
unmarshaller.start(new InputStreamByteInput(result));
int formatId = unmarshaller.readInt();
int len = unmarshaller.readInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,25 @@

package org.wildfly.httpclient.transaction;

import static org.wildfly.httpclient.common.Protocol.VERSION_PATH;
import static org.wildfly.httpclient.transaction.RequestType.XA_BEFORE_COMPLETION;
import static org.wildfly.httpclient.transaction.RequestType.XA_COMMIT;
import static org.wildfly.httpclient.transaction.RequestType.XA_FORGET;
import static org.wildfly.httpclient.transaction.RequestType.XA_PREPARE;
import static org.wildfly.httpclient.transaction.RequestType.XA_ROLLBACK;
import static org.wildfly.httpclient.transaction.TransactionConstants.EXCEPTION;
import static org.wildfly.httpclient.transaction.TransactionConstants.READ_ONLY;
import static org.wildfly.httpclient.transaction.TransactionConstants.TXN_CONTEXT;
import static org.wildfly.httpclient.transaction.TransactionConstants.XA_BC_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.XA_COMMIT_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.XA_FORGET_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.XA_PREP_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.XA_ROLLBACK_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.XID;

import io.undertow.client.ClientRequest;
import io.undertow.client.ClientResponse;
import io.undertow.util.Headers;
import io.undertow.util.Methods;
import org.jboss.marshalling.Marshaller;
import org.jboss.marshalling.Marshalling;
import org.wildfly.httpclient.common.HttpTargetContext;
Expand All @@ -37,17 +52,6 @@
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

import static org.wildfly.httpclient.common.Protocol.VERSION_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.EXCEPTION;
import static org.wildfly.httpclient.transaction.TransactionConstants.READ_ONLY;
import static org.wildfly.httpclient.transaction.TransactionConstants.TXN_CONTEXT;
import static org.wildfly.httpclient.transaction.TransactionConstants.XA_BC_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.XA_COMMIT_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.XA_FORGET_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.XA_PREP_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.XA_ROLLBACK_PATH;
import static org.wildfly.httpclient.transaction.TransactionConstants.XID;

/**
* Represents a remote subordinate transaction that is managed over HTTP protocol.
*
Expand All @@ -74,12 +78,12 @@ Xid getId() {
@Override
public void commit(boolean onePhase) throws XAException {
String operationPath = XA_COMMIT_PATH + (onePhase ? "?opc=true" : "");
processOperation(operationPath);
processOperation(XA_COMMIT, operationPath);
}

@Override
public void rollback() throws XAException {
processOperation(XA_ROLLBACK_PATH);
processOperation(XA_ROLLBACK, XA_ROLLBACK_PATH);
}

@Override
Expand All @@ -89,12 +93,12 @@ public void end(int flags) throws XAException {

@Override
public void beforeCompletion() throws XAException {
processOperation(XA_BC_PATH);
processOperation(XA_BEFORE_COMPLETION, XA_BC_PATH);
}

@Override
public int prepare() throws XAException {
boolean readOnly = processOperation(XA_PREP_PATH, (result) -> {
boolean readOnly = processOperation(XA_PREPARE, XA_PREP_PATH, (result) -> {
String header = result.getResponseHeaders().getFirst(READ_ONLY);
return header != null && Boolean.parseBoolean(header);
});
Expand All @@ -103,22 +107,23 @@ public int prepare() throws XAException {

@Override
public void forget() throws XAException {
processOperation(XA_FORGET_PATH);
processOperation(XA_FORGET, XA_FORGET_PATH);
}

private void processOperation(String operationPath) throws XAException {
processOperation(operationPath, null);
private void processOperation(RequestType requestType, String operationPath) throws XAException {
processOperation(requestType, operationPath, null);
}

private <T> T processOperation(String operationPath, Function<ClientResponse, T> resultFunction) throws XAException {
private <T> T processOperation(RequestType requestType, String operationPath, Function<ClientResponse, T> resultFunction) throws XAException {
final CompletableFuture<T> result = new CompletableFuture<>();
ClientRequest cr = new ClientRequest()
.setMethod(Methods.POST)
.setPath(targetContext.getUri().getPath() + TXN_CONTEXT + VERSION_PATH + targetContext.getProtocolVersion() + operationPath);
cr.getRequestHeaders().put(Headers.ACCEPT, EXCEPTION.toString());
cr.getRequestHeaders().put(Headers.CONTENT_TYPE, XID.toString());
targetContext.sendRequest(cr, sslContext, authenticationConfiguration, output -> {
Marshaller marshaller = targetContext.getHttpMarshallerFactory(cr).createMarshaller();

RequestBuilder builder = new RequestBuilder().setRequestType(requestType).setVersion(targetContext.getProtocolVersion());
final ClientRequest request = builder.createRequest(targetContext.getUri().getPath());
request.setPath(targetContext.getUri().getPath() + TXN_CONTEXT + VERSION_PATH + targetContext.getProtocolVersion() + operationPath);
request.getRequestHeaders().put(Headers.ACCEPT, EXCEPTION.toString());
request.getRequestHeaders().put(Headers.CONTENT_TYPE, XID.toString());
targetContext.sendRequest(request, sslContext, authenticationConfiguration, output -> {
Marshaller marshaller = targetContext.getHttpMarshallerFactory(request).createMarshaller();
marshaller.start(Marshalling.createByteOutput(output));
marshaller.writeInt(id.getFormatId());
final byte[] gtid = id.getGlobalTransactionId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ private void setRequestMethod(final ClientRequest request) {
}

private void setRequestPath(final ClientRequest request, final String prefix) {
throw new UnsupportedOperationException();
// NOOP
}

private void setRequestHeaders(final ClientRequest request) {
throw new UnsupportedOperationException();
// NOOP
}

}