-
Notifications
You must be signed in to change notification settings - Fork 15
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(querier): adding unstable Querier.
- Loading branch information
Showing
11 changed files
with
872 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,6 +40,7 @@ tasks { | |
"ZPub", | ||
"ZPubThr", | ||
"ZPut", | ||
"ZQuerier", | ||
"ZQueryable", | ||
"ZScout", | ||
"ZSub", | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
// | ||
// Copyright (c) 2023 ZettaScale Technology | ||
// | ||
// This program and the accompanying materials are made available under the | ||
// terms of the Eclipse Public License 2.0 which is available at | ||
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 | ||
// which is available at https://www.apache.org/licenses/LICENSE-2.0. | ||
// | ||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 | ||
// | ||
// Contributors: | ||
// ZettaScale Zenoh Team, <[email protected]> | ||
// | ||
|
||
package io.zenoh; | ||
|
||
import io.zenoh.bytes.ZBytes; | ||
import io.zenoh.exceptions.ZError; | ||
import io.zenoh.query.*; | ||
import picocli.CommandLine; | ||
|
||
import java.time.Duration; | ||
import java.util.List; | ||
import java.util.concurrent.Callable; | ||
|
||
import static io.zenoh.ConfigKt.loadConfig; | ||
|
||
@CommandLine.Command( | ||
name = "ZQuerier", | ||
mixinStandardHelpOptions = true, | ||
description = "Zenoh Querier example" | ||
) | ||
public class ZQuerier implements Callable<Integer> { | ||
|
||
@Override | ||
public Integer call() throws Exception { | ||
Zenoh.initLogFromEnvOr("error"); | ||
|
||
Config config = loadConfig(emptyArgs, configFile, connect, listen, noMulticastScouting, mode); | ||
Selector selector = Selector.tryFrom(this.selectorOpt); | ||
|
||
QueryTarget queryTarget = target != null ? QueryTarget.valueOf(target.toUpperCase()) : QueryTarget.BEST_MATCHING; | ||
Duration queryTimeout = Duration.ofMillis(timeout); | ||
|
||
Session session = Zenoh.open(config); | ||
QuerierOptions options = new QuerierOptions(); | ||
options.setTarget(queryTarget); | ||
options.setTimeout(queryTimeout); | ||
Querier querier = session.declareQuerier(selector.getKeyExpr(), options); | ||
|
||
performQueries(querier, selector); | ||
return 0; | ||
} | ||
|
||
/** | ||
* Performs queries in an infinite loop, printing responses. | ||
*/ | ||
private void performQueries(Querier querier, Selector selector) throws ZError, InterruptedException { | ||
for (int idx = 0; idx < Integer.MAX_VALUE; idx++) { | ||
Thread.sleep(1000); | ||
|
||
String queryPayload = String.format("[%04d] %s", idx, payload != null ? payload : ""); | ||
System.out.println("Querying '" + selector + "' with payload: '" + queryPayload + "'..."); | ||
|
||
Querier.GetOptions options = new Querier.GetOptions(); | ||
options.setPayload(ZBytes.from(queryPayload)); | ||
options.setParameters(selector.getParameters()); | ||
|
||
querier.get(this::handleReply, options); | ||
} | ||
} | ||
|
||
/** | ||
* Handles replies received from the query. | ||
*/ | ||
private void handleReply(Reply reply) { | ||
if (reply instanceof Reply.Success) { | ||
Reply.Success successReply = (Reply.Success) reply; | ||
System.out.println(">> Received ('" + successReply.getSample().getKeyExpr() + | ||
"': '" + successReply.getSample().getPayload() + "')"); | ||
} else if (reply instanceof Reply.Error) { | ||
Reply.Error errorReply = (Reply.Error) reply; | ||
System.out.println(">> Received (ERROR: '" + errorReply.getError() + "')"); | ||
} | ||
} | ||
|
||
/** | ||
* ----- Example arguments and private fields ----- | ||
*/ | ||
|
||
private final Boolean emptyArgs; | ||
|
||
ZQuerier(Boolean emptyArgs) { | ||
this.emptyArgs = emptyArgs; | ||
} | ||
|
||
@CommandLine.Option( | ||
names = {"-s", "--selector"}, | ||
description = "The selection of resources to query [default: demo/example/**].", | ||
defaultValue = "demo/example/**" | ||
) | ||
private String selectorOpt; | ||
|
||
@CommandLine.Option( | ||
names = {"-p", "--payload"}, | ||
description = "An optional payload to put in the query." | ||
) | ||
private String payload; | ||
|
||
@CommandLine.Option( | ||
names = {"-t", "--target"}, | ||
description = "The target queryables of the query. Default: BEST_MATCHING. " + | ||
"[possible values: BEST_MATCHING, ALL, ALL_COMPLETE]" | ||
) | ||
private String target; | ||
|
||
@CommandLine.Option( | ||
names = {"-o", "--timeout"}, | ||
description = "The query timeout in milliseconds [default: 10000].", | ||
defaultValue = "10000" | ||
) | ||
private long timeout; | ||
|
||
@CommandLine.Option( | ||
names = {"-c", "--config"}, | ||
description = "A configuration file." | ||
) | ||
private String configFile; | ||
|
||
@CommandLine.Option( | ||
names = {"-m", "--mode"}, | ||
description = "The session mode. Default: peer. Possible values: [peer, client, router].", | ||
defaultValue = "peer" | ||
) | ||
private String mode; | ||
|
||
@CommandLine.Option( | ||
names = {"-e", "--connect"}, | ||
description = "Endpoints to connect to.", | ||
split = "," | ||
) | ||
private List<String> connect; | ||
|
||
@CommandLine.Option( | ||
names = {"-l", "--listen"}, | ||
description = "Endpoints to listen on.", | ||
split = "," | ||
) | ||
private List<String> listen; | ||
|
||
@CommandLine.Option( | ||
names = {"--no-multicast-scouting"}, | ||
description = "Disable the multicast-based scouting mechanism.", | ||
defaultValue = "false" | ||
) | ||
private boolean noMulticastScouting; | ||
|
||
public static void main(String[] args) { | ||
int exitCode = new CommandLine(new ZQuerier(args.length == 0)).execute(args); | ||
System.exit(exitCode); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
23 changes: 23 additions & 0 deletions
23
zenoh-java/src/commonMain/kotlin/io/zenoh/annotations/Annotations.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
// | ||
// Copyright (c) 2023 ZettaScale Technology | ||
// | ||
// This program and the accompanying materials are made available under the | ||
// terms of the Eclipse Public License 2.0 which is available at | ||
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 | ||
// which is available at https://www.apache.org/licenses/LICENSE-2.0. | ||
// | ||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 | ||
// | ||
// Contributors: | ||
// ZettaScale Zenoh Team, <[email protected]> | ||
// | ||
|
||
package io.zenoh.annotations | ||
|
||
@RequiresOptIn( | ||
level = RequiresOptIn.Level.WARNING, | ||
message = "This feature is unstable and may change in future releases." | ||
) | ||
@Retention(AnnotationRetention.BINARY) | ||
@Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION) | ||
annotation class Unstable |
135 changes: 135 additions & 0 deletions
135
zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNIQuerier.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
// | ||
// Copyright (c) 2023 ZettaScale Technology | ||
// | ||
// This program and the accompanying materials are made available under the | ||
// terms of the Eclipse Public License 2.0 which is available at | ||
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 | ||
// which is available at https://www.apache.org/licenses/LICENSE-2.0. | ||
// | ||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 | ||
// | ||
// Contributors: | ||
// ZettaScale Zenoh Team, <[email protected]> | ||
// | ||
|
||
package io.zenoh.jni | ||
|
||
import io.zenoh.annotations.Unstable | ||
import io.zenoh.bytes.Encoding | ||
import io.zenoh.bytes.IntoZBytes | ||
import io.zenoh.bytes.into | ||
import io.zenoh.config.ZenohId | ||
import io.zenoh.exceptions.ZError | ||
import io.zenoh.handlers.Callback | ||
import io.zenoh.handlers.Handler | ||
import io.zenoh.jni.callbacks.JNIGetCallback | ||
import io.zenoh.jni.callbacks.JNIOnCloseCallback | ||
import io.zenoh.keyexpr.KeyExpr | ||
import io.zenoh.qos.CongestionControl | ||
import io.zenoh.qos.Priority | ||
import io.zenoh.qos.QoS | ||
import io.zenoh.query.Parameters | ||
import io.zenoh.query.Querier | ||
import io.zenoh.query.Reply | ||
import io.zenoh.sample.Sample | ||
import io.zenoh.sample.SampleKind | ||
import org.apache.commons.net.ntp.TimeStamp | ||
|
||
internal class JNIQuerier(val ptr: Long) { | ||
|
||
@OptIn(Unstable::class) | ||
@Throws(ZError::class) | ||
fun performGetWithCallback(keyExpr: KeyExpr, callback: Callback<Reply>, options: Querier.GetOptions) { | ||
performGet(keyExpr, options.parameters, callback, fun() {}, Unit, options.attachment, options.payload, options.encoding) | ||
} | ||
|
||
@OptIn(Unstable::class) | ||
@Throws(ZError::class) | ||
fun <R> performGetWithHandler(keyExpr: KeyExpr, handler: Handler<Reply, R>, options: Querier.GetOptions): R { | ||
return performGet(keyExpr, options.parameters, handler::handle, handler::onClose, handler.receiver(), options.attachment, options.payload, options.encoding) | ||
} | ||
|
||
@Throws(ZError::class) | ||
private fun <R> performGet( | ||
keyExpr: KeyExpr, | ||
parameters: Parameters?, | ||
callback: Callback<Reply>, | ||
onClose: () -> Unit, | ||
receiver: R, | ||
attachment: IntoZBytes?, | ||
payload: IntoZBytes?, | ||
encoding: Encoding? | ||
): R { | ||
val getCallback = JNIGetCallback { | ||
replierId: ByteArray?, | ||
success: Boolean, | ||
keyExpr2: String?, | ||
payload2: ByteArray, | ||
encodingId: Int, | ||
encodingSchema: String?, | ||
kind: Int, | ||
timestampNTP64: Long, | ||
timestampIsValid: Boolean, | ||
attachmentBytes: ByteArray?, | ||
express: Boolean, | ||
priority: Int, | ||
congestionControl: Int, | ||
-> | ||
val reply: Reply | ||
if (success) { | ||
val timestamp = if (timestampIsValid) TimeStamp(timestampNTP64) else null | ||
val sample = Sample( | ||
KeyExpr(keyExpr2!!, null), | ||
payload2.into(), | ||
Encoding(encodingId, schema = encodingSchema), | ||
SampleKind.fromInt(kind), | ||
timestamp, | ||
QoS(CongestionControl.fromInt(congestionControl), Priority.fromInt(priority), express), | ||
attachmentBytes?.into() | ||
) | ||
reply = Reply.Success(replierId?.let { ZenohId(it) }, sample) | ||
} else { | ||
reply = Reply.Error( | ||
replierId?.let { ZenohId(it) }, | ||
payload2.into(), | ||
Encoding(encodingId, schema = encodingSchema) | ||
) | ||
} | ||
callback.run(reply) | ||
} | ||
|
||
getViaJNI(this.ptr, | ||
keyExpr.jniKeyExpr?.ptr ?: 0, | ||
keyExpr.keyExpr, | ||
parameters?.toString(), | ||
getCallback, | ||
onClose, | ||
attachment?.into()?.bytes, | ||
payload?.into()?.bytes, | ||
encoding?.id ?: Encoding.defaultEncoding().id, | ||
encoding?.schema | ||
) | ||
return receiver | ||
} | ||
|
||
fun close() { | ||
freePtrViaJNI(ptr) | ||
} | ||
|
||
@Throws(ZError::class) | ||
private external fun getViaJNI( | ||
querierPtr: Long, | ||
keyExprPtr: Long, | ||
keyExprString: String, | ||
parameters: String?, | ||
callback: JNIGetCallback, | ||
onClose: JNIOnCloseCallback, | ||
attachmentBytes: ByteArray?, | ||
payload: ByteArray?, | ||
encodingId: Int, | ||
encodingSchema: String?, | ||
) | ||
|
||
private external fun freePtrViaJNI(ptr: Long) | ||
|
||
} |
Oops, something went wrong.