From c624955ea4d9478a5778f970d2c20ddfe793e726 Mon Sep 17 00:00:00 2001 From: Athou Date: Wed, 24 Jan 2024 12:49:20 +0100 Subject: [PATCH] websocket notification now takes entry filtering into account (#1191) --- .../commafeed/backend/dao/FeedEntryDAO.java | 4 +- .../backend/feed/FeedRefreshUpdater.java | 36 +++++++++++---- .../backend/service/FeedEntryService.java | 36 ++++++++------- .../com/commafeed/integration/BaseIT.java | 20 ++++++++- .../commafeed/integration/WebSocketIT.java | 44 ++++++++++++++++--- .../src/test/resources/feed/rss_2.xml | 32 ++++++++++++++ 6 files changed, 136 insertions(+), 36 deletions(-) create mode 100644 commafeed-server/src/test/resources/feed/rss_2.xml diff --git a/commafeed-server/src/main/java/com/commafeed/backend/dao/FeedEntryDAO.java b/commafeed-server/src/main/java/com/commafeed/backend/dao/FeedEntryDAO.java index 04494f926..0c57fd40d 100644 --- a/commafeed-server/src/main/java/com/commafeed/backend/dao/FeedEntryDAO.java +++ b/commafeed-server/src/main/java/com/commafeed/backend/dao/FeedEntryDAO.java @@ -26,8 +26,8 @@ public FeedEntryDAO(SessionFactory sessionFactory) { super(sessionFactory); } - public Long findExisting(String guidHash, Feed feed) { - return query().select(entry.id).from(entry).where(entry.guidHash.eq(guidHash), entry.feed.eq(feed)).limit(1).fetchOne(); + public FeedEntry findExisting(String guidHash, Feed feed) { + return query().select(entry).from(entry).where(entry.guidHash.eq(guidHash), entry.feed.eq(feed)).limit(1).fetchOne(); } public List findFeedsExceedingCapacity(long maxCapacity, long max) { diff --git a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshUpdater.java b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshUpdater.java index bacb65a76..ed1fc0323 100644 --- a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshUpdater.java +++ b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshUpdater.java @@ -3,8 +3,11 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -20,6 +23,7 @@ import com.commafeed.backend.feed.parser.FeedParserResult.Content; import com.commafeed.backend.feed.parser.FeedParserResult.Entry; import com.commafeed.backend.model.Feed; +import com.commafeed.backend.model.FeedEntry; import com.commafeed.backend.model.FeedSubscription; import com.commafeed.backend.model.User; import com.commafeed.backend.service.FeedEntryService; @@ -75,6 +79,7 @@ public FeedRefreshUpdater(UnitOfWork unitOfWork, FeedService feedService, FeedEn private AddEntryResult addEntry(final Feed feed, final Entry entry, final List subscriptions) { boolean processed = false; boolean inserted = false; + Set subscriptionsForWhichEntryIsUnread = new HashSet<>(); // lock on feed, make sure we are not updating the same feed twice at // the same time @@ -96,10 +101,21 @@ private AddEntryResult addEntry(final Feed feed, final Entry entry, final List feedEntryService.addEntry(feed, entry, subscriptions)); - if (inserted) { - entryInserted.mark(); - } + inserted = unitOfWork.call(() -> { + Instant now = Instant.now(); + FeedEntry feedEntry = feedEntryService.findOrCreate(feed, entry); + boolean newEntry = !feedEntry.getInserted().isBefore(now); + if (newEntry) { + entryInserted.mark(); + for (FeedSubscription sub : subscriptions) { + boolean unread = feedEntryService.applyFilter(sub, feedEntry); + if (unread) { + subscriptionsForWhichEntryIsUnread.add(sub); + } + } + } + return newEntry; + }); } else { log.error("lock timeout for " + feed.getUrl() + " - " + key1); } @@ -113,12 +129,13 @@ private AddEntryResult addEntry(final Feed feed, final Entry entry, final List entries) { boolean processed = true; long inserted = 0; + Map unreadCountBySubscription = new HashMap<>(); if (!entries.isEmpty()) { Set lastEntries = cache.getLastEntries(feed); @@ -135,6 +152,7 @@ public boolean update(Feed feed, List entries) { AddEntryResult addEntryResult = addEntry(feed, entry, subscriptions); processed &= addEntryResult.processed; inserted += addEntryResult.inserted ? 1 : 0; + addEntryResult.subscriptionsForWhichEntryIsUnread.forEach(sub -> unreadCountBySubscription.merge(sub, 1L, Long::sum)); entryCacheMiss.mark(); } else { @@ -153,7 +171,7 @@ public boolean update(Feed feed, List entries) { cache.invalidateUnreadCount(subscriptions.toArray(new FeedSubscription[0])); cache.invalidateUserRootCategory(users.toArray(new User[0])); - notifyOverWebsocket(subscriptions, inserted); + notifyOverWebsocket(unreadCountBySubscription); } } @@ -171,14 +189,16 @@ public boolean update(Feed feed, List entries) { return processed; } - private void notifyOverWebsocket(List subscriptions, long inserted) { - subscriptions.forEach(sub -> webSocketSessions.sendMessage(sub.getUser(), WebSocketMessageBuilder.newFeedEntries(sub, inserted))); + private void notifyOverWebsocket(Map unreadCountBySubscription) { + unreadCountBySubscription.forEach((sub, unreadCount) -> webSocketSessions.sendMessage(sub.getUser(), + WebSocketMessageBuilder.newFeedEntries(sub, unreadCount))); } @AllArgsConstructor private static class AddEntryResult { private final boolean processed; private final boolean inserted; + private final Set subscriptionsForWhichEntryIsUnread; } } diff --git a/commafeed-server/src/main/java/com/commafeed/backend/service/FeedEntryService.java b/commafeed-server/src/main/java/com/commafeed/backend/service/FeedEntryService.java index 98b708a9b..1dc831989 100644 --- a/commafeed-server/src/main/java/com/commafeed/backend/service/FeedEntryService.java +++ b/commafeed-server/src/main/java/com/commafeed/backend/service/FeedEntryService.java @@ -17,6 +17,7 @@ import com.commafeed.backend.model.FeedEntryStatus; import com.commafeed.backend.model.FeedSubscription; import com.commafeed.backend.model.User; +import com.commafeed.backend.service.FeedEntryFilteringService.FeedEntryFilterException; import jakarta.inject.Inject; import jakarta.inject.Singleton; @@ -38,33 +39,34 @@ public class FeedEntryService { /** * this is NOT thread-safe */ - public boolean addEntry(Feed feed, Entry entry, List subscriptions) { + public FeedEntry findOrCreate(Feed feed, Entry entry) { String guid = FeedUtils.truncate(entry.guid(), 2048); String guidHash = DigestUtils.sha1Hex(entry.guid()); - Long existing = feedEntryDAO.findExisting(guidHash, feed); + FeedEntry existing = feedEntryDAO.findExisting(guidHash, feed); if (existing != null) { - return false; + return existing; } FeedEntry feedEntry = buildEntry(feed, entry, guid, guidHash); feedEntryDAO.saveOrUpdate(feedEntry); + return feedEntry; + } + + public boolean applyFilter(FeedSubscription sub, FeedEntry entry) { + boolean matches = true; + try { + matches = feedEntryFilteringService.filterMatchesEntry(sub.getFilter(), entry); + } catch (FeedEntryFilterException e) { + log.error("could not evaluate filter {}", sub.getFilter(), e); + } - // if filter does not match the entry, mark it as read - for (FeedSubscription sub : subscriptions) { - boolean matches = true; - try { - matches = feedEntryFilteringService.filterMatchesEntry(sub.getFilter(), feedEntry); - } catch (FeedEntryFilteringService.FeedEntryFilterException e) { - log.error("could not evaluate filter {}", sub.getFilter(), e); - } - if (!matches) { - FeedEntryStatus status = new FeedEntryStatus(sub.getUser(), sub, feedEntry); - status.setRead(true); - feedEntryStatusDAO.saveOrUpdate(status); - } + if (!matches) { + FeedEntryStatus status = new FeedEntryStatus(sub.getUser(), sub, entry); + status.setRead(true); + feedEntryStatusDAO.saveOrUpdate(status); } - return true; + return matches; } private FeedEntry buildEntry(Feed feed, Entry e, String guid, String guidHash) { diff --git a/commafeed-server/src/test/java/com/commafeed/integration/BaseIT.java b/commafeed-server/src/test/java/com/commafeed/integration/BaseIT.java index 969f96524..ee990a953 100644 --- a/commafeed-server/src/test/java/com/commafeed/integration/BaseIT.java +++ b/commafeed-server/src/test/java/com/commafeed/integration/BaseIT.java @@ -38,6 +38,8 @@ @ExtendWith(MockServerExtension.class) public abstract class BaseIT { + private static final HttpRequest FEED_REQUEST = HttpRequest.request().withMethod("GET").withPath("/"); + private final CommaFeedDropwizardAppExtension extension = buildExtension(); private Client client; @@ -50,6 +52,8 @@ public abstract class BaseIT { private String webSocketUrl; + private MockServerClient mockServerClient; + protected CommaFeedDropwizardAppExtension buildExtension() { return new CommaFeedDropwizardAppExtension() { @Override @@ -61,9 +65,10 @@ protected JerseyClientBuilder clientBuilder() { @BeforeEach void init(MockServerClient mockServerClient) throws IOException { + this.mockServerClient = mockServerClient; + URL resource = Objects.requireNonNull(getClass().getResource("/feed/rss.xml")); - mockServerClient.when(HttpRequest.request().withMethod("GET").withPath("/")) - .respond(HttpResponse.response().withBody(IOUtils.toString(resource, StandardCharsets.UTF_8))); + mockServerClient.when(FEED_REQUEST).respond(HttpResponse.response().withBody(IOUtils.toString(resource, StandardCharsets.UTF_8))); this.client = extension.client(); this.feedUrl = "http://localhost:" + mockServerClient.getPort() + "/"; @@ -77,6 +82,13 @@ void cleanup() { this.client.close(); } + protected void feedNowReturnsMoreEntries() throws IOException { + mockServerClient.clear(FEED_REQUEST); + + URL resource = Objects.requireNonNull(getClass().getResource("/feed/rss_2.xml")); + mockServerClient.when(FEED_REQUEST).respond(HttpResponse.response().withBody(IOUtils.toString(resource, StandardCharsets.UTF_8))); + } + protected String login() { LoginRequest req = new LoginRequest(); req.setName("admin"); @@ -112,4 +124,8 @@ protected Entries getFeedEntries(long subscriptionId) { .get(); return response.readEntity(Entries.class); } + + protected void forceRefreshAllFeeds() { + client.target(apiBaseUrl + "feed/refreshAll").request().get(Void.class); + } } diff --git a/commafeed-server/src/test/java/com/commafeed/integration/WebSocketIT.java b/commafeed-server/src/test/java/com/commafeed/integration/WebSocketIT.java index 30e3cb6a5..271c2a43a 100644 --- a/commafeed-server/src/test/java/com/commafeed/integration/WebSocketIT.java +++ b/commafeed-server/src/test/java/com/commafeed/integration/WebSocketIT.java @@ -13,6 +13,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import com.commafeed.frontend.model.request.FeedModificationRequest; + import jakarta.websocket.ClientEndpointConfig; import jakarta.websocket.CloseReason; import jakarta.websocket.ContainerProvider; @@ -20,13 +22,12 @@ import jakarta.websocket.Endpoint; import jakarta.websocket.EndpointConfig; import jakarta.websocket.Session; +import jakarta.ws.rs.client.Entity; class WebSocketIT extends BaseIT { @Test void sessionClosedIfNotLoggedIn() throws DeploymentException, IOException { - ClientEndpointConfig config = buildConfig("fake-session-id"); - AtomicBoolean connected = new AtomicBoolean(); AtomicReference closeReasonRef = new AtomicReference<>(); try (Session ignored = ContainerProvider.getWebSocketContainer().connectToServer(new Endpoint() { @@ -39,7 +40,7 @@ public void onOpen(Session session, EndpointConfig config) { public void onClose(Session session, CloseReason closeReason) { closeReasonRef.set(closeReason); } - }, config, URI.create(getWebSocketUrl()))) { + }, buildConfig("fake-session-id"), URI.create(getWebSocketUrl()))) { Awaitility.await().atMost(15, TimeUnit.SECONDS).untilTrue(connected); Awaitility.await().atMost(15, TimeUnit.SECONDS).until(() -> closeReasonRef.get() != null); @@ -50,7 +51,6 @@ public void onClose(Session session, CloseReason closeReason) { @Test void subscribeAndGetsNotified() throws DeploymentException, IOException { String sessionId = login(); - ClientEndpointConfig config = buildConfig(sessionId); AtomicBoolean connected = new AtomicBoolean(); AtomicReference messageRef = new AtomicReference<>(); @@ -60,7 +60,7 @@ public void onOpen(Session session, EndpointConfig config) { session.addMessageHandler(String.class, messageRef::set); connected.set(true); } - }, config, URI.create(getWebSocketUrl()))) { + }, buildConfig(sessionId), URI.create(getWebSocketUrl()))) { Awaitility.await().atMost(15, TimeUnit.SECONDS).untilTrue(connected); Long subscriptionId = subscribe(getFeedUrl()); @@ -70,10 +70,40 @@ public void onOpen(Session session, EndpointConfig config) { } } + @Test + void notNotifiedForFilteredEntries() throws DeploymentException, IOException { + String sessionId = login(); + Long subscriptionId = subscribeAndWaitForEntries(getFeedUrl()); + + FeedModificationRequest req = new FeedModificationRequest(); + req.setId(subscriptionId); + req.setName("feed-name"); + req.setFilter("!title.contains('item 4')"); + getClient().target(getApiBaseUrl() + "feed/modify").request().post(Entity.json(req), Void.class); + + AtomicBoolean connected = new AtomicBoolean(); + AtomicReference messageRef = new AtomicReference<>(); + try (Session ignored = ContainerProvider.getWebSocketContainer().connectToServer(new Endpoint() { + @Override + public void onOpen(Session session, EndpointConfig config) { + session.addMessageHandler(String.class, messageRef::set); + connected.set(true); + } + }, buildConfig(sessionId), URI.create(getWebSocketUrl()))) { + Awaitility.await().atMost(15, TimeUnit.SECONDS).untilTrue(connected); + + feedNowReturnsMoreEntries(); + forceRefreshAllFeeds(); + + Awaitility.await().atMost(15, TimeUnit.SECONDS).until(() -> messageRef.get() != null); + Assertions.assertEquals("new-feed-entries:" + subscriptionId + ":1", messageRef.get()); + } + + } + @Test void pingPong() throws DeploymentException, IOException { String sessionId = login(); - ClientEndpointConfig config = buildConfig(sessionId); AtomicBoolean connected = new AtomicBoolean(); AtomicReference messageRef = new AtomicReference<>(); @@ -83,7 +113,7 @@ public void onOpen(Session session, EndpointConfig config) { session.addMessageHandler(String.class, messageRef::set); connected.set(true); } - }, config, URI.create(getWebSocketUrl()))) { + }, buildConfig(sessionId), URI.create(getWebSocketUrl()))) { Awaitility.await().atMost(15, TimeUnit.SECONDS).untilTrue(connected); session.getAsyncRemote().sendText("ping"); diff --git a/commafeed-server/src/test/resources/feed/rss_2.xml b/commafeed-server/src/test/resources/feed/rss_2.xml new file mode 100644 index 000000000..8b169494c --- /dev/null +++ b/commafeed-server/src/test/resources/feed/rss_2.xml @@ -0,0 +1,32 @@ + + + + CommaFeed test feed + https://hostname.local/commafeed + CommaFeed test feed description + + Item 4 + https://hostname.local/commafeed/4 + Item 4 description + Sun, 31 Dec 2023 15:00:00 +0100 + + + Item 3 + https://hostname.local/commafeed/3 + Item 3 description + Sat, 30 Dec 2023 15:00:00 +0100 + + + Item 2 + https://hostname.local/commafeed/2 + Item 2 description + Fri, 29 Dec 2023 15:02:00 +0100 + + + Item 1 + https://hostname.local/commafeed/1 + Item 1 description + Wed, 27 Dec 2023 22:24:00 +0100 + + + \ No newline at end of file