Skip to content

Commit

Permalink
create actor builder, add cluster join actor
Browse files Browse the repository at this point in the history
  • Loading branch information
BrandonArp committed Jan 31, 2017
1 parent adc3c18 commit 4bb2285
Show file tree
Hide file tree
Showing 11 changed files with 368 additions and 211 deletions.
1 change: 1 addition & 0 deletions config/config.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ jvmMetricsCollectionInterval="PT.5S"
maxConnectionTimeout="PT2M"
minConnectionTimeout="PT1M"
clusterHostSuffix=".cluster"
clusterJoinActor.type="com.arpnetworking.akka.NonJoiningClusterJoiner"
rebalanceConfiguration {
maxParallel=100
threshold=500
Expand Down
46 changes: 46 additions & 0 deletions src/main/java/com/arpnetworking/akka/ActorBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright 2016 InscopeMetrics, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.arpnetworking.akka;

import akka.actor.Props;
import com.arpnetworking.commons.builder.OvalBuilder;

import java.util.function.Function;

/**
* Builder for actors.
*
* @param <B> The type of the builder
* @author Brandon Arp (brandon dot arp at inscopemetrics dot com)
*/
public abstract class ActorBuilder<B extends ActorBuilder<B>> extends OvalBuilder<Props> {
/**
* Protected constructor.
*
* @param createProps method to create a {@link Props} from the {@link ActorBuilder}
*/
protected ActorBuilder(final Function<B, Props> createProps) {
super(createProps);
}

/**
* Called by setters to always return appropriate subclass of
* {@link ActorBuilder}, even from setters of base class.
*
* @return instance with correct {@link ActorBuilder} class type.
*/
protected abstract B self();
}
90 changes: 90 additions & 0 deletions src/main/java/com/arpnetworking/akka/NonJoiningClusterJoiner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Copyright 2016 Inscope Metrics, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.arpnetworking.akka;

import akka.actor.Props;
import akka.actor.UntypedActor;
import com.arpnetworking.steno.Logger;
import com.arpnetworking.steno.LoggerFactory;

/**
* Actor that does not attempt to join a cluster.
*
* @author Brandon Arp (brandon dot arp at inscopemetrics dot com)
*/
public final class NonJoiningClusterJoiner extends UntypedActor {
/**
* Static factory method for creating a {@link Props} to create a {@link NonJoiningClusterJoiner} actor.
*
* @return a new {@link Props}
*/
public static Props props() {
return Props.create(NonJoiningClusterJoiner.class);
}

/**
* Static factory method for creating a {@link Props} to create a {@link NonJoiningClusterJoiner} actor from a
* {@link Builder}.
*
* @param builder Builder to create the Props from
* @return a new {@link Props}
*/
private static Props props(final Builder builder) {
return props();
}

/**
* Public constructor.
*/
public NonJoiningClusterJoiner() {
LOGGER.info()
.setMessage("NonJoiningClusterJoiner starting up")
.log();
}


/**
* {@inheritDoc}
*/
@Override
public void onReceive(final Object message) throws Exception {
unhandled(message);
}

private static final Logger LOGGER = LoggerFactory.getLogger(NonJoiningClusterJoiner.class);

/**
* Implementation of the {@link com.arpnetworking.commons.builder.Builder} pattern for a {@link NonJoiningClusterJoiner}.
*
* @author Brandon Arp (brandon dot arp at inscopemetrics dot com)
*/
public static class Builder extends ActorBuilder<Builder> {
/**
* Public constructor.
*/
public Builder() {
super(NonJoiningClusterJoiner::props);
}

/**
* {@inheritDoc}
*/
@Override
public Builder self() {
return this;
}
}
}
57 changes: 25 additions & 32 deletions src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,54 +164,39 @@ private ActorSystem provideActorSystem(@Named("akka-config") final Config akkaCo
@Named("cluster-emitter")
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
private ActorRef provideClusterEmitter(final Injector injector, final ActorSystem system) {
return launchEmitter(injector, system, _configuration.getClusterPipelineConfiguration(), "cluster-emitter-configurator");
}

@Provides
@Singleton
@Named("host-emitter")
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
private ActorRef provideHostEmitter(final Injector injector, final ActorSystem system) {
return launchEmitter(injector, system, _configuration.getHostPipelineConfiguration(), "host-emitter-configurator");
}

private ActorRef launchEmitter(final Injector injector, final ActorSystem system, final File pipelineFile, final String name) {
final ActorRef emitterConfigurationProxy = system.actorOf(
ConfigurableActorProxy.props(new RoundRobinEmitterFactory()),
"cluster-emitter-configurator");
name);
final ActorConfigurator<EmitterConfiguration> configurator =
new ActorConfigurator<>(emitterConfigurationProxy, EmitterConfiguration.class);
final ObjectMapper objectMapper = EmitterConfiguration.createObjectMapper(injector);
final File configurationFile = _configuration.getClusterPipelineConfiguration();
final Builder<? extends JsonNodeSource> sourceBuilder;
if (configurationFile.getName().toLowerCase(Locale.getDefault()).endsWith(HOCON_FILE_EXTENSION)) {
if (pipelineFile.getName().toLowerCase(Locale.getDefault()).endsWith(HOCON_FILE_EXTENSION)) {
sourceBuilder = new HoconFileSource.Builder()
.setObjectMapper(objectMapper)
.setFile(configurationFile);
.setFile(pipelineFile);
} else {
sourceBuilder = new JsonNodeFileSource.Builder()
.setObjectMapper(objectMapper)
.setFile(configurationFile);
.setFile(pipelineFile);
}

final DynamicConfiguration configuration = new DynamicConfiguration.Builder()
.setObjectMapper(objectMapper)
.addSourceBuilder(sourceBuilder)
.addTrigger(new FileTrigger.Builder().setFile(_configuration.getClusterPipelineConfiguration()).build())
.addListener(configurator)
.build();

configuration.launch();

return emitterConfigurationProxy;
}

@Provides
@Singleton
@Named("host-emitter")
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
private ActorRef provideHostEmitter(final Injector injector, final ActorSystem system) {
final ActorRef emitterConfigurationProxy = system.actorOf(
ConfigurableActorProxy.props(new RoundRobinEmitterFactory()),
"host-emitter-configurator");
final ActorConfigurator<EmitterConfiguration> configurator =
new ActorConfigurator<>(emitterConfigurationProxy, EmitterConfiguration.class);
final ObjectMapper objectMapper = EmitterConfiguration.createObjectMapper(injector);
final DynamicConfiguration configuration = new DynamicConfiguration.Builder()
.setObjectMapper(objectMapper)
.addSourceBuilder(
new JsonNodeFileSource.Builder()
.setObjectMapper(objectMapper)
.setFile(_configuration.getHostPipelineConfiguration()))
.addTrigger(new FileTrigger.Builder().setFile(_configuration.getHostPipelineConfiguration()).build())
.addTrigger(new FileTrigger.Builder().setFile(pipelineFile).build())
.addListener(configurator)
.build();

Expand Down Expand Up @@ -259,6 +244,14 @@ private ActorRef provideTcpServer(final Injector injector, final ActorSystem sys
return system.actorOf(GuiceActorCreator.props(injector, AggClientServer.class), "tcp-server");
}

@Provides
@Singleton
@Named("cluster-joiner")
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
private ActorRef provideClusterJoiner(final ActorSystem system, final ClusterAggregatorConfiguration config) {
return system.actorOf(config.getClusterJoinActor(), "cluster-joiner");
}

@Provides
@Singleton
@Named("aggregator-lifecycle")
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/arpnetworking/clusteraggregator/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ private void launchActors(final Injector injector) {
.log();
injector.getInstance(Key.get(ActorRef.class, Names.named("jvm-metrics-collector")));

LOGGER.info()
.setMessage("Launching cluster joiner")
.log();
injector.getInstance(Key.get(ActorRef.class, Names.named("cluster-joiner")));

LOGGER.info()
.setMessage("Launching http server")
.log();
Expand Down
Loading

0 comments on commit 4bb2285

Please sign in to comment.