Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
hmottestad committed May 24, 2023
1 parent fb1f6eb commit b4dcc41
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public abstract class AbstractSailConnection implements SailConnection {
private final AbstractSail sailBase;

private volatile boolean txnActive;
private static final VarHandle TXN_ACTIVE;

private volatile boolean txnPrepared;

Expand All @@ -91,16 +92,6 @@ public abstract class AbstractSailConnection implements SailConnection {
private boolean isOpen = true;
private static final VarHandle IS_OPEN;

static {
try {
IS_OPEN = MethodHandles.lookup()
.in(AbstractSailConnection.class)
.findVarHandle(AbstractSailConnection.class, "isOpen", boolean.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}

/**
* Lock used to prevent concurrent calls to update methods like addStatement, clear, commit, etc. within a
* transaction.
Expand Down Expand Up @@ -138,17 +129,20 @@ public abstract class AbstractSailConnection implements SailConnection {

private IsolationLevel transactionIsolationLevel;

// used to decide if we need to call flush()
// Used to decide if we need to call flush(). Use the VarHandles below in relase/acquire mode instead.
private volatile boolean statementsAdded;
private volatile boolean statementsRemoved;

private static final VarHandle STATEMENTS_ADDED;
private static final VarHandle STATEMENTS_REMOVED;

/*--------------*
* Constructors *
*--------------*/

public AbstractSailConnection(AbstractSail sailBase) {
this.sailBase = sailBase;
txnActive = false;
TXN_ACTIVE.setRelease(this, false);
if (debugEnabled) {
activeIterationsDebug = new ConcurrentHashMap<>();
} else {
Expand Down Expand Up @@ -177,7 +171,7 @@ protected void verifyIsOpen() throws SailException {
* @throws SailException if no transaction is active.
*/
protected void verifyIsActive() throws SailException {
if (!isActive()) {
if (!((boolean) TXN_ACTIVE.getAcquire(this))) {
throw new SailException("No active transaction");
}
}
Expand Down Expand Up @@ -215,7 +209,7 @@ public void begin(IsolationLevel isolationLevel) throws SailException {
}

startTransactionInternal();
txnActive = true;
TXN_ACTIVE.setRelease(this, true);
} finally {
updateLock.unlock();
}
Expand Down Expand Up @@ -268,15 +262,15 @@ public final void close() throws SailException {
try {
forceCloseActiveOperations();

if (txnActive) {
if ((boolean) TXN_ACTIVE.getAcquire(this)) {
logger.warn("Rolling back transaction due to connection close",
debugEnabled ? new Throwable() : null);
try {
// Use internal method to avoid deadlock: the public
// rollback method will try to obtain a connection lock
rollbackInternal();
} finally {
txnActive = false;
TXN_ACTIVE.setRelease(this, false);
txnPrepared = false;
}
}
Expand Down Expand Up @@ -417,7 +411,7 @@ public final long size(Resource... contexts) throws SailException {
}

protected final boolean transactionActive() {
return txnActive;
return (boolean) TXN_ACTIVE.getAcquire(this);
}

/**
Expand Down Expand Up @@ -459,7 +453,7 @@ public final void prepare() throws SailException {

updateLock.lock();
try {
if (txnActive) {
if ((boolean) TXN_ACTIVE.getAcquire(this)) {
prepareInternal();
txnPrepared = true;
}
Expand Down Expand Up @@ -489,12 +483,12 @@ public final void commit() throws SailException {

updateLock.lock();
try {
if (txnActive) {
if ((boolean) TXN_ACTIVE.getAcquire(this)) {
if (!txnPrepared) {
prepareInternal();
}
commitInternal();
txnActive = false;
TXN_ACTIVE.setRelease(this, false);
txnPrepared = false;
}
} finally {
Expand Down Expand Up @@ -525,11 +519,11 @@ public final void rollback() throws SailException {

updateLock.lock();
try {
if (txnActive) {
if ((boolean) TXN_ACTIVE.getAcquire(this)) {
try {
rollbackInternal();
} finally {
txnActive = false;
TXN_ACTIVE.setRelease(this, false);
txnPrepared = false;
}
} else {
Expand All @@ -553,7 +547,8 @@ public final void addStatement(Resource subj, IRI pred, Value obj, Resource... c
flushPendingUpdates();
}
addStatement(null, subj, pred, obj, contexts);
statementsAdded = true;

STATEMENTS_ADDED.setRelease(this, true);
}

@Override
Expand All @@ -562,7 +557,7 @@ public final void removeStatements(Resource subj, IRI pred, Value obj, Resource.
flushPendingUpdates();
}
removeStatement(null, subj, pred, obj, contexts);
statementsRemoved = true;
STATEMENTS_REMOVED.setRelease(this, true);
}

@Override
Expand Down Expand Up @@ -604,7 +599,7 @@ public void addStatement(UpdateContext op, Resource subj, IRI pred, Value obj, R
startUpdate(op);
}
}
statementsAdded = true;
STATEMENTS_ADDED.setRelease(this, true);
}

/**
Expand Down Expand Up @@ -632,7 +627,7 @@ public void removeStatement(UpdateContext op, Resource subj, IRI pred, Value obj
startUpdate(op);
}
}
statementsRemoved = true;
STATEMENTS_REMOVED.setRelease(this, true);
}

@Override
Expand Down Expand Up @@ -703,7 +698,7 @@ public final void clear(Resource... contexts) throws SailException {
try {
verifyIsActive();
clearInternal(contexts);
statementsRemoved = true;
STATEMENTS_REMOVED.setRelease(this, true);
} finally {
updateLock.unlock();
}
Expand Down Expand Up @@ -836,19 +831,20 @@ public final void clearNamespaces() throws SailException {

@Override
public boolean pendingRemovals() {
return statementsRemoved;
return (boolean) STATEMENTS_REMOVED.getAcquire(this);

}

protected boolean pendingAdds() {
return statementsAdded;
return (boolean) STATEMENTS_ADDED.getAcquire(this);
}

protected void setStatementsAdded() {
statementsAdded = true;
STATEMENTS_ADDED.setRelease(this, true);
}

protected void setStatementsRemoved() {
statementsRemoved = true;
STATEMENTS_REMOVED.setRelease(this, true);
}

@Deprecated(forRemoval = true)
Expand Down Expand Up @@ -992,9 +988,10 @@ private void forceCloseActiveOperations() throws SailException {
* @throws SailException
*/
private void flushPendingUpdates() throws SailException {
if ((statementsAdded || statementsRemoved) && isActive()) {
if ((pendingAdds() || pendingRemovals()) && isActive()) {
if (isActive()) {
synchronized (this) {
// we are inside a synchornized block so there isn't much point using the VarHandle
if ((statementsAdded || statementsRemoved) && isActive()) {
flush();
statementsAdded = false;
Expand Down Expand Up @@ -1128,4 +1125,44 @@ public synchronized void release() {
}
}
}

static {
try {
IS_OPEN = MethodHandles.lookup()
.in(AbstractSailConnection.class)
.findVarHandle(AbstractSailConnection.class, "isOpen", boolean.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}

static {
try {
TXN_ACTIVE = MethodHandles.lookup()
.in(AbstractSailConnection.class)
.findVarHandle(AbstractSailConnection.class, "txnActive", boolean.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}

static {
try {
STATEMENTS_REMOVED = MethodHandles.lookup()
.in(AbstractSailConnection.class)
.findVarHandle(AbstractSailConnection.class, "statementsRemoved", boolean.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}

static {
try {
STATEMENTS_ADDED = MethodHandles.lookup()
.in(AbstractSailConnection.class)
.findVarHandle(AbstractSailConnection.class, "statementsAdded", boolean.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class FileIO {

public static final int INF_QUAD_MARKER = 5;

public static final int URI_MARKER = 6;
public static final int IRI_MARKER = 6;

public static final int BNODE_MARKER = 7;

Expand Down Expand Up @@ -270,7 +270,7 @@ private void readStatement(boolean hasContext, boolean isExplicit, DataInputStre

private void writeValue(Value value, DataOutputStream dataOut) throws IOException {
if (value.isIRI()) {
dataOut.writeByte(URI_MARKER);
dataOut.writeByte(IRI_MARKER);
writeString(((IRI) value).stringValue(), dataOut);
} else if (value.isBNode()) {
dataOut.writeByte(BNODE_MARKER);
Expand Down Expand Up @@ -301,9 +301,9 @@ private void writeValue(Value value, DataOutputStream dataOut) throws IOExceptio
private Value readValue(DataInputStream dataIn) throws IOException, ClassCastException {
int valueTypeMarker = dataIn.readByte();

if (valueTypeMarker == URI_MARKER) {
String uriString = readString(dataIn);
return vf.createIRI(uriString);
if (valueTypeMarker == IRI_MARKER) {
String iriString = readString(dataIn);
return vf.createIRI(iriString);
} else if (valueTypeMarker == BNODE_MARKER) {
String bnodeID = readString(dataIn);
return vf.createBNode(bnodeID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private int minStatementCount(Value subj, Value pred, Value obj, Value context)
}

if (pred != null) {
MemIRI memPred = valueFactory.getMemURI((IRI) pred);
MemIRI memPred = valueFactory.getMemIRI((IRI) pred);
if (memPred != null) {
minListSizes = Math.min(minListSizes, memPred.getPredicateStatementCount());
if (minListSizes == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private CloseableIteration<MemStatement, SailException> createStatementIterator(
return EMPTY_ITERATION;
}

MemIRI memPred = valueFactory.getMemURI(pred);
MemIRI memPred = valueFactory.getMemIRI(pred);
if (pred != null && memPred == null) {
// non-existent predicate
return EMPTY_ITERATION;
Expand Down Expand Up @@ -345,7 +345,7 @@ private CloseableIteration<MemTriple, SailException> createTripleIterator(Resour
return EMPTY_TRIPLE_ITERATION;
}

MemIRI memPred = valueFactory.getMemURI(pred);
MemIRI memPred = valueFactory.getMemIRI(pred);
if (pred != null && memPred == null) {
// non-existent predicate
return EMPTY_TRIPLE_ITERATION;
Expand Down Expand Up @@ -592,7 +592,7 @@ public String toString() {
} else {
sb.append("inferred ");
}
if (txnLock) {
if ((boolean) TXN_LOCK.getAcquire(this)) {
sb.append("snapshot ").append(nextSnapshot);
} else {
sb.append(super.toString());
Expand Down Expand Up @@ -635,7 +635,7 @@ public synchronized void prepare() throws SailException {

@Override
public synchronized void flush() throws SailException {
if (txnLock) {
if ((boolean) TXN_LOCK.getAcquire(this)) {
invalidateCache();
currentSnapshot = Math.max(currentSnapshot, nextSnapshot);
if (requireCleanup) {
Expand All @@ -653,8 +653,8 @@ public void close() {
reservedSnapshot.release();
}
} finally {
boolean toCloseTxnLock = txnLock;
txnLock = false;
boolean toCloseTxnLock = (boolean) TXN_LOCK.getAcquire(this);
TXN_LOCK.setRelease(this, false);
if (toCloseTxnLock) {
txnLockManager.unlock();
}
Expand Down Expand Up @@ -792,7 +792,7 @@ private void innerDeprecate(Statement statement, int nextSnapshot) {
}

private void acquireExclusiveTransactionLock() throws SailException {
if (!txnLock) {
if (!(boolean) TXN_LOCK.getAcquire(this)) {
synchronized (this) {
if (!txnLock) {
txnLockManager.lock();
Expand All @@ -812,7 +812,7 @@ private MemStatement addStatement(Resource subj, IRI pred, Value obj, Resource c

// Get or create MemValues for the operands
MemResource memSubj = valueFactory.getOrCreateMemResource(subj);
MemIRI memPred = valueFactory.getOrCreateMemURI(pred);
MemIRI memPred = valueFactory.getOrCreateMemIRI(pred);
MemValue memObj = valueFactory.getOrCreateMemValue(obj);
MemResource memContext = context == null ? null : valueFactory.getOrCreateMemResource(context);

Expand Down Expand Up @@ -1118,7 +1118,7 @@ public ReservedSnapshot reserve(int snapshot, Object reservedBy) {
}
}

LongAdder longAdder = activeSnapshots.computeIfAbsent(snapshot, (k) -> new LongAdder());
LongAdder longAdder = activeSnapshots.computeIfAbsent(snapshot, k -> new LongAdder());
longAdder.increment();

return new ReservedSnapshot(snapshot, reservedBy, debug, longAdder, activeSnapshots,
Expand Down Expand Up @@ -1162,7 +1162,7 @@ public ReservedSnapshot(int snapshot, Object reservedBy, boolean debug,
this.frequency = frequency;
this.highestEverReservedSnapshot = highestEverReservedSnapshot;
cleanable = cleaner.register(reservedBy, () -> {
int tempSnapshot = ((int) SNAPSHOT.getVolatile(this));
int tempSnapshot = (int) SNAPSHOT.getVolatile(this);
if (tempSnapshot != SNAPSHOT_RELEASED) {
String message = "Releasing MemorySailStore snapshot {} which was reserved and never released (possibly unclosed MemorySailDataset or MemorySailSink).";
if (stackTraceForDebugging != null) {
Expand Down Expand Up @@ -1201,4 +1201,17 @@ public void release() {

}
}

private static final VarHandle TXN_LOCK;

static {
try {
TXN_LOCK = MethodHandles.lookup()
.in(MemorySailSink.class)
.findVarHandle(MemorySailSink.class, "txnLock", boolean.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}

}
Loading

0 comments on commit b4dcc41

Please sign in to comment.