Skip to content

Commit

Permalink
Remove quantity
Browse files Browse the repository at this point in the history
  • Loading branch information
thomash-acinq committed Jan 8, 2025
1 parent 849ae96 commit a2c6e0b
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 75 deletions.
6 changes: 3 additions & 3 deletions eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ trait Eclair {

def receive(description: Either[String, ByteVector32], amount_opt: Option[MilliSatoshi], expire_opt: Option[Long], fallbackAddress_opt: Option[String], paymentPreimage_opt: Option[ByteVector32], privateChannelIds_opt: Option[List[ByteVector32]])(implicit timeout: Timeout): Future[Bolt11Invoice]

def createOffer(description_opt: Option[String], amount_opt: Option[MilliSatoshi], expiry_opt: Option[TimestampSecond], issuer_opt: Option[String], availableQuantity_opt: Option[Long], firstNodeId_opt: Option[PublicKey], hideNodeId: Boolean)(implicit timeout: Timeout): Future[Offer]
def createOffer(description_opt: Option[String], amount_opt: Option[MilliSatoshi], expiry_opt: Option[TimestampSecond], issuer_opt: Option[String], firstNodeId_opt: Option[PublicKey], hideNodeId: Boolean)(implicit timeout: Timeout): Future[Offer]

def disableOffer(offer: Offer)(implicit timeout: Timeout): Unit

Expand Down Expand Up @@ -378,9 +378,9 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
}
}

override def createOffer(description_opt: Option[String], amount_opt: Option[MilliSatoshi], expiry_opt: Option[TimestampSecond], issuer_opt: Option[String], availableQuantity_opt: Option[Long], firstNodeId_opt: Option[PublicKey], hideNodeId: Boolean)(implicit timeout: Timeout): Future[Offer] = {
override def createOffer(description_opt: Option[String], amount_opt: Option[MilliSatoshi], expiry_opt: Option[TimestampSecond], issuer_opt: Option[String], firstNodeId_opt: Option[PublicKey], hideNodeId: Boolean)(implicit timeout: Timeout): Future[Offer] = {
val offerCreator = appKit.system.spawnAnonymous(OfferCreator(appKit.nodeParams, appKit.router, appKit.offerManager, appKit.defaultOfferHandler))
offerCreator.ask[Either[String, Offer]](replyTo => OfferCreator.Create(replyTo, description_opt, amount_opt, expiry_opt, issuer_opt, availableQuantity_opt, firstNodeId_opt, hideNodeId))
offerCreator.ask[Either[String, Offer]](replyTo => OfferCreator.Create(replyTo, description_opt, amount_opt, expiry_opt, issuer_opt, firstNodeId_opt, hideNodeId))
.flatMap {
case Left(errorMessage) => Future.failed(new Exception(errorMessage))
case Right(offer) => Future.successful(offer)
Expand Down
16 changes: 3 additions & 13 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -405,26 +405,16 @@ case class DualOffersDb(primary: OffersDb, secondary: OffersDb) extends OffersDb

private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-offers").build()))

override def addOffer(offer: OfferTypes.Offer, pathId_opt: Option[ByteVector32], quantityAvailable: Long): Unit = {
runAsync(secondary.addOffer(offer, pathId_opt, quantityAvailable))
primary.addOffer(offer, pathId_opt, quantityAvailable)
override def addOffer(offer: OfferTypes.Offer, pathId_opt: Option[ByteVector32]): Unit = {
runAsync(secondary.addOffer(offer, pathId_opt))
primary.addOffer(offer, pathId_opt)
}

override def disableOffer(offer: OfferTypes.Offer): Unit = {
runAsync(secondary.disableOffer(offer))
primary.disableOffer(offer)
}

override def getAvailableQuantity(offer: OfferTypes.Offer): Long = {
runAsync(secondary.getAvailableQuantity(offer))
primary.getAvailableQuantity(offer)
}

override def setAvailableQuantity(offer: OfferTypes.Offer, quantityAvailable: Long): Unit = {
runAsync(secondary.setAvailableQuantity(offer, quantityAvailable))
primary.setAvailableQuantity(offer, quantityAvailable)
}

override def listOffers(onlyActive: Boolean): Seq[OfferData] = {
runAsync(secondary.listOffers(onlyActive))
primary.listOffers(onlyActive)
Expand Down
9 changes: 2 additions & 7 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/OffersDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,12 @@ package fr.acinq.eclair.db
import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.eclair.wire.protocol.OfferTypes.Offer

case class OfferData(offer: Offer, pathId_opt: Option[ByteVector32], quantityAvailable: Long)
case class OfferData(offer: Offer, pathId_opt: Option[ByteVector32])

trait OffersDb {
def addOffer(offer: Offer, pathId_opt: Option[ByteVector32], quantityAvailable: Long): Unit
def addOffer(offer: Offer, pathId_opt: Option[ByteVector32]): Unit

def disableOffer(offer: Offer): Unit

def getAvailableQuantity(offer: Offer): Long

// Must only be called from the `DefaultHandler` actor to prevent data races.
def setAvailableQuantity(offer: Offer, quantityAvailable: Long): Unit

def listOffers(onlyActive: Boolean): Seq[OfferData]
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class PgOffersDb(implicit ds: DataSource, lock: PgLock) extends OffersDb with Lo
case None =>
statement.executeUpdate("CREATE SCHEMA offers")

statement.executeUpdate("CREATE TABLE offers.managed (offer_id TEXT NOT NULL PRIMARY KEY, offer TEXT NOT NULL, path_id TEXT, created_at TIMESTAMP WITH TIME ZONE NOT NULL, is_active BOOLEAN NOT NULL, quantity_available BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE offers.managed (offer_id TEXT NOT NULL PRIMARY KEY, offer TEXT NOT NULL, path_id TEXT, created_at TIMESTAMP WITH TIME ZONE NOT NULL, is_active BOOLEAN NOT NULL)")

statement.executeUpdate("CREATE INDEX offer_is_active_idx ON offers.managed(is_active)")
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
Expand All @@ -55,16 +55,15 @@ class PgOffersDb(implicit ds: DataSource, lock: PgLock) extends OffersDb with Lo
}
}

override def addOffer(offer: OfferTypes.Offer, pathId_opt: Option[ByteVector32], quantityAvailable: Long): Unit = withMetrics("offers/add", DbBackends.Postgres){
override def addOffer(offer: OfferTypes.Offer, pathId_opt: Option[ByteVector32]): Unit = withMetrics("offers/add", DbBackends.Postgres){
withLock { pg =>
using(pg.prepareStatement("INSERT INTO offers.managed (offer_id, offer, path_id, created_at, is_active, quantity_available) VALUES (?, ?, ?, NOW, TRUE, ?)")) { statement =>
using(pg.prepareStatement("INSERT INTO offers.managed (offer_id, offer, path_id, created_at, is_active) VALUES (?, ?, ?, NOW, TRUE)")) { statement =>
statement.setString(1, offer.offerId.toHex)
statement.setString(2, offer.toString)
pathId_opt match {
case Some(pathId) => statement.setString(3, pathId.toHex)
case None => statement.setNull(3, java.sql.Types.VARCHAR)
}
statement.setLong(4, quantityAvailable)

statement.executeUpdate()
}
Expand All @@ -73,9 +72,5 @@ class PgOffersDb(implicit ds: DataSource, lock: PgLock) extends OffersDb with Lo

override def disableOffer(offer: OfferTypes.Offer): Unit = ???

override def getAvailableQuantity(offer: OfferTypes.Offer): Long = ???

override def setAvailableQuantity(offer: OfferTypes.Offer, quantityAvailable: Long): Unit = ???

override def listOffers(onlyActive: Boolean): Seq[OfferData] = ???
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,9 @@ import java.sql.Connection

class SqliteOffersDb(val sqlite: Connection) extends OffersDb with Logging {

override def addOffer(offer: OfferTypes.Offer, pathId_opt: Option[ByteVector32], quantityAvailable: Long): Unit = ???
override def addOffer(offer: OfferTypes.Offer, pathId_opt: Option[ByteVector32]): Unit = ???

override def disableOffer(offer: OfferTypes.Offer): Unit = ???

override def getAvailableQuantity(offer: OfferTypes.Offer): Long = ???

override def setAvailableQuantity(offer: OfferTypes.Offer, quantityAvailable: Long): Unit = ???

override def listOffers(onlyActive: Boolean): Seq[OfferData] = ???
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,32 +34,21 @@ object DefaultHandler {
Behaviors.setup(context =>
Behaviors.receiveMessage {
case OfferManager.HandleInvoiceRequest(replyTo, invoiceRequest) =>
if (nodeParams.db.offers.getAvailableQuantity(invoiceRequest.offer) >= invoiceRequest.quantity) {
val amount = invoiceRequest.amount.getOrElse(10_000_000.msat)
invoiceRequest.offer.contactInfos.head match {
case OfferTypes.RecipientNodeId(nodeId) =>
val route = ReceivingRoute(Seq(nodeId), nodeParams.channelConf.maxExpiryDelta)
replyTo ! OfferManager.InvoiceRequestActor.ApproveRequest(amount, Seq(route), None)
case OfferTypes.BlindedPath(BlindedRoute(firstNodeId: EncodedNodeId.WithPublicKey, _, _)) =>
val routeParams = nodeParams.routerConf.pathFindingExperimentConf.getRandomConf().getDefaultRouteParams
router ! BlindedRouteRequest(context.spawnAnonymous(waitForRoute(nodeParams, replyTo, amount)), firstNodeId.publicKey, nodeParams.nodeId, amount, routeParams, pathsToFind = 2)
case OfferTypes.BlindedPath(BlindedRoute(_: EncodedNodeId.ShortChannelIdDir, _, _)) =>
context.log.error("unexpected managed offer with compact first node id")
replyTo ! OfferManager.InvoiceRequestActor.RejectRequest("internal error")
}
} else {
replyTo ! OfferManager.InvoiceRequestActor.RejectRequest("quantity unavailable for this offer")
val amount = invoiceRequest.amount.getOrElse(10_000_000.msat)
invoiceRequest.offer.contactInfos.head match {
case OfferTypes.RecipientNodeId(nodeId) =>
val route = ReceivingRoute(Seq(nodeId), nodeParams.channelConf.maxExpiryDelta)
replyTo ! OfferManager.InvoiceRequestActor.ApproveRequest(amount, Seq(route), None)
case OfferTypes.BlindedPath(BlindedRoute(firstNodeId: EncodedNodeId.WithPublicKey, _, _)) =>
val routeParams = nodeParams.routerConf.pathFindingExperimentConf.getRandomConf().getDefaultRouteParams
router ! BlindedRouteRequest(context.spawnAnonymous(waitForRoute(nodeParams, replyTo, amount)), firstNodeId.publicKey, nodeParams.nodeId, amount, routeParams, pathsToFind = 2)
case OfferTypes.BlindedPath(BlindedRoute(_: EncodedNodeId.ShortChannelIdDir, _, _)) =>
context.log.error("unexpected managed offer with compact first node id")
replyTo ! OfferManager.InvoiceRequestActor.RejectRequest("internal error")
}
Behaviors.same
case OfferManager.HandlePayment(replyTo, offer, invoiceData) =>
val availableQuantity = nodeParams.db.offers.getAvailableQuantity(offer)
if (availableQuantity >= invoiceData.quantity) {
// Only this actor reads and writes the available quantity so there is no data race.
nodeParams.db.offers.setAvailableQuantity(offer, availableQuantity - invoiceData.quantity)
replyTo ! OfferManager.PaymentActor.AcceptPayment()
} else {
replyTo ! OfferManager.PaymentActor.RejectPayment("quantity unavailable for this offer")
}
case OfferManager.HandlePayment(replyTo, _, _) =>
replyTo ! OfferManager.PaymentActor.AcceptPayment()
Behaviors.same
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ object OfferCreator {
amount_opt: Option[MilliSatoshi],
expiry_opt: Option[TimestampSecond],
issuer_opt: Option[String],
availableQuantity_opt: Option[Long],
firstNodeId_opt: Option[PublicKey],
hideNodeId: Boolean) extends Command

Expand All @@ -46,8 +45,8 @@ object OfferCreator {

def apply(nodeParams: NodeParams, router: ActorRef, offerManager: typed.ActorRef[OfferManager.Command], defaultOfferHandler: typed.ActorRef[OfferManager.HandlerCommand]): Behavior[Command] =
Behaviors.receivePartial {
case (context, Create(replyTo, description_opt, amount_opt, expiry_opt, issuer_opt, availableQuantity_opt, firstNodeId, hideNodeId)) =>
new OfferCreator(context, replyTo, nodeParams, router, offerManager, defaultOfferHandler).init(description_opt, amount_opt, expiry_opt, issuer_opt, availableQuantity_opt, firstNodeId, hideNodeId)
case (context, Create(replyTo, description_opt, amount_opt, expiry_opt, issuer_opt, firstNodeId, hideNodeId)) =>
new OfferCreator(context, replyTo, nodeParams, router, offerManager, defaultOfferHandler).init(description_opt, amount_opt, expiry_opt, issuer_opt, firstNodeId, hideNodeId)
}
}

Expand All @@ -63,7 +62,6 @@ private class OfferCreator(context: ActorContext[OfferCreator.Command],
amount_opt: Option[MilliSatoshi],
expiry_opt: Option[TimestampSecond],
issuer_opt: Option[String],
availableQuantity_opt: Option[Long],
firstNodeId_opt: Option[PublicKey],
hideNodeId: Boolean): Behavior[Command] = {
if (amount_opt.nonEmpty && description_opt.isEmpty) {
Expand All @@ -76,47 +74,45 @@ private class OfferCreator(context: ActorContext[OfferCreator.Command],
description_opt.map(OfferDescription),
expiry_opt.map(OfferAbsoluteExpiry),
issuer_opt.map(OfferIssuer),
availableQuantity_opt.map(_ => OfferQuantityMax(0)),
).flatten
val quantityAvailable = availableQuantity_opt.getOrElse(Long.MaxValue)
firstNodeId_opt match {
case Some(firstNodeId) =>
router ! Router.MessageRouteRequest(context.messageAdapter(RouteResponseWrapper(_)), firstNodeId, nodeParams.nodeId, Set.empty)
waitForRoute(tlvs, quantityAvailable)
waitForRoute(tlvs)
case None if hideNodeId =>
router ! Router.GetCentralNode(context.messageAdapter(FirstNodeWrapper(_)))
waitForFirstNode(tlvs, quantityAvailable)
waitForFirstNode(tlvs)
case None =>
val offer = Offer(TlvStream(tlvs + OfferNodeId(nodeParams.nodeId)))
registerOffer(offer, None, quantityAvailable)
registerOffer(offer, None)

}
}
}

private def waitForFirstNode(tlvs: Set[OfferTlv], quantityAvailable: Long): Behavior[Command] = {
private def waitForFirstNode(tlvs: Set[OfferTlv]): Behavior[Command] = {
Behaviors.receiveMessagePartial {
case FirstNodeWrapper(firstNodeId) =>
router ! Router.MessageRouteRequest(context.messageAdapter(RouteResponseWrapper(_)), firstNodeId, nodeParams.nodeId, Set.empty)
waitForRoute(tlvs, quantityAvailable)
waitForRoute(tlvs)
}
}

private def waitForRoute(tlvs: Set[OfferTlv], quantityAvailable: Long): Behavior[Command] = {
private def waitForRoute(tlvs: Set[OfferTlv]): Behavior[Command] = {
Behaviors.receiveMessagePartial {
case RouteResponseWrapper(Router.MessageRoute(intermediateNodes, _)) =>
val pathId = randomBytes32()
val paths = Seq(OnionMessages.buildRoute(randomKey(), intermediateNodes.map(IntermediateNode(_)), Recipient(nodeParams.nodeId, Some(pathId))).route)
val offer = Offer(TlvStream(tlvs + OfferPaths(paths)))
registerOffer(offer, Some(pathId), quantityAvailable)
registerOffer(offer, Some(pathId))
case RouteResponseWrapper(Router.MessageRouteNotFound(_)) =>
replyTo ! Left("No route found")
Behaviors.stopped
}
}

private def registerOffer(offer: Offer, pathId_opt: Option[ByteVector32], quantityAvailable: Long): Behavior[Command] = {
nodeParams.db.offers.addOffer(offer, pathId_opt, quantityAvailable)
private def registerOffer(offer: Offer, pathId_opt: Option[ByteVector32]): Behavior[Command] = {
nodeParams.db.offers.addOffer(offer, pathId_opt)
offerManager ! OfferManager.RegisterOffer(offer, None, pathId_opt, defaultOfferHandler)
replyTo ! Right(offer)
Behaviors.stopped
Expand Down

0 comments on commit a2c6e0b

Please sign in to comment.