From e102e5db43018533aacdf7417b570097a6fb161b Mon Sep 17 00:00:00 2001 From: Matt Jacobs Date: Thu, 18 Aug 2016 13:09:11 -0700 Subject: [PATCH] Remove all ReactiveSocket modules - this should allow a build by JDK6/7 --- .../build.gradle | 19 - .../contrib/reactivesocket/EventStream.java | 103 ---- .../reactivesocket/EventStreamEnum.java | 42 -- .../EventStreamRequestHandler.java | 103 ---- .../HystrixMetricsReactiveSocketClient.java | 92 --- .../EventStreamRequestHandlerTest.java | 198 ------ .../reactivesocket/EventStreamTest.java | 583 ------------------ .../reactivesocket/HystrixStreamTest.java | 64 -- .../client/build.gradle | 18 - ...trixMetricsReactiveSocketClientRunner.java | 128 ---- .../server/build.gradle | 20 - .../HystrixMetricsReactiveSocketServer.java | 67 -- settings.gradle | 6 - 13 files changed, 1443 deletions(-) delete mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle delete mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStream.java delete mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java delete mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java delete mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/client/HystrixMetricsReactiveSocketClient.java delete mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java delete mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamTest.java delete mode 100644 hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/HystrixStreamTest.java delete mode 100644 hystrix-examples-reactivesocket/client/build.gradle delete mode 100644 hystrix-examples-reactivesocket/client/src/main/java/com/netflix/hystrix/examples/reactivesocket/HystrixMetricsReactiveSocketClientRunner.java delete mode 100644 hystrix-examples-reactivesocket/server/build.gradle delete mode 100644 hystrix-examples-reactivesocket/server/src/main/java/com/netflix/hystrix/examples/reactivesocket/HystrixMetricsReactiveSocketServer.java diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle b/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle deleted file mode 100644 index ff1ccc3e9..000000000 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle +++ /dev/null @@ -1,19 +0,0 @@ -repositories { - mavenCentral() - jcenter() - maven { url 'https://dl.bintray.com/reactivesocket/ReactiveSocket' } -} - -sourceCompatibility = JavaVersion.VERSION_1_8 -targetCompatibility = JavaVersion.VERSION_1_8 - -dependencies { - compile project(':hystrix-core') - compile project(':hystrix-serialization') - compile 'io.reactivesocket:reactivesocket:latest.release' - compile 'io.reactivesocket:reactivesocket-netty:latest.release' - compile 'io.reactivex:rxjava-reactive-streams:latest.release' - - testCompile 'junit:junit-dep:4.10' - testCompile 'org.mockito:mockito-all:1.9.5' -} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStream.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStream.java deleted file mode 100644 index 60b2c22c7..000000000 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStream.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.hystrix.contrib.reactivesocket; - -import com.netflix.hystrix.config.HystrixConfigurationStream; -import com.netflix.hystrix.metric.HystrixRequestEventsStream; -import com.netflix.hystrix.metric.consumer.HystrixDashboardStream; -import com.netflix.hystrix.metric.sample.HystrixUtilizationStream; -import com.netflix.hystrix.serial.SerialHystrixConfiguration; -import com.netflix.hystrix.serial.SerialHystrixDashboardData; -import com.netflix.hystrix.serial.SerialHystrixRequestEvents; -import com.netflix.hystrix.serial.SerialHystrixUtilization; -import io.reactivesocket.Frame; -import io.reactivesocket.Payload; -import rx.Observable; - -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; - -class EventStream implements Supplier> { - private final Observable source; - private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false); - - /* package-private */EventStream(Observable source) { - this.source = source - .doOnSubscribe(() -> isSourceCurrentlySubscribed.set(true)) - .doOnUnsubscribe(() -> isSourceCurrentlySubscribed.set(false)) - .share() - .onBackpressureDrop(); - } - - @Override - public Observable get() { - return source; - } - - public static EventStream getInstance(EventStreamEnum eventStreamEnum) { - final Observable source; - - switch (eventStreamEnum) { - case CONFIG_STREAM: - source = HystrixConfigurationStream.getInstance() - .observe() - .map(SerialHystrixConfiguration::toBytes) - .map(EventStream::toPayload); - break; - case REQUEST_EVENT_STREAM: - source = HystrixRequestEventsStream.getInstance() - .observe() - .map(SerialHystrixRequestEvents::toBytes) - .map(EventStream::toPayload); - break; - case UTILIZATION_STREAM: - source = HystrixUtilizationStream.getInstance() - .observe() - .map(SerialHystrixUtilization::toBytes) - .map(EventStream::toPayload); - break; - case GENERAL_DASHBOARD_STREAM: - source = HystrixDashboardStream.getInstance() - .observe() - .map(SerialHystrixDashboardData::toBytes) - .map(EventStream::toPayload); - break; - default: - throw new IllegalArgumentException("Unknown EventStreamEnum : " + eventStreamEnum); - } - - return new EventStream(source); - } - - public boolean isSourceCurrentlySubscribed() { - return isSourceCurrentlySubscribed.get(); - } - - public static Payload toPayload(byte[] byteArray) { - return new Payload() { - @Override - public ByteBuffer getData() { - return ByteBuffer.wrap(byteArray); - } - - @Override - public ByteBuffer getMetadata() { - return Frame.NULL_BYTEBUFFER; - } - }; - } -} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java deleted file mode 100644 index f407ddf6d..000000000 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamEnum.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.hystrix.contrib.reactivesocket; - -import java.util.Arrays; - -public enum EventStreamEnum { - - CONFIG_STREAM(1), REQUEST_EVENT_STREAM(2), UTILIZATION_STREAM(3), GENERAL_DASHBOARD_STREAM(4); - - private final int typeId; - - EventStreamEnum(int typeId) { - this.typeId = typeId; - } - - public static EventStreamEnum findByTypeId(int typeId) { - return Arrays - .asList(EventStreamEnum.values()) - .stream() - .filter(t -> t.typeId == typeId) - .findAny() - .orElseThrow(() -> new IllegalStateException("no type id found for id => " + typeId)); - } - - public int getTypeId() { - return typeId; - } -} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java deleted file mode 100644 index 98e5b69b2..000000000 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandler.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.hystrix.contrib.reactivesocket; - -import io.reactivesocket.Payload; -import io.reactivesocket.RequestHandler; -import org.agrona.BitUtil; -import org.reactivestreams.Publisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import rx.Observable; -import rx.RxReactiveStreams; - -/** - * An implementation of {@link RequestHandler} that provides a Hystrix Stream. Takes an 32-bit integer in the {@link Payload} - * data of a ReactiveSocket {@link io.reactivesocket.Frame} which corresponds to an id in {@link EventStreamEnum}. If - * the id is found it will begin to stream the events to the subscriber. - */ -public class EventStreamRequestHandler extends RequestHandler { - private static final Logger logger = LoggerFactory.getLogger(EventStreamRequestHandler.class); - - @Override - public Publisher handleRequestResponse(Payload payload) { - Observable singleResponse = Observable.defer(() -> { - try { - int typeId = payload.getData().getInt(0); - EventStreamEnum eventStreamEnum = EventStreamEnum.findByTypeId(typeId); - EventStream eventStream = EventStream.getInstance(eventStreamEnum); - return eventStream.get().take(1); - } catch (Throwable t) { - logger.error(t.getMessage(), t); - return Observable.error(t); - } - }); - - return RxReactiveStreams.toPublisher(singleResponse); - } - - @Override - public Publisher handleRequestStream(Payload payload) { - Observable multiResponse = Observable.defer(() -> { - try { - int typeId = payload.getData().getInt(0); - int numRequested = payload.getData().getInt(BitUtil.SIZE_OF_INT); - EventStreamEnum eventStreamEnum = EventStreamEnum.findByTypeId(typeId); - EventStream eventStream = EventStream.getInstance(eventStreamEnum); - return eventStream.get().take(numRequested); - } catch (Throwable t) { - logger.error(t.getMessage(), t); - return Observable.error(t); - } - }); - - return RxReactiveStreams.toPublisher(multiResponse); - } - - @Override - public Publisher handleSubscription(Payload payload) { - Observable infiniteResponse = Observable - .defer(() -> { - try { - int typeId = payload.getData().getInt(0); - EventStreamEnum eventStreamEnum = EventStreamEnum.findByTypeId(typeId); - EventStream eventStream = EventStream.getInstance(eventStreamEnum); - return eventStream.get(); - } catch (Throwable t) { - logger.error(t.getMessage(), t); - return Observable.error(t); - } - }) - .onBackpressureDrop(); - - return RxReactiveStreams.toPublisher(infiniteResponse); - } - - @Override - public Publisher handleFireAndForget(Payload payload) { - return NO_FIRE_AND_FORGET_HANDLER.apply(payload); - } - - @Override - public Publisher handleChannel(Payload initialPayload, Publisher inputs) { - return NO_REQUEST_CHANNEL_HANDLER.apply(inputs); - } - - @Override - public Publisher handleMetadataPush(Payload payload) { - return NO_METADATA_PUSH_HANDLER.apply(payload); - } -} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/client/HystrixMetricsReactiveSocketClient.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/client/HystrixMetricsReactiveSocketClient.java deleted file mode 100644 index 1efd73f6e..000000000 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/main/java/com/netflix/hystrix/contrib/reactivesocket/client/HystrixMetricsReactiveSocketClient.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.hystrix.contrib.reactivesocket.client; - -import com.netflix.hystrix.contrib.reactivesocket.EventStreamEnum; -import io.netty.channel.nio.NioEventLoopGroup; -import io.reactivesocket.ConnectionSetupPayload; -import io.reactivesocket.DefaultReactiveSocket; -import io.reactivesocket.Frame; -import io.reactivesocket.Payload; -import io.reactivesocket.ReactiveSocket; -import io.reactivesocket.netty.tcp.client.ClientTcpDuplexConnection; -import org.agrona.BitUtil; -import org.reactivestreams.Publisher; -import rx.RxReactiveStreams; - -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; - -public class HystrixMetricsReactiveSocketClient { - - private final ReactiveSocket reactiveSocket; - - public HystrixMetricsReactiveSocketClient(String host, int port, NioEventLoopGroup eventLoopGroup) { - ClientTcpDuplexConnection duplexConnection = RxReactiveStreams.toObservable( - ClientTcpDuplexConnection.create(InetSocketAddress.createUnresolved(host, port), eventLoopGroup) - ).toBlocking().single(); - - this.reactiveSocket = DefaultReactiveSocket - .fromClientConnection(duplexConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8"), Throwable::printStackTrace); - } - - public void startAndWait() { - reactiveSocket.startAndWait(); - } - - public Publisher requestResponse(EventStreamEnum eventStreamEnum) { - return reactiveSocket.requestResponse(createPayload(eventStreamEnum)); - } - - public Publisher requestStream(EventStreamEnum eventStreamEnum, int numRequested) { - return reactiveSocket.requestStream(createPayload(eventStreamEnum, numRequested)); - } - - public Publisher requestSubscription(EventStreamEnum eventStreamEnum) { - return reactiveSocket.requestSubscription(createPayload(eventStreamEnum)); - } - - private static Payload createPayload(EventStreamEnum eventStreamEnum) { - return new Payload() { - @Override - public ByteBuffer getData() { - return ByteBuffer.allocate(BitUtil.SIZE_OF_INT) - .putInt(0, eventStreamEnum.getTypeId()); - } - - @Override - public ByteBuffer getMetadata() { - return Frame.NULL_BYTEBUFFER; - } - }; - } - - private static Payload createPayload(EventStreamEnum eventStreamEnum, int numRequested) { - return new Payload() { - @Override - public ByteBuffer getData() { - return ByteBuffer.allocate(BitUtil.SIZE_OF_INT * 2) - .putInt(0, eventStreamEnum.getTypeId()) - .putInt(BitUtil.SIZE_OF_INT, numRequested); - } - - @Override - public ByteBuffer getMetadata() { - return Frame.NULL_BYTEBUFFER; - } - }; - } -} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java deleted file mode 100644 index a1868fab9..000000000 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamRequestHandlerTest.java +++ /dev/null @@ -1,198 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.hystrix.contrib.reactivesocket; - - -import com.netflix.hystrix.HystrixCommand; -import com.netflix.hystrix.HystrixCommandGroupKey; -import io.reactivesocket.Frame; -import io.reactivesocket.Payload; -import org.agrona.BitUtil; -import org.junit.Assert; -import org.junit.Test; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import rx.schedulers.Schedulers; - -import java.nio.ByteBuffer; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -public class EventStreamRequestHandlerTest { - @Test(timeout = 10_000) - public void testEventStreamRequestN() throws Exception { - Payload payload = new Payload() { - @Override - public ByteBuffer getData() { - return ByteBuffer - .allocate(BitUtil.SIZE_OF_INT) - .putInt(EventStreamEnum.GENERAL_DASHBOARD_STREAM.getTypeId()); - } - - @Override - public ByteBuffer getMetadata() { - return Frame.NULL_BYTEBUFFER; - } - }; - - Schedulers - .io() - .createWorker() - .schedulePeriodically(() -> { - TestCommand testCommand = new TestCommand(); - testCommand.execute(); - }, 0, 1, TimeUnit.MILLISECONDS); - - CountDownLatch latch = new CountDownLatch(1); - CountDownLatch latch1 = new CountDownLatch(5); - CountDownLatch latch2 = new CountDownLatch(15); - - AtomicReference subscriptionAtomicReference = new AtomicReference<>(); - - EventStreamRequestHandler handler = new EventStreamRequestHandler(); - Publisher payloadPublisher = handler.handleSubscription(payload); - - payloadPublisher - .subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - subscriptionAtomicReference.set(s); - latch.countDown(); - } - - @Override - public void onNext(Payload payload) { - ByteBuffer data = payload.getData(); - //String s = new String(data.array()); - //System.out.println(s); - - latch1.countDown(); - latch2.countDown(); - } - - @Override - public void onError(Throwable t) { - - } - - @Override - public void onComplete() { - - } - }); - - latch.await(); - - Subscription subscription = subscriptionAtomicReference.get(); - subscription.request(5); - - latch1.await(); - - long count = latch2.getCount(); - Assert.assertTrue(count < 15); - - subscription.request(100); - - latch2.await(); - - } - - @Test(timeout = 10_000) - public void testEventStreamFireHose() throws Exception { - Payload payload = new Payload() { - @Override - public ByteBuffer getData() { - return ByteBuffer - .allocate(BitUtil.SIZE_OF_INT) - .putInt(EventStreamEnum.GENERAL_DASHBOARD_STREAM.getTypeId()); - } - - @Override - public ByteBuffer getMetadata() { - return Frame.NULL_BYTEBUFFER; - } - }; - - Schedulers - .io() - .createWorker() - .schedulePeriodically(() -> { - TestCommand testCommand = new TestCommand(); - testCommand.execute(); - }, 0, 1, TimeUnit.MILLISECONDS); - - CountDownLatch latch = new CountDownLatch(1); - CountDownLatch latch1 = new CountDownLatch(15); - - AtomicReference subscriptionAtomicReference = new AtomicReference<>(); - - EventStreamRequestHandler handler = new EventStreamRequestHandler(); - Publisher payloadPublisher = handler.handleSubscription(payload); - - AtomicInteger i = new AtomicInteger(0); - - payloadPublisher - .subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - subscriptionAtomicReference.set(s); - latch.countDown(); - } - - @Override - public void onNext(Payload payload) { - ByteBuffer data = payload.getData(); - //String s = new String(data.array()); - //System.out.println(System.currentTimeMillis() + " : " + i.incrementAndGet()); - - latch1.countDown(); - } - - @Override - public void onError(Throwable t) { - - } - - @Override - public void onComplete() { - - } - }); - - latch.await(); - - Subscription subscription = subscriptionAtomicReference.get(); - subscription.request(Long.MAX_VALUE); - - latch1.await(); - - - } - - class TestCommand extends HystrixCommand { - protected TestCommand() { - super(HystrixCommandGroupKey.Factory.asKey("HystrixMetricsPollerTest")); - } - - @Override - protected Boolean run() throws Exception { - return true; - } - } -} \ No newline at end of file diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamTest.java deleted file mode 100644 index bd9f89b89..000000000 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/EventStreamTest.java +++ /dev/null @@ -1,583 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.hystrix.contrib.reactivesocket; - -import com.netflix.hystrix.HystrixCommand; -import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; -import io.reactivesocket.Payload; -import org.agrona.BitUtil; -import org.junit.Before; -import org.junit.Test; -import rx.Observable; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Func2; -import rx.schedulers.Schedulers; - -import java.nio.ByteBuffer; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.Assert.*; - -public class EventStreamTest extends HystrixStreamTest { - - EventStream stream; - - @Before - public void init() { - stream = new EventStream(Observable.interval(10, TimeUnit.MILLISECONDS) - .map(ts -> { - Payload p = new Payload() { - @Override - public ByteBuffer getData() { - return ByteBuffer.allocate(BitUtil.SIZE_OF_INT * 2) - .putInt(0, 1) - .putInt(BitUtil.SIZE_OF_INT, 2); - } - - @Override - public ByteBuffer getMetadata() { - return null; - } - }; - - return p; - - }) - ); - } - - @Test - public void testConfigStreamHasData() throws Exception { - final AtomicBoolean hasBytes = new AtomicBoolean(false); - CountDownLatch latch = new CountDownLatch(1); - final int NUM = 2; - - EventStream.getInstance(EventStreamEnum.CONFIG_STREAM).get() - .take(NUM) - .subscribe(new Subscriber() { - @Override - public void onCompleted() { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Config OnCompleted"); - latch.countDown(); - } - - @Override - public void onError(Throwable e) { - e.printStackTrace(); - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Config OnError : " + e); - latch.countDown(); - } - - @Override - public void onNext(Payload payload) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Config OnNext w bytes : " + payload.getData().remaining()); - if (payload.getData().remaining() > 0) { - hasBytes.set(true); - } - } - }); - - for (int i = 0; i < NUM; i++) { - HystrixCommand cmd = new SyntheticBlockingCommand(); - cmd.execute(); - } - - assertTrue(latch.await(10000, TimeUnit.MILLISECONDS)); - assertTrue(hasBytes.get()); - } - - @Test - public void testUtilizationStreamHasData() throws Exception { - final AtomicBoolean hasBytes = new AtomicBoolean(false); - CountDownLatch latch = new CountDownLatch(1); - final int NUM = 10; - - EventStream.getInstance(EventStreamEnum.UTILIZATION_STREAM).get() - .take(NUM) - .subscribe(new Subscriber() { - @Override - public void onCompleted() { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Utilization OnCompleted"); - latch.countDown(); - } - - @Override - public void onError(Throwable e) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Utilization OnError : " + e); - latch.countDown(); - } - - @Override - public void onNext(Payload payload) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Utilization OnNext w bytes : " + payload.getData().remaining()); - if (payload.getData().remaining() > 0) { - hasBytes.set(true); - } - } - }); - - for (int i = 0; i < NUM; i++) { - HystrixCommand cmd = new SyntheticBlockingCommand(); - cmd.execute(); - } - - assertTrue(latch.await(10000, TimeUnit.MILLISECONDS)); - assertTrue(hasBytes.get()); - } - - @Test - public void testRequestEventStreamHasData() throws Exception { - final AtomicBoolean hasBytes = new AtomicBoolean(false); - CountDownLatch latch = new CountDownLatch(1); - final int NUM = 10; - - EventStream.getInstance(EventStreamEnum.REQUEST_EVENT_STREAM).get() - .take(NUM) - .subscribe(new Subscriber() { - @Override - public void onCompleted() { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Request Event OnCompleted"); - latch.countDown(); - } - - @Override - public void onError(Throwable e) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Request Event OnError : " + e); - latch.countDown(); - } - - @Override - public void onNext(Payload payload) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Request Event OnNext w bytes : " + payload.getData().remaining()); - if (payload.getData().remaining() > 0) { - hasBytes.set(true); - } - } - }); - - for (int i = 0; i < NUM; i++) { - HystrixRequestContext requestContext = HystrixRequestContext.initializeContext(); - HystrixCommand cmd = new SyntheticBlockingCommand(); - cmd.execute(); - requestContext.close(); - } - - assertTrue(latch.await(10000, TimeUnit.MILLISECONDS)); - assertTrue(hasBytes.get()); - } - - @Test - public void testDashboardStreamHasData() throws Exception { - final AtomicBoolean hasBytes = new AtomicBoolean(false); - CountDownLatch latch = new CountDownLatch(1); - final int NUM = 10; - - EventStream.getInstance(EventStreamEnum.GENERAL_DASHBOARD_STREAM).get() - .take(NUM) - .subscribe(new Subscriber() { - @Override - public void onCompleted() { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Dashboard OnCompleted"); - latch.countDown(); - } - - @Override - public void onError(Throwable e) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Dashboard OnError : " + e); - latch.countDown(); - } - - @Override - public void onNext(Payload payload) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Dashboard OnNext w bytes : " + payload.getData().remaining()); - if (payload.getData().remaining() > 0) { - hasBytes.set(true); - } - } - }); - - for (int i = 0; i < NUM; i++) { - HystrixCommand cmd = new SyntheticBlockingCommand(); - cmd.execute(); - } - - assertTrue(latch.await(10000, TimeUnit.MILLISECONDS)); - assertTrue(hasBytes.get()); - } - - @Test - public void testSharedSourceStream() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - AtomicBoolean allEqual = new AtomicBoolean(false); - - Observable o1 = stream - .get() - .take(10) - .observeOn(Schedulers.computation()); - - Observable o2 = stream - .get() - .take(10) - .observeOn(Schedulers.computation()); - - Observable zipped = Observable.zip(o1, o2, Payload::equals); - Observable reduced = zipped.reduce(true, (b1, b2) -> b1 && b2); - - reduced.subscribe(new Subscriber() { - @Override - public void onCompleted() { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Reduced OnCompleted"); - latch.countDown(); - } - - @Override - public void onError(Throwable e) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Reduced OnError : " + e); - e.printStackTrace(); - latch.countDown(); - } - - @Override - public void onNext(Boolean b) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Reduced OnNext : " + b); - allEqual.set(b); - } - }); - - for (int i = 0; i < 10; i++) { - HystrixCommand cmd = new SyntheticBlockingCommand(); - cmd.execute(); - } - - assertTrue(latch.await(10000, TimeUnit.MILLISECONDS)); - assertTrue(allEqual.get()); - //we should be getting the same object from both streams. this ensures that multiple subscribers don't induce extra work - } - - @Test - public void testTwoSubscribersOneUnsubscribes() throws Exception { - CountDownLatch latch1 = new CountDownLatch(1); - CountDownLatch latch2 = new CountDownLatch(1); - AtomicInteger payloads1 = new AtomicInteger(0); - AtomicInteger payloads2 = new AtomicInteger(0); - - Subscription s1 = stream - .get() - .take(100) - .observeOn(Schedulers.computation()) - .doOnUnsubscribe(latch1::countDown) - .subscribe(new Subscriber() { - @Override - public void onCompleted() { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnCompleted"); - latch1.countDown(); - } - - @Override - public void onError(Throwable e) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnError : " + e); - latch1.countDown(); - } - - @Override - public void onNext(Payload payload) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnNext : " + payload.getData().remaining()); - payloads1.incrementAndGet(); - } - }); - - Subscription s2 = stream - .get() - .take(100) - .observeOn(Schedulers.computation()) - .doOnUnsubscribe(latch2::countDown) - .subscribe(new Subscriber() { - @Override - public void onCompleted() { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnCompleted"); - latch2.countDown(); - } - - @Override - public void onError(Throwable e) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnError : " + e); - latch2.countDown(); - } - - @Override - public void onNext(Payload payload) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnNext : " + payload.getData().remaining() + " : " + payloads2.get()); - payloads2.incrementAndGet(); - } - }); - //execute 1 command, then unsubscribe from first stream. then execute the rest - for (int i = 0; i < 5; i++) { - HystrixCommand cmd = new SyntheticBlockingCommand(); - cmd.execute(); - if (i == 1) { - s1.unsubscribe(); - } - } - assertTrue(stream.isSourceCurrentlySubscribed()); //only 1/2 subscriptions has been cancelled - - assertTrue(latch1.await(10000, TimeUnit.MILLISECONDS)); - assertTrue(latch2.await(10000, TimeUnit.MILLISECONDS)); - System.out.println("s1 got : " + payloads1.get() + ", s2 got : " + payloads2.get()); - assertTrue("s1 got data", payloads1.get() > 0); - assertTrue("s2 got data", payloads2.get() > 0); - assertTrue("s1 got less data than s2", payloads2.get() > payloads1.get()); - } - - @Test - public void testTwoSubscribersBothUnsubscribe() throws Exception { - CountDownLatch latch1 = new CountDownLatch(1); - CountDownLatch latch2 = new CountDownLatch(1); - AtomicInteger payloads1 = new AtomicInteger(0); - AtomicInteger payloads2 = new AtomicInteger(0); - - Subscription s1 = stream - .get() - .take(100) - .observeOn(Schedulers.computation()) - .doOnUnsubscribe(latch1::countDown) - .subscribe(new Subscriber() { - @Override - public void onCompleted() { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnCompleted"); - latch1.countDown(); - } - - @Override - public void onError(Throwable e) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnError : " + e); - latch1.countDown(); - } - - @Override - public void onNext(Payload payload) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnNext : " + payload.getData().remaining()); - payloads1.incrementAndGet(); - } - }); - - Subscription s2 = stream - .get() - .take(100) - .observeOn(Schedulers.computation()) - .doOnUnsubscribe(latch2::countDown) - .subscribe(new Subscriber() { - @Override - public void onCompleted() { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnCompleted"); - latch2.countDown(); - } - - @Override - public void onError(Throwable e) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnError : " + e); - latch2.countDown(); - } - - @Override - public void onNext(Payload payload) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnNext : " + payload.getData().remaining()); - payloads2.incrementAndGet(); - } - }); - //execute 1 command, then unsubscribe from both streams. then execute the rest - for (int i = 0; i < 5; i++) { - HystrixCommand cmd = new SyntheticBlockingCommand(); - cmd.execute(); - if (i == 1) { - s1.unsubscribe(); - s2.unsubscribe(); - } - } - assertFalse(stream.isSourceCurrentlySubscribed()); //both subscriptions have been cancelled - - System.out.println("s1 got : " + payloads1.get() + ", s2 got : " + payloads2.get()); - assertTrue(latch1.await(10000, TimeUnit.MILLISECONDS)); - assertTrue(latch2.await(10000, TimeUnit.MILLISECONDS)); - assertTrue("s1 got data", payloads1.get() > 0); - assertTrue("s2 got data", payloads2.get() > 0); - } - - @Test - public void testTwoSubscribersBothUnsubscribeThenNewSubscriber() throws Exception { - CountDownLatch latch1 = new CountDownLatch(1); - CountDownLatch latch2 = new CountDownLatch(1); - AtomicInteger payloads1 = new AtomicInteger(0); - AtomicInteger payloads2 = new AtomicInteger(0); - - Subscription s1 = stream - .get() - .take(100) - .observeOn(Schedulers.computation()) - .doOnUnsubscribe(latch1::countDown) - .subscribe(new Subscriber() { - @Override - public void onCompleted() { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnCompleted"); - latch1.countDown(); - } - - @Override - public void onError(Throwable e) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnError : " + e); - latch1.countDown(); - } - - @Override - public void onNext(Payload payload) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 1 OnNext : " + payload.getData().remaining()); - payloads1.incrementAndGet(); - } - }); - - Subscription s2 = stream - .get() - .take(100) - .observeOn(Schedulers.computation()) - .doOnUnsubscribe(latch2::countDown) - .subscribe(new Subscriber() { - @Override - public void onCompleted() { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnCompleted"); - latch2.countDown(); - } - - @Override - public void onError(Throwable e) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnError : " + e); - latch2.countDown(); - } - - @Override - public void onNext(Payload payload) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 2 OnNext : " + payload.getData().remaining()); - payloads2.incrementAndGet(); - } - }); - //execute 1 command, then unsubscribe from both streams. then execute the rest - for (int i = 0; i < 5; i++) { - HystrixCommand cmd = new SyntheticBlockingCommand(); - cmd.execute(); - if (i == 1) { - s1.unsubscribe(); - s2.unsubscribe(); - } - } - - System.out.println("s1 got : " + payloads1.get() + ", s2 got : " + payloads2.get()); - assertTrue(latch1.await(10000, TimeUnit.MILLISECONDS)); - assertTrue(latch2.await(10000, TimeUnit.MILLISECONDS)); - assertTrue("s1 got data", payloads1.get() > 0); - assertTrue("s2 got data", payloads2.get() > 0); - - final int NUM_DATA_REQUESTED = 100; - CountDownLatch latch3 = new CountDownLatch(1); - AtomicInteger payloads3 = new AtomicInteger(0); - Subscription s3 = stream - .get() - .take(NUM_DATA_REQUESTED) - .observeOn(Schedulers.computation()) - .doOnUnsubscribe(latch3::countDown) - .subscribe(new Subscriber() { - @Override - public void onCompleted() { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 3 OnCompleted"); - latch3.countDown(); - } - - @Override - public void onError(Throwable e) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 3 OnError : " + e); - latch3.countDown(); - } - - @Override - public void onNext(Payload payload) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : Dashboard 3 OnNext : " + payload.getData().remaining()); - payloads3.incrementAndGet(); - } - }); - - assertTrue(stream.isSourceCurrentlySubscribed()); //should be doing work when re-subscribed - - assertTrue(latch3.await(10000, TimeUnit.MILLISECONDS)); - assertEquals(NUM_DATA_REQUESTED, payloads3.get()); - } - - @Test - public void testTwoSubscribersOneSlow() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - AtomicBoolean foundError = new AtomicBoolean(false); - - Observable fast = stream - .get() - .observeOn(Schedulers.newThread()); - Observable slow = stream - .get() - .observeOn(Schedulers.newThread()) - .map(n -> { - try { - System.out.println("Sleeping on thread : " + Thread.currentThread().getName()); - Thread.sleep(100); - return n; - } catch (InterruptedException ex) { - return n; - } - }); - - Observable checkZippedEqual = Observable.zip(fast, slow, (payload, payload2) -> payload == payload2); - - Subscription s1 = checkZippedEqual - .take(10000) - .subscribe(new Subscriber() { - @Override - public void onCompleted() { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnCompleted"); - latch.countDown(); - } - - @Override - public void onError(Throwable e) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnError : " + e); - e.printStackTrace(); - foundError.set(true); - latch.countDown(); - } - - @Override - public void onNext(Boolean b) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnNext : " + b); - } - }); - - for (int i = 0; i < 50; i++) { - HystrixCommand cmd = new SyntheticBlockingCommand(); - cmd.execute(); - } - - latch.await(5000, TimeUnit.MILLISECONDS); - assertFalse(foundError.get()); - } -} diff --git a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/HystrixStreamTest.java b/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/HystrixStreamTest.java deleted file mode 100644 index efea48c90..000000000 --- a/hystrix-contrib/hystrix-reactivesocket-event-stream/src/test/java/com/netflix/hystrix/contrib/reactivesocket/HystrixStreamTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.hystrix.contrib.reactivesocket; - -import com.netflix.hystrix.HystrixCommand; -import com.netflix.hystrix.HystrixCommandGroupKey; -import com.netflix.hystrix.HystrixObservableCommand; -import rx.Observable; -import rx.schedulers.Schedulers; - -import java.util.Random; - -public class HystrixStreamTest { - - static final Random r = new Random(); - - protected static class SyntheticBlockingCommand extends HystrixCommand { - - public SyntheticBlockingCommand() { - super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("UNITTEST"))); - } - - @Override - protected Integer run() throws Exception { - int n = r.nextInt(100); - Thread.sleep(n); - return n; - } - } - - protected static class SyntheticAsyncCommand extends HystrixObservableCommand { - - public SyntheticAsyncCommand() { - super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("UNITTEST"))); - } - - @Override - protected Observable construct() { - return Observable.defer(() -> { - try { - int n = r.nextInt(100); - Thread.sleep(n); - return Observable.just(n); - } catch (InterruptedException ex) { - return Observable.error(ex); - } - }).subscribeOn(Schedulers.io()); - } - } -} diff --git a/hystrix-examples-reactivesocket/client/build.gradle b/hystrix-examples-reactivesocket/client/build.gradle deleted file mode 100644 index 169f69bc9..000000000 --- a/hystrix-examples-reactivesocket/client/build.gradle +++ /dev/null @@ -1,18 +0,0 @@ -apply plugin: 'application' - -mainClassName = 'com.netflix.hystrix.examples.reactivesocket.HystrixMetricsReactiveSocketClientRunner' -applicationDefaultJvmArgs = ["-Dio.netty.leakDetectionLevel=paranoid"] - -sourceCompatibility = JavaVersion.VERSION_1_8 -targetCompatibility = JavaVersion.VERSION_1_8 - -repositories { - maven { url 'https://dl.bintray.com/reactivesocket/ReactiveSocket' } -} - -dependencies { - compile project(':hystrix-core') - compile project(':hystrix-examples') - compile project(':hystrix-reactivesocket-event-stream') -} - diff --git a/hystrix-examples-reactivesocket/client/src/main/java/com/netflix/hystrix/examples/reactivesocket/HystrixMetricsReactiveSocketClientRunner.java b/hystrix-examples-reactivesocket/client/src/main/java/com/netflix/hystrix/examples/reactivesocket/HystrixMetricsReactiveSocketClientRunner.java deleted file mode 100644 index 7b24a1bff..000000000 --- a/hystrix-examples-reactivesocket/client/src/main/java/com/netflix/hystrix/examples/reactivesocket/HystrixMetricsReactiveSocketClientRunner.java +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.hystrix.examples.reactivesocket; - -import com.netflix.hystrix.HystrixCommandKey; -import com.netflix.hystrix.HystrixThreadPoolKey; -import com.netflix.hystrix.config.HystrixCommandConfiguration; -import com.netflix.hystrix.config.HystrixConfiguration; -import com.netflix.hystrix.config.HystrixThreadPoolConfiguration; -import com.netflix.hystrix.contrib.reactivesocket.EventStreamEnum; -import com.netflix.hystrix.contrib.reactivesocket.client.HystrixMetricsReactiveSocketClient; -import com.netflix.hystrix.metric.sample.HystrixCommandUtilization; -import com.netflix.hystrix.metric.sample.HystrixUtilization; -import com.netflix.hystrix.serial.SerialHystrixConfiguration; -import com.netflix.hystrix.serial.SerialHystrixMetric; -import com.netflix.hystrix.serial.SerialHystrixUtilization; -import io.netty.channel.nio.NioEventLoopGroup; -import io.reactivesocket.Payload; -import org.reactivestreams.Publisher; -import rx.Observable; -import rx.RxReactiveStreams; -import rx.Subscriber; -import rx.Subscription; - -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -public class HystrixMetricsReactiveSocketClientRunner { - public static void main(String[] args) throws InterruptedException { - System.out.println("Starting HystrixMetricsReactiveSocketClient..."); - - HystrixMetricsReactiveSocketClient client = new HystrixMetricsReactiveSocketClient("127.0.0.1", 8025, new NioEventLoopGroup()); - client.startAndWait(); - - final EventStreamEnum eventStreamEnum = EventStreamEnum.REQUEST_EVENT_STREAM; - - //Publisher publisher = client.requestResponse(eventStreamEnum); - Publisher publisher = client.requestStream(eventStreamEnum, 10); - //Publisher publisher = client.requestSubscription(eventStreamEnum); - Observable o = RxReactiveStreams.toObservable(publisher); - - final CountDownLatch latch = new CountDownLatch(1); - - Subscription s = o.subscribe(new Subscriber() { - @Override - public void onCompleted() { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnCompleted"); - latch.countDown(); - } - - @Override - public void onError(Throwable e) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnError : " + e); - e.printStackTrace(); - latch.countDown(); - } - - @Override - public void onNext(Payload payload) { - final StringBuilder bldr = new StringBuilder(); - - switch (eventStreamEnum) { - case UTILIZATION_STREAM: - HystrixUtilization u = SerialHystrixUtilization.fromByteBuffer(payload.getData()); - bldr.append("CommandUtil["); - for (Map.Entry entry: u.getCommandUtilizationMap().entrySet()) { - bldr.append(entry.getKey().name()) - .append(" -> ") - .append(entry.getValue().getConcurrentCommandCount()) - .append(", "); - } - bldr.append("]"); - break; - case CONFIG_STREAM: - HystrixConfiguration config = SerialHystrixConfiguration.fromByteBuffer(payload.getData()); - bldr.append("CommandConfig["); - for (Map.Entry entry: config.getCommandConfig().entrySet()) { - bldr.append(entry.getKey().name()) - .append(" -> ") - .append(entry.getValue().getExecutionConfig().getIsolationStrategy().name()) - .append(", "); - } - bldr.append("] ThreadPoolConfig["); - for (Map.Entry entry: config.getThreadPoolConfig().entrySet()) { - bldr.append(entry.getKey().name()) - .append(" -> ") - .append(entry.getValue().getCoreSize()) - .append(", "); - } - bldr.append("]"); - break; - case REQUEST_EVENT_STREAM: - String requestEvents = SerialHystrixMetric.fromByteBufferToString(payload.getData()); - bldr.append("RequestEvents : ").append(requestEvents); - break; - case GENERAL_DASHBOARD_STREAM: - String dashboardData = SerialHystrixMetric.fromByteBufferToString(payload.getData()); - bldr.append("Summary : ").append(dashboardData); - break; - default: - throw new RuntimeException("don't have a way to convert from " + eventStreamEnum + " to string yet"); - } - - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnNext : " + bldr.toString()); - } - }); - - if (!latch.await(10000, TimeUnit.MILLISECONDS)) { - System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " Unsubscribing - never received a terminal!"); - s.unsubscribe(); - } - System.exit(0); - } -} diff --git a/hystrix-examples-reactivesocket/server/build.gradle b/hystrix-examples-reactivesocket/server/build.gradle deleted file mode 100644 index 0025f125f..000000000 --- a/hystrix-examples-reactivesocket/server/build.gradle +++ /dev/null @@ -1,20 +0,0 @@ -apply plugin: 'application' - -mainClassName = 'com.netflix.hystrix.examples.reactivesocket.HystrixMetricsReactiveSocketServer' - -applicationDefaultJvmArgs = ["-Dio.netty.leakDetectionLevel=paranoid"] - -sourceCompatibility = JavaVersion.VERSION_1_8 -targetCompatibility = JavaVersion.VERSION_1_8 - -repositories { - maven { url 'https://dl.bintray.com/reactivesocket/ReactiveSocket' } -} - -dependencies { - compile project(':hystrix-core') - compile project(':hystrix-examples') - compile project(':hystrix-reactivesocket-event-stream') - compile 'io.reactivesocket:reactivesocket-netty:0.1.9' -} - diff --git a/hystrix-examples-reactivesocket/server/src/main/java/com/netflix/hystrix/examples/reactivesocket/HystrixMetricsReactiveSocketServer.java b/hystrix-examples-reactivesocket/server/src/main/java/com/netflix/hystrix/examples/reactivesocket/HystrixMetricsReactiveSocketServer.java deleted file mode 100644 index b23ee9186..000000000 --- a/hystrix-examples-reactivesocket/server/src/main/java/com/netflix/hystrix/examples/reactivesocket/HystrixMetricsReactiveSocketServer.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netflix.hystrix.examples.reactivesocket; - -import com.netflix.hystrix.contrib.reactivesocket.EventStreamRequestHandler; -import com.netflix.hystrix.examples.demo.HystrixCommandAsyncDemo; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; -import io.reactivesocket.netty.tcp.server.ReactiveSocketServerHandler; - -public class HystrixMetricsReactiveSocketServer { - - public static void main(String[] args) throws Exception { - System.out.println("Starting HystrixMetricsReactiveSocketServer..."); - - final ReactiveSocketServerHandler handler = ReactiveSocketServerHandler.create((setupPayload, rs) -> - new EventStreamRequestHandler()); - - EventLoopGroup bossGroup = new NioEventLoopGroup(1); - EventLoopGroup workerGroup = new NioEventLoopGroup(); - - try { - ServerBootstrap b = new ServerBootstrap(); - b.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .handler(new LoggingHandler(LogLevel.INFO)) - .childHandler(new ChannelInitializer() { - @Override - protected void initChannel(Channel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast(handler); - } - }); - Channel localhost = b.bind("127.0.0.1", 8025).sync().channel(); - - executeCommands(); - localhost.closeFuture().sync(); - } finally { - bossGroup.shutdownGracefully(); - workerGroup.shutdownGracefully(); - } - } - - private static void executeCommands() { - new HystrixCommandAsyncDemo().startDemo(false); - } -} diff --git a/settings.gradle b/settings.gradle index d1eb451d8..263f62612 100644 --- a/settings.gradle +++ b/settings.gradle @@ -2,8 +2,6 @@ rootProject.name='hystrix' include 'hystrix-core', \ 'hystrix-examples', \ 'hystrix-examples-webapp', \ -'hystrix-examples-reactivesocket/client', \ -'hystrix-examples-reactivesocket/server', \ 'hystrix-contrib/hystrix-clj', \ 'hystrix-contrib/hystrix-request-servlet', \ 'hystrix-contrib/hystrix-servo-metrics-publisher', \ @@ -12,21 +10,17 @@ include 'hystrix-core', \ 'hystrix-contrib/hystrix-codahale-metrics-publisher', \ 'hystrix-contrib/hystrix-yammer-metrics-publisher', \ 'hystrix-contrib/hystrix-network-auditor-agent', \ -'hystrix-contrib/hystrix-reactivesocket-event-stream', \ 'hystrix-contrib/hystrix-javanica', \ 'hystrix-contrib/hystrix-junit', \ 'hystrix-dashboard', \ 'hystrix-serialization' -project(':hystrix-examples-reactivesocket/client').name = 'hystrix-examples-reactivesocket-client' -project(':hystrix-examples-reactivesocket/server').name = 'hystrix-examples-reactivesocket-server' project(':hystrix-contrib/hystrix-clj').name = 'hystrix-clj' project(':hystrix-contrib/hystrix-request-servlet').name = 'hystrix-request-servlet' project(':hystrix-contrib/hystrix-servo-metrics-publisher').name = 'hystrix-servo-metrics-publisher' project(':hystrix-contrib/hystrix-metrics-event-stream').name = 'hystrix-metrics-event-stream' project(':hystrix-contrib/hystrix-rx-netty-metrics-stream').name = 'hystrix-rx-netty-metrics-stream' project(':hystrix-contrib/hystrix-codahale-metrics-publisher').name = 'hystrix-codahale-metrics-publisher' -project(':hystrix-contrib/hystrix-reactivesocket-event-stream').name = 'hystrix-reactivesocket-event-stream' project(':hystrix-contrib/hystrix-yammer-metrics-publisher').name = 'hystrix-yammer-metrics-publisher' project(':hystrix-contrib/hystrix-network-auditor-agent').name = 'hystrix-network-auditor-agent' project(':hystrix-contrib/hystrix-javanica').name = 'hystrix-javanica'