Skip to content

Commit

Permalink
feat(subscription): add support for deletedAt (#1300)
Browse files Browse the repository at this point in the history
* handled in `entityDeleted` notification trigger
* handled in `attributeDeleted` notification trigger
  • Loading branch information
bobeal authored Jan 6, 2025
1 parent 9fdb2cc commit 5289ca9
Show file tree
Hide file tree
Showing 38 changed files with 790 additions and 826 deletions.
3 changes: 1 addition & 2 deletions search-service/config/detekt/baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
<ID>LongMethod:AttributeInstanceService.kt$AttributeInstanceService$@Transactional suspend fun create(attributeInstance: AttributeInstance): Either&lt;APIException, Unit&gt;</ID>
<ID>LongMethod:EnabledAuthorizationServiceTests.kt$EnabledAuthorizationServiceTests$@Test fun `it should return serialized access control entities with other rigths if user is owner`()</ID>
<ID>LongMethod:EntityAccessControlHandler.kt$EntityAccessControlHandler$@PostMapping("/{subjectId}/attrs", consumes = [MediaType.APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE]) suspend fun addRightsOnEntities( @RequestHeader httpHeaders: HttpHeaders, @PathVariable subjectId: String, @RequestBody requestBody: Mono&lt;String&gt;, @AllowedParameters @RequestParam queryParams: MultiValueMap&lt;String, String&gt; ): ResponseEntity&lt;*&gt;</ID>
<ID>LongMethod:EntityEventService.kt$EntityEventService$private fun publishAttributeChangeEvent( sub: String?, tenantName: String, entityId: URI, entityTypesAndPayload: Pair&lt;List&lt;ExpandedTerm&gt;, String&gt;, attributeOperationResult: SucceededAttributeOperationResult )</ID>
<ID>LongMethod:EntityHandler.kt$EntityHandler$@GetMapping("/{entityId}", produces = [APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE, GEO_JSON_CONTENT_TYPE]) suspend fun getByURI( @RequestHeader httpHeaders: HttpHeaders, @PathVariable entityId: URI, @AllowedParameters( implemented = [ QP.OPTIONS, QP.TYPE, QP.ATTRS, QP.GEOMETRY_PROPERTY, QP.LANG, QP.CONTAINED_BY, QP.JOIN, QP.JOIN_LEVEL, QP.DATASET_ID, ], notImplemented = [QP.FORMAT, QP.PICK, QP.OMIT, QP.ENTITY_MAP, QP.LOCAL, QP.VIA] ) @RequestParam queryParams: MultiValueMap&lt;String, String&gt; ): ResponseEntity&lt;*&gt;</ID>
<ID>LongMethod:EntityHandler.kt$EntityHandler$@GetMapping(produces = [APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE, GEO_JSON_CONTENT_TYPE]) suspend fun getEntities( @RequestHeader httpHeaders: HttpHeaders, @AllowedParameters( implemented = [ QP.OPTIONS, QP.COUNT, QP.OFFSET, QP.LIMIT, QP.ID, QP.TYPE, QP.ID_PATTERN, QP.ATTRS, QP.Q, QP.GEOMETRY, QP.GEOREL, QP.COORDINATES, QP.GEOPROPERTY, QP.GEOMETRY_PROPERTY, QP.LANG, QP.SCOPEQ, QP.CONTAINED_BY, QP.JOIN, QP.JOIN_LEVEL, QP.DATASET_ID, ], notImplemented = [QP.FORMAT, QP.PICK, QP.OMIT, QP.EXPAND_VALUES, QP.CSF, QP.ENTITY_MAP, QP.LOCAL, QP.VIA] ) @RequestParam queryParams: MultiValueMap&lt;String, String&gt; ): ResponseEntity&lt;*&gt;</ID>
<ID>LongMethod:EntityEventService.kt$EntityEventService$private fun publishAttributeChangeEvent( updatedDetails: UpdatedDetails, sub: String?, tenantName: String, entityId: URI, entityTypesAndPayload: Pair&lt;List&lt;ExpandedTerm&gt;, String&gt;, serializedAttribute: Pair&lt;ExpandedTerm, String&gt;, overwrite: Boolean )</ID>
<ID>LongMethod:LinkedEntityServiceTests.kt$LinkedEntityServiceTests$@Test fun `it should inline entities up to the asked 2nd level`()</ID>
<ID>LongMethod:PatchAttributeTests.kt$PatchAttributeTests.Companion$@JvmStatic fun mergePatchProvider(): Stream&lt;Arguments&gt;</ID>
<ID>LongMethod:PatchAttributeTests.kt$PatchAttributeTests.Companion$@JvmStatic fun partialUpdatePatchProvider(): Stream&lt;Arguments&gt;</ID>
Expand All @@ -28,7 +28,6 @@
<ID>LongParameterList:EntityAttributeService.kt$EntityAttributeService$( entityId: URI, attributeName: ExpandedTerm, attributeMetadata: AttributeMetadata, createdAt: ZonedDateTime, attributePayload: ExpandedAttributeInstance, sub: Sub? )</ID>
<ID>LongParameterList:EntityAttributeService.kt$EntityAttributeService$( entityUri: URI, ngsiLdAttributes: List&lt;NgsiLdAttribute&gt;, expandedAttributes: ExpandedAttributes, createdAt: ZonedDateTime, observedAt: ZonedDateTime?, sub: Sub? )</ID>
<ID>LongParameterList:EntityAttributeService.kt$EntityAttributeService$( entityUri: URI, ngsiLdAttributes: List&lt;NgsiLdAttribute&gt;, expandedAttributes: ExpandedAttributes, disallowOverwrite: Boolean, createdAt: ZonedDateTime, sub: Sub? )</ID>
<ID>LongParameterList:EntityEventService.kt$EntityEventService$( updatedDetails: UpdatedDetails, sub: String?, tenantName: String, entityId: URI, entityTypesAndPayload: Pair&lt;List&lt;ExpandedTerm&gt;, String&gt;, serializedAttribute: Pair&lt;ExpandedTerm, String&gt;, overwrite: Boolean )</ID>
<ID>LongParameterList:TemporalEntityHandler.kt$TemporalEntityHandler$( @RequestHeader httpHeaders: HttpHeaders, @PathVariable entityId: URI, @PathVariable attrId: String, @PathVariable instanceId: URI, @RequestBody requestBody: Mono&lt;String&gt;, @AllowedParameters(notImplemented = [QP.LOCAL, QP.VIA]) @RequestParam queryParams: MultiValueMap&lt;String, String&gt; )</ID>
<ID>LongParameterList:V0_29__JsonLd_migration.kt$V0_29__JsonLd_migration$( entityId: URI, attributeName: ExpandedTerm, datasetId: URI?, attributePayload: ExpandedAttributeInstance, ngsiLdAttributeInstance: NgsiLdAttributeInstance, defaultCreatedAt: ZonedDateTime )</ID>
<ID>NestedBlockDepth:V0_29__JsonLd_migration.kt$V0_29__JsonLd_migration$override fun migrate(context: Context)</ID>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import arrow.core.left
import arrow.core.raise.either
import com.egm.stellio.search.authorization.service.AuthorizationService
import com.egm.stellio.search.authorization.service.EntityAccessRightsService
import com.egm.stellio.search.entity.model.FailedAttributeOperationResult
import com.egm.stellio.search.entity.model.NotUpdatedDetails
import com.egm.stellio.search.entity.model.UpdateAttributeResult
import com.egm.stellio.search.entity.model.UpdateOperationResult
import com.egm.stellio.search.entity.model.updateResultFromDetailedResult
import com.egm.stellio.search.entity.model.OperationStatus
import com.egm.stellio.search.entity.model.SucceededAttributeOperationResult
import com.egm.stellio.search.entity.model.UpdateResult
import com.egm.stellio.search.entity.util.composeEntitiesQueryFromGet
import com.egm.stellio.shared.config.ApplicationProperties
import com.egm.stellio.shared.model.AccessDeniedException
Expand Down Expand Up @@ -257,24 +258,24 @@ class EntityAccessControlHandler(
AccessRight.forAttributeName(ngsiLdRel.name).getOrNull()!!
).fold(
ifLeft = { apiException ->
UpdateAttributeResult(
FailedAttributeOperationResult(
ngsiLdRel.name,
ngsiLdRelInstance.datasetId,
UpdateOperationResult.FAILED,
OperationStatus.FAILED,
apiException.message
)
},
ifRight = {
UpdateAttributeResult(
SucceededAttributeOperationResult(
ngsiLdRel.name,
ngsiLdRelInstance.datasetId,
UpdateOperationResult.APPENDED,
null
OperationStatus.APPENDED,
emptyMap()
)
}
)
}
val appendResult = updateResultFromDetailedResult(results)
val appendResult = UpdateResult(results)

if (invalidAttributes.isEmpty() && unauthorizedInstances.isEmpty())
ResponseEntity.status(HttpStatus.NO_CONTENT).build<String>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package com.egm.stellio.search.entity.listener
import arrow.core.Either
import arrow.core.left
import arrow.core.raise.either
import com.egm.stellio.search.entity.model.OperationStatus
import com.egm.stellio.search.entity.model.SucceededAttributeOperationResult
import com.egm.stellio.search.entity.service.EntityEventService
import com.egm.stellio.search.entity.service.EntityService
import com.egm.stellio.shared.model.APIException
Expand Down Expand Up @@ -120,9 +122,14 @@ class ObservationEventListener(
entityEventService.publishAttributeChangeEvents(
observationEvent.sub,
observationEvent.entityId,
expandedAttribute.toExpandedAttributes(),
it,
false
listOf(
SucceededAttributeOperationResult(
observationEvent.attributeName,
observationEvent.datasetId,
OperationStatus.UPDATED,
expandedAttribute.toExpandedAttributes()
)
)
)
}
}
Expand All @@ -143,7 +150,7 @@ class ObservationEventListener(
entityService.appendAttributes(
observationEvent.entityId,
expandedAttribute.toExpandedAttributes(),
!observationEvent.overwrite,
false,
observationEvent.sub
).map {
if (it.notUpdated.isNotEmpty()) {
Expand All @@ -157,9 +164,14 @@ class ObservationEventListener(
entityEventService.publishAttributeChangeEvents(
observationEvent.sub,
observationEvent.entityId,
expandedAttribute.toExpandedAttributes(),
it,
observationEvent.overwrite
listOf(
SucceededAttributeOperationResult(
observationEvent.attributeName,
observationEvent.datasetId,
OperationStatus.APPENDED,
expandedAttribute.toExpandedAttributes()
)
)
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_OBJECT
import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_TYPE_TERM
import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_VALUE
import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_VALUE_TERM
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_DATASET_ID_PROPERTY
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_GEOPROPERTY_TYPE
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_GEOPROPERTY_VALUE
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_GEOPROPERTY_VALUES
Expand All @@ -27,6 +28,7 @@ import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_RELATIONSHIP_TYPE
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_VOCABPROPERTY_TYPE
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_VOCABPROPERTY_VALUE
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_VOCABPROPERTY_VALUES
import com.egm.stellio.shared.util.JsonLdUtils.buildNonReifiedPropertyValue
import com.egm.stellio.shared.util.JsonUtils.serializeObject
import io.r2dbc.postgresql.codec.Json
import org.springframework.data.annotation.Id
Expand Down Expand Up @@ -107,7 +109,7 @@ data class Attribute(
VocabProperty -> NGSILD_VOCABPROPERTY_VALUES
}

fun toNullCompactedRepresentation(): Map<String, Any> =
fun toNullCompactedRepresentation(datasetId: URI? = null): Map<String, Any> =
when (this) {
Property, GeoProperty, JsonProperty, VocabProperty ->
mapOf(
Expand All @@ -124,6 +126,12 @@ data class Attribute(
JSONLD_TYPE_TERM to this.name,
JSONLD_LANGUAGEMAP_TERM to mapOf(NGSILD_NONE_TERM to NGSILD_NULL)
)
}.let { nullAttrRepresentation ->
if (datasetId != null)
nullAttrRepresentation.plus(
NGSILD_DATASET_ID_PROPERTY to buildNonReifiedPropertyValue(datasetId.toString())
)
else nullAttrRepresentation
}

fun toNullValue(): String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ data class Entity(
val scopes: List<String>? = null,
val createdAt: ZonedDateTime,
val modifiedAt: ZonedDateTime? = null,
val deletedAt: ZonedDateTime? = null,
val payload: Json,
val specificAccessPolicy: SpecificAccessPolicy? = null
) {
Expand All @@ -47,17 +48,19 @@ data class Entity(
return resultEntity
}

companion object {

fun toExpandedDeletedEntity(
entityId: URI,
deletedAt: ZonedDateTime
): ExpandedEntity =
ExpandedEntity(
members = mapOf(
JSONLD_ID to entityId,
NGSILD_DELETED_AT_PROPERTY to buildNonReifiedTemporalValue(deletedAt)
)
)
}
fun toExpandedDeletedEntity(
deletedAt: ZonedDateTime
): ExpandedEntity =
ExpandedEntity(
members = mapOf(
JSONLD_ID to entityId,
JSONLD_TYPE to types,
NGSILD_CREATED_AT_PROPERTY to buildNonReifiedTemporalValue(createdAt),
NGSILD_DELETED_AT_PROPERTY to buildNonReifiedTemporalValue(deletedAt),
).run {
if (modifiedAt != null)
this.plus(NGSILD_MODIFIED_AT_PROPERTY to buildNonReifiedTemporalValue(modifiedAt))
else this
}
)
}
Original file line number Diff line number Diff line change
@@ -1,80 +1,88 @@
package com.egm.stellio.search.entity.model

import com.egm.stellio.shared.model.ExpandedAttributeInstance
import com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.annotation.JsonValue
import java.net.URI

/**
* UpdateResult datatype as defined in 5.2.18
*/
data class UpdateResult(
val updated: List<UpdatedDetails>,
val updated: List<String>,
val notUpdated: List<NotUpdatedDetails>
) {

@JsonIgnore
fun isSuccessful(): Boolean =
notUpdated.isEmpty() &&
updated.all { it.updateOperationResult.isSuccessResult() }
notUpdated.isEmpty()

@JsonIgnore
fun mergeWith(other: UpdateResult): UpdateResult =
UpdateResult(
updated = this.updated.plus(other.updated),
notUpdated = this.notUpdated.plus(other.notUpdated)
)
companion object {

@JsonIgnore
fun hasSuccessfulUpdate(): Boolean =
this.updated.isNotEmpty()
operator fun invoke(operationsResults: List<AttributeOperationResult>): UpdateResult =
operationsResults.map {
when (it) {
is SucceededAttributeOperationResult -> it.attributeName
is FailedAttributeOperationResult -> NotUpdatedDetails(it.attributeName, it.errorMessage)
}
}.let {
UpdateResult(
it.filterIsInstance<String>(),
it.filterIsInstance<NotUpdatedDetails>()
)
}
}
}

val EMPTY_UPDATE_RESULT: UpdateResult = UpdateResult(emptyList(), emptyList())

/**
* NotUpdatedDetails as defined in 5.2.19
*/
data class NotUpdatedDetails(
val attributeName: String,
val reason: String
)

data class UpdatedDetails(
@JsonValue
val attributeName: String,
@JsonIgnore
val datasetId: URI?,
@JsonIgnore
val updateOperationResult: UpdateOperationResult
/**
* Internal structure used to convey the result of an operation (update, delete...)
*/
sealed class AttributeOperationResult(
open val attributeName: String,
open val datasetId: URI? = null,
open val operationStatus: OperationStatus
)

data class UpdateAttributeResult(
val attributeName: String,
val datasetId: URI? = null,
val updateOperationResult: UpdateOperationResult,
val errorMessage: String? = null
) {
fun isSuccessfullyUpdated() =
this.updateOperationResult in listOf(
UpdateOperationResult.APPENDED,
UpdateOperationResult.REPLACED,
UpdateOperationResult.UPDATED,
UpdateOperationResult.DELETED,
UpdateOperationResult.IGNORED
)
}
data class SucceededAttributeOperationResult(
override val attributeName: String,
override val datasetId: URI? = null,
override val operationStatus: OperationStatus,
val newExpandedValue: ExpandedAttributeInstance,
) : AttributeOperationResult(attributeName, datasetId, operationStatus)

data class FailedAttributeOperationResult(
override val attributeName: String,
override val datasetId: URI? = null,
override val operationStatus: OperationStatus,
val errorMessage: String
) : AttributeOperationResult(attributeName, datasetId, operationStatus)

enum class UpdateOperationResult {
enum class OperationStatus {
APPENDED,
REPLACED,
UPDATED,
DELETED,
IGNORED,
FAILED;

fun isSuccessResult(): Boolean = listOf(APPENDED, REPLACED, UPDATED, DELETED).contains(this)
}
fun isSuccessResult(): Boolean = getSuccessStatuses().contains(this)

fun updateResultFromDetailedResult(updateStatuses: List<UpdateAttributeResult>): UpdateResult {
val updated = updateStatuses.filter { it.isSuccessfullyUpdated() }
.map { UpdatedDetails(it.attributeName, it.datasetId, it.updateOperationResult) }
companion object {
fun getSuccessStatuses(): List<OperationStatus> = listOf(APPENDED, REPLACED, UPDATED, DELETED, IGNORED)
}
}

val notUpdated = updateStatuses.filter { !it.isSuccessfullyUpdated() }
.map { NotUpdatedDetails(it.attributeName, it.errorMessage!!) }
fun List<AttributeOperationResult>.hasSuccessfulResult(): Boolean =
this.any { it is SucceededAttributeOperationResult }

return UpdateResult(updated, notUpdated)
}
fun List<AttributeOperationResult>.getSucceededOperations(): List<SucceededAttributeOperationResult> =
this.filterIsInstance<SucceededAttributeOperationResult>()
Loading

0 comments on commit 5289ca9

Please sign in to comment.