Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store remote features in PeersDb #2978

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.relay.OnTheFlyFunding
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
import fr.acinq.eclair.router.Router
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAddress, NodeAnnouncement}
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, NodeInfo}
import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli, TimestampSecond}
import grizzled.slf4j.Logging
import scodec.bits.ByteVector
Expand Down Expand Up @@ -264,22 +264,22 @@ case class DualPeersDb(primary: PeersDb, secondary: PeersDb) extends PeersDb {

private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-peers").build()))

override def addOrUpdatePeer(nodeId: Crypto.PublicKey, address: NodeAddress): Unit = {
runAsync(secondary.addOrUpdatePeer(nodeId, address))
primary.addOrUpdatePeer(nodeId, address)
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, nodeInfo: NodeInfo): Unit = {
runAsync(secondary.addOrUpdatePeer(nodeId, nodeInfo))
primary.addOrUpdatePeer(nodeId, nodeInfo)
}

override def removePeer(nodeId: Crypto.PublicKey): Unit = {
runAsync(secondary.removePeer(nodeId))
primary.removePeer(nodeId)
}

override def getPeer(nodeId: Crypto.PublicKey): Option[NodeAddress] = {
override def getPeer(nodeId: Crypto.PublicKey): Option[NodeInfo] = {
runAsync(secondary.getPeer(nodeId))
primary.getPeer(nodeId)
}

override def listPeers(): Map[Crypto.PublicKey, NodeAddress] = {
override def listPeers(): Map[Crypto.PublicKey, NodeInfo] = {
runAsync(secondary.listPeers())
primary.listPeers()
}
Expand Down
11 changes: 6 additions & 5 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@
package fr.acinq.eclair.db

import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.TimestampSecond
import fr.acinq.eclair.{Features, InitFeature, TimestampSecond}
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
import fr.acinq.eclair.wire.protocol.NodeAddress
import fr.acinq.eclair.wire.protocol.{NodeAddress, NodeInfo}
import scodec.bits.ByteVector

/** The PeersDb contains information about our direct peers, with whom we have or had channels. */
trait PeersDb {

def addOrUpdatePeer(nodeId: PublicKey, address: NodeAddress): Unit
def addOrUpdatePeer(nodeId: PublicKey, nodeInfo: NodeInfo): Unit

def removePeer(nodeId: PublicKey): Unit

def getPeer(nodeId: PublicKey): Option[NodeAddress]
def getPeer(nodeId: PublicKey): Option[NodeInfo]

def listPeers(): Map[PublicKey, NodeAddress]
def listPeers(): Map[PublicKey, NodeInfo]

def addOrUpdateRelayFees(nodeId: PublicKey, fees: RelayFees): Unit

Expand Down
89 changes: 59 additions & 30 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@ package fr.acinq.eclair.db.pg

import fr.acinq.bitcoin.scalacompat.Crypto
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.{MilliSatoshi, TimestampSecond}
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.db.PeersDb
import fr.acinq.eclair.db.pg.PgUtils.PgLock
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{Features, MilliSatoshi, TimestampSecond}
import grizzled.slf4j.Logging
import scodec.bits.{BitVector, ByteVector}
import scodec.bits.ByteVector

import java.sql.Statement
import javax.sql.DataSource

object PgPeersDb {
val DB_NAME = "peers"
val CURRENT_VERSION = 4
val CURRENT_VERSION = 5
}

class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logging {
Expand All @@ -59,16 +59,22 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON local.peer_storage(removed_peer_at)")
}

def migration45(statement: Statement): Unit = {
statement.executeUpdate("ALTER TABLE local.peers RENAME COLUMN data TO node_address")
statement.executeUpdate("ALTER TABLE local.peers ALTER COLUMN node_address DROP NOT NULL")
statement.executeUpdate("ALTER TABLE local.peers ADD COLUMN node_features BYTEA")
}

using(pg.createStatement()) { statement =>
getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local")
statement.executeUpdate("CREATE TABLE local.peers (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL)")
statement.executeUpdate("CREATE TABLE local.peers (node_id TEXT NOT NULL PRIMARY KEY, node_address BYTEA, node_features BYTEA)")
statement.executeUpdate("CREATE TABLE local.relay_fees (node_id TEXT NOT NULL PRIMARY KEY, fee_base_msat BIGINT NOT NULL, fee_proportional_millionths BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE local.peer_storage (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, last_updated_at TIMESTAMP WITH TIME ZONE NOT NULL, removed_peer_at TIMESTAMP WITH TIME ZONE)")

statement.executeUpdate("CREATE INDEX removed_peer_at_idx ON local.peer_storage(removed_peer_at)")
case Some(v@(1 | 2 | 3)) =>
case Some(v@(1 | 2 | 3 | 4)) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
if (v < 2) {
migration12(statement)
Expand All @@ -79,26 +85,47 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
if (v < 4) {
migration34(statement)
}
if (v < 5) {
migration45(statement)
}
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
setVersion(statement, DB_NAME, CURRENT_VERSION)
}
}

override def addOrUpdatePeer(nodeId: Crypto.PublicKey, nodeaddress: NodeAddress): Unit = withMetrics("peers/add-or-update", DbBackends.Postgres) {
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, nodeInfo: NodeInfo): Unit = withMetrics("peers/add-or-update", DbBackends.Postgres) {
withLock { pg =>
val data = CommonCodecs.nodeaddress.encode(nodeaddress).require.toByteArray
using(pg.prepareStatement(
"""
| INSERT INTO local.peers (node_id, data)
| VALUES (?, ?)
| ON CONFLICT (node_id)
| DO UPDATE SET data = EXCLUDED.data ;
| """.stripMargin)) { statement =>
statement.setString(1, nodeId.value.toHex)
statement.setBytes(2, data)
statement.executeUpdate()
nodeInfo.address_opt match {
case Some(address) =>
val encodedAddress = CommonCodecs.nodeaddress.encode(address).require.toByteArray
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(nodeInfo.features).require.toByteArray
using(pg.prepareStatement(
"""
| INSERT INTO local.peers (node_id, node_address, node_features)
| VALUES (?, ?, ?)
| ON CONFLICT (node_id)
| DO UPDATE SET node_address = EXCLUDED.node_address, node_features = EXCLUDED.node_features
|""".stripMargin)) { statement =>
statement.setString(1, nodeId.value.toHex)
statement.setBytes(2, encodedAddress)
statement.setBytes(3, encodedFeatures)
statement.executeUpdate()
}
case None =>
t-bast marked this conversation as resolved.
Show resolved Hide resolved
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(nodeInfo.features).require.toByteArray
using(pg.prepareStatement(
"""
| INSERT INTO local.peers (node_id, node_address, node_features)
| VALUES (?, NULL, ?)
| ON CONFLICT (node_id)
| DO UPDATE SET node_features = EXCLUDED.node_features
|""".stripMargin)) { statement =>
statement.setString(1, nodeId.value.toHex)
statement.setBytes(2, encodedFeatures)
statement.executeUpdate()
}
}
}
}
Expand Down Expand Up @@ -126,27 +153,29 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
}
}

override def getPeer(nodeId: PublicKey): Option[NodeAddress] = withMetrics("peers/get", DbBackends.Postgres) {
override def getPeer(nodeId: PublicKey): Option[NodeInfo] = withMetrics("peers/get", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("SELECT data FROM local.peers WHERE node_id=?")) { statement =>
using(pg.prepareStatement("SELECT node_address, node_features FROM local.peers WHERE node_id=?")) { statement =>
statement.setString(1, nodeId.value.toHex)
statement.executeQuery()
.mapCodec(CommonCodecs.nodeaddress)
.headOption
statement.executeQuery().map { rs =>
val nodeAddress_opt = rs.getBitVectorOpt("node_address").map(CommonCodecs.nodeaddress.decode(_).require.value)
val nodeFeatures_opt = rs.getBitVectorOpt("node_features").map(CommonCodecs.initFeaturesCodec.decode(_).require.value)
NodeInfo(nodeFeatures_opt.getOrElse(Features.empty), nodeAddress_opt)
}.headOption
}
}
}

override def listPeers(): Map[PublicKey, NodeAddress] = withMetrics("peers/list", DbBackends.Postgres) {
override def listPeers(): Map[PublicKey, NodeInfo] = withMetrics("peers/list", DbBackends.Postgres) {
withLock { pg =>
using(pg.createStatement()) { statement =>
statement.executeQuery("SELECT node_id, data FROM local.peers")
using(pg.prepareStatement("SELECT node_id, node_address, node_features FROM local.peers")) { statement =>
statement.executeQuery()
.map { rs =>
val nodeid = PublicKey(rs.getByteVectorFromHex("node_id"))
val nodeaddress = CommonCodecs.nodeaddress.decode(BitVector(rs.getBytes("data"))).require.value
nodeid -> nodeaddress
}
.toMap
val nodeId = PublicKey(rs.getByteVectorFromHex("node_id"))
val nodeAddress_opt = rs.getBitVectorOpt("node_address").map(CommonCodecs.nodeaddress.decode(_).require.value)
val nodeFeatures_opt = rs.getBitVectorOpt("node_features").map(CommonCodecs.initFeaturesCodec.decode(_).require.value)
nodeId -> NodeInfo(nodeFeatures_opt.getOrElse(Features.empty), nodeAddress_opt)
}.toMap
}
}
}
Expand Down
Loading
Loading