Skip to content

Commit

Permalink
chore: subscription fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
im-adithya committed Feb 27, 2024
1 parent e46ebbd commit ebfedfd
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 21 deletions.
54 changes: 35 additions & 19 deletions internal/nostr/nostr.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,19 +135,19 @@ func NewService(ctx context.Context) (*Service, error) {

for _, sub := range openSubscriptions {
go func(sub Subscription) {
ctx, cancel := context.WithCancel(ctx)
svc.mu.Lock()
svc.subscriptions[sub.ID] = cancel
svc.mu.Unlock()
errorChan := make(chan error)
svc.handleSubscription(ctx, &sub, errorChan)

err := <-errorChan
if err != nil {
svc.stopSubscription(&sub)
svc.Logger.Errorf("error opening subscription %d: %v", sub.ID, err)
}
svc.Logger.Infof("opened subscription %d", sub.ID)
ctx, cancel := context.WithCancel(svc.Ctx)
svc.mu.Lock()
svc.subscriptions[sub.ID] = cancel
svc.mu.Unlock()
errorChan := make(chan error)
go svc.handleSubscription(ctx, &sub, errorChan)

err := <-errorChan
if err != nil {
svc.stopSubscription(&sub)
svc.Logger.Errorf("error opening subscription %d: %v", sub.ID, err)
}
svc.Logger.Infof("opened subscription %d", sub.ID)
}(sub)
}

Expand Down Expand Up @@ -330,6 +330,7 @@ func (svc *Service) SubscriptionHandler(c echo.Context) error {

return c.JSON(http.StatusOK, SubscriptionResponse{
SubscriptionId: subscription.ID,
WebhookUrl: requestData.WebhookUrl,
})
}

Expand Down Expand Up @@ -376,6 +377,17 @@ func (svc *Service) handleSubscription(ctx context.Context, subscription *Subscr
errorChan <- nil
go func(){
for event := range sub.Events {
svc.Logger.WithFields(logrus.Fields{
"eventId": event.ID,
"eventKind": event.Kind,
}).Infof("received event on subscription %d", subscription.ID)
responseEvent := ResponseEvent{
SubscriptionId: subscription.ID,
NostrId: event.ID,
Content: event.Content,
RepliedAt: event.CreatedAt.Time(),
}
svc.db.Save(&responseEvent)
svc.postEventToWebhook(event, subscription.WebhookUrl)
}
}()
Expand All @@ -391,12 +403,16 @@ func (svc *Service) StopSubscriptionHandler(c echo.Context) error {
subId := uint(uint64Id)

subscription := Subscription{}
findSubscriptionResult := svc.db.Where("id = ?", subId).Find(&subscription)

if findSubscriptionResult.RowsAffected != 0 {
return c.JSON(http.StatusBadRequest, ErrorResponse{
Message: "subscription does not exist",
})
if err := svc.db.First(&subscription, subId).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return c.JSON(http.StatusBadRequest, ErrorResponse{
Message: "subscription does not exist",
})
} else {
return c.JSON(http.StatusInternalServerError, ErrorResponse{
Message: fmt.Sprintf("error occurred while fetching user: %s", err.Error()),
})
}
}

err := svc.stopSubscription(&subscription)
Expand Down
2 changes: 1 addition & 1 deletion migrations/initial_migration_postgres.sql
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ CREATE TABLE response_events (
id bigint NOT NULL,
subscription_id bigint,
request_id bigint NULL,
nostr_id text UNIQUE,
nostr_id text,
content text,
replied_at timestamp with time zone,
created_at timestamp with time zone,
Expand Down
2 changes: 1 addition & 1 deletion migrations/initial_migration_sqlite.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CREATE TABLE "subscriptions" (`id` integer,`relay_url` text,`webhook_url` text,`ids_string` text,`kinds_string` text,`authors_string` text,`tags_string` text,`since` datetime,`until` datetime,`limit` integer,`search` text,`open` boolean,`created_at` datetime,`updated_at` datetime,PRIMARY KEY (`id`));
CREATE TABLE "request_events" (`id` integer,`subscription_id` integer,`nostr_id` text UNIQUE,`content` text,`state` text,`created_at` datetime,`updated_at` datetime,PRIMARY KEY (`id`),CONSTRAINT `fk_request_events_subscription` FOREIGN KEY (`subscription_id`) REFERENCES `subscriptions`(`id`) ON DELETE CASCADE);
CREATE TABLE "response_events" (`id` integer,`subscription_id` integer,`request_id` integer null,`nostr_id` text UNIQUE,`content` text,`replied_at` datetime,`created_at` datetime,`updated_at` datetime,PRIMARY KEY (`id`),CONSTRAINT `fk_response_events_subscription` FOREIGN KEY (`subscription_id`) REFERENCES `subscriptions`(`id`) ON DELETE CASCADE,CONSTRAINT `fk_response_events_request_event` FOREIGN KEY (`request_id`) REFERENCES `request_events`(`id`) ON DELETE CASCADE);
CREATE TABLE "response_events" (`id` integer,`subscription_id` integer,`request_id` integer null,`nostr_id` text,`content` text,`replied_at` datetime,`created_at` datetime,`updated_at` datetime,PRIMARY KEY (`id`),CONSTRAINT `fk_response_events_subscription` FOREIGN KEY (`subscription_id`) REFERENCES `subscriptions`(`id`) ON DELETE CASCADE,CONSTRAINT `fk_response_events_request_event` FOREIGN KEY (`request_id`) REFERENCES `request_events`(`id`) ON DELETE CASCADE);

0 comments on commit ebfedfd

Please sign in to comment.