Skip to content

Commit

Permalink
Merge pull request #104 from getAlby/task-improv
Browse files Browse the repository at this point in the history
fix: webhook and stop subscriptions logic
  • Loading branch information
im-adithya committed Aug 1, 2024
2 parents 028f79f + a1989c3 commit ab7d91f
Showing 1 changed file with 30 additions and 28 deletions.
58 changes: 30 additions & 28 deletions internal/nostr/nostr.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func (svc *Service) NIP47WebhookHandler(c echo.Context) error {
"webhook_url": requestData.WebhookUrl,
}).Debug("Processing request event")

if svc.db.Where("nostr_id = ?", requestData.SignedEvent.ID).First(&RequestEvent{}).RowsAffected != 0 {
if svc.db.Where("nostr_id = ?", requestData.SignedEvent.ID).Find(&RequestEvent{}).RowsAffected != 0 {
svc.Logger.WithFields(logrus.Fields{
"request_event_id": requestData.SignedEvent.ID,
"wallet_pubkey": requestData.WalletPubkey,
Expand Down Expand Up @@ -452,9 +452,24 @@ func (svc *Service) NIP47WebhookHandler(c echo.Context) error {
subscription := svc.prepareNIP47Subscription(requestData.RelayUrl, requestData.WalletPubkey, requestData.WebhookUrl, requestEvent)

ctx, cancel := context.WithTimeout(svc.Ctx, 90*time.Second)
defer cancel()

go svc.startSubscription(ctx, &subscription, svc.publishRequestEvent, svc.handleResponseEvent)

go func(){
defer cancel()
select {
case <-ctx.Done():
svc.Logger.WithFields(logrus.Fields{
"request_event_id": requestData.SignedEvent.ID,
"client_pubkey": requestData.SignedEvent.PubKey,
"wallet_pubkey": requestData.WalletPubkey,
"relay_url": requestData.RelayUrl,
}).Error("Stopped subscription without receiving event")
case event := <-subscription.EventChan:
svc.postEventToWebhook(event, subscription.WebhookUrl)
}
}()

return c.JSON(http.StatusOK, NIP47Response{
State: WEBHOOK_RECEIVED,
})
Expand Down Expand Up @@ -736,20 +751,14 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri
if isCustomRelay {
relay.Close()
}
// Save the request event state and stop the
// subscription if it's an NIP47 request
// stop the subscription if it's an NIP47 request
if (subscription.RequestEvent != nil) {
if (subscription.RequestEvent.State == "") {
subscription.RequestEvent.State = REQUEST_EVENT_PUBLISH_FAILED
}
svc.db.Save(&subscription.RequestEvent)
// stop the subscription as it is one time
svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"relay_url": subscription.RelayUrl,
}).Debug("Stopping subscription")
svc.stopSubscription(subscription)
}
svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"relay_url": subscription.RelayUrl,
}).Debug("Stopping subscription")
break
}
}
Expand All @@ -767,19 +776,21 @@ func (svc *Service) publishRequestEvent(ctx context.Context, subscription *Subsc
svc.Logger.WithError(err).WithFields(logrus.Fields{
"request_event_id": subscription.RequestEvent.NostrId,
"relay_url": subscription.RelayUrl,
"wallet_pubkey": walletPubkey,
"client_pubkey": clientPubkey,
"wallet_pubkey": walletPubkey,
"client_pubkey": clientPubkey,
}).Error("Failed to publish to relay")
subscription.RequestEvent.State = REQUEST_EVENT_PUBLISH_FAILED
sub.Unsub()
} else {
svc.Logger.WithFields(logrus.Fields{
"request_event_id": subscription.RequestEvent.NostrId,
"relay_url": subscription.RelayUrl,
"wallet_pubkey": walletPubkey,
"client_pubkey": clientPubkey,
}).Info("Published request event successfully")
}).Info("Published request event")
subscription.RequestEvent.State = REQUEST_EVENT_PUBLISH_CONFIRMED
}
svc.db.Save(&subscription.RequestEvent)
}

func (svc *Service) handleResponseEvent(event *nostr.Event, subscription *Subscription) {
Expand All @@ -803,17 +814,8 @@ func (svc *Service) handleResponseEvent(event *nostr.Event, subscription *Subscr
RequestId: &subscription.RequestEvent.ID,
}
svc.db.Save(&responseEvent)
if subscription.WebhookUrl != "" {
svc.postEventToWebhook(event, subscription.WebhookUrl)
} else {
subscription.EventChan <- event
}
svc.subscriptionsMutex.Lock()
sub, exists := svc.subscriptions[subscription.Uuid]
if exists {
sub.Unsub()
}
svc.subscriptionsMutex.Unlock()

subscription.EventChan <- event
}

func (svc *Service) handleSubscribedEvent(event *nostr.Event, subscription *Subscription) {
Expand Down Expand Up @@ -920,7 +922,7 @@ func (svc *Service) postEventToWebhook(event *nostr.Event, webhookURL string) {
"event_id": event.ID,
"event_kind": event.Kind,
"webhook_url": webhookURL,
}).Info("Successfully posted event to webhook")
}).Info("Posted event to webhook")
}

func (svc *Service) subscriptionToFilter(subscription *Subscription) (*nostr.Filter){
Expand Down

0 comments on commit ab7d91f

Please sign in to comment.