Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ComputeNodePersistService #34305

Merged
merged 1 commit into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public final class InstanceMetaDataFactory {
* Create instance meta data.
*
* @param instanceId instance ID
* @param instanceType instance type
* @param instanceType instance type
* @param computeNodeData compute node data
* @return created instance meta data
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,129 +55,125 @@ public final class ComputeNodePersistService {
* @param computeNodeInstance compute node instance
*/
public void registerOnline(final ComputeNodeInstance computeNodeInstance) {
String instanceId = computeNodeInstance.getMetaData().getId();
persistOnline(computeNodeInstance);
updateState(computeNodeInstance.getMetaData().getId(), computeNodeInstance.getState().getCurrentState());
persistLabels(computeNodeInstance.getMetaData().getId(), computeNodeInstance.getLabels());
}

private void persistOnline(final ComputeNodeInstance computeNodeInstance) {
ComputeNodeData computeNodeData = new ComputeNodeData(
computeNodeInstance.getMetaData().getDatabaseName(), computeNodeInstance.getMetaData().getAttributes(), computeNodeInstance.getMetaData().getVersion());
repository.persistEphemeral(ComputeNodePath.getOnlinePath(instanceId, computeNodeInstance.getMetaData().getType()),
repository.persistEphemeral(ComputeNodePath.getOnlinePath(computeNodeInstance.getMetaData().getId(), computeNodeInstance.getMetaData().getType()),
YamlEngine.marshal(new YamlComputeNodeDataSwapper().swapToYamlConfiguration(computeNodeData)));
repository.persistEphemeral(ComputeNodePath.getStatePath(instanceId), computeNodeInstance.getState().getCurrentState().name());
persistInstanceLabels(instanceId, computeNodeInstance.getLabels());
}

/**
* Persist instance labels.
*
* @param instanceId instance ID
* @param labels instance labels
*/
public void persistInstanceLabels(final String instanceId, final Collection<String> labels) {
repository.persistEphemeral(ComputeNodePath.getLabelsPath(instanceId), YamlEngine.marshal(labels));
}

/**
* Persist instance worker ID.
*
* @param instanceId instance ID
* @param workerId worker ID
*/
public void persistInstanceWorkerId(final String instanceId, final int workerId) {
repository.persistEphemeral(ComputeNodePath.getWorkerIdPath(instanceId), String.valueOf(workerId));
}

/**
* Load compute node state.
*
* @param instanceId instance ID
* @return state
*/
public String loadComputeNodeState(final String instanceId) {
return repository.query(ComputeNodePath.getStatePath(instanceId));
}

/**
* Load instance worker ID.
* Compute node offline.
*
* @param instanceId instance ID
* @return worker ID
* @param computeNodeInstance compute node instance
*/
public Optional<Integer> loadInstanceWorkerId(final String instanceId) {
try {
String workerId = repository.query(ComputeNodePath.getWorkerIdPath(instanceId));
return Strings.isNullOrEmpty(workerId) ? Optional.empty() : Optional.of(Integer.valueOf(workerId));
} catch (final NumberFormatException ex) {
log.error("Invalid worker id for instance: {}", instanceId);
}
return Optional.empty();
public void offline(final ComputeNodeInstance computeNodeInstance) {
repository.delete(ComputeNodePath.getOnlinePath(computeNodeInstance.getMetaData().getId(), computeNodeInstance.getMetaData().getType()));
}

/**
* Load all compute node instances.
*
* @return loaded compute node instances
* @return loaded instances
*/
public Collection<ComputeNodeInstance> loadAllComputeNodeInstances() {
return Arrays.stream(InstanceType.values()).flatMap(each -> loadComputeNodeInstances(each).stream()).collect(Collectors.toList());
public Collection<ComputeNodeInstance> loadAllInstances() {
return Arrays.stream(InstanceType.values()).flatMap(each -> loadInstances(each).stream()).collect(Collectors.toList());
}

private Collection<ComputeNodeInstance> loadComputeNodeInstances(final InstanceType instanceType) {
private Collection<ComputeNodeInstance> loadInstances(final InstanceType instanceType) {
Collection<ComputeNodeInstance> result = new LinkedList<>();
for (String each : repository.getChildrenKeys(ComputeNodePath.getOnlinePath(instanceType))) {
String value = repository.query(ComputeNodePath.getOnlinePath(each, instanceType));
if (Strings.isNullOrEmpty(value)) {
continue;
if (!Strings.isNullOrEmpty(value)) {
result.add(loadInstance(
InstanceMetaDataFactory.create(each, instanceType, new YamlComputeNodeDataSwapper().swapToObject(YamlEngine.unmarshal(value, YamlComputeNodeData.class)))));
}
result.add(loadComputeNodeInstance(
InstanceMetaDataFactory.create(each, instanceType, new YamlComputeNodeDataSwapper().swapToObject(YamlEngine.unmarshal(value, YamlComputeNodeData.class)))));
}
return result;
}

/**
* Load compute node instance by instance meta data.
* Load compute node instance.
*
* @param instanceMetaData instance meta data
* @return compute node instance
* @return loaded instance
*/
public ComputeNodeInstance loadComputeNodeInstance(final InstanceMetaData instanceMetaData) {
public ComputeNodeInstance loadInstance(final InstanceMetaData instanceMetaData) {
ComputeNodeInstance result = new ComputeNodeInstance(instanceMetaData);
result.getLabels().addAll(loadInstanceLabels(instanceMetaData.getId()));
InstanceState.get(loadComputeNodeState(instanceMetaData.getId())).ifPresent(result::switchState);
loadInstanceWorkerId(instanceMetaData.getId()).ifPresent(result::setWorkerId);
InstanceState.get(loadState(instanceMetaData.getId())).ifPresent(result::switchState);
result.getLabels().addAll(loadLabels(instanceMetaData.getId()));
loadWorkerId(instanceMetaData.getId()).ifPresent(result::setWorkerId);
return result;
}

private String loadState(final String instanceId) {
return repository.query(ComputeNodePath.getStatePath(instanceId));
}

@SuppressWarnings("unchecked")
private Collection<String> loadInstanceLabels(final String instanceId) {
private Collection<String> loadLabels(final String instanceId) {
String yamlContent = repository.query(ComputeNodePath.getLabelsPath(instanceId));
return Strings.isNullOrEmpty(yamlContent) ? Collections.emptyList() : YamlEngine.unmarshal(yamlContent, Collection.class);
}

/**
* Get assigned worker IDs.
* Update state.
*
* @return assigned worker IDs
* @param instanceId instance ID
* @param instanceState instance state
*/
public Collection<Integer> getAssignedWorkerIds() {
Collection<String> instanceIds = repository.getChildrenKeys(ComputeNodePath.getWorkerIdRootPath());
return instanceIds.stream().map(each -> repository.query(ComputeNodePath.getWorkerIdPath(each))).filter(Objects::nonNull).map(Integer::parseInt).collect(Collectors.toSet());
public void updateState(final String instanceId, final InstanceState instanceState) {
repository.persistEphemeral(ComputeNodePath.getStatePath(instanceId), instanceState.name());
}

/**
* Update compute node state.
* Persist labels.
*
* @param instanceId instance ID
* @param instanceState instance state
* @param labels instance labels
*/
public void updateComputeNodeState(final String instanceId, final InstanceState instanceState) {
repository.persistEphemeral(ComputeNodePath.getStatePath(instanceId), instanceState.name());
public void persistLabels(final String instanceId, final Collection<String> labels) {
repository.persistEphemeral(ComputeNodePath.getLabelsPath(instanceId), YamlEngine.marshal(labels));
}

/**
* Compute node offline.
* Persist worker ID.
*
* @param computeNodeInstance compute node instance
* @param instanceId instance ID
* @param workerId worker ID
*/
public void offline(final ComputeNodeInstance computeNodeInstance) {
repository.delete(ComputeNodePath.getOnlinePath(computeNodeInstance.getMetaData().getId(), computeNodeInstance.getMetaData().getType()));
public void persistWorkerId(final String instanceId, final int workerId) {
repository.persistEphemeral(ComputeNodePath.getWorkerIdPath(instanceId), String.valueOf(workerId));
}

/**
* Load worker ID.
*
* @param instanceId instance ID
* @return worker ID
*/
public Optional<Integer> loadWorkerId(final String instanceId) {
try {
String workerId = repository.query(ComputeNodePath.getWorkerIdPath(instanceId));
return Strings.isNullOrEmpty(workerId) ? Optional.empty() : Optional.of(Integer.valueOf(workerId));
} catch (final NumberFormatException ex) {
log.error("Invalid worker id for instance: {}", instanceId);
return Optional.empty();
}
}

/**
* Get assigned worker IDs.
*
* @return assigned worker IDs
*/
public Collection<Integer> getAssignedWorkerIds() {
Collection<String> instanceIds = repository.getChildrenKeys(ComputeNodePath.getWorkerIdRootPath());
return instanceIds.stream().map(each -> repository.query(ComputeNodePath.getWorkerIdPath(each))).filter(Objects::nonNull).map(Integer::parseInt).collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,66 +68,71 @@ void assertRegisterOnline() {
}

@Test
void assertPersistInstanceLabels() {
String instanceId = new ProxyInstanceMetaData("foo_instance_id", 3307).getId();
computeNodePersistService.persistInstanceLabels(instanceId, Collections.singletonList("test"));
verify(repository).persistEphemeral("/nodes/compute_nodes/labels/foo_instance_id", YamlEngine.marshal(Collections.singletonList("test")));
void assertOffline() {
computeNodePersistService.offline(new ComputeNodeInstance(new ProxyInstanceMetaData("foo_instance_id", 3307)));
verify(repository).delete("/nodes/compute_nodes/online/proxy/foo_instance_id");
}

@Test
void assertPersistInstanceWorkerId() {
String instanceId = new ProxyInstanceMetaData("foo_instance_id", 3307).getId();
computeNodePersistService.persistInstanceWorkerId(instanceId, 100);
verify(repository).persistEphemeral("/nodes/compute_nodes/worker_id/foo_instance_id", String.valueOf(100));
void assertLoadAllInstances() {
when(repository.getChildrenKeys("/nodes/compute_nodes/online/jdbc")).thenReturn(Collections.singletonList("foo_instance_3307"));
when(repository.getChildrenKeys("/nodes/compute_nodes/online/proxy")).thenReturn(Collections.singletonList("foo_instance_3308"));
YamlComputeNodeData yamlComputeNodeData0 = new YamlComputeNodeData();
yamlComputeNodeData0.setAttribute("127.0.0.1");
yamlComputeNodeData0.setVersion("foo_version");
when(repository.query("/nodes/compute_nodes/online/jdbc/foo_instance_3307")).thenReturn(YamlEngine.marshal(yamlComputeNodeData0));
List<ComputeNodeInstance> actual = new ArrayList<>(computeNodePersistService.loadAllInstances());
assertThat(actual.size(), is(1));
assertThat(actual.get(0).getMetaData().getId(), is("foo_instance_3307"));
assertThat(actual.get(0).getMetaData().getIp(), is("127.0.0.1"));
}

@Test
void assertLoadComputeNodeState() {
String instanceId = new ProxyInstanceMetaData("foo_instance_id", 3307).getId();
when(repository.query("/nodes/compute_nodes/status/foo_instance_id")).thenReturn("OK");
assertThat(computeNodePersistService.loadComputeNodeState(instanceId), is("OK"));
void assertLoadInstance() {
InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3307);
ComputeNodeInstance actual = computeNodePersistService.loadInstance(instanceMetaData);
assertThat(actual.getMetaData(), is(instanceMetaData));
}

@Test
void assertLoadInstanceWorkerId() {
void assertUpdateState() {
computeNodePersistService.updateState("foo_instance_id", InstanceState.OK);
verify(repository).persistEphemeral("/nodes/compute_nodes/status/foo_instance_id", InstanceState.OK.name());
}

@Test
void assertPersistLabels() {
String instanceId = new ProxyInstanceMetaData("foo_instance_id", 3307).getId();
when(repository.query("/nodes/compute_nodes/worker_id/foo_instance_id")).thenReturn("1");
assertThat(computeNodePersistService.loadInstanceWorkerId(instanceId), is(Optional.of(1)));
computeNodePersistService.persistLabels(instanceId, Collections.singletonList("test"));
verify(repository).persistEphemeral("/nodes/compute_nodes/labels/foo_instance_id", YamlEngine.marshal(Collections.singletonList("test")));
}

@Test
void assertLoadWithEmptyInstanceWorkerId() {
void assertPersistWorkerId() {
String instanceId = new ProxyInstanceMetaData("foo_instance_id", 3307).getId();
when(repository.query("/nodes/compute_nodes/worker_id/foo_instance_id")).thenReturn("");
assertFalse(computeNodePersistService.loadInstanceWorkerId(instanceId).isPresent());
computeNodePersistService.persistWorkerId(instanceId, 100);
verify(repository).persistEphemeral("/nodes/compute_nodes/worker_id/foo_instance_id", String.valueOf(100));
}

@Test
void assertLoadInstanceWorkerIdWithInvalidFormat() {
void assertLoadWorkerId() {
String instanceId = new ProxyInstanceMetaData("foo_instance_id", 3307).getId();
when(repository.query("/nodes/compute_nodes/worker_id/foo_instance_id")).thenReturn("a");
assertFalse(computeNodePersistService.loadInstanceWorkerId(instanceId).isPresent());
when(repository.query("/nodes/compute_nodes/worker_id/foo_instance_id")).thenReturn("1");
assertThat(computeNodePersistService.loadWorkerId(instanceId), is(Optional.of(1)));
}

@Test
void assertLoadAllComputeNodeInstances() {
when(repository.getChildrenKeys("/nodes/compute_nodes/online/jdbc")).thenReturn(Collections.singletonList("foo_instance_3307"));
when(repository.getChildrenKeys("/nodes/compute_nodes/online/proxy")).thenReturn(Collections.singletonList("foo_instance_3308"));
YamlComputeNodeData yamlComputeNodeData0 = new YamlComputeNodeData();
yamlComputeNodeData0.setAttribute("127.0.0.1");
yamlComputeNodeData0.setVersion("foo_version");
when(repository.query("/nodes/compute_nodes/online/jdbc/foo_instance_3307")).thenReturn(YamlEngine.marshal(yamlComputeNodeData0));
List<ComputeNodeInstance> actual = new ArrayList<>(computeNodePersistService.loadAllComputeNodeInstances());
assertThat(actual.size(), is(1));
assertThat(actual.get(0).getMetaData().getId(), is("foo_instance_3307"));
assertThat(actual.get(0).getMetaData().getIp(), is("127.0.0.1"));
void assertLoadWithEmptyWorkerId() {
String instanceId = new ProxyInstanceMetaData("foo_instance_id", 3307).getId();
when(repository.query("/nodes/compute_nodes/worker_id/foo_instance_id")).thenReturn("");
assertFalse(computeNodePersistService.loadWorkerId(instanceId).isPresent());
}

@Test
void assertLoadComputeNodeInstance() {
InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3307);
ComputeNodeInstance actual = computeNodePersistService.loadComputeNodeInstance(instanceMetaData);
assertThat(actual.getMetaData(), is(instanceMetaData));
void assertLoadWorkerIdWithInvalidFormat() {
String instanceId = new ProxyInstanceMetaData("foo_instance_id", 3307).getId();
when(repository.query("/nodes/compute_nodes/worker_id/foo_instance_id")).thenReturn("a");
assertFalse(computeNodePersistService.loadWorkerId(instanceId).isPresent());
}

@Test
Expand All @@ -137,16 +142,4 @@ void assertGetUsedWorkerIds() {
when(repository.query("/nodes/compute_nodes/worker_id/2")).thenReturn("2");
assertThat(computeNodePersistService.getAssignedWorkerIds(), is(Collections.singleton(2)));
}

@Test
void assertUpdateComputeNodeState() {
computeNodePersistService.updateComputeNodeState("foo_instance_id", InstanceState.OK);
verify(repository).persistEphemeral("/nodes/compute_nodes/status/foo_instance_id", InstanceState.OK.name());
}

@Test
void assertOffline() {
computeNodePersistService.offline(new ComputeNodeInstance(new ProxyInstanceMetaData("foo_instance_id", 3307)));
verify(repository).delete("/nodes/compute_nodes/online/proxy/foo_instance_id");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private void registerOnline(final ComputeNodeInstanceContext computeNodeInstance
final ContextManager contextManager, final PersistRepository repository) {
contextManager.getPersistServiceFacade().getComputeNodePersistService().registerOnline(computeNodeInstanceContext.getInstance());
contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().getAllClusterInstances()
.addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllComputeNodeInstances());
.addAll(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadAllInstances());
new DataChangedEventListenerRegistry(contextManager, getDatabaseNames(param, contextManager.getPersistServiceFacade().getMetaDataPersistService())).register();
DeliverEventSubscriberRegistry deliverEventSubscriberRegistry = new DeliverEventSubscriberRegistry(contextManager.getComputeNodeInstanceContext().getEventBusContext());
deliverEventSubscriberRegistry.register(createDeliverEventSubscribers(repository));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void handle(final ContextManager contextManager, final DataChangedEvent e
InstanceMetaData instanceMetaData = InstanceMetaDataFactory.create(matcher.group(2), InstanceType.valueOf(matcher.group(1).toUpperCase()), computeNodeData);
if (Type.ADDED == event.getType()) {
contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry()
.add(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadComputeNodeInstance(instanceMetaData));
.add(contextManager.getPersistServiceFacade().getComputeNodePersistService().loadInstance(instanceMetaData));
} else if (Type.DELETED == event.getType()) {
contextManager.getComputeNodeInstanceContext().getClusterInstanceRegistry().delete(new ComputeNodeInstance(instanceMetaData));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public ClusterWorkerIdGenerator(final ClusterPersistRepository repository, final

@Override
public int generate(final Properties props) {
int result = computeNodePersistService.loadInstanceWorkerId(instanceId).orElseGet(this::generateNewWorkerId);
int result = computeNodePersistService.loadWorkerId(instanceId).orElseGet(this::generateNewWorkerId);
logWarning(result, props);
return result;
}
Expand All @@ -67,7 +67,7 @@ private int generateNewWorkerId() {
generatedWorkId = generateAvailableWorkerId();
} while (!generatedWorkId.isPresent());
int result = generatedWorkId.get();
computeNodePersistService.persistInstanceWorkerId(instanceId, result);
computeNodePersistService.persistWorkerId(instanceId, result);
return result;
}

Expand Down
Loading
Loading