Skip to content

Commit

Permalink
Feat/sse (#107)
Browse files Browse the repository at this point in the history
* Fix: 오류 수정

* Fix: 구독 목록 불러오기 수정

* Fix: transactional readonly 삭제

* Fix: sse 연결 방식 수정
  • Loading branch information
flowerdonk authored Aug 14, 2023
1 parent b049ff3 commit 507af31
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 72 deletions.
18 changes: 15 additions & 3 deletions src/main/java/com/anywayclear/controller/AlarmController.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.core.annotation.AuthenticationPrincipal;
import org.springframework.security.oauth2.core.user.OAuth2User;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import javax.servlet.http.HttpServletResponse;
import java.time.LocalDateTime;

@RestController
@RequiredArgsConstructor
Expand All @@ -15,10 +21,16 @@ public class AlarmController {
// 알림 서비스
private final AlarmService alarmService;

@PostMapping("/topic/{topicName}")
@GetMapping(produces = "text/event-stream")
public ResponseEntity<SseEmitter> createEmitter(@AuthenticationPrincipal OAuth2User oAuth2User,
@RequestHeader(value = "Last-Event_ID", required = false) String lastEventId, HttpServletResponse response) {
return new ResponseEntity<>(alarmService.createEmitter(oAuth2User, lastEventId, LocalDateTime.now()), HttpStatus.OK);
}

@PostMapping("/{type}/{topicName}")
@ResponseStatus(HttpStatus.CREATED)
public void pushAlarm(@PathVariable String topicName, @RequestParam(name = "context") String context) {
alarmService.pushAlarm(topicName, context);
public void pushAlarm(@PathVariable("type") String type, @PathVariable("topiceName") String topicName) {
alarmService.pushAlarm(topicName, type);
}

@GetMapping("/{memberId}/subs")
Expand Down
14 changes: 10 additions & 4 deletions src/main/java/com/anywayclear/controller/DibController.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import javax.servlet.http.HttpServletResponse;
import java.net.URI;
import java.time.LocalDateTime;

@RestController
Expand All @@ -25,10 +26,15 @@ public DibController(DibService dibService) {
this.dibService = dibService;
}

@GetMapping(value = "/{produceId}/dib", produces = "text/event-stream")
public ResponseEntity<SseEmitter> createDib(@PathVariable Long produceId, @AuthenticationPrincipal OAuth2User oAuth2User,
@RequestHeader(value = "Last-Event_ID", required = false) String lastEventId, HttpServletResponse response) {
return new ResponseEntity<>(dibService.createDib(produceId, oAuth2User, lastEventId, LocalDateTime.now()), HttpStatus.OK);
@PostMapping(value = "/{produceId}/dib")
public ResponseEntity<Void> createDib(@PathVariable Long produceId, @AuthenticationPrincipal OAuth2User oAuth2User) {
Long id = dibService.createDib(produceId, oAuth2User);
return ResponseEntity.created(URI.create("/api/dibs/" + id)).build();
}

@GetMapping("/{dib-id}")
public ResponseEntity<DibResponse> getDib(@PathVariable("dib-id") Long dibId) {
return ResponseEntity.ok(dibService.getDib(dibId));
}

@GetMapping("/{userId}")
Expand Down
16 changes: 11 additions & 5 deletions src/main/java/com/anywayclear/controller/SubscribeController.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.anywayclear.controller;

import com.anywayclear.dto.response.IsSubResponse;
import com.anywayclear.dto.response.SubscribeResponse;
import com.anywayclear.dto.response.SubscribeResponseList;
import com.anywayclear.service.SubscribeService;
import org.springframework.http.HttpStatus;
Expand All @@ -11,6 +12,7 @@
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import javax.servlet.http.HttpServletResponse;
import java.net.URI;
import java.time.LocalDateTime;

@RestController
Expand All @@ -22,11 +24,15 @@ public SubscribeController(SubscribeService subscribeService) {
this.subscribeService = subscribeService;
}

@GetMapping(value = "/{userId}/subscribe", produces = "text/event-stream")
public ResponseEntity<SseEmitter> createSubscribe(@PathVariable String userId, @AuthenticationPrincipal OAuth2User oAuth2User,
@RequestHeader(value = "Last-Event_ID", required = false) String lastEventId, HttpServletResponse response) {
System.out.println("로그인 유저 = " + oAuth2User);
return new ResponseEntity<>(subscribeService.createSubscribe(userId, oAuth2User, lastEventId, LocalDateTime.now()), HttpStatus.OK);
@GetMapping(value = "/{userId}/subscribe")
public ResponseEntity<SseEmitter> createSubscribe(@PathVariable String userId, @AuthenticationPrincipal OAuth2User oAuth2User) {
Long id = subscribeService.createSubscribe(userId, oAuth2User);
return ResponseEntity.created(URI.create("/api/subscribes/" + id)).build();
}

@GetMapping("/{subscribe-id}")
public ResponseEntity<SubscribeResponse> getSubscribe(@PathVariable("subscribe-id") Long subscribeId) {
return ResponseEntity.ok(subscribeService.getSubscribe(subscribeId));
}

@GetMapping
Expand Down
54 changes: 27 additions & 27 deletions src/main/java/com/anywayclear/repository/SSEInMemoryRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,39 @@ public Optional<SseEmitter> get(String key) {
return Optional.ofNullable(sseEmitterMap.get(key));
}

public Map<String, SseEmitter> getListByKeyPrefix(String keyPrefix){
// public Map<String, SseEmitter> getListByKeyPrefix(String keyPrefix){
//// return sseEmitterMap.keySet().stream()
//// .filter(key -> key.startsWith(keyPrefix))
//// .map(sseEmitterMap::get)
//// .collect(Collectors.toList());
// return sseEmitterMap.entrySet().stream()
// .filter(entry -> entry.getKey().startsWith(keyPrefix))
// .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// }

// public List<String> getKeyListByKeyPrefix(String keyPrefix){
// return sseEmitterMap.keySet().stream()
// .filter(key -> key.startsWith(keyPrefix))
// .map(sseEmitterMap::get)
// .collect(Collectors.toList());
return sseEmitterMap.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(keyPrefix))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public List<String> getKeyListByKeyPrefix(String keyPrefix){
return sseEmitterMap.keySet().stream()
.filter(key -> key.startsWith(keyPrefix))
.collect(Collectors.toList());
}

public Map<String, SseEmitter> getListByKeySuffix(String keySuffic){
// return sseEmitterMap.keySet().stream()
// .filter(key -> key.startsWith(keyPrefix))
// .map(sseEmitterMap::get)
// .collect(Collectors.toList());
return sseEmitterMap.entrySet().stream()
.filter(entry -> entry.getKey().endsWith(keySuffic))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
// }

// public Map<String, SseEmitter> getListByKeySuffix(String keySuffic){
//// return sseEmitterMap.keySet().stream()
//// .filter(key -> key.startsWith(keyPrefix))
//// .map(sseEmitterMap::get)
//// .collect(Collectors.toList());
// return sseEmitterMap.entrySet().stream()
// .filter(entry -> entry.getKey().endsWith(keySuffic))
// .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// }

public void remove(String key) {
sseEmitterMap.remove(key);
}

public void deleteAllByKeyPrefix(String keyPrefix) {
sseEmitterMap.forEach((key, emitter) -> {
if (key.startsWith(keyPrefix)) sseEmitterMap.remove(key);
});
}
// public void deleteAllByKeyPrefix(String keyPrefix) {
// sseEmitterMap.forEach((key, emitter) -> {
// if (key.startsWith(keyPrefix)) sseEmitterMap.remove(key);
// });
// }
}
78 changes: 49 additions & 29 deletions src/main/java/com/anywayclear/service/AlarmService.java
Original file line number Diff line number Diff line change
@@ -1,29 +1,21 @@
package com.anywayclear.service;

import com.anywayclear.dto.response.AlarmResponseList;
import com.anywayclear.entity.Alarm;
import com.anywayclear.entity.Dib;
import com.anywayclear.entity.Member;
import com.anywayclear.entity.Subscribe;
import com.anywayclear.entity.*;
import com.anywayclear.exception.CustomException;
import com.anywayclear.exception.ExceptionCode;
import com.anywayclear.repository.DibRepository;
import com.anywayclear.repository.MemberRepository;
import com.anywayclear.repository.SSEInMemoryRepository;
import com.anywayclear.repository.SubscribeRepository;
import com.anywayclear.repository.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.security.oauth2.core.user.OAuth2User;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand All @@ -47,54 +39,82 @@ public class AlarmService {

private final SSEInMemoryRepository sseRepository;

public SseEmitter createEmitter(String topicName, String userId, String lastEventId, LocalDateTime now) {
private final ProduceRepository produceRepository;

public SseEmitter createEmitter(OAuth2User oAuth2User, String lastEventId, LocalDateTime now) {
// 로그인 유저 userId
String userId = (String) oAuth2User.getAttributes().get("userId");

// SSE 연결
// emitter에 키에 토픽, 유저 정보를 담기
// 알람 내용에 토픽 정보를 넣어 해당 토픽으로 에미터를 검색해 sse 데이터 보내기
System.out.println("AlarmService.createEmitter 진입");
SseEmitter emitter = new SseEmitter(Long.parseLong("100000"));
System.out.println("emitter 생성 : " + emitter);
String key = topicName + "_" + now.toString() + "_" + userId;
System.out.println("key = " + key);
sseRepository.put(key, emitter);
System.out.println("userId = " + userId);
sseRepository.put(userId, emitter);
System.out.println("emitter 저장");

emitter.onCompletion(() -> {
System.out.println("onCompletion callback");
sseRepository.remove(key); // 만료되면 리스트에서 삭제
sseRepository.remove(userId); // 만료되면 리스트에서 삭제
});
emitter.onTimeout(() -> {
System.out.println("onTimeout callback");
// 만료시 Repository에서 삭제
emitter.complete();
});

sendToClient(emitter, key, "Connected", "subscribe");
sendToClient(emitter, userId, "Connected", "subscribe");
System.out.println("sse 알림 발송");

if (lastEventId!=null) { // 클라이언트가 미수신한 Event 유실 예방, 연결이 끊켰거나 미수신된 데이터를 다 찾아서 보내준다.
Map<String, SseEmitter> events = sseRepository.getListByKeySuffix(userId);
events.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendToClient(emitter, entry.getKey(), "Alarm", entry.getValue()));
if (lastEventId!=null) { // 클라이언트가 미수신한 Event 유실 예방, 연결이 끊겼거나 미수신된 데이터를 다 찾아서 보내준다.
// Map<String, SseEmitter> events = sseRepository.getListByKeySuffix(userId);
// events.entrySet().stream()
// .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
// .forEach(entry -> sendToClient(emitter, entry.getKey(), "Alarm", entry.getValue()));
SseEmitter event = sseRepository.get(userId).orElseThrow(() -> new CustomException(ExceptionCode.INVALID_MEMBER));
sendToClient(emitter, userId, "Alarm", event);
}

return emitter;
}

public void pushAlarm(String topicName, String context) { // 알람 전송
Alarm alarm = Alarm.builder().sender(topicName).context(context).build(); // 알람 객체 생성
public void pushAlarm(String type, String topicName) { // 알람 전송

// 알림 수신 목록 불러오기
Alarm alarm;
List<String> receiverKeyList;
if (type.equals("dib")) {
alarm = Alarm.builder().sender(topicName).context("경매가 시작되었습니다!").build(); // 알람 객체 생성
Produce produce = produceRepository.findById(Long.parseLong(topicName)).orElseThrow(() -> new CustomException(ExceptionCode.INVALID_PRODUCE_ID));
receiverKeyList = dibRepository.findAllByProduce(produce).stream().map(p -> p.getConsumer().getUserId()).collect(Collectors.toList());
System.out.println("receiverKeyList = " + receiverKeyList);
} else {
alarm = Alarm.builder().sender(topicName).context("새로운 경매 글이 올라왔습니다!").build(); // 알람 객체 생성
Member seller = memberRepository.findByUserId(topicName).orElseThrow(() -> new CustomException(ExceptionCode.INVALID_MEMBER));
receiverKeyList = subscribeRepository.findAllBySeller(seller).stream().map(m -> m.getConsumer().getUserId()).collect(Collectors.toList());
System.out.println("receiverKeyList = " + receiverKeyList);
}

// 받은 알람 SSE 전송
// 해당 토픽의 SseEmitter 모두 가져옴
Map<String, SseEmitter> sseEmitters = sseRepository.getListByKeyPrefix(topicName);
sseEmitters.forEach(
(key, emitter) -> {
// 데이터 전송
receiverKeyList.forEach(
key -> {
SseEmitter emitter = sseRepository.get(key).orElseThrow(() -> new CustomException(ExceptionCode.INVALID_MEMBER));
sendToClient(emitter, key, "Alarm", alarm);
}

);

// Map<String, SseEmitter> sseEmitters = sseRepository.getListByKeyPrefix(topicName);
// sseEmitters.forEach(
// (key, emitter) -> {
// // 데이터 전송
// sendToClient(emitter, key, "Alarm", alarm);
// }
// );

// 레디스 저장 -> 키(발송인 정보) : 값(알람 객체)
String key = "member:" + alarm.getSender() + ":alarm:" + alarm.getId(); // key 설정 -> member:memberId:alarm:alarmId;
redisAlarmTemplate.opsForValue().set(key, alarm); // 레디스에 저장
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/com/anywayclear/service/DibService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,13 @@ public DibService(DibRepository dibRepository, MemberRepository memberRepository
}

@Transactional
public SseEmitter createDib(Long topicName, OAuth2User oAuth2User, String lastEventId, LocalDateTime now) {
public Long createDib(Long topicName, OAuth2User oAuth2User) {
String userId = (String) oAuth2User.getAttributes().get("userId");

Member member = memberRepository.findByUserId(userId).orElseThrow(() -> new CustomException(INVALID_MEMBER));
Produce produce = produceRepository.findById(topicName).orElseThrow(() -> new CustomException(INVALID_PRODUCE_ID));
Dib dib = new Dib(member, produce);
dibRepository.save(dib);

return alarmService.createEmitter(topicName.toString(), userId, lastEventId, now);
return dib.getId();
}

@Transactional(readOnly = true)
Expand Down

0 comments on commit 507af31

Please sign in to comment.