Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(querier): adding unstable Querier. #166

Merged
merged 1 commit into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ tasks {
"ZPub",
"ZPubThr",
"ZPut",
"ZQuerier",
"ZQueryable",
"ZScout",
"ZSub",
Expand Down
162 changes: 162 additions & 0 deletions examples/src/main/java/io/zenoh/ZQuerier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

package io.zenoh;

import io.zenoh.bytes.ZBytes;
import io.zenoh.exceptions.ZError;
import io.zenoh.query.*;
import picocli.CommandLine;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.Callable;

import static io.zenoh.ConfigKt.loadConfig;

@CommandLine.Command(
name = "ZQuerier",
mixinStandardHelpOptions = true,
description = "Zenoh Querier example"
)
public class ZQuerier implements Callable<Integer> {

@Override
public Integer call() throws Exception {
Zenoh.initLogFromEnvOr("error");

Config config = loadConfig(emptyArgs, configFile, connect, listen, noMulticastScouting, mode);
Selector selector = Selector.tryFrom(this.selectorOpt);

QueryTarget queryTarget = target != null ? QueryTarget.valueOf(target.toUpperCase()) : QueryTarget.BEST_MATCHING;
Duration queryTimeout = Duration.ofMillis(timeout);

Session session = Zenoh.open(config);
QuerierOptions options = new QuerierOptions();
options.setTarget(queryTarget);
options.setTimeout(queryTimeout);
Querier querier = session.declareQuerier(selector.getKeyExpr(), options);

performQueries(querier, selector);
return 0;
}

/**
* Performs queries in an infinite loop, printing responses.
*/
private void performQueries(Querier querier, Selector selector) throws ZError, InterruptedException {
for (int idx = 0; idx < Integer.MAX_VALUE; idx++) {
Thread.sleep(1000);

String queryPayload = String.format("[%04d] %s", idx, payload != null ? payload : "");
System.out.println("Querying '" + selector + "' with payload: '" + queryPayload + "'...");

Querier.GetOptions options = new Querier.GetOptions();
options.setPayload(ZBytes.from(queryPayload));
options.setParameters(selector.getParameters());

querier.get(this::handleReply, options);
}
}

/**
* Handles replies received from the query.
*/
private void handleReply(Reply reply) {
if (reply instanceof Reply.Success) {
Reply.Success successReply = (Reply.Success) reply;
System.out.println(">> Received ('" + successReply.getSample().getKeyExpr() +
"': '" + successReply.getSample().getPayload() + "')");
} else if (reply instanceof Reply.Error) {
Reply.Error errorReply = (Reply.Error) reply;
System.out.println(">> Received (ERROR: '" + errorReply.getError() + "')");
}
}

/**
* ----- Example arguments and private fields -----
*/

private final Boolean emptyArgs;

ZQuerier(Boolean emptyArgs) {
this.emptyArgs = emptyArgs;
}

@CommandLine.Option(
names = {"-s", "--selector"},
description = "The selection of resources to query [default: demo/example/**].",
defaultValue = "demo/example/**"
)
private String selectorOpt;

@CommandLine.Option(
names = {"-p", "--payload"},
description = "An optional payload to put in the query."
)
private String payload;

@CommandLine.Option(
names = {"-t", "--target"},
description = "The target queryables of the query. Default: BEST_MATCHING. " +
"[possible values: BEST_MATCHING, ALL, ALL_COMPLETE]"
)
private String target;

@CommandLine.Option(
names = {"-o", "--timeout"},
description = "The query timeout in milliseconds [default: 10000].",
defaultValue = "10000"
)
private long timeout;

@CommandLine.Option(
names = {"-c", "--config"},
description = "A configuration file."
)
private String configFile;

@CommandLine.Option(
names = {"-m", "--mode"},
description = "The session mode. Default: peer. Possible values: [peer, client, router].",
defaultValue = "peer"
)
private String mode;

@CommandLine.Option(
names = {"-e", "--connect"},
description = "Endpoints to connect to.",
split = ","
)
private List<String> connect;

@CommandLine.Option(
names = {"-l", "--listen"},
description = "Endpoints to listen on.",
split = ","
)
private List<String> listen;

@CommandLine.Option(
names = {"--no-multicast-scouting"},
description = "Disable the multicast-based scouting mechanism.",
defaultValue = "false"
)
private boolean noMulticastScouting;

public static void main(String[] args) {
int exitCode = new CommandLine(new ZQuerier(args.length == 0)).execute(args);
System.exit(exitCode);
}
}
47 changes: 47 additions & 0 deletions zenoh-java/src/commonMain/kotlin/io/zenoh/Session.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package io.zenoh

import io.zenoh.annotations.Unstable
import io.zenoh.bytes.IntoZBytes
import io.zenoh.config.ZenohId
import io.zenoh.exceptions.ZError
Expand Down Expand Up @@ -321,6 +322,42 @@ class Session private constructor(private val config: Config) : AutoCloseable {
return resolveQueryableWithCallback(keyExpr, callback, options)
}


/**
* Declare a [Querier].
*
* A querier allows to send queries to a queryable.
*
* Queriers are automatically undeclared when dropped.
*
* Example:
* ```java
* try (Session session = Zenoh.open(config)) {
* QuerierOptions options = new QuerierOptions();
* options.setTarget(QueryTarget.BEST_MATCHING);
* Querier querier = session.declareQuerier(selector.getKeyExpr(), options);
* //...
* Querier.GetOptions options = new Querier.GetOptions();
* options.setPayload(ZBytes.from("Example payload"));
* querier.get(reply -> {...}, options);
* }
* ```
*
* @param keyExpr The [KeyExpr] for the querier.
* @param options Optional [QuerierOptions] to configure the querier.
* @return A [Querier] that will be undeclared on drop.
* @throws ZError
*/
@Unstable
@JvmOverloads
@Throws(ZError::class)
fun declareQuerier(
keyExpr: KeyExpr,
options: QuerierOptions = QuerierOptions()
): Querier {
return resolveQuerier(keyExpr, options)
}

/**
* Declare a [KeyExpr].
*
Expand Down Expand Up @@ -560,6 +597,16 @@ class Session private constructor(private val config: Config) : AutoCloseable {
} ?: throw (sessionClosedException)
}

@OptIn(Unstable::class)
private fun resolveQuerier(
keyExpr: KeyExpr,
options: QuerierOptions
): Querier {
return jniSession?.run {
declareQuerier(keyExpr, options)
} ?: throw sessionClosedException
}

@Throws(ZError::class)
internal fun <R> resolveGetWithHandler(
selector: IntoSelector,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

package io.zenoh.annotations

@RequiresOptIn(
level = RequiresOptIn.Level.WARNING,
message = "This feature is unstable and may change in future releases."
)
@Retention(AnnotationRetention.BINARY)
@Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION)
annotation class Unstable
135 changes: 135 additions & 0 deletions zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNIQuerier.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

package io.zenoh.jni

import io.zenoh.annotations.Unstable
import io.zenoh.bytes.Encoding
import io.zenoh.bytes.IntoZBytes
import io.zenoh.bytes.into
import io.zenoh.config.ZenohId
import io.zenoh.exceptions.ZError
import io.zenoh.handlers.Callback
import io.zenoh.handlers.Handler
import io.zenoh.jni.callbacks.JNIGetCallback
import io.zenoh.jni.callbacks.JNIOnCloseCallback
import io.zenoh.keyexpr.KeyExpr
import io.zenoh.qos.CongestionControl
import io.zenoh.qos.Priority
import io.zenoh.qos.QoS
import io.zenoh.query.Parameters
import io.zenoh.query.Querier
import io.zenoh.query.Reply
import io.zenoh.sample.Sample
import io.zenoh.sample.SampleKind
import org.apache.commons.net.ntp.TimeStamp

internal class JNIQuerier(val ptr: Long) {

@OptIn(Unstable::class)
@Throws(ZError::class)
fun performGetWithCallback(keyExpr: KeyExpr, callback: Callback<Reply>, options: Querier.GetOptions) {
performGet(keyExpr, options.parameters, callback, fun() {}, Unit, options.attachment, options.payload, options.encoding)
}

@OptIn(Unstable::class)
@Throws(ZError::class)
fun <R> performGetWithHandler(keyExpr: KeyExpr, handler: Handler<Reply, R>, options: Querier.GetOptions): R {
return performGet(keyExpr, options.parameters, handler::handle, handler::onClose, handler.receiver(), options.attachment, options.payload, options.encoding)
}

@Throws(ZError::class)
private fun <R> performGet(
keyExpr: KeyExpr,
parameters: Parameters?,
callback: Callback<Reply>,
onClose: () -> Unit,
receiver: R,
attachment: IntoZBytes?,
payload: IntoZBytes?,
encoding: Encoding?
): R {
val getCallback = JNIGetCallback {
replierId: ByteArray?,
success: Boolean,
keyExpr2: String?,
payload2: ByteArray,
encodingId: Int,
encodingSchema: String?,
kind: Int,
timestampNTP64: Long,
timestampIsValid: Boolean,
attachmentBytes: ByteArray?,
express: Boolean,
priority: Int,
congestionControl: Int,
->
val reply: Reply
if (success) {
val timestamp = if (timestampIsValid) TimeStamp(timestampNTP64) else null
val sample = Sample(
KeyExpr(keyExpr2!!, null),
payload2.into(),
Encoding(encodingId, schema = encodingSchema),
SampleKind.fromInt(kind),
timestamp,
QoS(CongestionControl.fromInt(congestionControl), Priority.fromInt(priority), express),
attachmentBytes?.into()
)
reply = Reply.Success(replierId?.let { ZenohId(it) }, sample)
} else {
reply = Reply.Error(
replierId?.let { ZenohId(it) },
payload2.into(),
Encoding(encodingId, schema = encodingSchema)
)
}
callback.run(reply)
}

getViaJNI(this.ptr,
keyExpr.jniKeyExpr?.ptr ?: 0,
keyExpr.keyExpr,
parameters?.toString(),
getCallback,
onClose,
attachment?.into()?.bytes,
payload?.into()?.bytes,
encoding?.id ?: Encoding.defaultEncoding().id,
encoding?.schema
)
return receiver
}

fun close() {
freePtrViaJNI(ptr)
}

@Throws(ZError::class)
private external fun getViaJNI(
querierPtr: Long,
keyExprPtr: Long,
keyExprString: String,
parameters: String?,
callback: JNIGetCallback,
onClose: JNIOnCloseCallback,
attachmentBytes: ByteArray?,
payload: ByteArray?,
encodingId: Int,
encodingSchema: String?,
)

private external fun freePtrViaJNI(ptr: Long)

}
Loading
Loading