Skip to content

Commit

Permalink
Feat/sse (#112)
Browse files Browse the repository at this point in the history
* Fix: 오류 수정

* Fix: 구독 목록 불러오기 수정

* Fix: transactional readonly 삭제

* Fix: sse 연결 방식 수정

* Remove: pub/sub 파일 삭제

* Feat: ExecutorService 추가

* Fix: api 수정

* Fix: sse 데이터 전달 테스트

* Fix: sse 데이터 전달 수정

* Fix: 포인트 조회 및 수정 api
  • Loading branch information
flowerdonk authored Aug 14, 2023
1 parent ef3ba2a commit 5ac5009
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class AlarmController {

@GetMapping(produces = "text/event-stream")
public ResponseEntity<SseEmitter> createEmitter(@AuthenticationPrincipal OAuth2User oAuth2User,
@RequestHeader(value = "Last-Event_ID", required = false) String lastEventId) {
@RequestHeader(value = "Last-Event_ID", required = false, defaultValue = "") String lastEventId) {

return new ResponseEntity<>(alarmService.createEmitter(oAuth2User, lastEventId, LocalDateTime.now()), HttpStatus.OK);
}
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/com/anywayclear/controller/PointController.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package com.anywayclear.controller;

import com.anywayclear.dto.request.PointUpdateRequest;
import com.anywayclear.dto.response.PointResponse;
import com.anywayclear.service.PointService;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import javax.validation.Valid;

@RestController
@RequestMapping("/api/points")
public class PointController {
Expand All @@ -14,8 +17,13 @@ public PointController(PointService pointService) {
this.pointService = pointService;
}

@GetMapping
public ResponseEntity<PointResponse> getPoint(@RequestParam(name = "userId") String userId) {
@GetMapping("/{userId}")
public ResponseEntity<PointResponse> getPoint(@PathVariable String userId) {
return ResponseEntity.ok(pointService.getPoint(userId));
}

@PatchMapping("/{userId}")
public ResponseEntity<PointResponse> updatePoint(@PathVariable String userId, @Valid @RequestBody PointUpdateRequest request) {
return ResponseEntity.ok(pointService.updatePoint(userId, request));
}
}
14 changes: 14 additions & 0 deletions src/main/java/com/anywayclear/dto/request/PointUpdateRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.anywayclear.dto.request;

import lombok.*;

import javax.validation.constraints.NotNull;

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class PointUpdateRequest {
@NotNull
private int balance;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,17 @@
public class PointResponse {
private int balance;
private LocalDateTime updatedAt;
private Member member;

@Builder
public PointResponse(int balance, LocalDateTime updatedAt, Member member) {
public PointResponse(int balance, LocalDateTime updatedAt) {
this.balance = balance;
this.updatedAt = updatedAt;
this.member = member;
}

public static PointResponse toResponse(Point point) {
return PointResponse.builder()
.balance(point.getBalance())
.updatedAt(point.getUpdatedAt())
.member(point.getMember())
.build();
}
}
61 changes: 40 additions & 21 deletions src/main/java/com/anywayclear/repository/SSEInMemoryRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.springframework.stereotype.Repository;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -13,41 +14,42 @@
public class SSEInMemoryRepository{
private final Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

public void put(String key, SseEmitter sseEmitter) {
// 유실된 데이터 파악을 위한 캐시 저장 { 'userId_now' : alarm() }
private final Map<String, Object> eventCache = new ConcurrentHashMap<>();

public SseEmitter save(String key, SseEmitter sseEmitter) {
sseEmitterMap.put(key, sseEmitter);
return sseEmitter;
}

public void saveEventCache(String eventCacheId, Object event) {
eventCache.put(eventCacheId, event);
}

public Optional<SseEmitter> get(String key) {
return Optional.ofNullable(sseEmitterMap.get(key));
}

// public Map<String, SseEmitter> getListByKeyPrefix(String keyPrefix){
//// return sseEmitterMap.keySet().stream()
//// .filter(key -> key.startsWith(keyPrefix))
//// .map(sseEmitterMap::get)
//// .collect(Collectors.toList());
// return sseEmitterMap.entrySet().stream()
// .filter(entry -> entry.getKey().startsWith(keyPrefix))
// .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// }
public Map<String, Object> findAllEventCacheByUserId(String userId) {
return eventCache.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(userId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public Map<String, SseEmitter> findAllEmitterByUserId(String userId){
return sseEmitterMap.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(userId))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}


// public List<String> getKeyListByKeyPrefix(String keyPrefix){
// return sseEmitterMap.keySet().stream()
// .filter(key -> key.startsWith(keyPrefix))
// .collect(Collectors.toList());
// }

// public Map<String, SseEmitter> getListByKeySuffix(String keySuffic){
//// return sseEmitterMap.keySet().stream()
//// .filter(key -> key.startsWith(keyPrefix))
//// .map(sseEmitterMap::get)
//// .collect(Collectors.toList());
// return sseEmitterMap.entrySet().stream()
// .filter(entry -> entry.getKey().endsWith(keySuffic))
// .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// }

public void remove(String key) {
public void delete(String key) {
sseEmitterMap.remove(key);
}

Expand All @@ -56,4 +58,21 @@ public void remove(String key) {
// if (key.startsWith(keyPrefix)) sseEmitterMap.remove(key);
// });
// }

public void deleteAllEmitterByUserId(String userId) {
sseEmitterMap.forEach((key, emitter) -> {
if (key.startsWith(userId)) {
sseEmitterMap.remove(key);
}
});
}

public void deleteAllEventCacheByUserId(String userId) {
eventCache.forEach((key, emitter) -> {
if (key.startsWith(userId)) {
eventCache.remove(key);
}
});
}

}
72 changes: 43 additions & 29 deletions src/main/java/com/anywayclear/service/AlarmService.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,33 +51,46 @@ public SseEmitter createEmitter(OAuth2User oAuth2User, String lastEventId, Local
// SSE 연결
// emitter에 키에 토픽, 유저 정보를 담기
// 알람 내용에 토픽 정보를 넣어 해당 토픽으로 에미터를 검색해 sse 데이터 보내기
System.out.println("AlarmService.createEmitter 진입");
SseEmitter emitter = new SseEmitter(Long.parseLong("100000"));
System.out.println("emitter 생성 : " + emitter);
System.out.println("userId = " + userId);
sseRepository.put(userId, emitter);
System.out.println("emitter 저장");

String key = userId + "_" + now;
SseEmitter emitter;

if (sseRepository.get(userId).isPresent()) {
sseRepository.delete(userId);
emitter = sseRepository.save(key, new SseEmitter(100000L * 4500L));
} else {
emitter = sseRepository.save(key, new SseEmitter(100000L * 4500L));
}


// 오류 발생 시 emitter 삭제
emitter.onCompletion(() -> {
System.out.println("onCompletion callback");
sseRepository.remove(userId); // 만료되면 리스트에서 삭제

sseRepository.delete(userId); // 만료되면 리스트에서 삭제

});
emitter.onTimeout(() -> {
System.out.println("onTimeout callback");
// 만료시 Repository에서 삭제
emitter.complete();
sseRepository.delete(userId);
});
emitter.onError((e) -> {
System.out.println("onError callback");
sseRepository.delete(userId);
});


// 503 에러 방지 - 더미 이벤트 전송
sendToClient(emitter, userId, "Connected", "subscribe");
System.out.println("sse 알림 발송");

if (lastEventId!=null) { // 클라이언트가 미수신한 Event 유실 예방, 연결이 끊겼거나 미수신된 데이터를 다 찾아서 보내준다.
// Map<String, SseEmitter> events = sseRepository.getListByKeySuffix(userId);
// events.entrySet().stream()
// .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
// .forEach(entry -> sendToClient(emitter, entry.getKey(), "Alarm", entry.getValue()));
SseEmitter event = sseRepository.get(userId).orElseThrow(() -> new CustomException(ExceptionCode.INVALID_MEMBER));
sendToClient(emitter, userId, "Alarm", event);
if (!lastEventId.isEmpty()) { // 클라이언트가 미수신한 Event 유실 예방, 연결이 끊겼거나 미수신된 데이터를 다 찾아서 보내준다.
Map<String, Object> eventCaches = sseRepository.findAllEventCacheByUserId(userId);
eventCaches.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendToClient(emitter, entry.getKey(), "Alarm", entry.getValue()));

}

return emitter;
Expand All @@ -103,21 +116,18 @@ public void pushAlarm(String type, String topicName) { // 알람 전송
// 받은 알람 SSE 전송
// 해당 토픽의 SseEmitter 모두 가져옴
receiverKeyList.forEach(
key -> {
SseEmitter emitter = sseRepository.get(key).orElseThrow(() -> new CustomException(ExceptionCode.INVALID_MEMBER));
sendToClient(emitter, key, "Alarm", alarm);
receiverKey -> {
Map<String, SseEmitter> sseEmitters = sseRepository.findAllEmitterByUserId(receiverKey);
sseEmitters.forEach(
(key, emitter) -> {
sseRepository.saveEventCache(key, alarm);
sendToClient(emitter, key, "Alarm", alarm);
}
);
}

);

// Map<String, SseEmitter> sseEmitters = sseRepository.getListByKeyPrefix(topicName);
// sseEmitters.forEach(
// (key, emitter) -> {
// // 데이터 전송
// sendToClient(emitter, key, "Alarm", alarm);
// }
// );

// 레디스 저장 -> 키(발송인 정보) : 값(알람 객체)
String key = "member:" + alarm.getSender() + ":alarm:" + alarm.getId(); // key 설정 -> member:memberId:alarm:alarmId;
redisAlarmTemplate.opsForValue().set(key, alarm); // 레디스에 저장
Expand Down Expand Up @@ -180,15 +190,19 @@ public void sendToClient(SseEmitter emitter, String key, String name, Object dat
emitter.send(SseEmitter.event()
.id(key)
.name(name)
.data(data));
.data(data)
.reconnectTime(0));
log.info("data 전달 성공");
emitter.send("더미 데이터");
log.info("더미데이터 전달 성공");
emitter.complete();
sseRepository.delete(key);
} catch (IOException e) {
sseRepository.remove(key);
sseRepository.delete(key);
emitter.completeWithError(e);
log.info("예외 발생");
}
emitter.complete();

});
}
}
15 changes: 14 additions & 1 deletion src/main/java/com/anywayclear/service/PointService.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package com.anywayclear.service;

import com.anywayclear.dto.request.PointUpdateRequest;
import com.anywayclear.dto.response.PointResponse;
import com.anywayclear.entity.Member;
import com.anywayclear.entity.Point;
import com.anywayclear.exception.CustomException;
import com.anywayclear.exception.ExceptionCode;
import com.anywayclear.repository.MemberRepository;
import com.anywayclear.repository.PointRepository;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class PointService {
Expand All @@ -17,12 +21,21 @@ public PointService(PointRepository pointRepository, MemberRepository memberRepo
this.memberRepository = memberRepository;
}

@Transactional(readOnly = true)
public PointResponse getPoint(String userId) {
// 닉네임으로 검색해야 하기 때문에 멤버 리포지토리에서 멤버 객체를 불러옴
Member member = memberRepository.findByUserId(userId).orElseThrow(() -> new RuntimeException("해당 userId의 유저가 없습니다."));
Member member = memberRepository.findByUserId(userId).orElseThrow(() -> new CustomException(ExceptionCode.INVALID_MEMBER));
// 불러온 멤버 객체에서 포인트 객체 가져오기
Point point = member.getPoint();
return PointResponse.toResponse(point);
}

@Transactional
public PointResponse updatePoint(String userId, PointUpdateRequest request) {
Member member = memberRepository.findByUserId(userId).orElseThrow(() -> new CustomException(ExceptionCode.INVALID_MEMBER));
// 불러온 멤버 객체에서 포인트 객체 가져오기
Point point = member.getPoint();
point.setBalance(request.getBalance());
return PointResponse.toResponse(pointRepository.save(point));
}
}

0 comments on commit 5ac5009

Please sign in to comment.