Skip to content

Commit

Permalink
fix(m360-api): codec rework
Browse files Browse the repository at this point in the history
  • Loading branch information
vincejv committed Aug 30, 2023
1 parent 60c91e4 commit a4c0597
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@
*/
public class M360CodecProvider implements CodecProvider {

@SuppressWarnings("rawtypes")
private static final List<Class> discriminatorClasses;
private static final List<Class<?>> discriminatorClasses;

private static final List<String> ignoredFields;

Expand All @@ -60,13 +59,13 @@ public <T> Codec<T> get(Class<T> clazz, CodecRegistry registry) {
}

private <T> Codec<T> buildDiscriminatorCodec(Class<T> clazz, CodecRegistry registry) {
var dscmntrMdlBldr = ClassModel.builder(clazz)
var discriminatorModelBuilder = ClassModel.builder(clazz)
.enableDiscriminator(true);
if (clazz == BroadcastRequest.class) {
stripNonProperties(dscmntrMdlBldr);
stripNonProperties(discriminatorModelBuilder);
}
return PojoCodecProvider.builder()
.register(dscmntrMdlBldr.build())
.register(discriminatorModelBuilder.build())
.build().get(clazz, registry);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.abavilla.fpi.telco.ext.enums.Telco;
import com.vincejv.m360.dto.ApiRequest;
import io.quarkus.mongodb.panache.common.MongoEntity;
import io.quarkus.runtime.annotations.RegisterForReflection;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
Expand All @@ -33,6 +34,7 @@

@Data
@EqualsAndHashCode(callSuper = true)
@RegisterForReflection
@NoArgsConstructor
@BsonDiscriminator
@MongoEntity(collection="m360_log")
Expand All @@ -49,6 +51,6 @@ public class MsgReq extends AbsMongoItem {
private String messageId;
private List<StateEncap> apiStatus;
private List<String> message;
@BsonProperty("request")
@BsonProperty(value = "request", useDiscriminator = true)
private ApiRequest broadcastRequest;
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public interface MsgReqMapper extends IDtoToEntityMapper<MsgReqDto, MsgReq> {
MsgReq mapToEntity(MsgReqDto dto);

MsgReq mapFromResponse(BroadcastResponseDto broadcastResponseDto);

@AfterMapping
default void afterMappingFromResponse(BroadcastResponseDto broadcastResponseDto, @MappingTarget MsgReq msgReq) {
msgReq.setTelco(Telco.fromId(broadcastResponseDto.getTelcoId()));
Expand Down
59 changes: 29 additions & 30 deletions core/src/main/java/com/abavilla/fpi/sms/service/sms/MsgAckSvc.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,36 +59,35 @@ public Uni<Void> acknowledge(String msgId, String ackStsCde, String ackTimestamp
var byMsgId = msgReqRepo.findByMsgId(msgId);

/* run in background, immediately return response to webhook */
executor.execute(() ->
byMsgId.map(msgReqOpt -> msgReqOpt.orElseThrow(() ->
new ApiSvcEx("Message Id for acknowledgement not found: " + msgId)))
.onFailure(OptimisticLockEx.class).retry().indefinitely()
.onFailure(ApiSvcEx.class)
.retry().withBackOff(Duration.ofSeconds(3)).withJitter(0.2)
.atMost(5) // Retry for item not found and nothing else
.chain(msgReq -> {
var stateItem = new StateEncap(apiStatus, ackTime);
if (ObjectUtils.isEmpty(msgReq.getApiStatus())) {
msgReq.setApiStatus(List.of(stateItem));
} else {
msgReq.getApiStatus().add(stateItem);
}
msgReq.setLastAcknowledgement(ackTime);
msgReq.setDateUpdated(LocalDateTime.now(ZoneOffset.UTC));
return msgReqRepo.persistOrUpdate(msgReq);
})
.onFailure().call(ex -> {
Log.error("Error leak ack", ex);
var leak = new LeakAck();
leak.setDateCreated(LocalDateTime.now(ZoneOffset.UTC));
leak.setDateUpdated(LocalDateTime.now(ZoneOffset.UTC));
leak.setMsgId(msgId);
leak.setApiStatus(apiStatus);
leak.setTimestamp(ackTime);
return repo.persist(leak);
})
.await().indefinitely()
);
byMsgId.map(msgReqOpt -> msgReqOpt.orElseThrow(() ->
new ApiSvcEx("Message Id for acknowledgement not found: " + msgId)))
.onFailure(OptimisticLockEx.class).retry().indefinitely()
.onFailure(ApiSvcEx.class)
.retry().withBackOff(Duration.ofSeconds(3)).withJitter(0.2)
.atMost(5) // Retry for item not found and nothing else
.chain(msgReq -> {
var stateItem = new StateEncap(apiStatus, ackTime);
if (ObjectUtils.isEmpty(msgReq.getApiStatus())) {
msgReq.setApiStatus(List.of(stateItem));
} else {
msgReq.getApiStatus().add(stateItem);
}
msgReq.setLastAcknowledgement(ackTime);
msgReq.setDateUpdated(LocalDateTime.now(ZoneOffset.UTC));
return msgReqRepo.update(msgReq);
})
.onFailure().call(ex -> {
Log.error("Error leak ack", ex);
var leak = new LeakAck();
leak.setDateCreated(LocalDateTime.now(ZoneOffset.UTC));
leak.setDateUpdated(LocalDateTime.now(ZoneOffset.UTC));
leak.setMsgId(msgId);
leak.setApiStatus(apiStatus);
leak.setTimestamp(ackTime);
return repo.persist(leak);
})
.subscribe().with(ignored -> {
});

return Uni.createFrom().voidItem();
}
Expand Down

0 comments on commit a4c0597

Please sign in to comment.