diff --git a/core/src/main/java/org/lflang/AttributeUtils.java b/core/src/main/java/org/lflang/AttributeUtils.java index 147a4b6d0d..50b8009e88 100644 --- a/core/src/main/java/org/lflang/AttributeUtils.java +++ b/core/src/main/java/org/lflang/AttributeUtils.java @@ -234,6 +234,15 @@ public static boolean isSparse(EObject node) { return findAttributeByName(node, "sparse") != null; } + /** + * Return true if the specified node is an Instantiation and has an {@code @transient} attribute. + * + * @param node An AST node. + */ + public static boolean isTransient(Instantiation node) { + return findAttributeByName(node, "transient") != null; + } + /** Return true if the reactor is marked to be a federate. */ public static boolean isFederate(Reactor reactor) { return findAttributeByName(reactor, "_fed_config") != null; diff --git a/core/src/main/java/org/lflang/federated/extensions/CExtension.java b/core/src/main/java/org/lflang/federated/extensions/CExtension.java index 29039010b4..643f8bca44 100644 --- a/core/src/main/java/org/lflang/federated/extensions/CExtension.java +++ b/core/src/main/java/org/lflang/federated/extensions/CExtension.java @@ -713,6 +713,9 @@ private String generateCodeToInitializeFederate(FederateInstance federate, RtiCo // Set global variable identifying the federate. code.pr("_lf_my_fed_id = " + federate.id + ";"); + // Set indicator variable that specify whether the federate is transient. + code.pr("_fed.is_transient = " + federate.isTransient + ";"); + // We keep separate record for incoming and outgoing p2p connections to allow incoming traffic // to be processed in a separate // thread without requiring a mutex lock. diff --git a/core/src/main/java/org/lflang/federated/generator/FederateInstance.java b/core/src/main/java/org/lflang/federated/generator/FederateInstance.java index 15a94c430e..67481841d5 100644 --- a/core/src/main/java/org/lflang/federated/generator/FederateInstance.java +++ b/core/src/main/java/org/lflang/federated/generator/FederateInstance.java @@ -36,6 +36,7 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.lflang.AttributeUtils; import org.lflang.MessageReporter; import org.lflang.TimeValue; import org.lflang.ast.ASTUtils; @@ -95,6 +96,7 @@ public FederateInstance( this.bankWidth = bankWidth; this.messageReporter = messageReporter; this.targetConfig = targetConfig; + this.isTransient = AttributeUtils.isTransient(instantiation); // If the instantiation is in a bank, then we have to append // the bank index to the name. @@ -156,6 +158,9 @@ public Instantiation getInstantiation() { /** The integer ID of this federate. */ public int id; + /** Type of the federate: transient if true, and peristent if false . */ + public boolean isTransient = false; + /** * The name of this federate instance. This will be the instantiation name, possibly appended with * "__n", where n is the bank position of this instance if the instantiation is of a bank of diff --git a/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java b/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java index ad8743a7c3..046459644c 100644 --- a/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java +++ b/core/src/main/java/org/lflang/federated/launcher/FedLauncherGenerator.java @@ -331,6 +331,15 @@ private String getDistHeader() { private String getRtiCommand(List federates, boolean isRemote) { List commands = new ArrayList<>(); + + // Identify the transient federates number + int transientFederatesNumber = 0; + for (FederateInstance federate : federates) { + if (federate.isTransient) { + transientFederatesNumber++; + } + } + if (isRemote) { commands.add("RTI -i '${FEDERATION_ID}' \\"); } else { @@ -345,6 +354,7 @@ private String getRtiCommand(List federates, boolean isRemote) commands.addAll( List.of( " -n " + federates.size() + " \\", + " -nt " + transientFederatesNumber + " \\", " -c " + targetConfig.getOrDefault(ClockSyncModeProperty.INSTANCE).toString() + " \\")); diff --git a/core/src/main/java/org/lflang/generator/c/CCompiler.java b/core/src/main/java/org/lflang/generator/c/CCompiler.java index 8673856af8..c85afdcd79 100644 --- a/core/src/main/java/org/lflang/generator/c/CCompiler.java +++ b/core/src/main/java/org/lflang/generator/c/CCompiler.java @@ -221,11 +221,13 @@ private static List cmakeOptions(TargetConfig targetConfig, FileConfig f String maybeQuote = ""; // Windows seems to require extra level of quoting. String srcPath = fileConfig.srcPath.toString(); // Windows requires escaping the backslashes. String rootPath = fileConfig.srcPkgPath.toString(); + String binPath = fileConfig.binPath.toString(); if (separator.equals("\\")) { separator = "\\\\\\\\"; maybeQuote = "\\\""; srcPath = srcPath.replaceAll("\\\\", "\\\\\\\\"); rootPath = rootPath.replaceAll("\\\\", "\\\\\\\\"); + binPath = binPath.replaceAll("\\\\", "\\\\\\\\"); } arguments.addAll( List.of( @@ -244,6 +246,8 @@ private static List cmakeOptions(TargetConfig targetConfig, FileConfig f // Do not convert to Unix path arguments.add("-DLF_SOURCE_DIRECTORY=\"" + maybeQuote + srcPath + maybeQuote + "\""); arguments.add("-DLF_PACKAGE_DIRECTORY=\"" + maybeQuote + rootPath + maybeQuote + "\""); + } else { + arguments.add("-DLF_FEDERATES_BIN_DIRECTORY=\"" + maybeQuote + binPath + maybeQuote + "\""); } arguments.add(FileUtil.toUnixString(fileConfig.getSrcGenPath())); diff --git a/core/src/main/java/org/lflang/validation/AttributeSpec.java b/core/src/main/java/org/lflang/validation/AttributeSpec.java index a685e98ca7..cb13cca247 100644 --- a/core/src/main/java/org/lflang/validation/AttributeSpec.java +++ b/core/src/main/java/org/lflang/validation/AttributeSpec.java @@ -205,6 +205,8 @@ enum AttrParamType { new AttributeSpec(List.of(new AttrParamSpec(VALUE_ATTR, AttrParamType.STRING, false)))); // @sparse ATTRIBUTE_SPECS_BY_NAME.put("sparse", new AttributeSpec(null)); + // @transient + ATTRIBUTE_SPECS_BY_NAME.put("transient", new AttributeSpec(null)); // @icon("value") ATTRIBUTE_SPECS_BY_NAME.put( "icon", diff --git a/core/src/main/java/org/lflang/validation/LFValidator.java b/core/src/main/java/org/lflang/validation/LFValidator.java index 7dfa0e557d..7512010048 100644 --- a/core/src/main/java/org/lflang/validation/LFValidator.java +++ b/core/src/main/java/org/lflang/validation/LFValidator.java @@ -534,6 +534,25 @@ public void checkInstantiation(Instantiation instantiation) { error("Variable-width banks are not supported.", Literals.INSTANTIATION__WIDTH_SPEC); } } + + // If the Instantiation has annotation '@transient', then make sure that the + // container is a federated reactor, and that the coordination is centralized + // FederateInstance + if (AttributeUtils.isTransient(instantiation)) { + Reactor container = (Reactor) instantiation.eContainer(); + if (!container.isFederated()) { + error( + "Only federates can be transients: " + instantiation.getReactorClass().getName(), + Literals.INSTANTIATION__REACTOR_CLASS); + } + // if (this.target != Target.C) { + // error( + // "Transient federates are only supported in the C target.", + // Literals.TARGET_DECL__NAME); + // } + // FIXME: Currently, transients are only supported in centralized coordination + // Either add the check, or add the support in decentralized coordination. + } } @Check(CheckType.FAST) diff --git a/core/src/main/resources/lib/c/reactor-c b/core/src/main/resources/lib/c/reactor-c index b19fff3339..55158ea984 160000 --- a/core/src/main/resources/lib/c/reactor-c +++ b/core/src/main/resources/lib/c/reactor-c @@ -1 +1 @@ -Subproject commit b19fff33391b19a01fa45c37d425d76f9f6ea21b +Subproject commit 55158ea984c2b5a69e800242da1557336f97e868 diff --git a/test/C/src/federated/transient/TransientDownstream1WithTimer.lf b/test/C/src/federated/transient/TransientDownstream1WithTimer.lf new file mode 100644 index 0000000000..d56bfc3267 --- /dev/null +++ b/test/C/src/federated/transient/TransientDownstream1WithTimer.lf @@ -0,0 +1,156 @@ +/** + * This lf program tests if a transient federate corretly leaves then joins the federation. It also + * tests if the transient's downstream executes as expected, that is it received correct TAGs, + * regardless of the transient being absent or present. In this test: + * - the transient federate spontaneously leaves the federation after 2 reactions to input port in, + * - the downstream of the transient federate has only one transient as upstream. + */ +target C { + timeout: 2 s +} + +preamble {= + #include + #include +=} + +/** Persistent federate that is responsible for lauching the transient federate */ +reactor TransientExec(offset: time = 0, period: time = 0, fed_instance_name: char* = "instance") { + timer t(offset, period) + + reaction(t) {= + // Construct the command to launch the transient federate + char mid_launch_cmd[512]; + sprintf(mid_launch_cmd, + "%s/federate__%s -i %s", + lf_get_federates_bin_directory(), + self->fed_instance_name, + lf_get_federation_id() + ); + + lf_print("Launching federate federate__%s at physical time " PRINTF_TIME ".", + mid_launch_cmd, + lf_time_physical()); + + int status = system(mid_launch_cmd); + + // Exit if error + if (status == 0) { + lf_print("Successfully launched federate__%s.", self->fed_instance_name); + } else { + lf_print_error_and_exit("Unable to launch federate__%s. Abort!", self->fed_instance_name); + } + =} +} + +/** + * Persistent federate, upstream of the transient. It reacts to its timer by sending increments to + * out output port. + */ +reactor Up(period: time = 500 ms) { + output out: int + timer t(0, period) + state count: int = 0 + + reaction(t) -> out {= + lf_set(out, self->count); + self->count++; + =} +} + +/** + * Transient federate that forwards whatever it receives from Up to down. It reacts twice to in + * input ports, then stops. It will execute twice during the lifetime of the federation. The second + * launch is done by TransientExec at logical time 1 s. Each time Middle joins, it notifies Down. + */ +reactor Middle { + input in: int + output out: int + output join: int + state count: int = 0 + + // Middle notifies its downstream that he joined, but make sure first that the effective start + // tag is correct + reaction(startup) -> join {= + if(lf_get_effective_start_time() < lf_get_start_time()) { + lf_print_error_and_exit("Fatal error: the transient's effective start time is less than the federation start time"); + } + + lf_set(join, 0); + =} + + // Pass the input value to the output port and stop spontaneously after two reactions to in + reaction(in) -> out {= + self->count++; + lf_set(out, in->value); + + if (self->count == 2) { + lf_stop(); + } + =} +} + +/** + * Persistent federate, which is downstream of the transient. It has to keep reacting to its + * internal timer and also to inputs from the tansient, if any. + */ +reactor Down { + timer t(0, 500 ms) + + input in: int + input join: int + + state count_timer: int = 0 + state count_join: int = 0 + state count_in_mid_reactions: int = 0 + + reaction(t) {= + self->count_timer++; + =} + + reaction(in) {= + self->count_in_mid_reactions++; + =} + + reaction(join) {= + self->count_join++; + =} + + reaction(shutdown) {= + // Check that the TAG have been successfully issued to Down + if (self->count_timer != 5) { + lf_print_error_and_exit("Federate's timer reacted %d times, while it had to react %d times.", + self->count_timer, + 5); + } + + // Check that Middle have joined 2 times + if (self->count_join != 2) { + lf_print_error_and_exit("Transient federate did not join twice, but %d times!", self->count_join); + } + + // Check that Middle have reacted correctly + if (self->count_in_mid_reactions < 4) { + lf_print_error_and_exit("Transient federate Mid did not execute and pass values from up corretly! Expected >= 4, but had: %d.", + self->count_in_mid_reactions); + } + =} +} + +federated reactor { + // Persistent federate that is responsible for lauching the transient once, after 1s + midExec = new TransientExec(offset = 1 s, fed_instance_name="mid") + + // Persistent downstream and upstream federates of the transient + up = new Up() + down = new Down() + + // Transient federate + @transient + mid = new Middle() + + // Connections + up.out -> mid.in + mid.join -> down.join + mid.out -> down.in +} diff --git a/test/C/src/federated/transient/TransientDownstream2UpWithTimer.lf b/test/C/src/federated/transient/TransientDownstream2UpWithTimer.lf new file mode 100644 index 0000000000..95a8a258d8 --- /dev/null +++ b/test/C/src/federated/transient/TransientDownstream2UpWithTimer.lf @@ -0,0 +1,125 @@ +/** + * This lf program tests if a transient federate corretly leaves then joins the federation. It also + * tests if the transient's downstream executes as expected, that is it received correct TAGs, + * regardless of the transient being absent or present. In this test: + * - the transient federate spontaneously leaves the federation after 2 reactions to input port in, + * - the downstream of the transient federate has both, persistent and transient upstreams. + */ +target C { + timeout: 2 s +} + +import Up from "TransientDownstream1WithTimer.lf" +import Middle from "TransientDownstream1WithTimer.lf" + +preamble {= + #include + #include +=} + +/** Persistent federate that is responsible for lauching the transient federate */ +reactor TransientExec(offset: time = 0, period: time = 0, fed_instance_name: char* = "instance") { + timer t(offset, period) + + reaction(t) {= + // Construct the command to launch the transient federate + char mid_launch_cmd[512]; + sprintf(mid_launch_cmd, + "%s/federate__%s -i %s", + lf_get_federates_bin_directory(), + self->fed_instance_name, + lf_get_federation_id() + ); + + lf_print("Launching federate federate__%s at physical time " PRINTF_TIME ".", + mid_launch_cmd, + lf_time_physical()); + + int status = system(mid_launch_cmd); + + // Exit if error + if (status == 0) { + lf_print("Successfully launched federate__%s.", self->fed_instance_name); + } else { + lf_print_error_and_exit("Unable to launch federate__%s. Abort!", self->fed_instance_name); + } + =} +} + +/** + * Persistent federate, which is downstream of the transient. It has to keep reacting to its + * internal timer and also to inputs from the tansient, if any. + */ +reactor Down { + timer t(0, 500 ms) + + input in_mid: int + input in_up: int + input join: int + + state count_timer: int = 0 + state count_join: int = 0 + state count_in_mid_reactions: int = 0 + state count_in_up_reactions: int = 0 + + reaction(t) {= + self->count_timer++; + =} + + reaction(in_mid) {= + self->count_in_mid_reactions++; + =} + + reaction(in_up) {= + self->count_in_up_reactions++; + =} + + reaction(join) {= + self->count_join++; + =} + + reaction(shutdown) {= + // Check that the TAG have been successfully issued to Down + if (self->count_timer != 5) { + lf_print_error_and_exit("Federate's timer reacted %d times, while it had to react %d times.", + self->count_timer, + 5); + } + if (self->count_in_up_reactions != 7) { + lf_print_error_and_exit("Federate's timer reacted %d times, while it had to react %d times.", + self->count_in_up_reactions, + 7); + } + + // Check that Middle have joined 2 times + if (self->count_join != 2) { + lf_print_error_and_exit("Transient federate did not join twice, but %d times!", self->count_join); + } + + // Check that Middle have reacted correctly + if (self->count_in_mid_reactions < 4) { + lf_print_error_and_exit("Transient federate Mid did not execute and pass values from up corretly! Expected >= 4, but had: %d.", + self->count_in_mid_reactions); + } + =} +} + +federated reactor { + // Persistent federate that is responsible for lauching the transient once, after 1s + midExec = new TransientExec(offset = 1 s, fed_instance_name="mid") + + // Persistent downstream and upstream federates of the transient + up1 = new Up() + up2 = new Up(period = 300 msec) + down = new Down() + + // Transient federate + @transient + mid = new Middle() + + // Connections + up1.out -> mid.in + mid.join -> down.join + mid.out -> down.in_mid + up2.out -> down.in_up +} diff --git a/test/C/src/federated/transient/TransientJoin3Times.lf b/test/C/src/federated/transient/TransientJoin3Times.lf new file mode 100644 index 0000000000..c897f8498b --- /dev/null +++ b/test/C/src/federated/transient/TransientJoin3Times.lf @@ -0,0 +1,111 @@ +/** + * This lf program tests if a transient federate corretly leaves then joins the federation. It also + * tests if the transient's downstream executes as expected, that is it received correct TAGs, + * regardless of the transient being absent or present. In this test, the downstream of the the + * transient federate has only a transient as upstream federate. + */ +target C { + timeout: 2 s +} + +import Up from "TransientDownstream1WithTimer.lf" +import Middle from "TransientDownstream1WithTimer.lf" + +preamble {= + #include + #include +=} + +/** Persistent federate that is responsible for lauching the transient federate */ +reactor TransientExec(offset: time = 0, period: time = 0, fed_instance_name: char* = "instance") { + timer t(offset, period) + + reaction(t) {= + // Construct the command to launch the transient federate + char mid_launch_cmd[512]; + sprintf(mid_launch_cmd, + "%s/federate__%s -i %s", + lf_get_federates_bin_directory(), + self->fed_instance_name, + lf_get_federation_id() + ); + + lf_print("Launching federate federate__%s at physical time " PRINTF_TIME ".", + mid_launch_cmd, + lf_time_physical()); + + int status = system(mid_launch_cmd); + + // Exit if error + if (status == 0) { + lf_print("Successfully launched federate__%s.", self->fed_instance_name); + } else { + lf_print_error_and_exit("Unable to launch federate__%s. Abort!", self->fed_instance_name); + } + =} +} + +/** + * Persistent federate, which is downstream of the transient. It has to keep reacting to its + * internal timer and also to inputs from the tansient, if any. + */ +reactor Down { + timer t(0, 500 ms) + + input in: int + input join: int + + state count_timer: int = 0 + state count_join: int = 0 + state count_in_reactions: int = 0 + + reaction(t) {= + self->count_timer++; + =} + + reaction(in) {= + self->count_in_reactions++; + =} + + reaction(join) {= + self->count_join++; + =} + + reaction(shutdown) {= + // Check that the TAG have been successfully issued to Down + if (self->count_timer != 5) { + lf_print_error_and_exit("Federate's timer reacted %d times, while it had to react %d times.", + self->count_timer, + 5); + } + + // Check that Middle have joined 2 times + if (self->count_join != 3) { + lf_print_error_and_exit("Transient federate did not join 3 times, but %d times!", self->count_join); + } + + // Check that Middle have reacted correctly + if (self->count_in_reactions < 6) { + lf_print_error_and_exit("Transient federate Mid did not execute and pass values from up corretly! Expected >= 6, but had: %d.", + self->count_in_reactions); + } + =} +} + +federated reactor { + // Persistent federate that is responsible for lauching the transient once, after 1s + midExec = new TransientExec(offset = 700 msec, period = 700 ms, fed_instance_name="mid") + + // Persistent downstream and upstream federates of the transient + up = new Up(period = 200 msec) + down = new Down() + + // Transient federate + @transient + mid = new Middle() + + // Connections + up.out -> mid.in + mid.join -> down.join + mid.out -> down.in +}