Skip to content

Commit

Permalink
apache#3042 added pipeline update
Browse files Browse the repository at this point in the history
  • Loading branch information
IsaakKrut committed Aug 23, 2024
1 parent a594748 commit cfceff7
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ public void updateAdapter(AdapterDescription ad)
}
}

public void updateDataStream(SpDataStream dataStream) throws AdapterException {
var correspondingAdapter = adapterMasterManagement.getAdapter(dataStream.getCorrespondingAdapterId());
dataStreamResourceManager.update(dataStream);

correspondingAdapter.setDataStream(dataStream);
updateAdapter(correspondingAdapter);
}

public List<PipelineUpdateInfo> checkPipelineMigrations(AdapterDescription adapterDescription) {
var affectedPipelines = PipelineManager
.getPipelinesContainingElements(adapterDescription.getCorrespondingDataStreamElementId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@

package org.apache.streampipes.rest.impl.pe;

import org.apache.streampipes.commons.exceptions.connect.AdapterException;
import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
import org.apache.streampipes.connect.management.management.AdapterMasterManagement;
import org.apache.streampipes.connect.management.management.AdapterUpdateManagement;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.message.Message;
import org.apache.streampipes.model.message.NotificationType;
import org.apache.streampipes.model.monitoring.SpLogMessage;
import org.apache.streampipes.resource.management.DataStreamResourceManager;
import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
import org.apache.streampipes.resource.management.SpResourceManager;
import org.apache.streampipes.rest.impl.connect.AbstractAdapterResource;
import org.apache.streampipes.rest.security.AuthConstants;
import org.apache.streampipes.storage.management.StorageDispatcher;

import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
Expand All @@ -43,7 +49,17 @@

@RestController
@RequestMapping("/api/v2/streams")
public class DataStreamResource extends AbstractAuthGuardedRestResource {
public class DataStreamResource extends AbstractAdapterResource<AdapterMasterManagement> {

public DataStreamResource() {
super(() -> new AdapterMasterManagement(
StorageDispatcher.INSTANCE.getNoSqlStore()
.getAdapterInstanceStorage(),
new SpResourceManager().manageAdapters(),
new SpResourceManager().manageDataStreams(),
AdapterMetricsManager.INSTANCE.getAdapterMetrics()
));
}

@GetMapping(path = "/available", produces = MediaType.APPLICATION_JSON_VALUE)
@PreAuthorize(AuthConstants.HAS_READ_PIPELINE_ELEMENT_PRIVILEGE)
Expand Down Expand Up @@ -98,8 +114,12 @@ public ResponseEntity<?> addDataStream(@RequestBody SpDataStream dataStream) {
public ResponseEntity<?> updateDataStream(@RequestBody SpDataStream dataStream) {
try {
getDataStreamResourceManager().update(dataStream);
var updateManager = new AdapterUpdateManagement(managementService);

updateManager.updateDataStream(dataStream);

return ok();
} catch (IllegalArgumentException e) {
} catch (AdapterException e) {
return badRequest(e.getMessage());
}
}
Expand Down

0 comments on commit cfceff7

Please sign in to comment.