Skip to content

Commit

Permalink
Http-Kafka AsyncAPI (#1072)
Browse files Browse the repository at this point in the history
  • Loading branch information
bmaidics authored Jun 7, 2024
1 parent 6f96b91 commit f7b1423
Show file tree
Hide file tree
Showing 27 changed files with 1,866 additions and 92 deletions.
13 changes: 13 additions & 0 deletions runtime/binding-asyncapi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>binding-http-kafka</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>binding-tls</artifactId>
Expand Down Expand Up @@ -157,6 +163,13 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>binding-http-kafka</artifactId>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ public void attach(
NamespaceConfig v = entry.getValue();
List<BindingConfig> bindings = v.bindings.stream()
.filter(b -> b.type.equals("mqtt") || b.type.equals("http") ||
b.type.equals("kafka") && b.kind == CACHE_CLIENT || b.type.equals("mqtt-kafka"))
b.type.equals("kafka") && b.kind == CACHE_CLIENT || b.type.equals("mqtt-kafka") ||
b.type.equals("http-kafka"))
.collect(toList());
extractResolveId(k, bindings);
extractNamespace(k, bindings);
Expand All @@ -224,7 +225,8 @@ private void attachProxyBinding(
Object2ObjectHashMap::new));

namespaceGenerator.init(binding);
final NamespaceConfig composite = namespaceGenerator.generateProxy(binding, asyncapis, schemaIdsByApiId::get);
final List<String> labels = configs.stream().map(c -> c.apiLabel).collect(toList());
final NamespaceConfig composite = namespaceGenerator.generateProxy(binding, asyncapis, schemaIdsByApiId::get, labels);
composite.readURL = binding.readURL;
attach.accept(composite);
updateNamespace(configs, composite, new ArrayList<>(asyncapis.values()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,26 @@ public JsonObject adaptToJson(
JsonObjectBuilder object = Json.createObjectBuilder();

object.add(API_ID_NAME, asyncapiCondition.apiId);
object.add(OPERATION_ID_NAME, asyncapiCondition.operationId);

if (asyncapiCondition.operationId != null)
{
object.add(OPERATION_ID_NAME, asyncapiCondition.operationId);
}
return object.build();
}

@Override
public ConditionConfig adaptFromJson(
JsonObject object)
{
String apiId = object.getString(API_ID_NAME);
String operationId = object.getString(OPERATION_ID_NAME);
String apiId = object.containsKey(API_ID_NAME)
? object.getString(API_ID_NAME)
: null;

String operationId = object.containsKey(OPERATION_ID_NAME)
? object.getString(OPERATION_ID_NAME)
: null;

return new AsyncapiConditionConfig(apiId, operationId);
}
}
Loading

0 comments on commit f7b1423

Please sign in to comment.