Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
itsneski committed Jun 3, 2022
2 parents 70cf0e4 + 43fe35c commit 553a280
Show file tree
Hide file tree
Showing 14 changed files with 209 additions and 91 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@


# Lightning Jet 🚀⚡️, or simply Jet

Fully automated rebalancer for LND Lightning nodes. Helps get an insight into peers' classification based on routing history, missed routing opportunities, and stuck htlcs.
Lightning Jet is a fully automated rebalancer for Lightning nodes. Jet optimizes channel liquidity allocation based on routing volume, missed routing opportunities (htlcs), and other variables.

Jet runs as a daemon (background process) on a broad range of [supported platforms](#supported-platforms). It classifies peers into inbound and outbound based on routing volume; it then rebalances the channels, ensuring sufficient liquidity (inbound and outbound) to route sats.

The mission of Lightning Jet is to help independent node operators compete in the ever-changing landscape of the Lightning Network, especially as big institutional players enter the space.
The mission of Lightning Jet is to help independent node operators compete in the ever-changing landscape of the Lightning Network as big institutional players enter the space.

Join [Lightning Jet telegram chat](https://t.me/lnjet).

Expand Down Expand Up @@ -222,6 +225,7 @@ Settings under `rebalancer` section:
|`enforceProfitability`|When set to true, jet will pause all non profitable automated rebalances, leaving only profitable rebalances. Monitor rebalance status by `jet monitor --status`|
|`minCapacity`|Sets minimum capacity (in sats) for channels to be included in automated rebalancing. For example, `"minCapacity": 500000` means that channels with capacity below or equal to `500000` sats will be excluded from automated rebalancing.|
|`buffer`|Minimum rebalance buffer in sats, overrides default value of `250`. Jet will warn when the delta between local and remote ppm for outbound and balanced peers is below the buffer.|
|`disabled`|When set to true, the setting disables automated rebalancer. You can still rebalance manually via `jet rebalance`. Requires restart of the daddy service `jet restart daddy` followed by `jet stop rebalancer`.|
|`exclude`|A list of nodes to exclude from auto rebalancing. Nodes can be excluded from inbound peers, outbound peers, or both. By default, nodes will be excluded from outbound peers when no further info is provided, meaning that excluded nodes won't be rebalanced into. E.g.`"exclude": ["11111111", "22222222:outbound", "33333333:inbound", "44444444:all"]` excludes nodes with ids `11111111` and `22222222` from outbound peers, node with id `33333333` from inbound peers and node with id `44444444` from both inbound and outbound peers.|

### Example:
Expand Down
3 changes: 2 additions & 1 deletion api/htlc-history.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ module.exports = {
total: withCommas(n.sum),
"%": n.p,
"d%": n.d,
ppm: n.ppm
ppm: n.ppm,
margin: n.margin
})
})
return newList;
Expand Down
34 changes: 27 additions & 7 deletions api/rebalance.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const {recordRebalanceFailure} = require('../db/utils');
const {recordRebalanceAvoid} = require('../db/utils');
const {listRebalanceAvoidSync} = require('../db/utils');
const {recordActiveRebalanceSync} = require('../db/utils');
const {deleteActiveRebalance} = require('../db/utils');
const {deleteActiveRebalanceSync} = require('../db/utils');
const {listPeersMapSync} = require('../lnd-api/utils');
const {getNodeFeeSync} = require('../lnd-api/utils');
const {rebalanceSync} = require('../bos/rebalance');
Expand All @@ -52,7 +52,7 @@ module.exports = ({from, to, amount, ppm = config.rebalancer.maxPpm || constants
throw new Error('from, to and amount are mandatory arguments');
}

console.log(date.format(new Date, 'MM/DD hh:mm A'));
console.log('\n' + date.format(new Date, 'MM/DD hh:mm:ss A'));
console.log('rebalancer is starting up');

var peerMap = listPeersMapSync(lndClient);
Expand Down Expand Up @@ -164,11 +164,13 @@ module.exports = ({from, to, amount, ppm = config.rebalancer.maxPpm || constants
console.log('time left:', maxRuntime, 'mins');
if (aggresiveMode) console.log('aggressive mode: on');
if (config.debugMode) console.log('debug mode: enabled');
console.log('----------------------------------------\n')

// record for jet monitor
const rebalanceId = recordActiveRebalanceSync({from: outId, to: inId, amount: AMOUNT, ppm, mins: maxRuntime});
if (rebalanceId === undefined) console.error('rebalance db record id is undefined');
if (rebalanceId) console.log('rebalance id:', rebalanceId);
else console.error('rebalance db record id is undefined');

console.log('----------------------------------------\n')

const startTime = Date.now();

Expand Down Expand Up @@ -462,8 +464,6 @@ module.exports = ({from, to, amount, ppm = config.rebalancer.maxPpm || constants
} // for
} catch(err) {
console.error('error running rebalance loop:', err);
} finally {
if (rebalanceId != undefined) deleteActiveRebalance(rebalanceId);
}

// record rebalance failure, success has already been recorded
Expand All @@ -474,6 +474,24 @@ module.exports = ({from, to, amount, ppm = config.rebalancer.maxPpm || constants

printStats(lndClient, nodeStats, nodeInfo);

if (rebalanceId) {
console.log('deleting rebalance record with id:', rebalanceId);
deleteActiveRebalanceSync(rebalanceId);
} else {
console.warn('can not delete rebalance record, id does not exist');
}

// each jet rebalance instance runs in a seraparate process; explicitly exit
// so that processes don't linger; this isn't ideal, but it ensures
// that node operators don't have to manually kill processs that
// are stuck. its unclear why some processes are getting stuck.
// this issue will become moot once jet moves to a single-process
// architecture. note that explicit process exit should not result in
// adverse side effects, e.g no pending db writes that may result
// in a corrupted db once interrupted
// https://github.com/itsneski/lightning-jet/issues/55
process.exit();

// str can either be a tag, a portion of node's alias, or node's pub id
function findId(str) {
if (tags[str]) return tags[str];
Expand All @@ -492,7 +510,7 @@ module.exports = ({from, to, amount, ppm = config.rebalancer.maxPpm || constants
// format for printing
function printStats() {
getNodesInfoSync(lndClient, Object.keys(nodeStats)).forEach(n => {
nodeInfo[n.node.pub_key] = n;
if (n) nodeInfo[n.node.pub_key] = n;
})
epoch = Math.floor(+new Date() / 1000);
let stats = Object.values(nodeStats);
Expand Down Expand Up @@ -525,6 +543,7 @@ module.exports = ({from, to, amount, ppm = config.rebalancer.maxPpm || constants
})

console.log('\n-------------------------------------------');
console.log(date.format(new Date, 'MM/DD hh:mm:ss A'));
console.log('finished rebalance from', OUT, 'to', IN);
console.log('last message:', lastMessage);
console.log('amount targeted:', numberWithCommas(AMOUNT));
Expand All @@ -537,6 +556,7 @@ module.exports = ({from, to, amount, ppm = config.rebalancer.maxPpm || constants
console.log('nodes that exceeded per hop ppm:', stringify(sortedMax));
console.log('low fee nodes:', stringify(lowFeeSorted));
console.log('\n-------------------------------------------');
console.log(date.format(new Date, 'MM/DD hh:mm:ss A'));
console.log('finished rebalance from', OUT, 'to', IN);
console.log('last message:', lastMessage);
console.log('amount targeted:', numberWithCommas(AMOUNT));
Expand Down
17 changes: 5 additions & 12 deletions api/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -443,20 +443,13 @@ module.exports = {
let list = dbUtils.listActiveRebalancesSync();
if (!list || list.length === 0) return;

// clean up the list, remove processes that no longer exist
let updated = [];
let formatted = [];
list.forEach(l => {
if (module.exports.isRunningPidSync(l.pid)) {
// recalculate minites left
let minsLeft = l.mins - Math.round((Date.now() - l.date)/(60 * 1000));
updated.push({from:l.from_node, to:l.to_node, amount:l.amount, ppm:l.ppm, mins:minsLeft});
} else {
// for whatever reason the record lingers even though the process
// is gone. clean up
dbUtils.deleteActiveRebalance(l.rowid);
}
// recalculate minites left
let minsLeft = l.mins - Math.round((Date.now() - l.date)/(60 * 1000));
formatted.push({from:l.from_node, to:l.to_node, amount:l.amount, ppm:l.ppm, mins:minsLeft});
})
return updated;
return formatted;
},
readLastLineSync: function(file) {
let lastLine;
Expand Down
20 changes: 20 additions & 0 deletions bos/connect.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// related to https://github.com/itsneski/lightning-jet/issues/79
// jet calls bos rebalance api directly, it does not require bos to be installed
// bos rebalance needs lnd api handle. can't re-use existing lnd handle
// as bos needs the one from ln-service.

const fs = require('fs');
const config = require('../api/config');
const lnService = require('ln-service');

const macaroon = fs.readFileSync(config.macaroonPath).toString('base64');
const tlsCert = fs.readFileSync(config.tlsCertPath).toString('base64');
const address = config.serverAddress || 'localhost:10009';

const {lnd} = lnService.authenticatedLndGrpc({
cert: tlsCert,
macaroon: macaroon,
socket: address
})

module.exports = lnd;
44 changes: 22 additions & 22 deletions bos/rebalance.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
const importLazy = require('import-lazy')(require);
const swaps = importLazy('balanceofsatoshis/swaps');
const {readFile} = require('fs');
const lnd = importLazy('balanceofsatoshis/lnd');
const lndForNode = (logger, node) => lnd.authenticatedLnd({logger, node});
const {parseResult, parseError, parseNodes} = require('./parser');
const lndHandle = importLazy('./connect');

module.exports = {
rebalanceSync(args) {
Expand All @@ -20,7 +19,13 @@ module.exports = {
done = true;
}
}
module.exports.rebalance(args, callback);
// async rebalance returns a promise. make sure to catch exceptions
// and gracefully exit the deasync loop
module.exports.rebalance(args, callback).catch((err) => {
console.error('rebalanceSync: error calling rebalance:', err);
error = err;
done = true;
})
while(done === undefined) {
require('deasync').runLoopOnce();
}
Expand Down Expand Up @@ -64,7 +69,6 @@ module.exports = {
warn: (msg) => { return args.logger.warn(msg) },
error: (msg) => { return args.logger.error(msg) }
}
const lndHandle = await lndForNode(mylogger);

const callback = (err, res) => {
try {
Expand All @@ -77,25 +81,21 @@ module.exports = {
}

return new Promise((resolve, reject) => {
try {
if (global.testModeOn) console.log('rebalance from:', args.from, 'to:', args.to, 'amount:', args.amount, 'max fee:', args.maxFee, 'max fee rate:', args.maxFeeRate, 'mins:', args.mins);
if (global.testModeOn) console.log('avoid:', args.avoid);
if (global.testModeOn) console.log('rebalance from:', args.from, 'to:', args.to, 'amount:', args.amount, 'max fee:', args.maxFee, 'max fee rate:', args.maxFeeRate, 'mins:', args.mins);
if (global.testModeOn) console.log('avoid:', args.avoid);

swaps.manageRebalance({
logger: mylogger,
avoid: args.avoid,
fs: {getFile: readFile},
out_through: args.from,
in_through: args.to,
lnd: lndHandle.lnd,
max_fee: args.maxFee,
max_fee_rate: args.maxFeeRate,
max_rebalance: args.amount,
timeout_minutes: args.mins,
}, callback)
} catch(err) {
reject(mylogger.error({err}));
}
swaps.manageRebalance({
logger: mylogger,
avoid: args.avoid,
fs: {getFile: readFile},
out_through: args.from,
in_through: args.to,
lnd: lndHandle,
max_fee: args.maxFee,
max_fee_rate: args.maxFeeRate,
max_rebalance: args.amount,
timeout_minutes: args.mins,
}, callback);
})
}
}
77 changes: 49 additions & 28 deletions db/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,29 @@ module.exports = {
return error;
}
},
deleteActiveRebalance(rowid) {
const pref = 'deleteActiveRebalance:';
let db = getHandle();
try {
db.serialize(function() {
let cmd = 'DELETE FROM ' + ACTIVE_REBALANCE_TABLE + ' WHERE rowid = ' + rowid;
executeDb(db, cmd, (err) => {
if (err) console.error(pref, 'error:', err);
deleteActiveRebalanceSync(pid) {
const pref = 'deleteActiveRebalanceSync:';
if (doIt()) {
// retry in case of an error
console.log(pref, 'retrying due to an error');
doIt();
}

function doIt() {
let error;
let db = getHandle();
try {
db.serialize(function() {
let cmd = 'DELETE FROM ' + ACTIVE_REBALANCE_TABLE + ' WHERE pid = ' + pid;
error = executeDbSync(db, cmd);
})
})
} catch(error) {
console.error('deleteActiveRebalance:', error.message);
} finally {
closeHandle(db);
} catch(err) {
console.error(pref, err.message);
error = err;
} finally {
closeHandle(db);
}
return error;
}
},
listActiveRebalancesSync() {
Expand All @@ -140,22 +149,33 @@ module.exports = {
},
// pid is optional, used for testing
recordActiveRebalanceSync({from, to, amount, ppm, mins}, pid) {
let db = getHandle();
let rowid;
try {
const proc = pid || require('process').pid;
db.serialize(() => {
const vals = constructInsertString([Date.now(), from, to, amount, ppm, mins, proc]);
const cols = '(date, from_node, to_node, amount, ppm, mins, pid)';
let cmd = 'INSERT INTO ' + ACTIVE_REBALANCE_TABLE + ' ' + cols + ' VALUES (' + vals + ')';
rowid = execInsertDbSync(db, cmd); // rowid
})
} catch(error) {
console.error('recordActiveRebalanceSync:', error.message);
} finally {
closeHandle(db);
const pref = 'recordActiveRebalanceSync:';
let id = doIt();
if (!id) {
console.log(pref, 'retrying due to an error');
id = doIt();
}
return id;

function doIt() {
let db = getHandle();
let ret;
try {
const proc = pid || require('process').pid;
db.serialize(() => {
const vals = constructInsertString([Date.now(), from, to, amount, ppm, mins, proc]);
const cols = '(date, from_node, to_node, amount, ppm, mins, pid)';
let cmd = 'INSERT INTO ' + ACTIVE_REBALANCE_TABLE + ' ' + cols + ' VALUES (' + vals + ')';
executeDbSync(db, cmd);
ret = proc; // process id
})
} catch(error) {
console.error(pref, error.message);
} finally {
closeHandle(db);
}
return ret;
}
return rowid;
},
getValByFilterSync(filter) {
let db = getHandle();
Expand Down Expand Up @@ -701,6 +721,7 @@ function createChannelEventsTable(db) {

function createActiveRebalanceTable(db) {
executeDbSync(db, "CREATE TABLE IF NOT EXISTS " + ACTIVE_REBALANCE_TABLE + " (date INTEGER NOT NULL, from_node TEXT NOT NULL, to_node TEXT NOT NULL, amount INTEGER NOT NULL, ppm INTEGER, mins INTEGER, pid INTEGER NOT NULL, extra TEXT)");
executeDbSync(db, "CREATE UNIQUE INDEX active_rebalance_pid_index ON " + ACTIVE_REBALANCE_TABLE + "(pid)");
}

function createFeeHistoryTable(db) {
Expand Down
Loading

0 comments on commit 553a280

Please sign in to comment.