Skip to content

Commit

Permalink
multi: async grpc lnmd+lndmd, update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
bitromortac committed Apr 25, 2022
1 parent a7466c5 commit 92a780a
Show file tree
Hide file tree
Showing 15 changed files with 932 additions and 461 deletions.
3 changes: 3 additions & 0 deletions lndmanage/grpc_compiled/build_grpc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ python -m grpc_tools.protoc --proto_path=googleapis:. --python_out=. --grpc_pyth
python -m grpc_tools.protoc --proto_path=googleapis:. --python_out=. --grpc_python_out=. router.proto
python -m grpc_tools.protoc --proto_path=googleapis:. --python_out=. --grpc_python_out=. walletkit.proto
python -m grpc_tools.protoc --proto_path=googleapis:. --python_out=. --grpc_python_out=. signer.proto
python -m grpc_tools.protoc --proto_path=googleapis:. --python_out=. --grpc_python_out=. manager.proto

# fix import paths
sed -i -- 's@import lightning_pb2 as lightning__pb2@from lndmanage.grpc_compiled import lightning_pb2 as lightning__pb2@' lightning_pb2_grpc.py
Expand All @@ -30,3 +31,5 @@ sed -i -- 's@import signer_pb2 as signer__pb2@from lndmanage.grpc_compiled impor

sed -i -- 's@import signer_pb2 as signer__pb2@from lndmanage.grpc_compiled import signer_pb2 as signer__pb2@' walletkit_pb2_grpc.py
sed -i -- 's@import walletkit_pb2 as walletkit__pb2@from lndmanage.grpc_compiled import walletkit_pb2 as walletkit__pb2@' walletkit_pb2_grpc.py

sed -i -- 's@import manager_pb2 as manager__pb2@from lndmanage.grpc_compiled import manager_pb2 as manager__pb2@' manager_pb2_grpc.py
25 changes: 25 additions & 0 deletions lndmanage/grpc_compiled/manager.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
syntax = "proto3";

option objc_class_prefix = "MNG";

package managerpc;

// blah.
service Mangager {
// blah.
rpc RunningServices(RunningServicesRequest) returns (RunningServicesResponse) {}
}

// blah.
message RunningServicesRequest {
}

// blah.
message RunningServicesResponse {
repeated RunningService services = 1;
}

// blah.
message RunningService {
string name = 1;
}
171 changes: 171 additions & 0 deletions lndmanage/grpc_compiled/manager_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 70 additions & 0 deletions lndmanage/grpc_compiled/manager_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

from lndmanage.grpc_compiled import manager_pb2 as manager__pb2


class MangagerStub(object):
"""blah.
"""

def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.RunningServices = channel.unary_unary(
'/managerpc.Mangager/RunningServices',
request_serializer=manager__pb2.RunningServicesRequest.SerializeToString,
response_deserializer=manager__pb2.RunningServicesResponse.FromString,
)


class MangagerServicer(object):
"""blah.
"""

def RunningServices(self, request, context):
"""blah.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_MangagerServicer_to_server(servicer, server):
rpc_method_handlers = {
'RunningServices': grpc.unary_unary_rpc_method_handler(
servicer.RunningServices,
request_deserializer=manager__pb2.RunningServicesRequest.FromString,
response_serializer=manager__pb2.RunningServicesResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'managerpc.Mangager', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))


# This class is part of an EXPERIMENTAL API.
class Mangager(object):
"""blah.
"""

@staticmethod
def RunningServices(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/managerpc.Mangager/RunningServices',
manager__pb2.RunningServicesRequest.SerializeToString,
manager__pb2.RunningServicesResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
61 changes: 35 additions & 26 deletions lndmanage/lib/chan_acceptor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Implements logic for accepting channels dynamically."""
import asyncio
from typing import TYPE_CHECKING
from google.protobuf import text_format
import textwrap

import lndmanage.grpc_compiled.lightning_pb2 as lnd

Expand Down Expand Up @@ -60,46 +62,53 @@ def configure(self):
)
)

async def manage_channel_openings(self):
async def accept_channels(self):
logger.info("Channel acceptor started.")
response_queue = asyncio.queues.Queue()
try:
# async way to use a bidirectional streaming grpc endpoint
# with an async iterator
async for r in self.node.async_rpc.ChannelAcceptor(
self.request_iterator(response_queue)):
await response_queue.put(r)
except asyncio.CancelledError:
logger.info("channel acceptor cancelled")
return

# Use an async bidirectional streaming grpc endpoint with an async iterator.
# Note: no exceptions escape from there, handle them inside the iterator.
async for r in self.node.async_rpc.ChannelAcceptor(
self.request_iterator(response_queue)):
if isinstance(r, Exception):
raise r
await response_queue.put(r)

async def request_iterator(self, channel_details: asyncio.Queue):
logger.info("channel acceptor started")
while True:
channel_detail = await channel_details.get()
if self.accept_channel(channel_detail):
yield lnd.ChannelAcceptResponse(
accept=True, pending_chan_id=channel_detail.pending_chan_id
)
else:
yield lnd.ChannelAcceptResponse(
accept=False, pending_chan_id=channel_detail.pending_chan_id
)
# Be careful, exceptions don't leave from here, only get logged.
try:
while True:
channel_detail = await channel_details.get()
if self.accept_channel(channel_detail):
yield lnd.ChannelAcceptResponse(
accept=True, pending_chan_id=channel_detail.pending_chan_id
)
else:
yield lnd.ChannelAcceptResponse(
accept=False, pending_chan_id=channel_detail.pending_chan_id
)
except asyncio.CancelledError:
logger.info("canceled")

def accept_channel(self, channel_detail) -> bool:
# be careful, exceptions from here seem to not get raised up to the main
# loop
# TODO: raise exceptions from here
logger.info(
f"about to make a decision about channel:\n{channel_detail}")
f"About to make a decision about a channel:")
logger.info(textwrap.indent(str(channel_detail), " "))

node_pubkey = channel_detail.node_pubkey.hex()
is_private = self.network_analysis.is_private(node_pubkey)
logger.info(f"is private {is_private}")

# We apply different policies for private or public channels.
if is_private:
if (self.min_size_private < channel_detail.funding_amt
< self.max_size_private):
logger.debug(f"Private channel accepted.")
return True
else:
if (self.min_size_public < channel_detail.funding_amt
< self.max_size_public):
logger.debug(f"Public channel accepted.")
return True

logger.debug(f"Channel open rejected.")
return False
Loading

0 comments on commit 92a780a

Please sign in to comment.