-
Notifications
You must be signed in to change notification settings - Fork 40
Shared Task Chains
Shared Chains are an instance of a TaskChain, that's bound to another TaskChain, where the execution of each chain is controlled as elements of a backing chain. For example (in a more psuedocode format):
newSharedChain("test")
(async) -> print("Hello")
(delay) -> 10 seconds
(async) -> print("World")
newSharedChain("test")
(async) -> print("Foo")
(delay) -> 10 seconds
(async) -> print("Bar")
If these were not shared chains, the output would of been:
Hello
Foo
wait 10
World
Bar
But instead, it will be:
Hello
wait 10
World
Foo
wait 10
Bar
That's because a shared chain is equivalent to:
newChain()
(asyncCallback:done)
newChain()
(async) -> print("Hello")
(delay) -> 10 seconds
(async) -> print("World")
(async) -> done()
(execute)
(asyncCallback:done)
newChain()
(async) -> print("Foo")
(delay) -> 10 seconds
(async) -> print("Bar")
(async) -> done()
(execute)
(execute)
Say you have some code you want to run asynchronous, such as saving data to a database.
If you dispatch a task to write data to a database, and at the same time you dispatch a task to read that same data, you are not guaranteed that the updated value is what you are going to be returned! You could solve this by writing your own synchronization, but now you'll have locks held for long periods of time or many other messy solutions.
TaskChain simplifies all this. If you can simply give your shared actions a name to represent them, you can ensure one finishes before the other starts.
For example, with my code that saves players inventory to a database, I have this:
public void save() {
final String title = inventory.getTitle();
Empire.newSharedChain(chainName)
.syncFirst(inventory::getStorageContents)
.asyncLast( (storageContents) -> {
String json = Serialization.serializeInventory(title, storageContents.length, storageContents);
if (json == null) {
logError("Package failed to serialize..." + packageId + Arrays.toString(storageContents));
return;
}
try {
if (json.equals(serialized)) {
return;
}
Util.logDebug("Saving package " + packageId);
json = Compression.compress(json);
if (json != null) {
EmpireDb.executeUpdate("UPDATE packages SET data = ?, last_accessed = UNIX_TIMESTAMP() WHERE id = ?", json, packageId);
} else {
logError("Package failed to compress..." + packageId);
}
} catch (Exception ex) {
Util.printException("Exception in package save" + packageId, ex);
}
}).execute();
}
I define chainName as
this.chainName = "PACKAGE_" + packageId;
When a package is opened, it is locked to that server so that another server can not open it and manipulate it. If a player was to close a package (and triggered an async save, and an async unlock), then immediately try to re-open it, we need to ensure that the open occurs AFTER the previous save and unlock has finished!
So we have the following:
public synchronized void open(Player player, Consumer<OpenedPackage> cb) {
Empire.newSharedChain(chainName).async( () -> {
if (openedPackage == null) {
openedPackage = OpenedPackage.tryOpen(player, this);
}
cb.accept(openedPackage);
}).execute();
}
void unlockPackage() {
Empire.newSharedChain(chainName).async( () -> {
try {
EmpireDb.executeUpdate("UPDATE packages SET open_on = 0 WHERE id = ?", packageId);
} catch (SQLException e) {
Util.printException(e);
}
}).execute();
}
By using Shared Chains, we are able to absolutely guarantee no package will be unlocked until all pending async operations on it has finished. We also guarantee that the unlock can never occur before the final save operation too.