diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..2421e38 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,8 @@ +{ + "files.exclude": { + "**/.classpath": true, + "**/.project": true, + "**/.settings": true, + "**/.factorypath": true + } +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..6d66652 --- /dev/null +++ b/README.md @@ -0,0 +1,131 @@ +# ThingWorx Concurrency Extension + +The aim of this extension it's to provide the Swiss Tool for Concurrency in ThingWorx. + +## Contents + +- [How It Works](#how-it-works) +- [Compatibility](#compatibility) +- [Installation](#installation) +- [Usage](#usage) + - [Mutex](#mutex) + - [Counter](#counter) + - [Concurrency Script Functions](#concurrency-script-functions) +- [Build](#build) +- [Acknowledgments](#acknowledgments) +- [Author](#author) + + +## How It Works + +It publishes standard Java concurrency features in order to be used easily on ThingWorx Server Side Javascript. Also, it may try to solve +typical concurrency problems like doing an autoincrement counter. + +## Compatibility + +ThingWorx 7.3 and above. It's set to minimum ThingWorx 6.5 and built with ThingWorx 6.5 SDK but I didn't tested with it. + +## Installation + +Import the extension (ConcurrencyExtension.zip) with ThingWorx Composer Import Extension feature. + +## Usage + +### Mutex + +We implemented a mutex ThingShape with Java ReentrantLook with fair execution enabled which allows to synchronize and block thread execution. +Blocking it's done at Thing's level, et means each Thing which implements the wupMutexTS ThingShape has it's own ReentrantLook. + +#### Mutex Usage Samples + +Just add the wupMutexTS to the Thing or ThingTemplate to whom you want to add mutex blocking features. + +In order to lock a piece of code in Javascript, and ensure that only one thread its entering on it at a time: + +```javascript +me.Lock_wupMutexTS(); +try { + // -- whatever code that needs to be mutex +} finally { + me.Unlock_wupMutexTS(); +} +``` +You can also tryLock a piece of code, in order to allow one thread and only one and discard the others. +For instance it may be interesting if you have a timer which triggers once in a while and you don't want that two +consecutive triggers are executed at the same time: + +```javascript +if (me.TryLock_wupMutexTS()===true) { + try { + // -- whatever code that needs to be mutex + } finally { + me.Unlock_wupMutexTS(); + } +} else { + // -- The lock was already got and previous code its skipped +} +``` + +You can create more than one mutex per thing, all wupMutexTS services has an optional "id" parameter, which allows to create a more quirurgic mutex. +Each different passed "id" will create its own ReentrantLook. Sample with previous code but with a specific lock for one specific timer. + +```javascript +if (me.TryLock_wupMutexTS({ id: "timer1" })===true) { + try { + // -- whatever code that needs to be mutex + } finally { + me.Unlock_wupMutexTS({ id: "timer1" }); + } +} else { + // -- The lock was already got and previous code it's skipped +} +``` + +### Counter + +A thread safe "autoincrement" ThingShape. It provides a "counter" property and the corresponding services in order to increase (one by one) it's value. + +#### Counter Usage Samples + +To increase the counter value, no worries about having any amount of threads incrementing the property value, all will get it's own and unique value: + +```javascript +var newCounterValue = me.Increase_wupCounterTS(); +``` + +You can reset or reset counter value with Set_wupCounterTS method: + +```javascript +me.Set_wupCounterTS({ value: 0 }); +``` + +The counter it's stored and update on a persistent property named: counter_wupCounterTS + +### Concurrency Script Functions + +List of script helper functions related with this concurrency extension. This services should go on a subsystem like entity, but subsystems on ThingWorx can't be created through extensions :( + +#### GetTotalActiveLocks_wupMutexTS + +Returns the total active locks, in the whole ThingWorx running system. + +#### GetTotalActiveWaiting_wupMutexTS + +Returns the total active threads which are waiting on a lock, in the whole ThingWorx running system. + +#### GetTotalThingsLocksUsage_wupMutexTS + +Returns the total number of mutex created on Things (ReentranLocks), in the whole ThingWorx running system since last start. + +## Build + +If you need to build it, it's built with ant and java 8 on a MacOS, the scripts are on the repository. Version change it's done by hand and should be automated. + +## Acknowledgments + +I've started from the [code](https://community.ptc.com/t5/ThingWorx-Developers/Concurrency-Synchronisation-ConcurrentModificationException/m-p/624921) posted by [@antondorf](https://community.ptc.com/t5/user/viewprofilepage/user-id/290654) on the ThingWorx Developer Community. + + +## Author + +[Carles Coll Madrenas](https://linkedin.com/in/carlescoll) diff --git a/bin/ConcurrencyExtension.jar b/bin/ConcurrencyExtension.jar new file mode 100644 index 0000000..739f6bc Binary files /dev/null and b/bin/ConcurrencyExtension.jar differ diff --git a/bin/classes/com/wup/wupBaseThingShape.class b/bin/classes/com/wup/wupBaseThingShape.class new file mode 100644 index 0000000..c92d3fc Binary files /dev/null and b/bin/classes/com/wup/wupBaseThingShape.class differ diff --git a/bin/classes/com/wup/wupConcurrencySC.class b/bin/classes/com/wup/wupConcurrencySC.class new file mode 100644 index 0000000..d60396e Binary files /dev/null and b/bin/classes/com/wup/wupConcurrencySC.class differ diff --git a/bin/classes/com/wup/wupCounterTS.class b/bin/classes/com/wup/wupCounterTS.class new file mode 100644 index 0000000..9225c21 Binary files /dev/null and b/bin/classes/com/wup/wupCounterTS.class differ diff --git a/bin/classes/com/wup/wupMutexTS.class b/bin/classes/com/wup/wupMutexTS.class new file mode 100644 index 0000000..e0007b4 Binary files /dev/null and b/bin/classes/com/wup/wupMutexTS.class differ diff --git a/build.xml b/build.xml new file mode 100755 index 0000000..038791f --- /dev/null +++ b/build.xml @@ -0,0 +1,116 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + + +
+
+ + +
+
+ + + + + + + + + + + + + + + + + +
+ + + + + + + + + + + + + + diff --git a/compile_pack_extension.command b/compile_pack_extension.command new file mode 100755 index 0000000..00a5031 --- /dev/null +++ b/compile_pack_extension.command @@ -0,0 +1,4 @@ +echo -n -e "\033]0;Packing ConcurrencySEE\007" +cd "`dirname "$0"`" +ant +#osascript -e 'tell application "Terminal" to close (every window whose name contains "Packing ConcurrencySEE")' & \ No newline at end of file diff --git a/configfiles/metadata.xml b/configfiles/metadata.xml new file mode 100644 index 0000000..85c82d9 --- /dev/null +++ b/configfiles/metadata.xml @@ -0,0 +1,52 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/lib/logback-classic-1.0.13.jar b/lib/logback-classic-1.0.13.jar new file mode 100644 index 0000000..80bf5d1 Binary files /dev/null and b/lib/logback-classic-1.0.13.jar differ diff --git a/lib/logback-core-1.0.13.jar b/lib/logback-core-1.0.13.jar new file mode 100644 index 0000000..568ccfa Binary files /dev/null and b/lib/logback-core-1.0.13.jar differ diff --git a/lib/slf4j-api-1.7.6.jar b/lib/slf4j-api-1.7.6.jar new file mode 100644 index 0000000..816d8ee Binary files /dev/null and b/lib/slf4j-api-1.7.6.jar differ diff --git a/lib/thingworx-all-6.5.0-b460.jar b/lib/thingworx-all-6.5.0-b460.jar new file mode 100644 index 0000000..31b334e Binary files /dev/null and b/lib/thingworx-all-6.5.0-b460.jar differ diff --git a/src/com/wup/wupBaseThingShape.java b/src/com/wup/wupBaseThingShape.java new file mode 100644 index 0000000..4202a39 --- /dev/null +++ b/src/com/wup/wupBaseThingShape.java @@ -0,0 +1,40 @@ +package com.wup; + +import org.slf4j.Logger; + +import com.thingworx.things.Thing; +import com.thingworx.thingshape.ThingShape; +import com.thingworx.logging.LogUtilities; +import com.thingworx.webservices.context.ThreadLocalContext; + +public class wupBaseThingShape extends ThingShape { + + private static final long serialVersionUID = 1L; + + private static Logger _logger = LogUtilities.getInstance().getApplicationLogger(wupBaseThingShape.class); + + + protected Thing getMe() throws Exception { + final Object meObj = ThreadLocalContext.getMeContext(); + if (meObj instanceof Thing) { + return (Thing) meObj; + } else { + this.logError("getMe() Cannot cast me to Thing."); + throw new Exception("Cannot cast me to Thing"); + } + } + + protected String getMeName() throws Exception { + final Thing me = this.getMe(); + return me.getName(); + } + + protected void logError(String text) { + try { + _logger.error("[wupBaseThingShape("+this.getMeName()+")]."+text); + } catch(Exception e) { + + } + } + +} diff --git a/src/com/wup/wupConcurrencySC.java b/src/com/wup/wupConcurrencySC.java new file mode 100644 index 0000000..b85f1b5 --- /dev/null +++ b/src/com/wup/wupConcurrencySC.java @@ -0,0 +1,45 @@ +package com.wup; + +import com.thingworx.metadata.annotations.ThingworxServiceDefinition; +import com.thingworx.metadata.annotations.ThingworxServiceResult; + + +public class wupConcurrencySC { + + @ThingworxServiceDefinition( + name = "GetTotalActiveLocks_wupMutexTS", + description = "Returns the total active locks in the whole ThingWorx running system.", + category = "WUP", + isAllowOverride = false, + aspects = {"isAsync:false" } + ) + @ThingworxServiceResult(name = "result", description = "The total ammount.", baseType = "LONG", aspects = {}) + public static long GetTotalActiveLocks_wupMutexTS() { + return wupMutexTS.getTotalActiveLocks(); + } + + @ThingworxServiceDefinition( + name = "GetTotalActiveWaiting_wupMutexTS", + description = "Returns the total active threads which are waiting on a lock in the whole ThingWorx running system.", + category = "WUP", + isAllowOverride = false, + aspects = {"isAsync:false" } + ) + @ThingworxServiceResult(name = "result", description = "The total ammount.", baseType = "LONG", aspects = {}) + public static long GetTotalActiveWaiting_wupMutexTS() { + return wupMutexTS.getTotalActiveWaiting(); + } + + @ThingworxServiceDefinition( + name = "GetTotalThingsLocksUsage_wupMutexTS", + description = "Returns the total number of mutex created on Things (ReentranLocks), in the whole ThingWorx running system since last start.", + category = "WUP", + isAllowOverride = false, + aspects = {"isAsync:false" } + ) + @ThingworxServiceResult(name = "result", description = "The total ammount.", baseType = "LONG", aspects = {}) + public static long GetTotalThingsLocksUsage_wupMutexTS() { + return wupMutexTS.getTotalThingsLocksUsage(); + } + +} diff --git a/src/com/wup/wupCounterTS.java b/src/com/wup/wupCounterTS.java new file mode 100644 index 0000000..090d581 --- /dev/null +++ b/src/com/wup/wupCounterTS.java @@ -0,0 +1,108 @@ +package com.wup; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ConcurrentHashMap; +import com.thingworx.metadata.annotations.ThingworxServiceDefinition; +import com.thingworx.metadata.annotations.ThingworxServiceParameter; +import com.thingworx.metadata.annotations.ThingworxServiceResult; +import com.thingworx.metadata.annotations.ThingworxPropertyDefinition; +import com.thingworx.metadata.annotations.ThingworxPropertyDefinitions; +import com.thingworx.things.Thing; +import com.thingworx.types.primitives.LongPrimitive; + +@ThingworxPropertyDefinitions(properties = { + @ThingworxPropertyDefinition(name= wupCounterTS.PROPERTY_COUNTER, description="Counter atomic 'autoincrement' property. Don't update the value, at next increase will get previous value +1.", baseType="LONG", aspects={"isPersistent:true","defaultValue:0"}) +}) + +public class wupCounterTS extends wupBaseThingShape { + + private static final long serialVersionUID = 1L; + + // -- This will create zombie counters if Things are deleted, + // -- on server restart they will be freed, we may need to + // -- implement a garbage collector... But deleting things it's + // -- not a normal behaviour on ThingWorx. + private static final ConcurrentHashMap _instanceCounter = new ConcurrentHashMap<>(); + + public static final String PROPERTY_COUNTER = "counter_wupCounterTS"; + + public static int getTotalThingsCounterUsage() { + return wupCounterTS._instanceCounter.size(); + } + + private AtomicLong getInstanceCounter() throws Exception { + final Thing me = this.getMe(); + final String meName = me.getName(); + AtomicLong counter = wupCounterTS._instanceCounter.get(meName); + if (counter == null) { + synchronized(me) { + long currentValue = (long)me.getPropertyValue(wupCounterTS.PROPERTY_COUNTER).getValue(); + counter = wupCounterTS._instanceCounter.computeIfAbsent(meName, k -> new AtomicLong(currentValue)); + } + } + return counter; + } + + @ThingworxServiceDefinition( + name = "Increase_wupCounterTS", + description = "Increase an return counter value.", + category = "WUP", + isAllowOverride = false, + aspects = {"isAsync:false" } + ) + @ThingworxServiceResult(name = "result", description = "The increased value", baseType = "LONG", aspects = {}) + public long Increase_wupCounterTS() throws Exception { + final AtomicLong meCounter = this.getInstanceCounter(); + if (meCounter != null) { + long newValue; + while(true) { + long existingValue = meCounter.get(); + newValue = existingValue + 1; + if(meCounter.compareAndSet(existingValue, newValue)) { + break; + } + } + // -- Let's set current property value. + final Thing me = this.getMe(); + synchronized(me) { + long currentValue = (long)me.getPropertyValue(wupCounterTS.PROPERTY_COUNTER).getValue(); + if (newValue>currentValue) { + // -- We can have various consurrent value increases and desordered, hence we will only write if new + // -- value it's bigger than previous one + me.setPropertyValue(wupCounterTS.PROPERTY_COUNTER,new LongPrimitive(newValue)); + } + } + return newValue; + } + + throw new Exception("Increase_wupCounterTS/Cannot get instance Counter"); + } + + @ThingworxServiceDefinition( + name = "Set_wupCounterTS", + description = "Set/Reset counter value.", + category = "WUP", + isAllowOverride = false, + aspects = {"isAsync:false" } + ) + public void Set_wupCounterTS( + @ThingworxServiceParameter (name = "value", + description = "Value to set the counter, usually used to reset to 0 or alike.", + baseType = "LONG", + aspects={"isRequired:false", + "defaultValue:0" + }) final Long value) throws Exception { + final AtomicLong meCounter = this.getInstanceCounter(); + if (meCounter != null) { + // -- Let's set current property value. + final Thing me = this.getMe(); + synchronized(me) { + meCounter.set((long)value); + me.setPropertyValue(wupCounterTS.PROPERTY_COUNTER,new LongPrimitive(value)); + } + } else { + throw new Exception("Set_wupCounterTS/Cannot get instance Counter"); + } + } + +} diff --git a/src/com/wup/wupMutexTS.java b/src/com/wup/wupMutexTS.java new file mode 100644 index 0000000..c98f407 --- /dev/null +++ b/src/com/wup/wupMutexTS.java @@ -0,0 +1,208 @@ +package com.wup; + +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ConcurrentHashMap; +import com.thingworx.metadata.annotations.ThingworxServiceDefinition; +import com.thingworx.metadata.annotations.ThingworxServiceParameter; +import com.thingworx.metadata.annotations.ThingworxServiceResult; + +public class wupMutexTS extends wupBaseThingShape { + + private static final long serialVersionUID = 1L; + + // -- Only one Mutex instance for the whole system and thing, event if ThingsRestart + // -- we don't kill mutex as maybe there's processes executing on the background. + // -- This will create zombie mutex if Things are deleted, + // -- on server restart they will be freed, we may need to + // -- implement a garbage collector... But deleting things it's + // -- not a normal behaviour on ThingWorx. + private static final ConcurrentHashMap _instanceMtx = new ConcurrentHashMap<>(); + + private static AtomicInteger activeLocks = new AtomicInteger(0); + private static AtomicInteger activeWaiting = new AtomicInteger(0); + + private static void incrementLocks() { + while(true) { + int existingValue = wupMutexTS.activeLocks.get(); + int newValue = existingValue + 1; + if(wupMutexTS.activeLocks.compareAndSet(existingValue, newValue)) { + return; + } + } + } + + private static void decrementLocks() { + while(true) { + int existingValue = wupMutexTS.activeLocks.get(); + int newValue = existingValue - 1; + if(wupMutexTS.activeLocks.compareAndSet(existingValue, newValue)) { + return; + } + } + } + private static void incrementWaiting() { + while(true) { + int existingValue = wupMutexTS.activeWaiting.get(); + int newValue = existingValue + 1; + if(wupMutexTS.activeWaiting.compareAndSet(existingValue, newValue)) { + return; + } + } + } + + private static void decrementWaiting() { + while(true) { + int existingValue = wupMutexTS.activeWaiting.get(); + int newValue = existingValue - 1; + if(wupMutexTS.activeWaiting.compareAndSet(existingValue, newValue)) { + return; + } + } + } + + public static int getTotalActiveLocks() { + return wupMutexTS.activeLocks.get(); + } + public static int getTotalActiveWaiting() { + return wupMutexTS.activeWaiting.get(); + } + + public static int getTotalThingsLocksUsage() { + return wupMutexTS._instanceMtx.size(); + } + + + private ReentrantLock getInstanceMutex(String id/*optional*/) throws Exception { + String mutexId = this.getMeName(); + if (id!=null) { + if (!id.equals("")) mutexId = mutexId+"/"+id; + } + ReentrantLock meMtx = wupMutexTS._instanceMtx.get(mutexId); + if (meMtx == null) { + meMtx = wupMutexTS._instanceMtx.computeIfAbsent(mutexId, k -> new ReentrantLock(true)); + } + return meMtx; + } + + + @ThingworxServiceDefinition( + name = "Lock_wupMutexTS", + description = "Get a exclusive Lock for this thing. Recomended usage:\n "+ + " me.Lock_wupMutexTS(); \n"+ + " try {\n"+ + " // -- whatever code that needs to be mutex \n"+ + " } finally { \n"+ + " me.Unlock_wupMutexTS(); \n"+ + "}", + category = "WUP", + isAllowOverride = false, + aspects = {"isAsync:false" } + ) + @ThingworxServiceResult(name = "result", description = "", baseType = "NOTHING", aspects = {}) + public void Lock_wupMutexTS( + @ThingworxServiceParameter(name = "id", + description = "Optional Mutex Name in order to have more than one Mutex for the same thing, for instance to be more quirurgic on the blocking condition.", + baseType = "STRING", + aspects={"isRequired:false"} + ) final String id + ) throws Exception { + final ReentrantLock meMtx = this.getInstanceMutex(id); + if (meMtx != null) { + wupMutexTS.incrementWaiting(); + meMtx.lock(); + // -- we must ensure that the lock it's returned, otherwise we must unlock here. + try { + wupMutexTS.decrementWaiting(); + wupMutexTS.incrementLocks(); + } catch(Exception e) { + meMtx.unlock(); + throw new Exception("Lock_wupMutexTS/Failed to to additional steps, waiting counter maybe corrupted."); + } + } else { + throw new Exception("Lock_wupMutexTS/Cannot get instance Mutex"); + } + } + + @ThingworxServiceDefinition(name = "TryLock_wupMutexTS", description = "Get a exclusive Lock for this thing with or without a timout.", category = "WUP", isAllowOverride = false, aspects = { + "isAsync:false" }) + @ThingworxServiceResult(name = "result", description = "Returns true if the lock was acquired, false otherwise. If -1, does a tryLock without a timeout", baseType = "BOOLEAN", aspects = {}) + public Boolean TryLock_wupMutexTS( + @ThingworxServiceParameter(name = "id", + description = "Optional Mutex Name in order to have more than one Mutex for the same thing, for instance to be more quirurgic on the blocking condition.", + baseType = "STRING", + aspects={"isRequired:false"} + ) final String id , + @ThingworxServiceParameter(name = "timeOut", + description = "Timeout in milliseconds. Default = -1 if you don't want a timeout -> only one it's allowed others are discarded.", + baseType = "LONG", + aspects={"isRequired:false", + "defaultValue:-1" + }) final Long timeOut + ) throws Exception { + final ReentrantLock meMtx = this.getInstanceMutex(id); + if (meMtx != null) { + final Boolean result; + Boolean incremented = false; + if (((long)timeOut)<0) { + result = meMtx.tryLock(); + } else { + incremented = true; + wupMutexTS.incrementWaiting(); + result = meMtx.tryLock((long) timeOut, TimeUnit.MILLISECONDS); + } + + if (result==true) { + // -- we must ensure that the lock it's returned, otherwise we must unlock here. + try{ + if (incremented==true) wupMutexTS.decrementWaiting(); + wupMutexTS.incrementLocks(); + } catch(Exception e) { + meMtx.unlock(); + throw new Exception("TryLock_wupMutexTS/Failed to do additional steps, waiting counter maybe corrupted."); + } + } + return result; + } else { + throw new Exception("TryLock_wupMutexTS/Cannot get instance Mutex"); + } + } + + @ThingworxServiceDefinition(name = "Unlock_wupMutexTS", description = "Freeds the current lock for the thing.", category = "WUP", isAllowOverride = false, aspects = { + "isAsync:false" }) + @ThingworxServiceResult(name = "result", description = "", baseType = "NOTHING", aspects = {}) + public void Unlock_wupMutexTS( + @ThingworxServiceParameter(name = "id", + description = "Optional Mutex Name in order to have more than one Mutex for the same thing, for instance to be more quirurgic on the blocking condition.", + baseType = "STRING", + aspects={"isRequired:false"} + ) final String id + ) throws Exception { + final ReentrantLock meMtx = this.getInstanceMutex(id); + if (meMtx != null) { + meMtx.unlock(); + wupMutexTS.decrementLocks(); + } else { + throw new Exception("Unlock_wupMutexTS/Cannot get instance Mutex"); + } + } + + @ThingworxServiceDefinition(name = "IsLocked_wupMutexTS", description = "Check if current lock it's acquiered.", category = "WUP", isAllowOverride = false, aspects = { + "isAsync:false" }) + @ThingworxServiceResult(name = "result", description = "", baseType = "BOOLEAN", aspects = {}) + public Boolean IsLocked_wupMutexTS( + @ThingworxServiceParameter(name = "id", + description = "Optional Mutex Name in order to have more than one Mutex for the same thing, for instance to be more quirurgic on the blocking condition.", + baseType = "STRING", + aspects={"isRequired:false"} + ) final String id + ) throws Exception { + final ReentrantLock meMtx = this.getInstanceMutex(id); + if (meMtx != null) { + return meMtx.isLocked(); + } + return false; + } + +} diff --git a/zip/ConcurrencyExtension.zip b/zip/ConcurrencyExtension.zip new file mode 100644 index 0000000..a82556b Binary files /dev/null and b/zip/ConcurrencyExtension.zip differ