From be0de2ac5bbfca3f469aa2e5a67c3ae5a045ed00 Mon Sep 17 00:00:00 2001 From: flowerdonk <122508480+flowerdonk@users.noreply.github.com> Date: Mon, 14 Aug 2023 00:50:43 +0900 Subject: [PATCH] Feat/sse (#98) --- .../anywayclear/config/SecurityConfig.java | 2 + .../controller/AlarmController.java | 35 +---- .../anywayclear/controller/DibController.java | 24 ++-- .../controller/SubscribeController.java | 23 ++- src/main/java/com/anywayclear/entity/Dib.java | 12 ++ .../com/anywayclear/entity/Subscribe.java | 12 ++ .../anywayclear/exception/ExceptionCode.java | 3 + .../exception/GlobalExceptionHandler.java | 10 ++ .../anywayclear/repository/DibRepository.java | 5 +- .../repository/SSEInMemoryRepository.java | 23 ++- .../anywayclear/repository/SSERepository.java | 2 +- .../repository/SubscribeRepository.java | 2 + .../com/anywayclear/service/AlarmService.java | 134 +++++++++--------- .../com/anywayclear/service/DibService.java | 44 ++++-- .../service/RedisSubscribeService.java | 18 ++- .../anywayclear/service/SubscribeService.java | 39 +++-- 16 files changed, 239 insertions(+), 149 deletions(-) diff --git a/src/main/java/com/anywayclear/config/SecurityConfig.java b/src/main/java/com/anywayclear/config/SecurityConfig.java index a63e4a9..45278d4 100644 --- a/src/main/java/com/anywayclear/config/SecurityConfig.java +++ b/src/main/java/com/anywayclear/config/SecurityConfig.java @@ -61,6 +61,8 @@ public SecurityFilterChain securityFilterChain(HttpSecurity httpSecurity) throws .authorizeHttpRequests(authorize -> authorize .antMatchers(HttpMethod.OPTIONS).permitAll() // OPTIONS 메서드는 모두 허용 // .antMatchers("/api/members/**").authenticated() // jwt없이 요청한건지 재확인 가능 + .antMatchers("/api/subscribes/**").authenticated() // jwt없이 요청한건지 재확인 가능 + .antMatchers("/api/dibs/**").authenticated() // jwt없이 요청한건지 재확인 가능 .anyRequest().permitAll() ); diff --git a/src/main/java/com/anywayclear/controller/AlarmController.java b/src/main/java/com/anywayclear/controller/AlarmController.java index 25cc09b..ca9b4f0 100644 --- a/src/main/java/com/anywayclear/controller/AlarmController.java +++ b/src/main/java/com/anywayclear/controller/AlarmController.java @@ -2,22 +2,10 @@ import com.anywayclear.dto.response.AlarmResponseList; import com.anywayclear.service.AlarmService; - import lombok.RequiredArgsConstructor; -import org.springframework.data.domain.Pageable; -import org.springframework.data.domain.Sort; -import org.springframework.data.web.PageableDefault; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; - -import org.springframework.security.core.annotation.AuthenticationPrincipal; -import org.springframework.security.oauth2.core.user.OAuth2User; import org.springframework.web.bind.annotation.*; -import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; - -import javax.servlet.http.HttpServletResponse; -import java.net.URI; -import java.time.LocalDateTime; @RestController @RequiredArgsConstructor @@ -27,29 +15,10 @@ public class AlarmController { // 알림 서비스 private final AlarmService alarmService; - @PutMapping("/topic/{topicName}") - public ResponseEntity createTopic(@PathVariable String topicName) { - final String topic = alarmService.createTopic(topicName); - return ResponseEntity.created(URI.create("api/alarms/" + topic)).build(); - } - - @GetMapping(value = "/topic/{topicName}/subscribe", produces = "text/event-stream") - public ResponseEntity subscribeTopic(@PathVariable String topicName, @AuthenticationPrincipal OAuth2User oAuth2User, - @RequestHeader(value = "Last-Event_ID", required = false) String lastEventId, HttpServletResponse response) { - System.out.println("OAuth UserName : " + oAuth2User); - return new ResponseEntity<>(alarmService.subscribeTopic(topicName, "username", lastEventId, LocalDateTime.now()), HttpStatus.OK); - } - @PostMapping("/topic/{topicName}") @ResponseStatus(HttpStatus.CREATED) - public void pushAlarm(@PathVariable String topicName, @RequestParam(name = "sender") String sender, @RequestParam(name = "context") String context) { - alarmService.pushAlarm(topicName, sender, context); - } - - @DeleteMapping("/topic/{topicName}") - @ResponseStatus(HttpStatus.OK) - public void deleteTopic(@PathVariable String topicName) { - alarmService.deleteTopic(topicName); + public void pushAlarm(@PathVariable String topicName, @RequestParam(name = "context") String context) { + alarmService.pushAlarm(topicName, context); } @GetMapping("/{memberId}/subs") diff --git a/src/main/java/com/anywayclear/controller/DibController.java b/src/main/java/com/anywayclear/controller/DibController.java index 1b3ec3e..c5d8fa0 100644 --- a/src/main/java/com/anywayclear/controller/DibController.java +++ b/src/main/java/com/anywayclear/controller/DibController.java @@ -1,22 +1,20 @@ package com.anywayclear.controller; -import com.anywayclear.dto.request.DibCreateRequest; import com.anywayclear.dto.response.DibResponse; -import com.anywayclear.dto.response.DibResponseList; import com.anywayclear.dto.response.IsDibResponse; import com.anywayclear.dto.response.MultiResponse; import com.anywayclear.entity.Dib; import com.anywayclear.service.DibService; import org.springframework.data.domain.Pageable; -import org.springframework.data.domain.Sort; -import org.springframework.data.web.PageableDefault; +import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.security.core.annotation.AuthenticationPrincipal; import org.springframework.security.oauth2.core.user.OAuth2User; import org.springframework.web.bind.annotation.*; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; -import javax.validation.Valid; -import java.net.URI; +import javax.servlet.http.HttpServletResponse; +import java.time.LocalDateTime; @RestController @RequestMapping("/api/dibs") @@ -27,10 +25,10 @@ public DibController(DibService dibService) { this.dibService = dibService; } - @PostMapping - public ResponseEntity createDib(@Valid @RequestBody DibCreateRequest request) { - final Long id = dibService.createDib(request); - return ResponseEntity.created(URI.create("api/dibs/" + id)).build(); + @GetMapping(value = "/{produceId}/dib", produces = "text/event-stream") + public ResponseEntity createDib(@PathVariable Long produceId, @AuthenticationPrincipal OAuth2User oAuth2User, + @RequestHeader(value = "Last-Event_ID", required = false) String lastEventId, HttpServletResponse response) { + return new ResponseEntity<>(dibService.createDib(produceId, oAuth2User, lastEventId, LocalDateTime.now()), HttpStatus.OK); } @GetMapping("/{userId}") @@ -44,4 +42,10 @@ public ResponseEntity getIsDib(@AuthenticationPrincipal OAuth2Use return ResponseEntity.ok(dibService.getIsDib(userId, produceId)); } + @DeleteMapping("/{produce-id}") + @ResponseStatus(HttpStatus.OK) + public void deleteSubscribe(@PathVariable("produce-id") Long produceId, @AuthenticationPrincipal OAuth2User oAuth2User) { + String consumerId = (String) oAuth2User.getAttributes().get("userId"); + dibService.deleteDib(produceId, consumerId); + } } diff --git a/src/main/java/com/anywayclear/controller/SubscribeController.java b/src/main/java/com/anywayclear/controller/SubscribeController.java index 5a289be..195e3f8 100644 --- a/src/main/java/com/anywayclear/controller/SubscribeController.java +++ b/src/main/java/com/anywayclear/controller/SubscribeController.java @@ -1,16 +1,17 @@ package com.anywayclear.controller; -import com.anywayclear.dto.request.SubscribeCreateRequest; import com.anywayclear.dto.response.IsSubResponse; import com.anywayclear.dto.response.SubscribeResponseList; import com.anywayclear.service.SubscribeService; +import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.security.core.annotation.AuthenticationPrincipal; import org.springframework.security.oauth2.core.user.OAuth2User; import org.springframework.web.bind.annotation.*; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; -import javax.validation.Valid; -import java.net.URI; +import javax.servlet.http.HttpServletResponse; +import java.time.LocalDateTime; @RestController @RequestMapping("/api/subscribes") @@ -21,10 +22,11 @@ public SubscribeController(SubscribeService subscribeService) { this.subscribeService = subscribeService; } - @PostMapping - public ResponseEntity createSubscribe(@Valid @RequestBody SubscribeCreateRequest request) { - final Long id = subscribeService.createSubscribe(request); - return ResponseEntity.created(URI.create("api/subscribes/" + id)).build(); + @GetMapping(value = "/{userId}/subscribe", produces = "text/event-stream") + public ResponseEntity createSubscribe(@PathVariable String userId, @AuthenticationPrincipal OAuth2User oAuth2User, + @RequestHeader(value = "Last-Event_ID", required = false) String lastEventId, HttpServletResponse response) { + System.out.println("로그인 유저 = " + oAuth2User); + return new ResponseEntity<>(subscribeService.createSubscribe(userId, oAuth2User, lastEventId, LocalDateTime.now()), HttpStatus.OK); } @GetMapping @@ -37,4 +39,11 @@ public ResponseEntity getIsSub(@AuthenticationPrincipal OAuth2Use String consumerId = (String) oAuth2User.getAttributes().get("userId"); return ResponseEntity.ok(subscribeService.getIsSub(consumerId,sellerId)); } + + @DeleteMapping("/{seller-id}") + @ResponseStatus(HttpStatus.OK) + public void deleteSubscribe(@PathVariable("seller-id") String sellerId, @AuthenticationPrincipal OAuth2User oAuth2User) { + String consumerId = (String) oAuth2User.getAttributes().get("userId"); + subscribeService.deleteSubscribe(sellerId, consumerId); + } } diff --git a/src/main/java/com/anywayclear/entity/Dib.java b/src/main/java/com/anywayclear/entity/Dib.java index 026f36e..ea1aa8f 100644 --- a/src/main/java/com/anywayclear/entity/Dib.java +++ b/src/main/java/com/anywayclear/entity/Dib.java @@ -12,6 +12,18 @@ @Getter @Setter @NoArgsConstructor +@Table( + name = "dib", + uniqueConstraints = { + @UniqueConstraint( + name = "UniqueConsumerAndProduce", + columnNames = { + "consumer_id", + "produce_id" + } + ) + } +) public class Dib { @Id diff --git a/src/main/java/com/anywayclear/entity/Subscribe.java b/src/main/java/com/anywayclear/entity/Subscribe.java index 7c72b54..423885d 100644 --- a/src/main/java/com/anywayclear/entity/Subscribe.java +++ b/src/main/java/com/anywayclear/entity/Subscribe.java @@ -12,6 +12,18 @@ @Getter @Setter @NoArgsConstructor +@Table( + name = "subscribe", + uniqueConstraints = { + @UniqueConstraint( + name = "UniqueConsumerAndSeller", + columnNames = { + "consumer_id", + "seller_id" + } + ) + } +) public class Subscribe { @Id diff --git a/src/main/java/com/anywayclear/exception/ExceptionCode.java b/src/main/java/com/anywayclear/exception/ExceptionCode.java index e659e32..76d08ba 100644 --- a/src/main/java/com/anywayclear/exception/ExceptionCode.java +++ b/src/main/java/com/anywayclear/exception/ExceptionCode.java @@ -17,9 +17,12 @@ public enum ExceptionCode { INVALID_PRODUCE(BAD_REQUEST, "잘못된 농산물 정보 입니다", 400), INVALID_PRODUCE_ID(BAD_REQUEST, "잘못된 농산물 ID 입니다", 400), INVALID_AUCTION_ID(BAD_REQUEST, "잘못된 경매 ID 입니다", 400), + INVALID_DIB_ID(BAD_REQUEST, "잘못된 찜 ID 입니다", 400), + INVALID_SUBSCRIBE_ID(BAD_REQUEST, "잘못된 구독 ID 입니다", 400), INVALID_PRICE(BAD_REQUEST, "현재 입찰가보다 낮게 입찰할 수 없습니다", 400), EXPIRED_AUCTION_TIME(BAD_REQUEST,"경매 시간이 종료되었습니다.",400), INVALID_AUCTION_STATUS(BAD_REQUEST, "경매 가능한 상태가 아닙니다", 400), + INVALID_INSERT_REQUEST(BAD_REQUEST, "중복된 Insert 요청입니다.", 400), // 401 UNAUTHORIZED : 인증되지 않은 사용자 INVALID_TOKEN(UNAUTHORIZED, "잘못된 토큰입니다", 401), diff --git a/src/main/java/com/anywayclear/exception/GlobalExceptionHandler.java b/src/main/java/com/anywayclear/exception/GlobalExceptionHandler.java index a4227de..2ba809d 100644 --- a/src/main/java/com/anywayclear/exception/GlobalExceptionHandler.java +++ b/src/main/java/com/anywayclear/exception/GlobalExceptionHandler.java @@ -6,6 +6,10 @@ import org.springframework.web.bind.annotation.RestControllerAdvice; import org.springframework.web.servlet.NoHandlerFoundException; +import javax.validation.ConstraintViolationException; + +import java.sql.SQLIntegrityConstraintViolationException; + import static com.anywayclear.exception.ExceptionCode.*; import static org.springframework.http.HttpStatus.*; import static org.springframework.web.client.HttpClientErrorException.Forbidden; @@ -56,4 +60,10 @@ private ErrorResponse handleResourceException() { // private ErrorResponse handleException(Exception e) { // return new ErrorResponse(INTERNAL_SERVER_ERROR,e.getMessage()); // } + + @ExceptionHandler(SQLIntegrityConstraintViolationException.class) + @ResponseStatus(BAD_REQUEST) + private ErrorResponse handleConstraintViolationException() { + return ErrorResponse.of(INVALID_INSERT_REQUEST); + } } diff --git a/src/main/java/com/anywayclear/repository/DibRepository.java b/src/main/java/com/anywayclear/repository/DibRepository.java index 1cd8c50..d16bba0 100644 --- a/src/main/java/com/anywayclear/repository/DibRepository.java +++ b/src/main/java/com/anywayclear/repository/DibRepository.java @@ -7,6 +7,7 @@ import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; +import javax.transaction.Transactional; import java.util.List; import java.util.Optional; @@ -14,9 +15,11 @@ public interface DibRepository extends JpaRepository { Page findAllByConsumer(Member member, Pageable pageable); // 찜 중인 농산물 찾기 (페이지) - List findAllByConsumer(Optional member); // 찜 중인 농산물 찾기 + List findAllByConsumer(Member member); // 찜 중인 농산물 찾기 List findAllByProduce(Produce produce); // 해당 농산물을 찜한 소비자 찾기 Optional findByConsumerAndProduce(Member member, Produce produce); + + void deleteByConsumerAndProduce(Member member, Produce produce); } diff --git a/src/main/java/com/anywayclear/repository/SSEInMemoryRepository.java b/src/main/java/com/anywayclear/repository/SSEInMemoryRepository.java index d6eb323..cd1c589 100644 --- a/src/main/java/com/anywayclear/repository/SSEInMemoryRepository.java +++ b/src/main/java/com/anywayclear/repository/SSEInMemoryRepository.java @@ -21,11 +21,14 @@ public Optional get(String key) { return Optional.ofNullable(sseEmitterMap.get(key)); } - public List getListByKeyPrefix(String keyPrefix){ - return sseEmitterMap.keySet().stream() - .filter(key -> key.startsWith(keyPrefix)) - .map(sseEmitterMap::get) - .collect(Collectors.toList()); + public Map getListByKeyPrefix(String keyPrefix){ +// return sseEmitterMap.keySet().stream() +// .filter(key -> key.startsWith(keyPrefix)) +// .map(sseEmitterMap::get) +// .collect(Collectors.toList()); + return sseEmitterMap.entrySet().stream() + .filter(entry -> entry.getKey().startsWith(keyPrefix)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } public List getKeyListByKeyPrefix(String keyPrefix){ @@ -34,6 +37,16 @@ public List getKeyListByKeyPrefix(String keyPrefix){ .collect(Collectors.toList()); } + public Map getListByKeySuffix(String keySuffic){ +// return sseEmitterMap.keySet().stream() +// .filter(key -> key.startsWith(keyPrefix)) +// .map(sseEmitterMap::get) +// .collect(Collectors.toList()); + return sseEmitterMap.entrySet().stream() + .filter(entry -> entry.getKey().endsWith(keySuffic)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + public void remove(String key) { sseEmitterMap.remove(key); } diff --git a/src/main/java/com/anywayclear/repository/SSERepository.java b/src/main/java/com/anywayclear/repository/SSERepository.java index 085f2ad..b310084 100644 --- a/src/main/java/com/anywayclear/repository/SSERepository.java +++ b/src/main/java/com/anywayclear/repository/SSERepository.java @@ -14,7 +14,7 @@ public interface SSERepository { Optional get(String key); - List getListByKeyPrefix(String keyPrefix); + Map getListByKeyPrefix(String keyPrefix); List getKeyListByKeyPrefix(String keyPrefix); diff --git a/src/main/java/com/anywayclear/repository/SubscribeRepository.java b/src/main/java/com/anywayclear/repository/SubscribeRepository.java index f8966e2..69cc242 100644 --- a/src/main/java/com/anywayclear/repository/SubscribeRepository.java +++ b/src/main/java/com/anywayclear/repository/SubscribeRepository.java @@ -4,6 +4,7 @@ import com.anywayclear.entity.Subscribe; import org.springframework.data.jpa.repository.JpaRepository; +import javax.transaction.Transactional; import java.util.List; import java.util.Optional; @@ -14,4 +15,5 @@ public interface SubscribeRepository extends JpaRepository { Optional findByConsumerAndSeller(Member consumer, Member seller); + void deleteByConsumerAndSeller(Member consumer, Member seller); } diff --git a/src/main/java/com/anywayclear/service/AlarmService.java b/src/main/java/com/anywayclear/service/AlarmService.java index fbe89b9..ace2140 100644 --- a/src/main/java/com/anywayclear/service/AlarmService.java +++ b/src/main/java/com/anywayclear/service/AlarmService.java @@ -1,48 +1,43 @@ package com.anywayclear.service; -import com.anywayclear.dto.response.*; +import com.anywayclear.dto.response.AlarmResponseList; import com.anywayclear.entity.Alarm; import com.anywayclear.entity.Dib; import com.anywayclear.entity.Member; +import com.anywayclear.entity.Subscribe; +import com.anywayclear.exception.CustomException; +import com.anywayclear.exception.ExceptionCode; import com.anywayclear.repository.DibRepository; import com.anywayclear.repository.MemberRepository; import com.anywayclear.repository.SSEInMemoryRepository; import com.anywayclear.repository.SubscribeRepository; import lombok.RequiredArgsConstructor; -import org.springframework.data.domain.Pageable; +import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.listener.ChannelTopic; -import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; -import javax.annotation.PostConstruct; import java.io.IOException; import java.time.LocalDateTime; -import java.util.*; - -import java.util.concurrent.ConcurrentHashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Service @RequiredArgsConstructor +@Slf4j public class AlarmService { - // topic에 메시지 발행을 기다리는 Listener - private final RedisMessageListenerContainer redisMessageListener; - // 리포지토리 대신 템플릿 사용 private final RedisTemplate redisAlarmTemplate; - // 발행 서비스 - private final RedisPublishService redisPublishService; - - // 구독 서비스 - private final RedisSubscribeService redisSubscribeService; - // 구독 목록 불러오기 위한 서비스 - private final SubscribeService subscribeService; + private final SubscribeRepository subscribeRepository; // 찜 목록 불러오기 위한 리포지토리 private final DibRepository dibRepository; @@ -50,37 +45,23 @@ public class AlarmService { // 유저Id로 유저 객체를 찾기 위한 리포지토리 private final MemberRepository memberRepository; - // topic 이름으로 topic 정보를 가져와 메시지를 발송할 수 있도록 Map에 저장 - private Map channels; - - private SSEInMemoryRepository sseRepository; - - @PostConstruct - public void init() { // topic 정보를 담을 Map을 초기화 - channels = new ConcurrentHashMap<>(); // topicName : ChannelTopic obj - } - - public String createTopic(String topicName) { // 신규 Topic을 생성하고 Listener 등록 및 Topic Map에 저장 - ChannelTopic topic = new ChannelTopic(topicName); // 토픽 생성 - redisMessageListener.addMessageListener(redisSubscribeService, topic); // 리스너 등록 - channels.put(topicName, topic); - return topicName; - } - - public SseEmitter subscribeTopic(String topicName, String userId, String lastEventId, LocalDateTime now) { - // Pub/Sub Topic 찾아서 리스너 연결 -// ChannelTopic topic = channels.get(topicName); -// redisMessageListener.addMessageListener(redisSubscribeService, topic); + private final SSEInMemoryRepository sseRepository; + public SseEmitter createEmitter(String topicName, String userId, String lastEventId, LocalDateTime now) { // SSE 연결 // emitter에 키에 토픽, 유저 정보를 담기 // 알람 내용에 토픽 정보를 넣어 해당 토픽으로 에미터를 검색해 sse 데이터 보내기 + System.out.println("AlarmService.createEmitter 진입"); SseEmitter emitter = new SseEmitter(Long.parseLong("100000")); - String key = userId + ":" + topicName + ":" +now.toString(); + System.out.println("emitter 생성 : " + emitter); + String key = topicName + "_" + now.toString() + "_" + userId; + System.out.println("key = " + key); + sseRepository.put(key, emitter); + System.out.println("emitter 저장"); emitter.onCompletion(() -> { System.out.println("onCompletion callback"); - sseRepository.remove(key); + sseRepository.remove(key); // 만료되면 리스트에서 삭제 }); emitter.onTimeout(() -> { System.out.println("onTimeout callback"); @@ -88,47 +69,53 @@ public SseEmitter subscribeTopic(String topicName, String userId, String lastEve emitter.complete(); }); - sseRepository.put(key, emitter); - try { - emitter.send(SseEmitter.event() - .name("CONNCECTED") // 클라이언트에서 식별하는 이벤트 이름 - .id(key) - .data("subscribe")); - } catch (IOException exception) { - sseRepository.remove(key); - System.out.println("SSE Exception: " + exception.getMessage()); + sendToClient(emitter, key, "Connected", "subscribe"); + System.out.println("sse 알림 발송"); + + if (lastEventId!=null) { // 클라이언트가 미수신한 Event 유실 예방, 연결이 끊켰거나 미수신된 데이터를 다 찾아서 보내준다. + Map events = sseRepository.getListByKeySuffix(userId); + events.entrySet().stream() + .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0) + .forEach(entry -> sendToClient(emitter, entry.getKey(), "Alarm", entry.getValue())); } return emitter; } - public void pushAlarm(String topicName, String sender, String context) { // 알람 전송 - ChannelTopic topic = channels.get(topicName); // 토픽 객체 불러오기 - Alarm alarm = Alarm.builder().sender(sender).context(context).build(); // 알람 객체 생성 - redisPublishService.publish(topic, alarm); // 알람 전송 + public void pushAlarm(String topicName, String context) { // 알람 전송 + Alarm alarm = Alarm.builder().sender(topicName).context(context).build(); // 알람 객체 생성 + // 받은 알람 SSE 전송 + // 해당 토픽의 SseEmitter 모두 가져옴 + Map sseEmitters = sseRepository.getListByKeyPrefix(topicName); + sseEmitters.forEach( + (key, emitter) -> { + // 데이터 전송 + sendToClient(emitter, key, "Alarm", alarm); + } + ); - } + // 레디스 저장 -> 키(발송인 정보) : 값(알람 객체) + String key = "member:" + alarm.getSender() + ":alarm:" + alarm.getId(); // key 설정 -> member:memberId:alarm:alarmId; + redisAlarmTemplate.opsForValue().set(key, alarm); // 레디스에 저장 + redisAlarmTemplate.expire(key, 1, TimeUnit.MINUTES); // TTL 설정 ***** 테스트용 - public void deleteTopic(String topicName) { // 토픽 제거 - ChannelTopic topic = channels.get(topicName); // 토픽 객체 불러오기 - redisMessageListener.removeMessageListener(redisSubscribeService, topic); // 토픽 제거 - channels.remove(topicName); // HashMap 제거 } + @Transactional(readOnly = true) public AlarmResponseList getSubscribeAlarmList(String memberId) { // 해당 유저의 알림 리스트 불러오기 // 패턴 매칭 사용 -> member:memberId:alarm:* // [1] 유저의 구독 목록 불러오기 - SubscribeResponseList subscribeResponseList = subscribeService.getSubscribeList(memberId); - Set keys = new HashSet<>(); // Set을 사용하여 중복된 값 제거 - for (SubscribeResponse response : subscribeResponseList.getSubscribeResponseList()) { - String seller = response.getUserId(); // Response 객체에서 sender 필드 값을 추출 + Member member = memberRepository.findByUserId(memberId).orElseThrow(() -> new CustomException(ExceptionCode.INVALID_MEMBER)); + List subscribeList = subscribeRepository.findAllByConsumer(member); + Set keys = new HashSet<>(); + for (Subscribe subscribe : subscribeList) { + String seller = subscribe.getSeller().getUserId(); // Response 객체에서 sender 필드 값을 추출 keys.add(seller); // keys 집합에 sender 값 추가 } - // [2] 반복문으로 해당 판매자 userId가 키로 포함된 알림 내역 불러오기 - Set subKeys = new HashSet<>(); + Set subKeys = new HashSet<>(); // Set을 사용하여 중복된 값 제거 for (String key : keys) { subKeys.addAll(redisAlarmTemplate.keys("member:" + key + ":alarm:*")); } @@ -137,12 +124,13 @@ public AlarmResponseList getSubscribeAlarmList(String memberId) { // 해당 유 List alarmList = subKeys.stream().map(k -> redisAlarmTemplate.opsForValue().get(k)).collect(Collectors.toList()); // 해당 키의 알람 리스트 저장 return new AlarmResponseList(alarmList); } - + + @Transactional(readOnly = true) public AlarmResponseList getDibAlarmList(String memberId) { // 해당 유저의 알림 리스트 불러오기 // 패턴 매칭 사용 -> member:memberId:alarm:* // [1] 유저의 찜 목록 불러오기 - Optional member = memberRepository.findByUserId(memberId); + Member member = memberRepository.findByUserId(memberId).orElseThrow(() -> new CustomException(ExceptionCode.INVALID_MEMBER)); List dibList = dibRepository.findAllByConsumer(member); Set keys = new HashSet<>(); // Set을 사용하여 중복된 값 제거 for (Dib dib : dibList) { @@ -160,4 +148,16 @@ public AlarmResponseList getDibAlarmList(String memberId) { // 해당 유저의 List alarmList = Dibskeys.stream().map(k -> redisAlarmTemplate.opsForValue().get(k)).collect(Collectors.toList()); // 해당 키의 알람 리스트 저장 return new AlarmResponseList(alarmList); } + + public void sendToClient(SseEmitter emitter, String key, String name, Object data) { + log.info("key={}, name={}, message={}",key, name, data); + try { + emitter.send(SseEmitter.event() + .id(key) + .name(name) + .data(data)); + } catch (IOException e) { + sseRepository.remove(key); + } + } } diff --git a/src/main/java/com/anywayclear/service/DibService.java b/src/main/java/com/anywayclear/service/DibService.java index 4a736f5..695408b 100644 --- a/src/main/java/com/anywayclear/service/DibService.java +++ b/src/main/java/com/anywayclear/service/DibService.java @@ -1,8 +1,6 @@ package com.anywayclear.service; -import com.anywayclear.dto.request.DibCreateRequest; import com.anywayclear.dto.response.DibResponse; -import com.anywayclear.dto.response.DibResponseList; import com.anywayclear.dto.response.IsDibResponse; import com.anywayclear.dto.response.MultiResponse; import com.anywayclear.entity.Dib; @@ -14,15 +12,15 @@ import com.anywayclear.repository.ProduceRepository; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; -import org.springframework.data.domain.Sort; -import org.springframework.data.web.PageableDefault; +import org.springframework.security.oauth2.core.user.OAuth2User; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import java.time.LocalDateTime; import java.util.List; -import java.util.stream.Collectors; -import static com.anywayclear.exception.ExceptionCode.INVALID_MEMBER; -import static com.anywayclear.exception.ExceptionCode.INVALID_PRODUCE_ID; +import static com.anywayclear.exception.ExceptionCode.*; @Service public class DibService { @@ -31,35 +29,53 @@ public class DibService { private final MemberRepository memberRepository; private final ProduceRepository produceRepository; - public DibService(DibRepository dibRepository, MemberRepository memberRepository, ProduceRepository produceRepository) { + private final AlarmService alarmService; + + public DibService(DibRepository dibRepository, MemberRepository memberRepository, ProduceRepository produceRepository, AlarmService alarmService) { this.dibRepository = dibRepository; this.memberRepository = memberRepository; this.produceRepository = produceRepository; + this.alarmService = alarmService; } - public Long createDib(DibCreateRequest request) { - Member member = memberRepository.findByUserId(request.getConsumerId()).orElseThrow(() -> new RuntimeException("아이디가 없습니다.")); - Produce produce = produceRepository.findById(request.getProduceId()).orElseThrow(() -> new RuntimeException("해당 농산물이 없습니다.")); + @Transactional + public SseEmitter createDib(Long topicName, OAuth2User oAuth2User, String lastEventId, LocalDateTime now) { + String userId = (String) oAuth2User.getAttributes().get("userId"); + + Member member = memberRepository.findByUserId(userId).orElseThrow(() -> new CustomException(INVALID_MEMBER)); + Produce produce = produceRepository.findById(topicName).orElseThrow(() -> new CustomException(INVALID_PRODUCE_ID)); Dib dib = new Dib(member, produce); - return dibRepository.save(dib).getId(); + dibRepository.save(dib); + + return alarmService.createEmitter(topicName.toString(), userId, lastEventId, now); } + @Transactional(readOnly = true) public DibResponse getDib(Long id) { - Dib dib = dibRepository.findById(id).orElseThrow(() -> new RuntimeException("아이디가 없습니다.")); + Dib dib = dibRepository.findById(id).orElseThrow(() -> new CustomException(INVALID_DIB_ID)); return DibResponse.toResponse(dib); } + @Transactional(readOnly = true) public MultiResponse getDibPage(String userId, Pageable pageable) { // 찜 중인 농산물 리스트 반환 - Member member = memberRepository.findByUserId(userId).orElseThrow(() -> new RuntimeException("해당 userId의 유저가 없습니다.")); + Member member = memberRepository.findByUserId(userId).orElseThrow(() -> new CustomException(INVALID_MEMBER)); Page dibPage = dibRepository.findAllByConsumer(member, pageable); List dibResponseList = dibPage.map(DibResponse::toResponse).getContent(); return new MultiResponse<>(dibResponseList, dibPage); } + @Transactional(readOnly = true) public IsDibResponse getIsDib(String userId, long produceId) { Produce produce = produceRepository.findById(produceId).orElseThrow(() -> new CustomException(INVALID_PRODUCE_ID)); Member consumer = memberRepository.findByUserId(userId).orElseThrow(() -> new CustomException(INVALID_MEMBER)); boolean isDib = dibRepository.findByConsumerAndProduce(consumer, produce).isPresent(); return new IsDibResponse(isDib); } + + @Transactional + public void deleteDib(Long produceId, String consumerId) { + Member consumer = memberRepository.findByUserId(consumerId).orElseThrow(() -> new CustomException(INVALID_MEMBER)); + Produce produce = produceRepository.findById(produceId).orElseThrow(() -> new CustomException(INVALID_MEMBER)); + dibRepository.deleteByConsumerAndProduce(consumer, produce); + } } diff --git a/src/main/java/com/anywayclear/service/RedisSubscribeService.java b/src/main/java/com/anywayclear/service/RedisSubscribeService.java index b9dc0a2..b05f0e0 100644 --- a/src/main/java/com/anywayclear/service/RedisSubscribeService.java +++ b/src/main/java/com/anywayclear/service/RedisSubscribeService.java @@ -1,13 +1,16 @@ package com.anywayclear.service; import com.anywayclear.entity.Alarm; +import com.anywayclear.repository.SSEInMemoryRepository; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import java.util.Map; import java.util.concurrent.TimeUnit; @Service @@ -16,6 +19,10 @@ public class RedisSubscribeService implements MessageListener { private final RedisTemplate redisAlarmTemplate; + private final SSEInMemoryRepository sseRepository; + + private final AlarmService alarmService; + // ObjectMaper.readValue를 사용해서 JSON을 파싱해서 자바 객체(ChatMessage.Class)로 바꿔줌 private final ObjectMapper mapper = new ObjectMapper(); @@ -29,8 +36,15 @@ public void onMessage(Message message, byte[] pattern) { try { Alarm alarm = mapper.readValue(message.getBody(), Alarm.class); // 받은 메시지 Alarm 객체로 역직렬화 - // 받은 알람 SSE 전송 ***** 추가 필요 - + // 받은 알람 SSE 전송 + // 해당 토픽의 SseEmitter 모두 가져옴 + Map sseEmitters = sseRepository.getListByKeyPrefix(alarm.getSender()); + sseEmitters.forEach( + (key, emitter) -> { + // 데이터 전송 + alarmService.sendToClient(emitter, key, "Alarm", alarm); + } + ); // 레디스 저장 -> 키(발송인 정보) : 값(알람 객체) String key = "member:" + alarm.getSender() + ":alarm:" + alarm.getId(); // key 설정 -> member:memberId:alarm:alarmId; diff --git a/src/main/java/com/anywayclear/service/SubscribeService.java b/src/main/java/com/anywayclear/service/SubscribeService.java index d169e9d..0ba33cb 100644 --- a/src/main/java/com/anywayclear/service/SubscribeService.java +++ b/src/main/java/com/anywayclear/service/SubscribeService.java @@ -1,6 +1,5 @@ package com.anywayclear.service; -import com.anywayclear.dto.request.SubscribeCreateRequest; import com.anywayclear.dto.response.IsSubResponse; import com.anywayclear.dto.response.SubscribeResponse; import com.anywayclear.dto.response.SubscribeResponseList; @@ -9,39 +8,53 @@ import com.anywayclear.exception.CustomException; import com.anywayclear.repository.MemberRepository; import com.anywayclear.repository.SubscribeRepository; +import org.springframework.security.oauth2.core.user.OAuth2User; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; import static com.anywayclear.exception.ExceptionCode.INVALID_MEMBER; +import static com.anywayclear.exception.ExceptionCode.INVALID_SUBSCRIBE_ID; @Service public class SubscribeService { private final SubscribeRepository subscribeRepository; private final MemberRepository memberRepository; - public SubscribeService(SubscribeRepository subscribeRepository, MemberRepository memberRepository) { + private AlarmService alarmService; + + public SubscribeService(SubscribeRepository subscribeRepository, MemberRepository memberRepository, AlarmService alarmService) { this.subscribeRepository = subscribeRepository; this.memberRepository = memberRepository; + this.alarmService = alarmService; } - public Long createSubscribe(SubscribeCreateRequest request) { - Member consumer = memberRepository.findByUserId(request.getConsumerId()).orElseThrow(() -> new RuntimeException("아이디가 없습니다.")); - Member seller = memberRepository.findByUserId(request.getSellerId()).orElseThrow(() -> new RuntimeException("아이디가 없습니다.")); + @Transactional + public SseEmitter createSubscribe(String topicName, OAuth2User oAuth2User, String lastEventId, LocalDateTime now) { + String userId = (String) oAuth2User.getAttributes().get("userId"); + + Member consumer = memberRepository.findByUserId(userId).orElseThrow(() -> new CustomException(INVALID_MEMBER)); + Member seller = memberRepository.findByUserId(topicName).orElseThrow(() -> new CustomException(INVALID_MEMBER)); Subscribe subscribe = new Subscribe(consumer, seller); - return subscribeRepository.save(subscribe).getId(); + subscribeRepository.save(subscribe); + + return alarmService.createEmitter(topicName, userId, lastEventId, now); } + @Transactional public SubscribeResponse getSubscribe(Long id) { - Subscribe subscribe = subscribeRepository.findById(id).orElseThrow(() -> new RuntimeException()); + Subscribe subscribe = subscribeRepository.findById(id).orElseThrow(() -> new CustomException(INVALID_SUBSCRIBE_ID)); return SubscribeResponse.toResponse(subscribe.getSeller()); } + @Transactional public SubscribeResponseList getSubscribeList(String userId) { // @AuthenticationPrincipal OAuth2User oauthuser - Member member = memberRepository.findByUserId(userId).orElseThrow(() -> new RuntimeException("해당 userId의 유저가 없습니다.")); + Member member = memberRepository.findByUserId(userId).orElseThrow(() -> new CustomException(INVALID_MEMBER)); List memberList = new ArrayList<>(); if (member.getRole().equals("ROLE_SELLER")) { // 판매자 일 경우 List subscribeList = subscribeRepository.findAllBySeller(member); @@ -57,10 +70,18 @@ public SubscribeResponseList getSubscribeList(String userId) { return new SubscribeResponseList(memberList); } + @Transactional public IsSubResponse getIsSub(String consumerId, String sellerId) { Member consumer = memberRepository.findByUserId(consumerId).orElseThrow(() -> new CustomException(INVALID_MEMBER)); Member seller = memberRepository.findByUserId(sellerId).orElseThrow(() -> new CustomException(INVALID_MEMBER)); boolean isSub = subscribeRepository.findByConsumerAndSeller(consumer, seller).isPresent(); return new IsSubResponse(isSub); } + + @Transactional(readOnly = true) + public void deleteSubscribe(String sellerId, String consumerId) { + Member consumer = memberRepository.findByUserId(consumerId).orElseThrow(() -> new CustomException(INVALID_MEMBER)); + Member seller = memberRepository.findByUserId(sellerId).orElseThrow(() -> new CustomException(INVALID_MEMBER)); + subscribeRepository.deleteByConsumerAndSeller(consumer, seller); + } }