Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(基础模块): MqttClient设备会话支持可恢复. #538

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import lombok.Setter;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.Transport;
Expand Down Expand Up @@ -70,8 +72,9 @@ public long connectTime() {

@Override
public Mono<Boolean> send(EncodedMessage encodedMessage) {
if(websocket==null){
return Reactors.ALWAYS_FALSE;
//未建立websocket链接,不支持此类消息.
if(websocket == null){
return Mono.error(new DeviceOperationException(ErrorCode.UNSUPPORTED_MESSAGE));
}
if (encodedMessage instanceof WebSocketMessage) {
return websocket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ private static class RouteKey {
}

private MqttClientSession createDeviceSession(DeviceOperator device, MqttClient client) {
return new MqttClientSession(device.getDeviceId(), device, client, monitor);
MqttClientSession session = new MqttClientSession(device.getDeviceId(), device, client, monitor);
session.setGatewayId(getId());
return session;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package org.jetlinks.community.network.mqtt.gateway.device.session;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.PersistentSession;
import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
import org.jetlinks.community.network.mqtt.client.MqttClient;
import reactor.core.publisher.Mono;

Expand All @@ -20,29 +23,45 @@
* @author zhouhao
* @since 1.0
*/
public class MqttClientSession implements DeviceSession {
public class MqttClientSession implements PersistentSession {
@Getter
private final String id;

@Getter
private final DeviceOperator operator;

private MqttClient clientTemp;

@Getter
@Setter
private MqttClient client;
private Mono<MqttClient> client;

private final long connectTime = System.currentTimeMillis();
@Setter(AccessLevel.PROTECTED)
private long connectTime = System.currentTimeMillis();

@Setter(AccessLevel.PROTECTED)
private long lastPingTime = System.currentTimeMillis();

private long keepAliveTimeout = -1;

private final DeviceGatewayMonitor monitor;

@Getter
@Setter
private String gatewayId;

public MqttClientSession(String id,
DeviceOperator operator,
MqttClient client,
DeviceGatewayMonitor monitor) {
this(id, operator, Mono.just(client), monitor);
this.clientTemp = client;
}

public MqttClientSession(String id,
DeviceOperator operator,
Mono<MqttClient> client,
DeviceGatewayMonitor monitor) {
this.id = id;
this.operator = operator;
this.client = client;
Expand All @@ -67,13 +86,19 @@ public long connectTime() {
@Override
public Mono<Boolean> send(EncodedMessage encodedMessage) {
if (encodedMessage instanceof MqttMessage) {
monitor.sentMessage();
if (monitor != null) {
monitor.sentMessage();
}
return client
.publish(((MqttMessage) encodedMessage))
.thenReturn(true)
;
.flatMap(client -> {
this.clientTemp = client;
return client
.publish(((MqttMessage) encodedMessage))
.thenReturn(true);
});
}
return Mono.error(new UnsupportedOperationException("unsupported message type:" + encodedMessage.getClass()));
return Mono.error(new DeviceOperationException
.NoStackTrace(ErrorCode.UNSUPPORTED_MESSAGE, "error.unsupported_mqtt_message_type"));
}

@Override
Expand All @@ -93,10 +118,19 @@ public void ping() {

@Override
public boolean isAlive() {
return client.isAlive() &&
return (clientTemp == null || clientTemp.isAlive()) &&
(keepAliveTimeout <= 0 || System.currentTimeMillis() - lastPingTime < keepAliveTimeout);
}

@Override
public Mono<Boolean> isAliveAsync() {
return client
.map(client -> {
this.clientTemp = client;
return isAlive();
});
}

@Override
public void onClose(Runnable call) {

Expand All @@ -113,4 +147,9 @@ public String toString() {
"id=" + id + ",device=" + getDeviceId() +
'}';
}

@Override
public String getProvider() {
return MqttClientSessionPersistentProvider.PROVIDER;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package org.jetlinks.community.network.mqtt.gateway.device.session;

import lombok.SneakyThrows;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.server.session.DeviceSessionProvider;
import org.jetlinks.core.server.session.DeviceSessionProviders;
import org.jetlinks.core.server.session.PersistentSession;
import org.jetlinks.core.utils.SerializeUtils;
import org.jetlinks.community.gateway.monitor.GatewayMonitors;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkManager;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.time.Duration;

import static org.jetlinks.community.codec.Serializers.getDefault;

@Component
public class MqttClientSessionPersistentProvider implements DeviceSessionProvider {
public static final String PROVIDER = "mqtt-client";

private final NetworkManager networkManager;

public MqttClientSessionPersistentProvider(NetworkManager networkManager) {
this.networkManager = networkManager;
DeviceSessionProviders.register(this);
}

@Override
public String getId() {
return PROVIDER;
}

@Override
public Mono<PersistentSession> deserialize(byte[] sessionData, DeviceRegistry registry) {

return Mono
.fromCallable(() -> {
ByteArrayInputStream stream = new ByteArrayInputStream(sessionData);
SessionData data = new SessionData();
try (ObjectInput input = getDefault().createInput(stream)) {
data.readExternal(input);
}
return data;
})
.flatMap(data -> data.toSession(registry, networkManager));
}

@Override
public Mono<byte[]> serialize(PersistentSession session, DeviceRegistry registry) {
if (!session.isWrapFrom(MqttClientSession.class)) {
return Mono.empty();
}
return SessionData
.of(session.unwrap(MqttClientSession.class))
.flatMap(data -> Mono
.fromCallable(() -> {
ByteArrayOutputStream stream = new ByteArrayOutputStream(128);
try (ObjectOutput output = getDefault().createOutput(stream)) {
data.writeExternal(output);
}
return stream.toByteArray();
}));
}

static class SessionData {
private String deviceId;
private String networkId;
private String gatewayId;
private long lastPingTime;
private long connectTime;
private long keepAliveTimeout;

public SessionData() {
}

public static Mono<SessionData> of(MqttClientSession session) {
SessionData data = new SessionData();
data.deviceId = session.getDeviceId();
data.gatewayId = session.getGatewayId();
data.lastPingTime = session.lastPingTime();
data.connectTime = session.connectTime();
data.keepAliveTimeout = session.getKeepAliveTimeout().toMillis();
return session
.getClient()
.doOnNext(client -> data.networkId = client.getId())
.thenReturn(data);
}

public Mono<PersistentSession> toSession(DeviceRegistry registry,
NetworkManager manager) {
return registry
.getDevice(deviceId)
.map(device -> {
MqttClientSession session = new MqttClientSession(
deviceId,
device,
manager.getNetwork(DefaultNetworkType.MQTT_CLIENT, networkId),
GatewayMonitors.getDeviceGatewayMonitor(gatewayId));

session.setKeepAliveTimeout(Duration.ofMillis(keepAliveTimeout));
session.setLastPingTime(lastPingTime);
session.setConnectTime(connectTime);
return session;
});
}

@SneakyThrows
public void writeExternal(ObjectOutput out) {
out.writeUTF(deviceId);
out.writeUTF(networkId);
SerializeUtils.writeNullableUTF(gatewayId, out);
out.writeLong(lastPingTime);
out.writeLong(connectTime);
out.writeLong(keepAliveTimeout);
}

@SneakyThrows
public void readExternal(ObjectInput in) {
deviceId = in.readUTF();
networkId = in.readUTF();
gatewayId = SerializeUtils.readNullableUTF(in);
lastPingTime = in.readLong();
connectTime = in.readLong();
keepAliveTimeout = in.readLong();
}
}
}
Loading