From c17caa1d9065058a9212bc634726b0be00784030 Mon Sep 17 00:00:00 2001 From: Richard Achmatowicz Date: Mon, 4 Jan 2021 15:21:24 -0500 Subject: [PATCH] [WFDESC-37] Adjust takeService() method to use (default) timeout value specified in constructor. --- pom.xml | 2 +- .../java/org/wildfly/discovery/Discovery.java | 34 +- .../discovery/DiscoveryTimeoutTestCase.java | 317 ++++++++++++++++++ .../wildfly/discovery/FilterSpecTestCase.java | 11 +- 4 files changed, 358 insertions(+), 6 deletions(-) create mode 100644 src/test/java/org/wildfly/discovery/DiscoveryTimeoutTestCase.java diff --git a/pom.xml b/pom.xml index f8d6abf..d2d74c1 100644 --- a/pom.xml +++ b/pom.xml @@ -104,7 +104,7 @@ true - false + true false diff --git a/src/main/java/org/wildfly/discovery/Discovery.java b/src/main/java/org/wildfly/discovery/Discovery.java index a6e4898..07e9f04 100644 --- a/src/main/java/org/wildfly/discovery/Discovery.java +++ b/src/main/java/org/wildfly/discovery/Discovery.java @@ -225,6 +225,15 @@ static final class BlockingQueueServicesQueue implements ServicesQueue { this(queue, problems, request, Long.MAX_VALUE, TimeUnit.DAYS); } + /** + * Create a BlockingQueueServicesQueue which makes use of a default timeout value for all calls to takeService() + * + * @param queue the underlying BlockingQueue used to store ServiceURLs + * @param problems the underlying List used to store Throwables encountered by the DiscoveryProvider while generating ServiceURLs + * @param request a DiscoveryRequest instance used for cancellation of the Discovery call + * @param time a default timeout value for calls to takeService() + * @param timeUnit the timout TimeUnit + */ BlockingQueueServicesQueue(final LinkedBlockingQueue queue, final CopyOnWriteArrayList problems, final DiscoveryRequest request, final long time, final TimeUnit timeUnit) { this.queue = queue; this.problems = problems; @@ -275,13 +284,33 @@ public ServiceURL pollService() { } } + /** + * Take a ServiceURL from the queue, using the timeout value specified in the constructor. + * + * @return the next ServiceURL in the queue (or null if the call times out) + * @throws InterruptedException + */ public ServiceURL takeService() throws InterruptedException { - await(); + // timeout <= 0 interpreted as block indefinitely + if (this.timeout <= 0) { + await(Long.MAX_VALUE, this.timeUnit); + } else { + await(this.timeout, this.timeUnit); + } return pollService(); } + /** + * Take a ServiceURL from the queue, using the timeout value specified in te method call. + * + * @param timeout + * @param timeUnit + * @return the next ServiceURL in the queue (or null if the call times out) + * @throws InterruptedException + */ @Override public ServiceURL takeService(long timeout, TimeUnit timeUnit) throws InterruptedException { + // timeout <= interpreted as block indefinitely if (timeout <= 0) timeout = Long.MAX_VALUE; await(timeout, timeUnit); return pollService(); @@ -291,6 +320,9 @@ public boolean isFinished() { return next == null && done; } + /** + * Close the queue, cancelling the ongoing request if required. + */ public void close() { if (! isFinished()) { request.cancel(); diff --git a/src/test/java/org/wildfly/discovery/DiscoveryTimeoutTestCase.java b/src/test/java/org/wildfly/discovery/DiscoveryTimeoutTestCase.java new file mode 100644 index 0000000..12ea5d6 --- /dev/null +++ b/src/test/java/org/wildfly/discovery/DiscoveryTimeoutTestCase.java @@ -0,0 +1,317 @@ +package org.wildfly.discovery; + +import org.jboss.logging.Logger; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.wildfly.discovery.spi.DiscoveryProvider; +import org.wildfly.discovery.spi.DiscoveryRequest; +import org.wildfly.discovery.spi.DiscoveryResult; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.Assert.assertEquals; + +/** + * Tests timeout functionality of calls to discovery + * + * @author Richard Achmatowicz + */ +public final class DiscoveryTimeoutTestCase { + + private static final Logger logger = Logger.getLogger(DiscoveryTimeoutTestCase.class); + + private static Discovery discoveryInstance1 = null; + private static Discovery discoveryInstance2 = null; + + private static int PROVIDER_URL_GENERATION_DELAY = 5 * 1000; + + /** + * Do any general setup here. + * + * @throws Exception + */ + @BeforeClass + public static void setup() throws Exception { + + List list = new ArrayList(); + + // add some Service URLs + AttributeValuePair clusterPair = new AttributeValuePair("cluster","c"); + ServiceURL cluster = buildAttributeServiceURL(clusterPair); + list.add(cluster); + + AttributeValuePair modulePair = new AttributeValuePair("module","m"); + ServiceURL module = buildAttributeServiceURL(modulePair); + list.add(module); + + ServiceURL combo = buildAttributeServiceURL(clusterPair, modulePair); + list.add(combo); + + // create a Discovery instance using a DiscoveryProvider that delays generation of ServiceURLs + // we need one statically defined provider per test :-( + discoveryInstance1 = Discovery.create(new DelayedDiscoveryProvider(list, PROVIDER_URL_GENERATION_DELAY)); + discoveryInstance2 = Discovery.create(new DelayedDiscoveryProvider(list, PROVIDER_URL_GENERATION_DELAY)); + } + + /** + * Do any test specific setup here + */ + @Before + public void setupTest() { + } + + /** + * Test accessing the queue when discovery instance is initialised with a timeout. + * Tests the case where: + * - takeService() is called before any ServiceURLs are ready, times out and returns null + * - takeService() is called when a ServiceURL is ready, does not time out and returns a value + */ + @Test + public void testDiscoveryWithConstructorTimeout() { + + FilterSpec cluster = FilterSpec.equal("cluster","c"); + FilterSpec module = FilterSpec.equal("module","m"); + FilterSpec all = FilterSpec.all(cluster,module); + + // call discovery for single attribute + logger.info("Calling discover for filterspec " + all); + try { + // get the queue containing the discovery results + final ServicesQueue servicesQueue = discover(discoveryInstance1, all, 1000, MILLISECONDS); + + // now start the consumer of the ServiceURLs + Thread consumer = new Thread(new Runnable() { + @Override + public void run() { + try { + // take a serviceURL immediately - should not be available for 5 seconds + logger.info("Calling takeService()"); + ServiceURL serviceURL = servicesQueue.takeService(); + Assert.assertNull("ServiceURL should be null due to timeout on method!", serviceURL); + + // take a second serviceURL after 5 seconds - should be available + try { + logger.info("Sleeping to allow ServiceURLs to be generated..."); + Thread.sleep(PROVIDER_URL_GENERATION_DELAY); + } catch (InterruptedException ie) { + // noop + } + logger.info("Calling takeService()"); + serviceURL = servicesQueue.takeService(); + Assert.assertNotNull("ServiceURL should be non-null due to delay!", serviceURL); + + // now drain the queue, we hae done our test + while ((serviceURL = servicesQueue.takeService(Long.MAX_VALUE, MILLISECONDS)) != null) { + logger.info("while draining, found match: " + serviceURL.toString()); + } + } catch(InterruptedException ie) { + Assert.fail("Discovery was interrupted ..."); + } + } + }); + consumer.start(); + consumer.join(); + + } catch(InterruptedException ie) { + logger.info("consumer thread was interrupted!"); + } + } + + /** + * Test accessing the queue when discovery instance is initialised with a timeout. + * Tests the case where: + * - takeService(t, tu) is called before any ServiceURLs are ready, waits long enough and returns a serviceURL + * - takeService(t, tu) is called when a ServiceURL is ready, does not time out and returns a value + */ + @Test + public void testDiscoveryWithMethodTimeout() { + + FilterSpec cluster = FilterSpec.equal("cluster","c"); + FilterSpec module = FilterSpec.equal("module","m"); + FilterSpec all = FilterSpec.any(cluster,module); + + // call discovery for single attribute + logger.info("Calling discover for filterspec " + all); + try { + // get the queue containing the discovery results + final ServicesQueue servicesQueue = discover(discoveryInstance2, all); + + // now start the consumer of the ServiceURLs + Thread consumer = new Thread(new Runnable() { + @Override + public void run() { + try { + // take a serviceURL immediately - should not be available for 5 seconds + logger.info("Calling takeService(t, tu)"); + ServiceURL serviceURL = servicesQueue.takeService(PROVIDER_URL_GENERATION_DELAY, MILLISECONDS); + Assert.assertNotNull("ServiceURL should be non-null due to timeout on method!", serviceURL); + + // take a second serviceURL after 5 seconds - should be available + try { + logger.info("Sleeping to allow ServiceURLs to be generated..."); + Thread.sleep(PROVIDER_URL_GENERATION_DELAY); + } catch (InterruptedException ie) { + // noop + } + logger.info("Calling takeService(t, tu)"); + serviceURL = servicesQueue.takeService(PROVIDER_URL_GENERATION_DELAY, MILLISECONDS); + Assert.assertNotNull("ServiceURL should be non-null due to timeout on method!", serviceURL); + + // now drain the queue, we hae done our test + while ((serviceURL = servicesQueue.takeService(Long.MAX_VALUE, MILLISECONDS)) != null) { + logger.info("while draining, found match: " + serviceURL.toString()); + } + } catch(InterruptedException ie) { + Assert.fail("Discovery was interrupted ..."); + } + } + }); + consumer.start(); + consumer.join(); + + } catch(InterruptedException ie) { + logger.info("consumer thread was interrupted!"); + } + } + + + /** + * Do any test-specific tear down here. + */ + @After + public void tearDownTest() { + } + + /** + * Do any general tear down here. + */ + @AfterClass + public static void tearDown() { + discoveryInstance1 = null; + discoveryInstance2 = null; + } + + /** + * Returns a queue of registered ServiceURLs which match the filter spec + * @param filterSpec a condition on attributes to match + * @return + */ + private static ServicesQueue discover(Discovery discovery, FilterSpec filterSpec) { + ServiceType serviceType = new ServiceType("ejb","jboss", null, null); + return discovery.discover(serviceType, filterSpec); + } + + /** + * Returns a queue of registered ServiceURLs which match the filter spec + * @param filterSpec a condition on attributes to match + * @param time a timeout specifying how long to wait for a result to be avaiulable + * @param timeUnit the units for parameter time + * @return + */ + private static ServicesQueue discover(Discovery discovery, FilterSpec filterSpec, long time, TimeUnit timeUnit) { + ServiceType serviceType = new ServiceType("ejb","jboss", null, null); + return discovery.discover(serviceType, filterSpec, time, timeUnit); + } + + /** + * An attribute value pair + */ + private static class AttributeValuePair { + String attribute = null; + String value = null; + + public AttributeValuePair(String attribute, String value) { + this.attribute = attribute; + this.value = value; + } + + public String getAttribute() { + return attribute; + } + + public String getValue() { + return value; + } + } + + /** + * Builds ServiceURLs with constant default type and varying attributes. + * + * @param pairs one or more attribute pairs to be set in the ServiceURL + * @return a configured ServiceURL + * @throws Exception + */ + private static ServiceURL buildAttributeServiceURL(AttributeValuePair ...pairs) throws Exception { + + final ServiceURL.Builder builder = new ServiceURL.Builder(); + // set the locationURI + builder.setUri(new URI("http://myhost.com")); + builder.setAbstractType("ejb"); + builder.setAbstractTypeAuthority("jboss"); + // add an attribute + for (AttributeValuePair pair : pairs) { + builder.addAttribute(pair.getAttribute(), AttributeValue.fromString(pair.getValue())); + } + return builder.create(); + } + + /* + * A StaticDiscoveryProvider which introduces a delay when returning ServiceURLs + */ + private static final class DelayedDiscoveryProvider implements DiscoveryProvider { + + private final List services; + // delay in ms + private int delay; + + public DelayedDiscoveryProvider(List services) { + this(services, 0); + } + + public DelayedDiscoveryProvider(List services, int delay) { + this.services = services; + this.delay = delay; + } + + @Override + public DiscoveryRequest discover(ServiceType serviceType, FilterSpec filterSpec, DiscoveryResult result) { + try { + // set up a new thread to execute ServiceURL generation before returning the cancellation handle to the consumer + new Thread(new Runnable() { + @Override + public void run() { + try { + for (ServiceURL service : services) { + if (serviceType.implies(service) && (filterSpec == null || service.satisfies(filterSpec))) { + // introduce delay in populating the queue with ServiceURLs + logger.info("populating queue with matches with delay " + delay + " ms"); + try { + Thread.sleep(DelayedDiscoveryProvider.this.delay); + } catch (InterruptedException ie) { + // noop + } + logger.info("adding match to queue: " + service.toString()); + result.addMatch(service); + } + } + } finally { + logger.info("queue matches generated, calling complete()"); + result.complete(); + } + } + }).start(); + return DiscoveryRequest.NULL; + } finally { + // noop + } + } + } +} diff --git a/src/test/java/org/wildfly/discovery/FilterSpecTestCase.java b/src/test/java/org/wildfly/discovery/FilterSpecTestCase.java index 492fb43..0fba1ad 100644 --- a/src/test/java/org/wildfly/discovery/FilterSpecTestCase.java +++ b/src/test/java/org/wildfly/discovery/FilterSpecTestCase.java @@ -2,6 +2,7 @@ import static org.junit.Assert.*; +import org.jboss.logging.Logger; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -25,6 +26,8 @@ */ public final class FilterSpecTestCase { + private static final Logger logger = Logger.getLogger(FilterSpecTestCase.class); + private static DiscoveryProvider provider = null; private static Discovery discovery = null; @@ -153,11 +156,11 @@ public void testDiscoverySingleAttribute() { List results = new ArrayList(); // call discovery for single attribute - System.out.println("Calling discover for filterspec " + cluster); + logger.info("Calling discover for filterspec " + cluster); try (final ServicesQueue servicesQueue = discover(cluster)) { ServiceURL serviceURL = servicesQueue.takeService(); do { - System.out.println("ServiceURL found = " + serviceURL); + logger.info("ServiceURL found = " + serviceURL); results.add(serviceURL); serviceURL = servicesQueue.takeService(); @@ -182,11 +185,11 @@ public void testDiscoveryMultipleAttributes() { List results = new ArrayList(); // call discovery for single attribute - System.out.println("Calling discover for filterspec " + all); + logger.info("Calling discover for filterspec " + all); try (final ServicesQueue servicesQueue = discover(all)) { ServiceURL serviceURL = servicesQueue.takeService(); do { - System.out.println("ServiceURL found = " + serviceURL); + logger.info("ServiceURL found = " + serviceURL); results.add(serviceURL); serviceURL = servicesQueue.takeService();