Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

creates an abstract RefreshableServiceProviderPlace for config reloading #1035

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions src/main/java/emissary/place/RefreshableServiceProviderPlace.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package emissary.place;

import emissary.config.ConfigUtil;
import emissary.config.Configurator;

import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

/**
* ServiceProviderPlace that supports on-demand refresh of its configuration
*/
public abstract class RefreshableServiceProviderPlace extends ServiceProviderPlace {

private static final Logger logger = LoggerFactory.getLogger(RefreshableServiceProviderPlace.class);

private final Object allocatorLock = new Object();
private final AtomicBoolean invalidated = new AtomicBoolean(false);

public RefreshableServiceProviderPlace() throws IOException {}

public RefreshableServiceProviderPlace(final String thePlaceLocation) throws IOException {
super(thePlaceLocation);
}

protected RefreshableServiceProviderPlace(final String configFile, @Nullable final String theDir, final String thePlaceLocation)
throws IOException {
super(configFile, theDir, thePlaceLocation);
}

protected RefreshableServiceProviderPlace(final InputStream configStream, @Nullable final String theDir, final String thePlaceLocation)
throws IOException {
super(configStream, theDir, thePlaceLocation);
}

protected RefreshableServiceProviderPlace(final InputStream configStream) throws IOException {
super(configStream);
}

protected RefreshableServiceProviderPlace(final String configFile, final String placeLocation) throws IOException {
super(configFile, placeLocation);
}

protected RefreshableServiceProviderPlace(final InputStream configStream, final String placeLocation) throws IOException {
super(configStream, placeLocation);
}

/**
* Get the invalid flag of the place. An invalidated place may indicate that the place has changes, such as new
* configuration, and may trigger a follow-on process to reconfigure, reinitialize, or re-create the place.
*
* @return true if the place has been invalidated, false otherwise
*/
public final boolean isInvalidated() {
return this.invalidated.get();
}

/**
* Invalidate a place that need to be refreshed.
*/
public final void invalidate() {
logger.info("Place being marked as invalidated");
setInvalidated(true);
}

/**
* Set the invalid flag of the place
*
* @param invalid true if place is invalid, false otherwise
*/
private void setInvalidated(final boolean invalid) {
this.invalidated.set(invalid);
}

/**
* Reinitialize the place by reloading the configurator and reconfiguring the place. Must call {@link #invalidate()}
* before attempting to refresh the place.
*/
public final void refresh() {
logger.trace("Waiting for lock in refresh()");
synchronized (allocatorLock) {
logger.debug("Attempting to refresh place using config locations");
try {
if (isInvalidated()) {
this.configG = reloadConfigurator(this.configLocs);
reconfigurePlace();
setInvalidated(false);
logger.info("Place refresh performed successfully");
} else {
logger.warn("Cannot refresh place configuration without first calling invalidate; no reconfiguration performed");
}
} catch (IOException e) {
logger.error("Failed to refresh configurator");
}
}
}

/**
* Reinitialize the place by reloading the configurator and reconfiguring the place. Must call {@link #invalidate()}
* before attempting to refresh the place.
*
* @param configStream the config data as an {@link InputStream}
*/
public final void refresh(final InputStream configStream) {
logger.trace("Waiting for lock in refresh(configStream)");
synchronized (allocatorLock) {
logger.debug("Attempting to refresh place using configStream");
try {
if (isInvalidated()) {
this.configG = reloadConfigurator(configStream);
reconfigurePlace();
setInvalidated(false);
logger.info("Place refresh performed successfully using configStream");
} else {
logger.warn(
"Cannot refresh place configuration with configStream without first calling invalidate; no reconfiguration performed");
}
} catch (IOException e) {
logger.error("Failed to refresh configurator using configStream");
}
}
}

protected abstract void reconfigurePlace() throws IOException;

/**
* Reload the {@link Configurator}
*
* @param configLocations the list of configuration files to load
* @throws IOException if there is an issue loading the config
*/
private static Configurator reloadConfigurator(@Nullable final List<String> configLocations) throws IOException {
logger.debug("Reloading configurator using locations {}", configLocations);
if (CollectionUtils.isNotEmpty(configLocations)) {
return ConfigUtil.getConfigInfo(configLocations);
}
throw new IOException("No config locations specified");
}

/**
* Reload the {@link Configurator}
*
* @param configStream the stream of configuration data
* @throws IOException if there is an issue loading the config
*/
private static Configurator reloadConfigurator(@Nullable final InputStream configStream) throws IOException {
logger.debug("Reloading configurator using configStream");
if (configStream != null) {
return ConfigUtil.getConfigInfo(configStream);
}
throw new IOException("Null config stream supplied");
}

}
8 changes: 5 additions & 3 deletions src/main/java/emissary/place/ServiceProviderPlace.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public abstract class ServiceProviderPlace implements IServiceProviderPlace,
@Nullable
protected Configurator configG;

protected final List<String> configLocs = new ArrayList<>();

/**
* A <i><b>local</b></i> reference to the directory that this place resides in. Every JVM that contains 'places' must
* have a local directory
Expand Down Expand Up @@ -110,6 +112,8 @@ public abstract class ServiceProviderPlace implements IServiceProviderPlace,
@Nullable
protected String serviceDescription;

protected String placeLocation;

/**
* Static context logger
*/
Expand Down Expand Up @@ -253,12 +257,11 @@ protected Configurator loadConfigurator(@Nullable String placeLocation) throws I
if (placeLocation == null) {
placeLocation = this.getClass().getSimpleName();
}

this.placeLocation = placeLocation;
// Extract config data stream name from place location
// and try finding config info with and without the
// package name of this class (in that order)
String myPackage = this.getClass().getPackage().getName();
List<String> configLocs = new ArrayList<>();
// Dont use KeyManipulator for this, only works when hostname/fqdn has dots
int pos = placeLocation.lastIndexOf("/");
String serviceClass = (pos > -1 ? placeLocation.substring(pos + 1) : placeLocation);
Expand Down Expand Up @@ -925,7 +928,6 @@ protected void deregisterFromDirectory(List<String> keys) {
}
}


/**
* Remove a service proxy from the running place. Proxy strings not found registered will be ignored Will remove all
* keys that match the supplied proxy
Expand Down
116 changes: 116 additions & 0 deletions src/test/java/emissary/place/RefreshableServiceProviderPlaceTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package emissary.place;

import emissary.core.IBaseDataObject;
import emissary.core.Namespace;
import emissary.directory.DirectoryEntry;
import emissary.test.core.junit5.UnitTest;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import javax.annotation.Nullable;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

class RefreshableServiceProviderPlaceTest extends UnitTest {

private static final byte[] cfgData = ("SERVICE_KEY = \"UNKNOWN.TEST_PLACE.ID.http://localhost:8001/RefreshablePlaceTest$6050\"\n" +
"KEY_1 = 200").getBytes();
private static final byte[] cfgDataReload = ("SERVICE_KEY = \"*.TEST_PLACE.ID.http://localhost:8001/RefreshablePlaceTest$5060\"\n" +
"KEY_1 = 300").getBytes();

@Nullable
private RefreshablePlaceTest place = null;

@Override
@BeforeEach
public void setUp() throws Exception {
InputStream config = new ByteArrayInputStream(cfgData);
place = new RefreshablePlaceTest(config, null, "http://localhost:8001/RefreshablePlaceTest");
}

@Override
@AfterEach
public void tearDown() throws Exception {
super.tearDown();
assertNotNull(place);
place.shutDown();
place = null;
}

@Test
void testReconfigure() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding another test (or block within this test) that calls place.refresh() without first calling invalidate()

assertNotNull(place, "Place created and configured");
assertEquals("RefreshablePlaceTest", place.getPlaceName(), "Configured place name");
assertEquals("UNKNOWN", place.getPrimaryProxy(), "Primary proxy");
assertEquals("UNKNOWN.TEST_PLACE.ID.http://localhost:8001/RefreshablePlaceTest", place.getKey(), "Key generation");
DirectoryEntry de = place.getDirectoryEntry();
assertNotNull(de, "Directory entry");
assertEquals(60, de.getCost(), "Cost in directory entry");
assertEquals(50, de.getQuality(), "Quality in directory entry");
assertEquals("Description not available", de.getDescription(), "Description in directory entry");
assertNotNull(place.configG);
assertEquals(200, place.configG.findIntEntry("KEY_1", 0));
assertDoesNotThrow(() -> Namespace.lookup("http://localhost:8001/RefreshablePlaceTest"));

place.invalidate();
place.refresh(new ByteArrayInputStream(cfgDataReload));
assertNotNull(place, "Place created and configured");
assertEquals("RefreshablePlaceTest", place.getPlaceName(), "Configured place name");
// assertEquals("*", placeTest.getPrimaryProxy(), "Primary proxy");
assertEquals("UNKNOWN", place.getPrimaryProxy(), "Primary proxy");
// assertEquals("*.TEST_PLACE.ID.http://localhost:8001/PlaceTest", placeTest.getKey(), "Key generation");
assertEquals("UNKNOWN.TEST_PLACE.ID.http://localhost:8001/RefreshablePlaceTest", place.getKey(), "Key generation");
de = place.getDirectoryEntry();
assertNotNull(de, "Directory entry");
// assertEquals(50, de.getCost(), "Cost in directory entry");
assertEquals(60, de.getCost(), "Cost in directory entry");
// assertEquals(40, de.getQuality(), "Quality in directory entry");
assertEquals(50, de.getQuality(), "Quality in directory entry");
assertEquals("Description not available", de.getDescription(), "Description in directory entry");
assertNotNull(place.configG);
assertEquals(300, place.configG.findIntEntry("KEY_1", 0));
assertDoesNotThrow(() -> Namespace.lookup("http://localhost:8001/RefreshablePlaceTest"));
}

@Test
void testInvalidate() {
assertNotNull(place, "Place created and configured");
assertFalse(place.isInvalidated());
assertNotNull(place.configG);
assertEquals(200, place.configG.findIntEntry("KEY_1", 0));

place.refresh(new ByteArrayInputStream(cfgDataReload));
assertFalse(place.isInvalidated());
assertEquals(200, place.configG.findIntEntry("KEY_1", 0));

place.invalidate();
assertTrue(place.isInvalidated());
place.refresh(new ByteArrayInputStream(cfgDataReload));
assertFalse(place.isInvalidated());
assertEquals(300, place.configG.findIntEntry("KEY_1", 0));
}

private static final class RefreshablePlaceTest extends RefreshableServiceProviderPlace {

public RefreshablePlaceTest(InputStream config, @Nullable String dir, @Nullable String loc) throws IOException {
super(config, dir, loc);
}

@Override
public void process(IBaseDataObject d) {
assertNotNull(d);
}

@Override
protected void reconfigurePlace() {}
}
}
Loading