Skip to content

Commit

Permalink
drafts initial version of standard router API
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <[email protected]>
  • Loading branch information
Oleh Dokuka authored and OlegDokuka committed Mar 14, 2021
1 parent eab6754 commit 136c82e
Show file tree
Hide file tree
Showing 9 changed files with 616 additions and 0 deletions.
43 changes: 43 additions & 0 deletions rsocket-router/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2015-Present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins {
id 'java-library'
id 'maven-publish'
id 'com.jfrog.artifactory'
id 'com.jfrog.bintray'
}

dependencies {
api project(':rsocket-core')

implementation 'org.slf4j:slf4j-api'

testImplementation project(':rsocket-test')
testImplementation 'org.junit.jupiter:junit-jupiter-api'
testImplementation 'org.junit.jupiter:junit-jupiter-params'
testImplementation 'org.mockito:mockito-core'
testImplementation 'org.assertj:assertj-core'
testImplementation 'io.projectreactor:reactor-test'

// TODO: Remove after JUnit5 migration
testCompileOnly 'junit:junit'
testImplementation 'org.hamcrest:hamcrest-library'
testRuntimeOnly 'org.junit.vintage:junit-vintage-engine'
testRuntimeOnly 'ch.qos.logback:logback-classic'
}

description = 'Transparent Load Balancer for RSocket'
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2015-Present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.router;

import io.netty.buffer.ByteBuf;
import io.rsocket.frame.FrameType;
import io.rsocket.metadata.CompositeMetadata;
import io.rsocket.metadata.CompositeMetadata.Entry;
import io.rsocket.metadata.CompositeMetadata.WellKnownMimeTypeEntry;
import io.rsocket.metadata.RoutingMetadata;
import io.rsocket.metadata.WellKnownMimeType;
import reactor.util.annotation.Nullable;

public class CompositeMetadataRouteCodec implements RouteCodec {

@Override
@Nullable
public Route decode(ByteBuf metadataByteBuf, FrameType requestType) {
final CompositeMetadata compositeMetadata = new CompositeMetadata(metadataByteBuf, false);

String route = null;
String mimeType = null;

for (Entry compositeMetadatum : compositeMetadata) {
if (compositeMetadatum instanceof WellKnownMimeTypeEntry) {
final WellKnownMimeTypeEntry wellKnownMimeTypeEntry =
(WellKnownMimeTypeEntry) compositeMetadatum;
final WellKnownMimeType type = wellKnownMimeTypeEntry.getType();

if (type == WellKnownMimeType.MESSAGE_RSOCKET_ROUTING) {
final RoutingMetadata routingMetadata =
new RoutingMetadata(compositeMetadatum.getContent());
for (String routeEntry : routingMetadata) {
route = routeEntry;
break;
}
} else if (type == WellKnownMimeType.MESSAGE_RSOCKET_MIMETYPE) {
// FIXME: once codecs are available
// mimeType = compositeMetadatum
}
}
}

if (route != null) {
return new Route(requestType, route, mimeType);
}

return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2015-Present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.router;

import io.rsocket.Payload;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.util.annotation.Nullable;

public interface HandlerFunction {

Route route();

@SuppressWarnings("rawtypes")
default Publisher handle(Payload payload) {
return handle(payload, null);
}

@SuppressWarnings("rawtypes")
Publisher handle(Payload firstPayload, @Nullable Flux<Payload> payloads);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2015-Present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.router;

import java.util.HashMap;
import java.util.Map;

class ImmutableRoutingRSocket extends RoutingRSocket {

private final Map<Route, HandlerFunction> mapping;

ImmutableRoutingRSocket(Map<Route, HandlerFunction> mapping, RouteCodec routeCodec) {
super(routeCodec);
this.mapping = mapping;
}

@Override
protected HandlerFunction handlerFor(Route route) {
return mapping.get(route);
}

static final class ImmutableRouterBuilder
implements RoutingRSocket.Builder<ImmutableRouterBuilder> {

final HashMap<Route, HandlerFunction> mapping = new HashMap<>();
final RouteCodec routeCodec;

ImmutableRouterBuilder(RouteCodec routeCodec) {
this.routeCodec = routeCodec;
}

@Override
public ImmutableRouterBuilder addHandler(HandlerFunction handler) {
this.mapping.put(handler.route(), handler);

return this;
}

@Override
public RoutingRSocket build() {
return new ImmutableRoutingRSocket(this.mapping, routeCodec);
}
}
}
78 changes: 78 additions & 0 deletions rsocket-router/src/main/java/io/rsocket/router/Route.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2015-Present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.router;

import io.rsocket.frame.FrameType;
import reactor.util.annotation.Nullable;

public final class Route {

final String route;
final String mimeType;
final FrameType requestType;

public Route(FrameType requestType, String route) {
this(requestType, route, null);
}

public Route(FrameType requestType, String route, @Nullable String mimeType) {
this.route = route;
this.mimeType = mimeType;
this.requestType = requestType;
}

public String route() {
return this.route;
}

@Nullable
public String mimeType() {
return this.mimeType;
}

public FrameType requestType() {
return requestType;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

Route route1 = (Route) o;

if (!route.equals(route1.route)) {
return false;
}
if (mimeType != null ? !mimeType.equals(route1.mimeType) : route1.mimeType != null) {
return false;
}
return requestType == route1.requestType;
}

@Override
public int hashCode() {
int result = route.hashCode();
result = 31 * result + (mimeType != null ? mimeType.hashCode() : 0);
result = 31 * result + requestType.hashCode();
return result;
}
}
27 changes: 27 additions & 0 deletions rsocket-router/src/main/java/io/rsocket/router/RouteCodec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2015-Present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.router;

import io.netty.buffer.ByteBuf;
import io.rsocket.frame.FrameType;
import reactor.util.annotation.Nullable;

public interface RouteCodec {

@Nullable
Route decode(ByteBuf metadataByteBuf, FrameType requestType);
}
Loading

0 comments on commit 136c82e

Please sign in to comment.