Skip to content

Commit

Permalink
Refactor ComputeNodePersistService
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Jan 10, 2025
1 parent 1c5251c commit ac5f28a
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public final class InstanceMetaDataFactory {
* Create instance meta data.
*
* @param instanceId instance ID
* @param instanceType instance type
* @param instanceType instance type
* @param computeNodeData compute node data
* @return created instance meta data
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,129 +55,125 @@ public final class ComputeNodePersistService {
* @param computeNodeInstance compute node instance
*/
public void registerOnline(final ComputeNodeInstance computeNodeInstance) {
String instanceId = computeNodeInstance.getMetaData().getId();
persistOnline(computeNodeInstance);
updateState(computeNodeInstance.getMetaData().getId(), computeNodeInstance.getState().getCurrentState());
persistLabels(computeNodeInstance.getMetaData().getId(), computeNodeInstance.getLabels());
}

private void persistOnline(final ComputeNodeInstance computeNodeInstance) {
ComputeNodeData computeNodeData = new ComputeNodeData(
computeNodeInstance.getMetaData().getDatabaseName(), computeNodeInstance.getMetaData().getAttributes(), computeNodeInstance.getMetaData().getVersion());
repository.persistEphemeral(ComputeNodePath.getOnlinePath(instanceId, computeNodeInstance.getMetaData().getType()),
repository.persistEphemeral(ComputeNodePath.getOnlinePath(computeNodeInstance.getMetaData().getId(), computeNodeInstance.getMetaData().getType()),
YamlEngine.marshal(new YamlComputeNodeDataSwapper().swapToYamlConfiguration(computeNodeData)));
repository.persistEphemeral(ComputeNodePath.getStatePath(instanceId), computeNodeInstance.getState().getCurrentState().name());
persistInstanceLabels(instanceId, computeNodeInstance.getLabels());
}

/**
* Persist instance labels.
*
* @param instanceId instance ID
* @param labels instance labels
*/
public void persistInstanceLabels(final String instanceId, final Collection<String> labels) {
repository.persistEphemeral(ComputeNodePath.getLabelsPath(instanceId), YamlEngine.marshal(labels));
}

/**
* Persist instance worker ID.
*
* @param instanceId instance ID
* @param workerId worker ID
*/
public void persistInstanceWorkerId(final String instanceId, final int workerId) {
repository.persistEphemeral(ComputeNodePath.getWorkerIdPath(instanceId), String.valueOf(workerId));
}

/**
* Load compute node state.
*
* @param instanceId instance ID
* @return state
*/
public String loadComputeNodeState(final String instanceId) {
return repository.query(ComputeNodePath.getStatePath(instanceId));
}

/**
* Load instance worker ID.
* Compute node offline.
*
* @param instanceId instance ID
* @return worker ID
* @param computeNodeInstance compute node instance
*/
public Optional<Integer> loadInstanceWorkerId(final String instanceId) {
try {
String workerId = repository.query(ComputeNodePath.getWorkerIdPath(instanceId));
return Strings.isNullOrEmpty(workerId) ? Optional.empty() : Optional.of(Integer.valueOf(workerId));
} catch (final NumberFormatException ex) {
log.error("Invalid worker id for instance: {}", instanceId);
}
return Optional.empty();
public void offline(final ComputeNodeInstance computeNodeInstance) {
repository.delete(ComputeNodePath.getOnlinePath(computeNodeInstance.getMetaData().getId(), computeNodeInstance.getMetaData().getType()));
}

/**
* Load all compute node instances.
*
* @return loaded compute node instances
* @return loaded instances
*/
public Collection<ComputeNodeInstance> loadAllComputeNodeInstances() {
return Arrays.stream(InstanceType.values()).flatMap(each -> loadComputeNodeInstances(each).stream()).collect(Collectors.toList());
public Collection<ComputeNodeInstance> loadAllInstances() {
return Arrays.stream(InstanceType.values()).flatMap(each -> loadInstances(each).stream()).collect(Collectors.toList());
}

private Collection<ComputeNodeInstance> loadComputeNodeInstances(final InstanceType instanceType) {
private Collection<ComputeNodeInstance> loadInstances(final InstanceType instanceType) {
Collection<ComputeNodeInstance> result = new LinkedList<>();
for (String each : repository.getChildrenKeys(ComputeNodePath.getOnlinePath(instanceType))) {
String value = repository.query(ComputeNodePath.getOnlinePath(each, instanceType));
if (Strings.isNullOrEmpty(value)) {
continue;
if (!Strings.isNullOrEmpty(value)) {
result.add(loadInstance(
InstanceMetaDataFactory.create(each, instanceType, new YamlComputeNodeDataSwapper().swapToObject(YamlEngine.unmarshal(value, YamlComputeNodeData.class)))));
}
result.add(loadComputeNodeInstance(
InstanceMetaDataFactory.create(each, instanceType, new YamlComputeNodeDataSwapper().swapToObject(YamlEngine.unmarshal(value, YamlComputeNodeData.class)))));
}
return result;
}

/**
* Load compute node instance by instance meta data.
* Load compute node instance.
*
* @param instanceMetaData instance meta data
* @return compute node instance
* @return loaded instance
*/
public ComputeNodeInstance loadComputeNodeInstance(final InstanceMetaData instanceMetaData) {
public ComputeNodeInstance loadInstance(final InstanceMetaData instanceMetaData) {
ComputeNodeInstance result = new ComputeNodeInstance(instanceMetaData);
result.getLabels().addAll(loadInstanceLabels(instanceMetaData.getId()));
InstanceState.get(loadComputeNodeState(instanceMetaData.getId())).ifPresent(result::switchState);
loadInstanceWorkerId(instanceMetaData.getId()).ifPresent(result::setWorkerId);
InstanceState.get(loadState(instanceMetaData.getId())).ifPresent(result::switchState);
result.getLabels().addAll(loadLabels(instanceMetaData.getId()));
loadWorkerId(instanceMetaData.getId()).ifPresent(result::setWorkerId);
return result;
}

private String loadState(final String instanceId) {
return repository.query(ComputeNodePath.getStatePath(instanceId));
}

@SuppressWarnings("unchecked")
private Collection<String> loadInstanceLabels(final String instanceId) {
private Collection<String> loadLabels(final String instanceId) {
String yamlContent = repository.query(ComputeNodePath.getLabelsPath(instanceId));
return Strings.isNullOrEmpty(yamlContent) ? Collections.emptyList() : YamlEngine.unmarshal(yamlContent, Collection.class);
}

/**
* Get assigned worker IDs.
* Update state.
*
* @return assigned worker IDs
* @param instanceId instance ID
* @param instanceState instance state
*/
public Collection<Integer> getAssignedWorkerIds() {
Collection<String> instanceIds = repository.getChildrenKeys(ComputeNodePath.getWorkerIdRootPath());
return instanceIds.stream().map(each -> repository.query(ComputeNodePath.getWorkerIdPath(each))).filter(Objects::nonNull).map(Integer::parseInt).collect(Collectors.toSet());
public void updateState(final String instanceId, final InstanceState instanceState) {
repository.persistEphemeral(ComputeNodePath.getStatePath(instanceId), instanceState.name());
}

/**
* Update compute node state.
* Persist labels.
*
* @param instanceId instance ID
* @param instanceState instance state
* @param labels instance labels
*/
public void updateComputeNodeState(final String instanceId, final InstanceState instanceState) {
repository.persistEphemeral(ComputeNodePath.getStatePath(instanceId), instanceState.name());
public void persistLabels(final String instanceId, final Collection<String> labels) {
repository.persistEphemeral(ComputeNodePath.getLabelsPath(instanceId), YamlEngine.marshal(labels));
}

/**
* Compute node offline.
* Persist worker ID.
*
* @param computeNodeInstance compute node instance
* @param instanceId instance ID
* @param workerId worker ID
*/
public void offline(final ComputeNodeInstance computeNodeInstance) {
repository.delete(ComputeNodePath.getOnlinePath(computeNodeInstance.getMetaData().getId(), computeNodeInstance.getMetaData().getType()));
public void persistWorkerId(final String instanceId, final int workerId) {
repository.persistEphemeral(ComputeNodePath.getWorkerIdPath(instanceId), String.valueOf(workerId));
}

/**
* Load worker ID.
*
* @param instanceId instance ID
* @return worker ID
*/
public Optional<Integer> loadWorkerId(final String instanceId) {
try {
String workerId = repository.query(ComputeNodePath.getWorkerIdPath(instanceId));
return Strings.isNullOrEmpty(workerId) ? Optional.empty() : Optional.of(Integer.valueOf(workerId));
} catch (final NumberFormatException ex) {
log.error("Invalid worker id for instance: {}", instanceId);
return Optional.empty();
}
}

/**
* Get assigned worker IDs.
*
* @return assigned worker IDs
*/
public Collection<Integer> getAssignedWorkerIds() {
Collection<String> instanceIds = repository.getChildrenKeys(ComputeNodePath.getWorkerIdRootPath());
return instanceIds.stream().map(each -> repository.query(ComputeNodePath.getWorkerIdPath(each))).filter(Objects::nonNull).map(Integer::parseInt).collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,66 +68,71 @@ void assertRegisterOnline() {
}

@Test
void assertPersistInstanceLabels() {
String instanceId = new ProxyInstanceMetaData("foo_instance_id", 3307).getId();
computeNodePersistService.persistInstanceLabels(instanceId, Collections.singletonList("test"));
verify(repository).persistEphemeral("/nodes/compute_nodes/labels/foo_instance_id", YamlEngine.marshal(Collections.singletonList("test")));
void assertOffline() {
computeNodePersistService.offline(new ComputeNodeInstance(new ProxyInstanceMetaData("foo_instance_id", 3307)));
verify(repository).delete("/nodes/compute_nodes/online/proxy/foo_instance_id");
}

@Test
void assertPersistInstanceWorkerId() {
String instanceId = new ProxyInstanceMetaData("foo_instance_id", 3307).getId();
computeNodePersistService.persistInstanceWorkerId(instanceId, 100);
verify(repository).persistEphemeral("/nodes/compute_nodes/worker_id/foo_instance_id", String.valueOf(100));
void assertLoadAllInstances() {
when(repository.getChildrenKeys("/nodes/compute_nodes/online/jdbc")).thenReturn(Collections.singletonList("foo_instance_3307"));
when(repository.getChildrenKeys("/nodes/compute_nodes/online/proxy")).thenReturn(Collections.singletonList("foo_instance_3308"));
YamlComputeNodeData yamlComputeNodeData0 = new YamlComputeNodeData();
yamlComputeNodeData0.setAttribute("127.0.0.1");
yamlComputeNodeData0.setVersion("foo_version");
when(repository.query("/nodes/compute_nodes/online/jdbc/foo_instance_3307")).thenReturn(YamlEngine.marshal(yamlComputeNodeData0));
List<ComputeNodeInstance> actual = new ArrayList<>(computeNodePersistService.loadAllInstances());
assertThat(actual.size(), is(1));
assertThat(actual.get(0).getMetaData().getId(), is("foo_instance_3307"));
assertThat(actual.get(0).getMetaData().getIp(), is("127.0.0.1"));
}

@Test
void assertLoadComputeNodeState() {
String instanceId = new ProxyInstanceMetaData("foo_instance_id", 3307).getId();
when(repository.query("/nodes/compute_nodes/status/foo_instance_id")).thenReturn("OK");
assertThat(computeNodePersistService.loadComputeNodeState(instanceId), is("OK"));
void assertLoadInstance() {
InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3307);
ComputeNodeInstance actual = computeNodePersistService.loadInstance(instanceMetaData);
assertThat(actual.getMetaData(), is(instanceMetaData));
}

@Test
void assertLoadInstanceWorkerId() {
void assertUpdateState() {
computeNodePersistService.updateState("foo_instance_id", InstanceState.OK);
verify(repository).persistEphemeral("/nodes/compute_nodes/status/foo_instance_id", InstanceState.OK.name());
}

@Test
void assertPersistLabels() {
String instanceId = new ProxyInstanceMetaData("foo_instance_id", 3307).getId();
when(repository.query("/nodes/compute_nodes/worker_id/foo_instance_id")).thenReturn("1");
assertThat(computeNodePersistService.loadInstanceWorkerId(instanceId), is(Optional.of(1)));
computeNodePersistService.persistLabels(instanceId, Collections.singletonList("test"));
verify(repository).persistEphemeral("/nodes/compute_nodes/labels/foo_instance_id", YamlEngine.marshal(Collections.singletonList("test")));
}

@Test
void assertLoadWithEmptyInstanceWorkerId() {
void assertPersistWorkerId() {
String instanceId = new ProxyInstanceMetaData("foo_instance_id", 3307).getId();
when(repository.query("/nodes/compute_nodes/worker_id/foo_instance_id")).thenReturn("");
assertFalse(computeNodePersistService.loadInstanceWorkerId(instanceId).isPresent());
computeNodePersistService.persistWorkerId(instanceId, 100);
verify(repository).persistEphemeral("/nodes/compute_nodes/worker_id/foo_instance_id", String.valueOf(100));
}

@Test
void assertLoadInstanceWorkerIdWithInvalidFormat() {
void assertLoadWorkerId() {
String instanceId = new ProxyInstanceMetaData("foo_instance_id", 3307).getId();
when(repository.query("/nodes/compute_nodes/worker_id/foo_instance_id")).thenReturn("a");
assertFalse(computeNodePersistService.loadInstanceWorkerId(instanceId).isPresent());
when(repository.query("/nodes/compute_nodes/worker_id/foo_instance_id")).thenReturn("1");
assertThat(computeNodePersistService.loadWorkerId(instanceId), is(Optional.of(1)));
}

@Test
void assertLoadAllComputeNodeInstances() {
when(repository.getChildrenKeys("/nodes/compute_nodes/online/jdbc")).thenReturn(Collections.singletonList("foo_instance_3307"));
when(repository.getChildrenKeys("/nodes/compute_nodes/online/proxy")).thenReturn(Collections.singletonList("foo_instance_3308"));
YamlComputeNodeData yamlComputeNodeData0 = new YamlComputeNodeData();
yamlComputeNodeData0.setAttribute("127.0.0.1");
yamlComputeNodeData0.setVersion("foo_version");
when(repository.query("/nodes/compute_nodes/online/jdbc/foo_instance_3307")).thenReturn(YamlEngine.marshal(yamlComputeNodeData0));
List<ComputeNodeInstance> actual = new ArrayList<>(computeNodePersistService.loadAllComputeNodeInstances());
assertThat(actual.size(), is(1));
assertThat(actual.get(0).getMetaData().getId(), is("foo_instance_3307"));
assertThat(actual.get(0).getMetaData().getIp(), is("127.0.0.1"));
void assertLoadWithEmptyWorkerId() {
String instanceId = new ProxyInstanceMetaData("foo_instance_id", 3307).getId();
when(repository.query("/nodes/compute_nodes/worker_id/foo_instance_id")).thenReturn("");
assertFalse(computeNodePersistService.loadWorkerId(instanceId).isPresent());
}

@Test
void assertLoadComputeNodeInstance() {
InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3307);
ComputeNodeInstance actual = computeNodePersistService.loadComputeNodeInstance(instanceMetaData);
assertThat(actual.getMetaData(), is(instanceMetaData));
void assertLoadWorkerIdWithInvalidFormat() {
String instanceId = new ProxyInstanceMetaData("foo_instance_id", 3307).getId();
when(repository.query("/nodes/compute_nodes/worker_id/foo_instance_id")).thenReturn("a");
assertFalse(computeNodePersistService.loadWorkerId(instanceId).isPresent());
}

@Test
Expand All @@ -137,16 +142,4 @@ void assertGetUsedWorkerIds() {
when(repository.query("/nodes/compute_nodes/worker_id/2")).thenReturn("2");
assertThat(computeNodePersistService.getAssignedWorkerIds(), is(Collections.singleton(2)));
}

@Test
void assertUpdateComputeNodeState() {
computeNodePersistService.updateComputeNodeState("foo_instance_id", InstanceState.OK);
verify(repository).persistEphemeral("/nodes/compute_nodes/status/foo_instance_id", InstanceState.OK.name());
}

@Test
void assertOffline() {
computeNodePersistService.offline(new ComputeNodeInstance(new ProxyInstanceMetaData("foo_instance_id", 3307)));
verify(repository).delete("/nodes/compute_nodes/online/proxy/foo_instance_id");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private void registerOnline(final ComputeNodeInstanceContext computeNodeInstance
final ContextManager contextManager, final PersistRepository repository) {
contextManager.getPersistServiceFacade().getComputeNodePersistService().registerOnline(computeNodeInstanceContext.getInstance());
contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().getAllClusterInstances()
.addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllComputeNodeInstances());
.addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllInstances());
new DataChangedEventListenerRegistry(contextManager, getDatabaseNames(param, contextManager.getPersistServiceFacade().getMetaDataPersistService())).register();
DeliverEventSubscriberRegistry deliverEventSubscriberRegistry = new DeliverEventSubscriberRegistry(contextManager.getComputeNodeInstanceContext().getEventBusContext());
deliverEventSubscriberRegistry.register(createDeliverEventSubscribers(repository));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void handle(final ContextManager contextManager, final DataChangedEvent e
InstanceMetaData instanceMetaData = InstanceMetaDataFactory.create(matcher.group(2), InstanceType.valueOf(matcher.group(1).toUpperCase()), computeNodeData);
if (Type.ADDED == event.getType()) {
contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry()
.add(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadComputeNodeInstance(instanceMetaData));
.add(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadInstance(instanceMetaData));
} else if (Type.DELETED == event.getType()) {
contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().delete(new ComputeNodeInstance(instanceMetaData));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public ClusterWorkerIdGenerator(final ClusterPersistRepository repository, final

@Override
public int generate(final Properties props) {
int result = computeNodePersistService.loadInstanceWorkerId(instanceId).orElseGet(this::generateNewWorkerId);
int result = computeNodePersistService.loadWorkerId(instanceId).orElseGet(this::generateNewWorkerId);
logWarning(result, props);
return result;
}
Expand All @@ -67,7 +67,7 @@ private int generateNewWorkerId() {
generatedWorkId = generateAvailableWorkerId();
} while (!generatedWorkId.isPresent());
int result = generatedWorkId.get();
computeNodePersistService.persistInstanceWorkerId(instanceId, result);
computeNodePersistService.persistWorkerId(instanceId, result);
return result;
}

Expand Down
Loading

0 comments on commit ac5f28a

Please sign in to comment.