Skip to content

Commit

Permalink
HeatPump reactive overhaul, *almost* passes all tests (#271)
Browse files Browse the repository at this point in the history
  • Loading branch information
climategadgets committed Sep 6, 2023
1 parent c352712 commit 637c71f
Show file tree
Hide file tree
Showing 15 changed files with 647 additions and 601 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ public void connect(UnitDirector.Feed feed) {
// Zones and zone controller have no business knowing about HVAC mode; inject it
var modeFlux = feed.hvacDeviceFlux
.doOnNext(s -> {
if (s.getValue().requested.mode == null) {
if (s.getValue().command.mode == null) {
logger.debug("null hvacMode (normal on startup): {}", s);
}
})
.filter(s -> s.getValue().requested.mode != null)
.map(s -> new Signal<HvacMode, Void>(s.timestamp, s.getValue().requested.mode, null, s.status, s.error));
.filter(s -> s.getValue().command.mode != null)
.map(s -> new Signal<HvacMode, Void>(s.timestamp, s.getValue().command.mode, null, s.status, s.error));
zoneRenderer.subscribeMode(modeFlux);

// Zone ("thermostat" in its terminology) status feed is the only one supported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ private Point convert(Signal<HvacDeviceStatus, Void> signal) {

if (status != null) {

b.tag("kind", status.kind.toString());
b.addField("demand", status.requested.demand);
b.addField("fanSpeed", status.requested.fanSpeed);
b.addField("demand", status.command.demand);
b.addField("fanSpeed", status.command.fanSpeed);
b.addField("uptimeMillis", Optional.ofNullable(status.uptime).map(Duration::toMillis).orElse(0L));
Optional.ofNullable(status.requested.mode).ifPresent(m -> b.tag("mode", m.toString()));
Optional.ofNullable(status.command.mode).ifPresent(m -> b.tag("mode", m.toString()));
}

if (signal.error != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import net.sf.dz3r.signal.hvac.HvacDeviceStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.io.IOException;
import java.time.Clock;
Expand All @@ -25,7 +25,8 @@ public abstract class AbstractHvacDevice implements HvacDevice {

private final String name;

private Flux<Signal<HvacDeviceStatus, Void>> statusFlux;
private final Sinks.Many<Signal<HvacDeviceStatus, Void>> statusSink;
private final Flux<Signal<HvacDeviceStatus, Void>> statusFlux;

/**
* The moment this device turned on, {@code null} if currently off.
Expand All @@ -41,6 +42,9 @@ protected AbstractHvacDevice(String name) {
protected AbstractHvacDevice(Clock clock, String name) {
this.clock = clock;
this.name = name;

statusSink = Sinks.many().multicast().onBackpressureBuffer();
statusFlux = statusSink.asFlux();
}

@Override
Expand All @@ -55,48 +59,13 @@ protected void check(Switch<?> s, String purpose) {
}

@Override
public final synchronized Flux<Signal<HvacDeviceStatus, Void>> getFlux() {

// VT: NOTE: This whole synchronized thing must be eliminated. + bucket list.

ThreadContext.push("getFlux#" + Integer.toHexString(hashCode()));

try {
logger.debug("getFlux(): name={} waiting...", getAddress());

while (statusFlux == null) {
try {
wait();
logger.debug("getFlux(): name={} flux={}", getAddress(), statusFlux);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException("This shouldn't have happened", ex);
}
}

logger.debug("getFlux(): name={} DONE", getAddress());

// VT: NOTE: Be careful when refactoring this, need correct sharing option here
return statusFlux;

} finally {
ThreadContext.pop();
}
public final Flux<Signal<HvacDeviceStatus, Void>> getFlux() {
return statusFlux;
}

protected final synchronized Flux<Signal<HvacDeviceStatus, Void>> setFlux(Flux<Signal<HvacDeviceStatus, Void>> source) {

// VT: NOTE: This whole synchronized thing must be eliminated. + bucket list.

ThreadContext.push("getFlux#" + Integer.toHexString(hashCode()));
try {
statusFlux = source;
notifyAll();
logger.debug("setFlux(): name={} notified", getAddress());
return source;
} finally {
ThreadContext.pop();
}
protected final void broadcast(Signal<HvacDeviceStatus, Void> signal) {
logger.debug("{}: broadcast: {}", getAddress(), signal);
statusSink.tryEmitNext(signal);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.NonNull;
Expand Down Expand Up @@ -43,8 +43,8 @@ public abstract class AbstractSwitch<A extends Comparable<A>> implements Switch<
*/
private final Clock clock;

private Sinks.Many<Signal<State, String>> stateSink;
private Flux<Signal<State, String>> stateFlux;
private FluxSink<Signal<State, String>> stateSink;
private Boolean lastKnownState;

/**
Expand Down Expand Up @@ -79,6 +79,9 @@ protected AbstractSwitch(@NonNull A address, @Nullable Scheduler scheduler, @Nul
this.minDelay = minDelay;
this.clock = clock == null ? Clock.systemUTC() : clock;

stateSink = Sinks.many().multicast().onBackpressureBuffer();
stateFlux = stateSink.asFlux();

logger.info("{}: created AbstractSwitch({}) with minDelay={}", Integer.toHexString(hashCode()), getAddress(), minDelay);
}

Expand All @@ -89,27 +92,9 @@ public final A getAddress() {

@Override
public final synchronized Flux<Signal<State, String>> getFlux() {

if (stateFlux != null) {
return stateFlux;
}

logger.debug("{}: creating stateFlux:{}", Integer.toHexString(hashCode()), address);

stateFlux = Flux
.create(this::connect)
.doOnSubscribe(s -> logger.debug("stateFlux:{} subscribed", address))
.publishOn(Schedulers.boundedElastic())
.publish()
.autoConnect();

return stateFlux;
}

private void connect(FluxSink<Signal<State, String>> sink) {
this.stateSink = sink;
}

@Override
public final Mono<Boolean> setState(boolean state) {

Expand Down Expand Up @@ -171,17 +156,7 @@ private Boolean limitRate(boolean state) {
}

private void reportState(Signal<State, String> signal) {

if (stateSink == null) {

// Unless something subscribes, this will be flooding the log - enable for troubleshooting
// logger.warn("stateSink:{} is still null, skipping: {}", address, signal); // NOSONAR

getFlux();
return;
}

stateSink.next(signal);
stateSink.tryEmitNext(signal);
}

@Override
Expand Down
Loading

0 comments on commit 637c71f

Please sign in to comment.