Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offers without extra plugin #2976

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,17 @@ import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
import fr.acinq.eclair.db.{IncomingPayment, OutgoingPayment, OutgoingPaymentStatus}
import fr.acinq.eclair.io.Peer.{GetPeerInfo, OpenChannelResponse, PeerInfo}
import fr.acinq.eclair.io._
import fr.acinq.eclair.message.OnionMessages.{IntermediateNode, Recipient}
import fr.acinq.eclair.message.{OnionMessages, Postman}
import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.offer.{OfferCreator, OfferManager}
import fr.acinq.eclair.payment.receive.MultiPartHandler.ReceiveStandardPayment
import fr.acinq.eclair.payment.relay.Relayer.{ChannelBalance, GetOutgoingChannels, OutgoingChannels, RelayFees}
import fr.acinq.eclair.payment.send.PaymentInitiator._
import fr.acinq.eclair.payment.send.{ClearRecipient, OfferPayment, PaymentIdentifier}
import fr.acinq.eclair.router.Router
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.wire.protocol.OfferTypes.Offer
import fr.acinq.eclair.wire.protocol.OfferTypes.{Offer, OfferAbsoluteExpiry, OfferIssuer, OfferQuantityMax, OfferTlv}
import fr.acinq.eclair.wire.protocol._
import grizzled.slf4j.Logging
import scodec.bits.ByteVector
Expand Down Expand Up @@ -120,6 +122,12 @@ trait Eclair {

def receive(description: Either[String, ByteVector32], amount_opt: Option[MilliSatoshi], expire_opt: Option[Long], fallbackAddress_opt: Option[String], paymentPreimage_opt: Option[ByteVector32], privateChannelIds_opt: Option[List[ByteVector32]])(implicit timeout: Timeout): Future[Bolt11Invoice]

def createOffer(description_opt: Option[String], amount_opt: Option[MilliSatoshi], expiry_opt: Option[TimestampSecond], issuer_opt: Option[String], firstNodeId_opt: Option[PublicKey], hideNodeId: Boolean)(implicit timeout: Timeout): Future[Offer]

def disableOffer(offer: Offer)(implicit timeout: Timeout): Unit

def listOffers(onlyActive: Boolean = true)(implicit timeout: Timeout): Future[Seq[Offer]]

def newAddress(): Future[String]

def receivedInfo(paymentHash: ByteVector32)(implicit timeout: Timeout): Future[Option[IncomingPayment]]
Expand Down Expand Up @@ -370,6 +378,24 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
}
}

override def createOffer(description_opt: Option[String], amount_opt: Option[MilliSatoshi], expiry_opt: Option[TimestampSecond], issuer_opt: Option[String], firstNodeId_opt: Option[PublicKey], hideNodeId: Boolean)(implicit timeout: Timeout): Future[Offer] = {
val offerCreator = appKit.system.spawnAnonymous(OfferCreator(appKit.nodeParams, appKit.router, appKit.offerManager, appKit.defaultOfferHandler))
offerCreator.ask[Either[String, Offer]](replyTo => OfferCreator.Create(replyTo, description_opt, amount_opt, expiry_opt, issuer_opt, firstNodeId_opt, hideNodeId))
.flatMap {
case Left(errorMessage) => Future.failed(new Exception(errorMessage))
case Right(offer) => Future.successful(offer)
}
}

override def disableOffer(offer: Offer)(implicit timeout: Timeout): Unit = {
appKit.offerManager ! OfferManager.DisableOffer(offer)
appKit.nodeParams.db.managedOffers.disableOffer(offer)
}

override def listOffers(onlyActive: Boolean = true)(implicit timeout: Timeout): Future[Seq[Offer]] = Future {
appKit.nodeParams.db.managedOffers.listOffers(onlyActive).map(_.offer)
}

override def newAddress(): Future[String] = {
appKit.wallet match {
case w: BitcoinCoreClient => w.getReceiveAddress()
Expand Down
6 changes: 5 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import fr.acinq.eclair.db.FileBackupHandler.FileBackupParams
import fr.acinq.eclair.db.{Databases, DbEventHandler, FileBackupHandler, PeerStorageCleaner}
import fr.acinq.eclair.io._
import fr.acinq.eclair.message.Postman
import fr.acinq.eclair.payment.offer.OfferManager
import fr.acinq.eclair.payment.offer.{DefaultHandler, OfferManager}
import fr.acinq.eclair.payment.receive.PaymentHandler
import fr.acinq.eclair.payment.relay.{AsyncPaymentTriggerer, PostRestartHtlcCleaner, Relayer}
import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator}
Expand Down Expand Up @@ -362,6 +362,8 @@ class Setup(val datadir: File,
dbEventHandler = system.actorOf(SimpleSupervisor.props(DbEventHandler.props(nodeParams), "db-event-handler", SupervisorStrategy.Resume))
register = system.actorOf(SimpleSupervisor.props(Register.props(), "register", SupervisorStrategy.Resume))
offerManager = system.spawn(Behaviors.supervise(OfferManager(nodeParams, router, paymentTimeout = 1 minute)).onFailure(typed.SupervisorStrategy.resume), name = "offer-manager")
defaultOfferHandler = system.spawn(Behaviors.supervise(DefaultHandler(nodeParams, router)).onFailure(typed.SupervisorStrategy.resume), name = "default-offer-handler")
_ = for (offer <- nodeParams.db.managedOffers.listOffers(onlyActive = true)) offerManager ! OfferManager.RegisterOffer(offer.offer, None, offer.pathId_opt, defaultOfferHandler)
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler", SupervisorStrategy.Resume))
triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "async-payment-triggerer")
peerReadyManager = system.spawn(Behaviors.supervise(PeerReadyManager()).onFailure(typed.SupervisorStrategy.restart), name = "peer-ready-manager")
Expand Down Expand Up @@ -402,6 +404,7 @@ class Setup(val datadir: File,
balanceActor = balanceActor,
postman = postman,
offerManager = offerManager,
defaultOfferHandler = defaultOfferHandler,
wallet = bitcoinClient)

zmqBlockTimeout = after(5 seconds, using = system.scheduler)(Future.failed(BitcoinZMQConnectionTimeoutException))
Expand Down Expand Up @@ -471,6 +474,7 @@ case class Kit(nodeParams: NodeParams,
balanceActor: typed.ActorRef[BalanceActor.Command],
postman: typed.ActorRef[Postman.Command],
offerManager: typed.ActorRef[OfferManager.Command],
defaultOfferHandler: typed.ActorRef[OfferManager.HandlerCommand],
wallet: OnChainWallet with OnchainPubkeyCache)

object Kit {
Expand Down
5 changes: 5 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ trait Databases {
def channels: ChannelsDb
def peers: PeersDb
def payments: PaymentsDb
def managedOffers: OffersDb
def pendingCommands: PendingCommandsDb
def liquidity: LiquidityDb
//@formatter:on
Expand All @@ -66,6 +67,7 @@ object Databases extends Logging {
channels: SqliteChannelsDb,
peers: SqlitePeersDb,
payments: SqlitePaymentsDb,
managedOffers: SqliteOffersDb,
pendingCommands: SqlitePendingCommandsDb,
private val backupConnection: Connection) extends Databases with FileBackup {
override def backup(backupFile: File): Unit = SqliteUtils.using(backupConnection.createStatement()) {
Expand All @@ -85,6 +87,7 @@ object Databases extends Logging {
channels = new SqliteChannelsDb(eclairJdbc),
peers = new SqlitePeersDb(eclairJdbc),
payments = new SqlitePaymentsDb(eclairJdbc),
managedOffers = new SqliteOffersDb(eclairJdbc),
pendingCommands = new SqlitePendingCommandsDb(eclairJdbc),
backupConnection = eclairJdbc
)
Expand All @@ -97,6 +100,7 @@ object Databases extends Logging {
channels: PgChannelsDb,
peers: PgPeersDb,
payments: PgPaymentsDb,
managedOffers: PgOffersDb,
pendingCommands: PgPendingCommandsDb,
dataSource: HikariDataSource,
lock: PgLock) extends Databases with ExclusiveLock {
Expand Down Expand Up @@ -157,6 +161,7 @@ object Databases extends Logging {
channels = new PgChannelsDb,
peers = new PgPeersDb,
payments = new PgPaymentsDb,
managedOffers = new PgOffersDb,
pendingCommands = new PgPendingCommandsDb,
dataSource = ds,
lock = lock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.relay.OnTheFlyFunding
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
import fr.acinq.eclair.router.Router
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAddress, NodeAnnouncement}
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAddress, NodeAnnouncement, OfferTypes}
import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli, TimestampSecond}
import grizzled.slf4j.Logging
import scodec.bits.ByteVector
Expand All @@ -36,6 +36,7 @@ case class DualDatabases(primary: Databases, secondary: Databases) extends Datab
override val channels: ChannelsDb = DualChannelsDb(primary.channels, secondary.channels)
override val peers: PeersDb = DualPeersDb(primary.peers, secondary.peers)
override val payments: PaymentsDb = DualPaymentsDb(primary.payments, secondary.payments)
override val managedOffers: OffersDb = DualOffersDb(primary.managedOffers, secondary.managedOffers)
override val pendingCommands: PendingCommandsDb = DualPendingCommandsDb(primary.pendingCommands, secondary.pendingCommands)
override val liquidity: LiquidityDb = DualLiquidityDb(primary.liquidity, secondary.liquidity)

Expand Down Expand Up @@ -400,6 +401,26 @@ case class DualPaymentsDb(primary: PaymentsDb, secondary: PaymentsDb) extends Pa
}
}

case class DualOffersDb(primary: OffersDb, secondary: OffersDb) extends OffersDb {

private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-offers").build()))

override def addOffer(offer: OfferTypes.Offer, pathId_opt: Option[ByteVector32]): Unit = {
runAsync(secondary.addOffer(offer, pathId_opt))
primary.addOffer(offer, pathId_opt)
}

override def disableOffer(offer: OfferTypes.Offer): Unit = {
runAsync(secondary.disableOffer(offer))
primary.disableOffer(offer)
}

override def listOffers(onlyActive: Boolean): Seq[OfferData] = {
runAsync(secondary.listOffers(onlyActive))
primary.listOffers(onlyActive)
}
}

case class DualPendingCommandsDb(primary: PendingCommandsDb, secondary: PendingCommandsDb) extends PendingCommandsDb {

private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-pending-commands").build()))
Expand Down
45 changes: 45 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/OffersDb.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2024 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.db

import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.eclair.wire.protocol.OfferTypes.Offer

case class OfferData(offer: Offer, pathId_opt: Option[ByteVector32])

/**
* Database for offers fully managed by eclair, as opposed to offers managed by a plugin.
*/
trait OffersDb {
/**
* Add an offer managed by eclair.
* @param pathId_opt If the offer uses a blinded path, this is the corresponding pathId.
*/
def addOffer(offer: Offer, pathId_opt: Option[ByteVector32]): Unit

/**
* Disable an offer. The offer is still stored but new invoice requests and new payment attempts for already emitted
* invoices will be rejected. To reenable an offer, use `addOffer`.
*/
def disableOffer(offer: Offer): Unit

/**
* List offers managed by eclair.
* @param onlyActive Whether to return only active offers or also disabled ones.
*/
def listOffers(onlyActive: Boolean): Seq[OfferData]
}
76 changes: 76 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgOffersDb.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2024 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.db.pg

import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.db.{OfferData, OffersDb}
import fr.acinq.eclair.db.pg.PgUtils.PgLock
import fr.acinq.eclair.wire.protocol.OfferTypes
import grizzled.slf4j.Logging

import java.sql.Statement
import javax.sql.DataSource

object PgOffersDb {
val DB_NAME = "offers"
val CURRENT_VERSION = 1
}

class PgOffersDb(implicit ds: DataSource, lock: PgLock) extends OffersDb with Logging {

import PgPaymentsDb._
import PgUtils.ExtendedResultSet._
import PgUtils._
import lock._

inTransaction { pg =>
using(pg.createStatement()) { statement =>
getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE SCHEMA offers")

statement.executeUpdate("CREATE TABLE offers.managed (offer_id TEXT NOT NULL PRIMARY KEY, offer TEXT NOT NULL, path_id TEXT, created_at TIMESTAMP WITH TIME ZONE NOT NULL, is_active BOOLEAN NOT NULL)")

statement.executeUpdate("CREATE INDEX offer_is_active_idx ON offers.managed(is_active)")
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
setVersion(statement, DB_NAME, CURRENT_VERSION)
}
}

override def addOffer(offer: OfferTypes.Offer, pathId_opt: Option[ByteVector32]): Unit = withMetrics("offers/add", DbBackends.Postgres){
withLock { pg =>
using(pg.prepareStatement("INSERT INTO offers.managed (offer_id, offer, path_id, created_at, is_active) VALUES (?, ?, ?, NOW, TRUE)")) { statement =>
statement.setString(1, offer.offerId.toHex)
statement.setString(2, offer.toString)
pathId_opt match {
case Some(pathId) => statement.setString(3, pathId.toHex)
case None => statement.setNull(3, java.sql.Types.VARCHAR)
}

statement.executeUpdate()
}
}
}

override def disableOffer(offer: OfferTypes.Offer): Unit = ???

override def listOffers(onlyActive: Boolean): Seq[OfferData] = ???
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2024 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.db.sqlite

import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.eclair.db.{OfferData, OffersDb}
import fr.acinq.eclair.wire.protocol.OfferTypes
import grizzled.slf4j.Logging

import java.sql.Connection

class SqliteOffersDb(val sqlite: Connection) extends OffersDb with Logging {

override def addOffer(offer: OfferTypes.Offer, pathId_opt: Option[ByteVector32]): Unit = ???

override def disableOffer(offer: OfferTypes.Offer): Unit = ???

override def listOffers(onlyActive: Boolean): Seq[OfferData] = ???
}
Loading
Loading