Skip to content

Commit

Permalink
websocket notification now takes entry filtering into account (#1191)
Browse files Browse the repository at this point in the history
  • Loading branch information
Athou committed Jan 24, 2024
1 parent 9354fb8 commit c624955
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ public FeedEntryDAO(SessionFactory sessionFactory) {
super(sessionFactory);
}

public Long findExisting(String guidHash, Feed feed) {
return query().select(entry.id).from(entry).where(entry.guidHash.eq(guidHash), entry.feed.eq(feed)).limit(1).fetchOne();
public FeedEntry findExisting(String guidHash, Feed feed) {
return query().select(entry).from(entry).where(entry.guidHash.eq(guidHash), entry.feed.eq(feed)).limit(1).fetchOne();
}

public List<FeedCapacity> findFeedsExceedingCapacity(long maxCapacity, long max) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
Expand All @@ -20,6 +23,7 @@
import com.commafeed.backend.feed.parser.FeedParserResult.Content;
import com.commafeed.backend.feed.parser.FeedParserResult.Entry;
import com.commafeed.backend.model.Feed;
import com.commafeed.backend.model.FeedEntry;
import com.commafeed.backend.model.FeedSubscription;
import com.commafeed.backend.model.User;
import com.commafeed.backend.service.FeedEntryService;
Expand Down Expand Up @@ -75,6 +79,7 @@ public FeedRefreshUpdater(UnitOfWork unitOfWork, FeedService feedService, FeedEn
private AddEntryResult addEntry(final Feed feed, final Entry entry, final List<FeedSubscription> subscriptions) {
boolean processed = false;
boolean inserted = false;
Set<FeedSubscription> subscriptionsForWhichEntryIsUnread = new HashSet<>();

// lock on feed, make sure we are not updating the same feed twice at
// the same time
Expand All @@ -96,10 +101,21 @@ private AddEntryResult addEntry(final Feed feed, final Entry entry, final List<F
locked2 = lock2.tryLock(1, TimeUnit.MINUTES);
if (locked1 && locked2) {
processed = true;
inserted = unitOfWork.call(() -> feedEntryService.addEntry(feed, entry, subscriptions));
if (inserted) {
entryInserted.mark();
}
inserted = unitOfWork.call(() -> {
Instant now = Instant.now();
FeedEntry feedEntry = feedEntryService.findOrCreate(feed, entry);
boolean newEntry = !feedEntry.getInserted().isBefore(now);
if (newEntry) {
entryInserted.mark();
for (FeedSubscription sub : subscriptions) {
boolean unread = feedEntryService.applyFilter(sub, feedEntry);
if (unread) {
subscriptionsForWhichEntryIsUnread.add(sub);
}
}
}
return newEntry;
});
} else {
log.error("lock timeout for " + feed.getUrl() + " - " + key1);
}
Expand All @@ -113,12 +129,13 @@ private AddEntryResult addEntry(final Feed feed, final Entry entry, final List<F
lock2.unlock();
}
}
return new AddEntryResult(processed, inserted);
return new AddEntryResult(processed, inserted, subscriptionsForWhichEntryIsUnread);
}

public boolean update(Feed feed, List<Entry> entries) {
boolean processed = true;
long inserted = 0;
Map<FeedSubscription, Long> unreadCountBySubscription = new HashMap<>();

if (!entries.isEmpty()) {
Set<String> lastEntries = cache.getLastEntries(feed);
Expand All @@ -135,6 +152,7 @@ public boolean update(Feed feed, List<Entry> entries) {
AddEntryResult addEntryResult = addEntry(feed, entry, subscriptions);
processed &= addEntryResult.processed;
inserted += addEntryResult.inserted ? 1 : 0;
addEntryResult.subscriptionsForWhichEntryIsUnread.forEach(sub -> unreadCountBySubscription.merge(sub, 1L, Long::sum));

entryCacheMiss.mark();
} else {
Expand All @@ -153,7 +171,7 @@ public boolean update(Feed feed, List<Entry> entries) {
cache.invalidateUnreadCount(subscriptions.toArray(new FeedSubscription[0]));
cache.invalidateUserRootCategory(users.toArray(new User[0]));

notifyOverWebsocket(subscriptions, inserted);
notifyOverWebsocket(unreadCountBySubscription);
}
}

Expand All @@ -171,14 +189,16 @@ public boolean update(Feed feed, List<Entry> entries) {
return processed;
}

private void notifyOverWebsocket(List<FeedSubscription> subscriptions, long inserted) {
subscriptions.forEach(sub -> webSocketSessions.sendMessage(sub.getUser(), WebSocketMessageBuilder.newFeedEntries(sub, inserted)));
private void notifyOverWebsocket(Map<FeedSubscription, Long> unreadCountBySubscription) {
unreadCountBySubscription.forEach((sub, unreadCount) -> webSocketSessions.sendMessage(sub.getUser(),
WebSocketMessageBuilder.newFeedEntries(sub, unreadCount)));
}

@AllArgsConstructor
private static class AddEntryResult {
private final boolean processed;
private final boolean inserted;
private final Set<FeedSubscription> subscriptionsForWhichEntryIsUnread;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.commafeed.backend.model.FeedEntryStatus;
import com.commafeed.backend.model.FeedSubscription;
import com.commafeed.backend.model.User;
import com.commafeed.backend.service.FeedEntryFilteringService.FeedEntryFilterException;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;
Expand All @@ -38,33 +39,34 @@ public class FeedEntryService {
/**
* this is NOT thread-safe
*/
public boolean addEntry(Feed feed, Entry entry, List<FeedSubscription> subscriptions) {
public FeedEntry findOrCreate(Feed feed, Entry entry) {
String guid = FeedUtils.truncate(entry.guid(), 2048);
String guidHash = DigestUtils.sha1Hex(entry.guid());
Long existing = feedEntryDAO.findExisting(guidHash, feed);
FeedEntry existing = feedEntryDAO.findExisting(guidHash, feed);
if (existing != null) {
return false;
return existing;
}

FeedEntry feedEntry = buildEntry(feed, entry, guid, guidHash);
feedEntryDAO.saveOrUpdate(feedEntry);
return feedEntry;
}

public boolean applyFilter(FeedSubscription sub, FeedEntry entry) {
boolean matches = true;
try {
matches = feedEntryFilteringService.filterMatchesEntry(sub.getFilter(), entry);
} catch (FeedEntryFilterException e) {
log.error("could not evaluate filter {}", sub.getFilter(), e);
}

// if filter does not match the entry, mark it as read
for (FeedSubscription sub : subscriptions) {
boolean matches = true;
try {
matches = feedEntryFilteringService.filterMatchesEntry(sub.getFilter(), feedEntry);
} catch (FeedEntryFilteringService.FeedEntryFilterException e) {
log.error("could not evaluate filter {}", sub.getFilter(), e);
}
if (!matches) {
FeedEntryStatus status = new FeedEntryStatus(sub.getUser(), sub, feedEntry);
status.setRead(true);
feedEntryStatusDAO.saveOrUpdate(status);
}
if (!matches) {
FeedEntryStatus status = new FeedEntryStatus(sub.getUser(), sub, entry);
status.setRead(true);
feedEntryStatusDAO.saveOrUpdate(status);
}

return true;
return matches;
}

private FeedEntry buildEntry(Feed feed, Entry e, String guid, String guidHash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
@ExtendWith(MockServerExtension.class)
public abstract class BaseIT {

private static final HttpRequest FEED_REQUEST = HttpRequest.request().withMethod("GET").withPath("/");

private final CommaFeedDropwizardAppExtension extension = buildExtension();

private Client client;
Expand All @@ -50,6 +52,8 @@ public abstract class BaseIT {

private String webSocketUrl;

private MockServerClient mockServerClient;

protected CommaFeedDropwizardAppExtension buildExtension() {
return new CommaFeedDropwizardAppExtension() {
@Override
Expand All @@ -61,9 +65,10 @@ protected JerseyClientBuilder clientBuilder() {

@BeforeEach
void init(MockServerClient mockServerClient) throws IOException {
this.mockServerClient = mockServerClient;

URL resource = Objects.requireNonNull(getClass().getResource("/feed/rss.xml"));
mockServerClient.when(HttpRequest.request().withMethod("GET").withPath("/"))
.respond(HttpResponse.response().withBody(IOUtils.toString(resource, StandardCharsets.UTF_8)));
mockServerClient.when(FEED_REQUEST).respond(HttpResponse.response().withBody(IOUtils.toString(resource, StandardCharsets.UTF_8)));

this.client = extension.client();
this.feedUrl = "http://localhost:" + mockServerClient.getPort() + "/";
Expand All @@ -77,6 +82,13 @@ void cleanup() {
this.client.close();
}

protected void feedNowReturnsMoreEntries() throws IOException {
mockServerClient.clear(FEED_REQUEST);

URL resource = Objects.requireNonNull(getClass().getResource("/feed/rss_2.xml"));
mockServerClient.when(FEED_REQUEST).respond(HttpResponse.response().withBody(IOUtils.toString(resource, StandardCharsets.UTF_8)));
}

protected String login() {
LoginRequest req = new LoginRequest();
req.setName("admin");
Expand Down Expand Up @@ -112,4 +124,8 @@ protected Entries getFeedEntries(long subscriptionId) {
.get();
return response.readEntity(Entries.class);
}

protected void forceRefreshAllFeeds() {
client.target(apiBaseUrl + "feed/refreshAll").request().get(Void.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,21 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import com.commafeed.frontend.model.request.FeedModificationRequest;

import jakarta.websocket.ClientEndpointConfig;
import jakarta.websocket.CloseReason;
import jakarta.websocket.ContainerProvider;
import jakarta.websocket.DeploymentException;
import jakarta.websocket.Endpoint;
import jakarta.websocket.EndpointConfig;
import jakarta.websocket.Session;
import jakarta.ws.rs.client.Entity;

class WebSocketIT extends BaseIT {

@Test
void sessionClosedIfNotLoggedIn() throws DeploymentException, IOException {
ClientEndpointConfig config = buildConfig("fake-session-id");

AtomicBoolean connected = new AtomicBoolean();
AtomicReference<CloseReason> closeReasonRef = new AtomicReference<>();
try (Session ignored = ContainerProvider.getWebSocketContainer().connectToServer(new Endpoint() {
Expand All @@ -39,7 +40,7 @@ public void onOpen(Session session, EndpointConfig config) {
public void onClose(Session session, CloseReason closeReason) {
closeReasonRef.set(closeReason);
}
}, config, URI.create(getWebSocketUrl()))) {
}, buildConfig("fake-session-id"), URI.create(getWebSocketUrl()))) {
Awaitility.await().atMost(15, TimeUnit.SECONDS).untilTrue(connected);

Awaitility.await().atMost(15, TimeUnit.SECONDS).until(() -> closeReasonRef.get() != null);
Expand All @@ -50,7 +51,6 @@ public void onClose(Session session, CloseReason closeReason) {
@Test
void subscribeAndGetsNotified() throws DeploymentException, IOException {
String sessionId = login();
ClientEndpointConfig config = buildConfig(sessionId);

AtomicBoolean connected = new AtomicBoolean();
AtomicReference<String> messageRef = new AtomicReference<>();
Expand All @@ -60,7 +60,7 @@ public void onOpen(Session session, EndpointConfig config) {
session.addMessageHandler(String.class, messageRef::set);
connected.set(true);
}
}, config, URI.create(getWebSocketUrl()))) {
}, buildConfig(sessionId), URI.create(getWebSocketUrl()))) {
Awaitility.await().atMost(15, TimeUnit.SECONDS).untilTrue(connected);

Long subscriptionId = subscribe(getFeedUrl());
Expand All @@ -70,10 +70,40 @@ public void onOpen(Session session, EndpointConfig config) {
}
}

@Test
void notNotifiedForFilteredEntries() throws DeploymentException, IOException {
String sessionId = login();
Long subscriptionId = subscribeAndWaitForEntries(getFeedUrl());

FeedModificationRequest req = new FeedModificationRequest();
req.setId(subscriptionId);
req.setName("feed-name");
req.setFilter("!title.contains('item 4')");
getClient().target(getApiBaseUrl() + "feed/modify").request().post(Entity.json(req), Void.class);

AtomicBoolean connected = new AtomicBoolean();
AtomicReference<String> messageRef = new AtomicReference<>();
try (Session ignored = ContainerProvider.getWebSocketContainer().connectToServer(new Endpoint() {
@Override
public void onOpen(Session session, EndpointConfig config) {
session.addMessageHandler(String.class, messageRef::set);
connected.set(true);
}
}, buildConfig(sessionId), URI.create(getWebSocketUrl()))) {
Awaitility.await().atMost(15, TimeUnit.SECONDS).untilTrue(connected);

feedNowReturnsMoreEntries();
forceRefreshAllFeeds();

Awaitility.await().atMost(15, TimeUnit.SECONDS).until(() -> messageRef.get() != null);
Assertions.assertEquals("new-feed-entries:" + subscriptionId + ":1", messageRef.get());
}

}

@Test
void pingPong() throws DeploymentException, IOException {
String sessionId = login();
ClientEndpointConfig config = buildConfig(sessionId);

AtomicBoolean connected = new AtomicBoolean();
AtomicReference<String> messageRef = new AtomicReference<>();
Expand All @@ -83,7 +113,7 @@ public void onOpen(Session session, EndpointConfig config) {
session.addMessageHandler(String.class, messageRef::set);
connected.set(true);
}
}, config, URI.create(getWebSocketUrl()))) {
}, buildConfig(sessionId), URI.create(getWebSocketUrl()))) {
Awaitility.await().atMost(15, TimeUnit.SECONDS).untilTrue(connected);

session.getAsyncRemote().sendText("ping");
Expand Down
32 changes: 32 additions & 0 deletions commafeed-server/src/test/resources/feed/rss_2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8" ?>
<rss version="2.0">
<channel>
<title>CommaFeed test feed</title>
<link>https://hostname.local/commafeed</link>
<description>CommaFeed test feed description</description>
<item>
<title>Item 4</title>
<link>https://hostname.local/commafeed/4</link>
<description>Item 4 description</description>
<pubDate>Sun, 31 Dec 2023 15:00:00 +0100</pubDate>
</item>
<item>
<title>Item 3</title>
<link>https://hostname.local/commafeed/3</link>
<description>Item 3 description</description>
<pubDate>Sat, 30 Dec 2023 15:00:00 +0100</pubDate>
</item>
<item>
<title>Item 2</title>
<link>https://hostname.local/commafeed/2</link>
<description>Item 2 description</description>
<pubDate>Fri, 29 Dec 2023 15:02:00 +0100</pubDate>
</item>
<item>
<title>Item 1</title>
<link>https://hostname.local/commafeed/1</link>
<description>Item 1 description</description>
<pubDate>Wed, 27 Dec 2023 22:24:00 +0100</pubDate>
</item>
</channel>
</rss>

0 comments on commit c624955

Please sign in to comment.