Skip to content

Commit

Permalink
【源码阅读1.1.0】sse
Browse files Browse the repository at this point in the history
  • Loading branch information
liujian committed Sep 25, 2024
1 parent 69b7a77 commit 57b5870
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public void appendLog(String processName, String stepPid, String logLine, boolea
// /TOPIC/PROCESS_CONSOLE/FlinkSubmit/12
String topic = StrFormatter.format("{}/{}", SseTopic.PROCESS_CONSOLE.getValue(), processName);
CompletableFuture.runAsync(() -> {
//todo 后端一些有用日志推送到前端,如maven编译日志
SseSessionContextHolder.sendTopic(topic, process);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

//todo SSE: Server-Sent Events
@Slf4j
@Data
public class SseSessionContextHolder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@
@RequiredArgsConstructor
@SaCheckLogin
public class SseController {

//todo sse 订阅
@PostMapping(value = "/subscribeTopic")
@ApiOperation("subscribeTopic")
@ApiImplicitParam(name = "topics", value = "topics", required = true, dataType = "List")
public Result<Set<String>> subscribeTopic(@RequestBody SseSubscribeDTO subscribeDTO) {
Set<String> b = SseSessionContextHolder.subscribeTopic(subscribeDTO.getSessionKey(), subscribeDTO.getTopics());
return Result.succeed(b, Status.SUCCESS);
}

//todo sse 连接
@GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ApiOperation("Connect Sse")
@ApiImplicitParam(name = "sessionKey", value = "Session unique key", required = true, dataType = "String")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public static String getUtcCondition(Date startTime, Date endTime) {
return MessageFormat.format(
"''{0}'' <= {2} AND {2} <= ''{1}''", startLdt.format(formatter), endLdt.format(formatter), HEART_TIME);
}

//todo 用sse做一些推送信息
@Override
public SseEmitter sendJvmInfo() {
String sessionKey = UUID.randomUUID().toString();
Expand Down
4 changes: 2 additions & 2 deletions dinky-web/src/models/Sse.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ export default () => {
const lastHeartTime = useRef<number>(0);
const subscriberRef = useRef<SubscriberData[]>([]);
const [eventSource, setEventSource] = useState<EventSource>();

//todo sse连接
const reconnectSse = () => {
lastHeartTime.current = Date.now();
uuidRef.current = uuidv4();
const sseUrl = 'api/sse/connect?sessionKey=' + uuidRef.current;
eventSource?.close();
setEventSource(new EventSource(sseUrl));
};

//todo sse订阅
const subscribe = async () => {
const topics: string[] = [];
subscriberRef.current.forEach((sub) => topics.push(...sub.topic));
Expand Down

0 comments on commit 57b5870

Please sign in to comment.