diff --git a/src/main/java/com/anywayclear/controller/AlarmController.java b/src/main/java/com/anywayclear/controller/AlarmController.java index b8113dc..07bd30d 100644 --- a/src/main/java/com/anywayclear/controller/AlarmController.java +++ b/src/main/java/com/anywayclear/controller/AlarmController.java @@ -23,7 +23,7 @@ public class AlarmController { @GetMapping(produces = "text/event-stream") public ResponseEntity createEmitter(@AuthenticationPrincipal OAuth2User oAuth2User, - @RequestHeader(value = "Last-Event_ID", required = false) String lastEventId) { + @RequestHeader(value = "Last-Event_ID", required = false, defaultValue = "") String lastEventId) { return new ResponseEntity<>(alarmService.createEmitter(oAuth2User, lastEventId, LocalDateTime.now()), HttpStatus.OK); } diff --git a/src/main/java/com/anywayclear/controller/PointController.java b/src/main/java/com/anywayclear/controller/PointController.java index ab68993..a151394 100644 --- a/src/main/java/com/anywayclear/controller/PointController.java +++ b/src/main/java/com/anywayclear/controller/PointController.java @@ -1,10 +1,13 @@ package com.anywayclear.controller; +import com.anywayclear.dto.request.PointUpdateRequest; import com.anywayclear.dto.response.PointResponse; import com.anywayclear.service.PointService; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; +import javax.validation.Valid; + @RestController @RequestMapping("/api/points") public class PointController { @@ -14,8 +17,13 @@ public PointController(PointService pointService) { this.pointService = pointService; } - @GetMapping - public ResponseEntity getPoint(@RequestParam(name = "userId") String userId) { + @GetMapping("/{userId}") + public ResponseEntity getPoint(@PathVariable String userId) { return ResponseEntity.ok(pointService.getPoint(userId)); } + + @PatchMapping("/{userId}") + public ResponseEntity updatePoint(@PathVariable String userId, @Valid @RequestBody PointUpdateRequest request) { + return ResponseEntity.ok(pointService.updatePoint(userId, request)); + } } diff --git a/src/main/java/com/anywayclear/dto/request/PointUpdateRequest.java b/src/main/java/com/anywayclear/dto/request/PointUpdateRequest.java new file mode 100644 index 0000000..2313b7d --- /dev/null +++ b/src/main/java/com/anywayclear/dto/request/PointUpdateRequest.java @@ -0,0 +1,14 @@ +package com.anywayclear.dto.request; + +import lombok.*; + +import javax.validation.constraints.NotNull; + +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class PointUpdateRequest { + @NotNull + private int balance; +} diff --git a/src/main/java/com/anywayclear/dto/response/PointResponse.java b/src/main/java/com/anywayclear/dto/response/PointResponse.java index 6d93593..75763f8 100644 --- a/src/main/java/com/anywayclear/dto/response/PointResponse.java +++ b/src/main/java/com/anywayclear/dto/response/PointResponse.java @@ -13,20 +13,17 @@ public class PointResponse { private int balance; private LocalDateTime updatedAt; - private Member member; @Builder - public PointResponse(int balance, LocalDateTime updatedAt, Member member) { + public PointResponse(int balance, LocalDateTime updatedAt) { this.balance = balance; this.updatedAt = updatedAt; - this.member = member; } public static PointResponse toResponse(Point point) { return PointResponse.builder() .balance(point.getBalance()) .updatedAt(point.getUpdatedAt()) - .member(point.getMember()) .build(); } } diff --git a/src/main/java/com/anywayclear/repository/SSEInMemoryRepository.java b/src/main/java/com/anywayclear/repository/SSEInMemoryRepository.java index 600556d..38eda5e 100644 --- a/src/main/java/com/anywayclear/repository/SSEInMemoryRepository.java +++ b/src/main/java/com/anywayclear/repository/SSEInMemoryRepository.java @@ -3,6 +3,7 @@ import org.springframework.stereotype.Repository; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import java.time.LocalDateTime; import java.util.List; import java.util.Map; import java.util.Optional; @@ -13,23 +14,34 @@ public class SSEInMemoryRepository{ private final Map sseEmitterMap = new ConcurrentHashMap<>(); - public void put(String key, SseEmitter sseEmitter) { + // 유실된 데이터 파악을 위한 캐시 저장 { 'userId_now' : alarm() } + private final Map eventCache = new ConcurrentHashMap<>(); + + public SseEmitter save(String key, SseEmitter sseEmitter) { sseEmitterMap.put(key, sseEmitter); + return sseEmitter; + } + + public void saveEventCache(String eventCacheId, Object event) { + eventCache.put(eventCacheId, event); } public Optional get(String key) { return Optional.ofNullable(sseEmitterMap.get(key)); } -// public Map getListByKeyPrefix(String keyPrefix){ -//// return sseEmitterMap.keySet().stream() -//// .filter(key -> key.startsWith(keyPrefix)) -//// .map(sseEmitterMap::get) -//// .collect(Collectors.toList()); -// return sseEmitterMap.entrySet().stream() -// .filter(entry -> entry.getKey().startsWith(keyPrefix)) -// .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); -// } + public Map findAllEventCacheByUserId(String userId) { + return eventCache.entrySet().stream() + .filter(entry -> entry.getKey().startsWith(userId)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + public Map findAllEmitterByUserId(String userId){ + return sseEmitterMap.entrySet().stream() + .filter(entry -> entry.getKey().startsWith(userId)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + // public List getKeyListByKeyPrefix(String keyPrefix){ // return sseEmitterMap.keySet().stream() @@ -37,17 +49,7 @@ public Optional get(String key) { // .collect(Collectors.toList()); // } -// public Map getListByKeySuffix(String keySuffic){ -//// return sseEmitterMap.keySet().stream() -//// .filter(key -> key.startsWith(keyPrefix)) -//// .map(sseEmitterMap::get) -//// .collect(Collectors.toList()); -// return sseEmitterMap.entrySet().stream() -// .filter(entry -> entry.getKey().endsWith(keySuffic)) -// .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); -// } - - public void remove(String key) { + public void delete(String key) { sseEmitterMap.remove(key); } @@ -56,4 +58,21 @@ public void remove(String key) { // if (key.startsWith(keyPrefix)) sseEmitterMap.remove(key); // }); // } + + public void deleteAllEmitterByUserId(String userId) { + sseEmitterMap.forEach((key, emitter) -> { + if (key.startsWith(userId)) { + sseEmitterMap.remove(key); + } + }); + } + + public void deleteAllEventCacheByUserId(String userId) { + eventCache.forEach((key, emitter) -> { + if (key.startsWith(userId)) { + eventCache.remove(key); + } + }); + } + } diff --git a/src/main/java/com/anywayclear/service/AlarmService.java b/src/main/java/com/anywayclear/service/AlarmService.java index 6539808..e2dd025 100644 --- a/src/main/java/com/anywayclear/service/AlarmService.java +++ b/src/main/java/com/anywayclear/service/AlarmService.java @@ -51,33 +51,46 @@ public SseEmitter createEmitter(OAuth2User oAuth2User, String lastEventId, Local // SSE 연결 // emitter에 키에 토픽, 유저 정보를 담기 // 알람 내용에 토픽 정보를 넣어 해당 토픽으로 에미터를 검색해 sse 데이터 보내기 - System.out.println("AlarmService.createEmitter 진입"); - SseEmitter emitter = new SseEmitter(Long.parseLong("100000")); - System.out.println("emitter 생성 : " + emitter); - System.out.println("userId = " + userId); - sseRepository.put(userId, emitter); - System.out.println("emitter 저장"); + String key = userId + "_" + now; + SseEmitter emitter; + + if (sseRepository.get(userId).isPresent()) { + sseRepository.delete(userId); + emitter = sseRepository.save(key, new SseEmitter(100000L * 4500L)); + } else { + emitter = sseRepository.save(key, new SseEmitter(100000L * 4500L)); + } + + + // 오류 발생 시 emitter 삭제 emitter.onCompletion(() -> { System.out.println("onCompletion callback"); - sseRepository.remove(userId); // 만료되면 리스트에서 삭제 + + sseRepository.delete(userId); // 만료되면 리스트에서 삭제 + }); emitter.onTimeout(() -> { System.out.println("onTimeout callback"); // 만료시 Repository에서 삭제 - emitter.complete(); + sseRepository.delete(userId); + }); + emitter.onError((e) -> { + System.out.println("onError callback"); + sseRepository.delete(userId); }); + + // 503 에러 방지 - 더미 이벤트 전송 sendToClient(emitter, userId, "Connected", "subscribe"); System.out.println("sse 알림 발송"); - if (lastEventId!=null) { // 클라이언트가 미수신한 Event 유실 예방, 연결이 끊겼거나 미수신된 데이터를 다 찾아서 보내준다. -// Map events = sseRepository.getListByKeySuffix(userId); -// events.entrySet().stream() -// .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0) -// .forEach(entry -> sendToClient(emitter, entry.getKey(), "Alarm", entry.getValue())); - SseEmitter event = sseRepository.get(userId).orElseThrow(() -> new CustomException(ExceptionCode.INVALID_MEMBER)); - sendToClient(emitter, userId, "Alarm", event); + if (!lastEventId.isEmpty()) { // 클라이언트가 미수신한 Event 유실 예방, 연결이 끊겼거나 미수신된 데이터를 다 찾아서 보내준다. + Map eventCaches = sseRepository.findAllEventCacheByUserId(userId); + eventCaches.entrySet().stream() + .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0) + .forEach(entry -> sendToClient(emitter, entry.getKey(), "Alarm", entry.getValue())); + } return emitter; @@ -103,21 +116,18 @@ public void pushAlarm(String type, String topicName) { // 알람 전송 // 받은 알람 SSE 전송 // 해당 토픽의 SseEmitter 모두 가져옴 receiverKeyList.forEach( - key -> { - SseEmitter emitter = sseRepository.get(key).orElseThrow(() -> new CustomException(ExceptionCode.INVALID_MEMBER)); - sendToClient(emitter, key, "Alarm", alarm); + receiverKey -> { + Map sseEmitters = sseRepository.findAllEmitterByUserId(receiverKey); + sseEmitters.forEach( + (key, emitter) -> { + sseRepository.saveEventCache(key, alarm); + sendToClient(emitter, key, "Alarm", alarm); + } + ); } ); -// Map sseEmitters = sseRepository.getListByKeyPrefix(topicName); -// sseEmitters.forEach( -// (key, emitter) -> { -// // 데이터 전송 -// sendToClient(emitter, key, "Alarm", alarm); -// } -// ); - // 레디스 저장 -> 키(발송인 정보) : 값(알람 객체) String key = "member:" + alarm.getSender() + ":alarm:" + alarm.getId(); // key 설정 -> member:memberId:alarm:alarmId; redisAlarmTemplate.opsForValue().set(key, alarm); // 레디스에 저장 @@ -180,15 +190,19 @@ public void sendToClient(SseEmitter emitter, String key, String name, Object dat emitter.send(SseEmitter.event() .id(key) .name(name) - .data(data)); + .data(data) + .reconnectTime(0)); log.info("data 전달 성공"); emitter.send("더미 데이터"); log.info("더미데이터 전달 성공"); + emitter.complete(); + sseRepository.delete(key); } catch (IOException e) { - sseRepository.remove(key); + sseRepository.delete(key); + emitter.completeWithError(e); log.info("예외 발생"); } - emitter.complete(); + }); } } diff --git a/src/main/java/com/anywayclear/service/PointService.java b/src/main/java/com/anywayclear/service/PointService.java index dd3b6c6..0ac1da0 100644 --- a/src/main/java/com/anywayclear/service/PointService.java +++ b/src/main/java/com/anywayclear/service/PointService.java @@ -1,11 +1,15 @@ package com.anywayclear.service; +import com.anywayclear.dto.request.PointUpdateRequest; import com.anywayclear.dto.response.PointResponse; import com.anywayclear.entity.Member; import com.anywayclear.entity.Point; +import com.anywayclear.exception.CustomException; +import com.anywayclear.exception.ExceptionCode; import com.anywayclear.repository.MemberRepository; import com.anywayclear.repository.PointRepository; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; @Service public class PointService { @@ -17,12 +21,21 @@ public PointService(PointRepository pointRepository, MemberRepository memberRepo this.memberRepository = memberRepository; } + @Transactional(readOnly = true) public PointResponse getPoint(String userId) { // 닉네임으로 검색해야 하기 때문에 멤버 리포지토리에서 멤버 객체를 불러옴 - Member member = memberRepository.findByUserId(userId).orElseThrow(() -> new RuntimeException("해당 userId의 유저가 없습니다.")); + Member member = memberRepository.findByUserId(userId).orElseThrow(() -> new CustomException(ExceptionCode.INVALID_MEMBER)); // 불러온 멤버 객체에서 포인트 객체 가져오기 Point point = member.getPoint(); return PointResponse.toResponse(point); } + @Transactional + public PointResponse updatePoint(String userId, PointUpdateRequest request) { + Member member = memberRepository.findByUserId(userId).orElseThrow(() -> new CustomException(ExceptionCode.INVALID_MEMBER)); + // 불러온 멤버 객체에서 포인트 객체 가져오기 + Point point = member.getPoint(); + point.setBalance(request.getBalance()); + return PointResponse.toResponse(pointRepository.save(point)); + } }