Skip to content

Commit

Permalink
Added auxiliary methods
Browse files Browse the repository at this point in the history
  • Loading branch information
Adriano Santos committed Oct 22, 2023
1 parent 58018cd commit 982bdeb
Show file tree
Hide file tree
Showing 11 changed files with 325 additions and 48 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ message Reply {

```dart
import 'package:spawn_app_example/src/generated/protos/domain.pb.dart';
import 'spawn_dart/spawn_dart.dart';
import 'package:spawn_dart/spawn_dart.dart';
@StatefulNamedActor(
'joe',
Expand All @@ -66,7 +66,7 @@ class JoeActor {

```dart
import 'package:spawn_app_example/joe_actor.dart';
import 'spawn_dart/spawn_dart.dart';
import 'package:spawn_dart/spawn_dart.dart';
void main() {
SpawnSystem()
Expand Down
1 change: 1 addition & 0 deletions example/spawn_app_example/bin/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ void main() {
SpawnSystem()
.create("spawn-system")
.withPort(8091)
.withProxyPort(9003)
.withStatefulNamedActor(JoeActor)
.start();
}
32 changes: 32 additions & 0 deletions lib/src/actor_handler.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,38 @@
import 'package:spawn_dart/src/protocol/eigr/functions/protocol/actors/protocol.pb.dart';

enum ActorKind { named, unnamed, pooled, proxy }

enum ActionDefinitionKind { normal, timer }

class ActionDefinition {
final ActionDefinitionKind kind;
final String name;
int seconds;

ActionDefinition(this.kind, this.name, {this.seconds = 0});

String get actionName {
return name;
}

ActionDefinitionKind get actionDefinitionKind {
return kind;
}

int get timerSeconds {
return seconds;
}
}

abstract class ActorHandler {
ActorKind getActorKind();
List<ActionDefinition> getActionDefinitions();
String getChannel();
int getDeactivatedTimeout();
String getRegisteredName();
int getSnapshotTimeout();
int getMaxPoolSize();
int getMinPoolSize();
bool isStateful();
ActorInvocationResponse handleInvoke(ActorInvocation invocation);
}
68 changes: 58 additions & 10 deletions lib/src/spawn_client.dart
Original file line number Diff line number Diff line change
@@ -1,27 +1,44 @@
import 'dart:io';

import 'dart:typed_data';
import 'package:http/http.dart' as http;
import 'package:spawn_dart/spawn_dart.dart';
import 'package:spawn_dart/src/actor_handler.dart';
import 'package:spawn_dart/src/protocol/eigr/functions/protocol/actors/actor.pb.dart';
import 'package:spawn_dart/src/protocol/eigr/functions/protocol/actors/protocol.pb.dart';

class SpawnClient {
static final _client = HttpClient();
static final _dartVersion = () {
final version = Platform.version;
return version.substring(0, version.indexOf(' '));
}();
static final _logger = Logger(
filter: SpawnLogFilter(),
printer: LogfmtPrinter(),
output: SimpleConsoleOutput(),
);

static final http.Client _client = http.Client();
static final _dartVersion = "Dart VM ${Platform.version}";
static const _headers = {
'content-type': 'application/octet-stream',
};
static final String _registerUrlPath = "/api/v1/system";
Map<String, ActorHandler> _actorHandlers = {};
static final String supportLibraryName = "spawn_dart";
static final String supportLibraryVersion = "1.0.0";
static final int protocolMinorVersion = 1;
static final int protocolMajorVersion = 1;

late String _spawnSystem;
Map<String, ActorHandler> _actorHandlers = {};
String _proxyHost = Platform.environment["PROXY_HTTP_HOST"] ?? "0.0.0.0";
int _proxyPort = int.parse(Platform.environment["PROXY_HTTP_PORT"] ?? "9001");

SpawnClient(Map<String, ActorHandler> actorHandlers) {
SpawnClient(String spawnSystem, Map<String, ActorHandler> actorHandlers) {
_spawnSystem = spawnSystem;
_actorHandlers = actorHandlers;
}

SpawnClient.withConnectionParams(
String host, int port, Map<String, ActorHandler> actorHandlers) {
SpawnClient.withConnectionParams(String host, int port, String spawnSystem,
Map<String, ActorHandler> actorHandlers) {
_proxyHost = host;
_proxyPort = port;
_spawnSystem = spawnSystem;
_actorHandlers = actorHandlers;
}

Expand All @@ -32,4 +49,35 @@ class SpawnClient {
int get proxyPort {
return _proxyPort;
}

Future<RegistrationResponse> register() async {
try {
ActorSystem actorSystem = ActorSystem.create();
ServiceInfo serviceInfo = ServiceInfo.create()
..protocolMinorVersion = protocolMinorVersion
..protocolMajorVersion = protocolMajorVersion
..serviceName = _spawnSystem
..serviceVersion = "UNKNOWN"
..serviceRuntime = _dartVersion
..supportLibraryName = supportLibraryName
..supportLibraryVersion = supportLibraryVersion;

RegistrationRequest request = RegistrationRequest.create()
..serviceInfo = serviceInfo
..actorSystem = actorSystem;

Uint8List requestBuffer = request.writeToBuffer();

_logger.i("Registering ActorSystem $_spawnSystem with $request");
var response = await _client.post(
Uri.http("$_proxyHost:$_proxyPort", _registerUrlPath),
headers: _headers,
body: requestBuffer);

_logger.i("Registration response status: $response.statusCode");
return RegistrationResponse.fromBuffer(response.bodyBytes);
} finally {
_client.close();
}
}
}
45 changes: 33 additions & 12 deletions lib/src/spawn_dart_base.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart' as shelf_io;
import 'package:spawn_dart/spawn_dart.dart';
import 'package:spawn_dart/src/actor_handler.dart';
import 'package:spawn_dart/src/protocol/eigr/functions/protocol/actors/protocol.pb.dart';
import 'package:spawn_dart/src/service.dart';
import 'package:spawn_dart/src/spawn_client.dart';
import 'package:spawn_dart/src/stateful_named_actor_handler.dart';
import 'package:spawn_dart/src/stateful_unnamed_actor_handler.dart';
import 'package:spawn_dart/src/stateless_named_actor_handler.dart';
Expand All @@ -19,71 +21,90 @@ class SpawnSystem {
);

final _watch = Stopwatch();
late int serverPort = int.parse(Platform.environment['PORT'] ?? '8080');
late String actorSystem;
Map<String, ActorHandler> actorHandlers = {};
String _proxyHost = Platform.environment["PROXY_HTTP_HOST"] ?? "0.0.0.0";
int _proxyPort = int.parse(Platform.environment["PROXY_HTTP_PORT"] ?? "9001");
late int _serverPort = int.parse(Platform.environment['PORT'] ?? '8080');
late String _actorSystem;
Map<String, ActorHandler> _actorHandlers = {};

SpawnSystem create(String system) {
actorSystem = system;
_actorSystem = system;
return this;
}

SpawnSystem withPort(int port) {
serverPort = port;
_serverPort = port;
return this;
}

SpawnSystem withProxyHost(String host) {
_proxyHost = host;
return this;
}

SpawnSystem withProxyPort(int port) {
_proxyPort = port;
return this;
}

SpawnSystem withStatefulNamedActor(Type entity) {
_logger.d('Registering StatefulNamedActor...');
ActorHandler actorHandler = StatefulNamedActorHandler(entity);
String actorName = actorHandler.getRegisteredName();
actorHandlers[actorName] = actorHandler;
_actorHandlers[actorName] = actorHandler;
return this;
}

SpawnSystem withStatefulUnNamedActor(Type entity) {
_logger.d('Registering StatefulUnNamedActor...');
ActorHandler actorHandler = StatefulUnNamedActorHandler(entity);
String actorName = actorHandler.getRegisteredName();
actorHandlers[actorName] = actorHandler;
_actorHandlers[actorName] = actorHandler;
return this;
}

SpawnSystem withStatelessNamedActor(Type entity) {
_logger.d('Registering StatelessNamedActor...');
ActorHandler actorHandler = StatelessNamedActorHandler(entity);
String actorName = actorHandler.getRegisteredName();
actorHandlers[actorName] = actorHandler;
_actorHandlers[actorName] = actorHandler;
return this;
}

SpawnSystem withStatelessUnNamedActor(Type entity) {
_logger.d('Registering StatelessUnNamedActor...');
ActorHandler actorHandler = StatelessUnNamedActorHandler(entity);
String actorName = actorHandler.getRegisteredName();
actorHandlers[actorName] = actorHandler;
_actorHandlers[actorName] = actorHandler;
return this;
}

SpawnSystem withStatelessPooledActor(Type entity) {
_logger.d('Registering StatelessPooledActor...');
ActorHandler actorHandler = StatelessPooledActorHandler(entity);
String actorName = actorHandler.getRegisteredName();
actorHandlers[actorName] = actorHandler;
_actorHandlers[actorName] = actorHandler;
return this;
}

Future<void> start() async {
final controller = Service(actorSystem, actorHandlers);
final SpawnClient spawnClient = SpawnClient.withConnectionParams(
_proxyHost, _proxyPort, _actorSystem, _actorHandlers);

final controller = Service(_actorSystem, _actorHandlers);

final server = await shelf_io.serve(
logRequests().addHandler(controller.handler),
InternetAddress.anyIPv4,
serverPort,
_serverPort,
);

_logger.i('Serving at http://${server.address.host}:${server.port}');

final RegistrationResponse registrationResponse =
await spawnClient.register();
_logger.i("Registration status $registrationResponse");

// Used for tracking uptime of the server.
_watch.start();
}
Expand Down
34 changes: 34 additions & 0 deletions lib/src/stateful_named_actor_handler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,40 @@ class StatefulNamedActorHandler implements ActorHandler {
return name;
}

@override
List<ActionDefinition> getActionDefinitions() {
var normalActions = actions.entries
.map((name) =>
ActionDefinition(ActionDefinitionKind.normal, name as String))
.toList();

// TODO: implement timer actions (normalActions + timerActions)
return normalActions;
}

@override
ActorKind getActorKind() => ActorKind.named;

@override
String getChannel() => _statefulNamedActorAnnotationInstance!.channel;

@override
int getDeactivatedTimeout() =>
_statefulNamedActorAnnotationInstance!.deactivatedTimeout;

@override
int getSnapshotTimeout() =>
_statefulNamedActorAnnotationInstance!.snapshotTimeout;

@override
int getMaxPoolSize() => 0;

@override
int getMinPoolSize() => 0;

@override
bool isStateful() => true;

@override
spawn_protocol.ActorInvocationResponse handleInvoke(
spawn_protocol.ActorInvocation invocation) {
Expand Down
48 changes: 42 additions & 6 deletions lib/src/stateful_unnamed_actor_handler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@ class StatefulUnNamedActorHandler implements ActorHandler {
as StatefulUnNamedActor);
}

@override
ActorInvocationResponse handleInvoke(ActorInvocation invocation) {
// TODO: implement invoke
throw UnimplementedError();
}

@override
String getRegisteredName() {
String? name = statefulUnNamedActorAnnotationInstance?.name;
Expand All @@ -41,4 +35,46 @@ class StatefulUnNamedActorHandler implements ActorHandler {

return name;
}

@override
List<ActionDefinition> getActionDefinitions() {
// TODO: implement getActionDefinitions
throw UnimplementedError();
}

@override
ActorKind getActorKind() => ActorKind.unnamed;

@override
String getChannel() {
// TODO: implement getChannel
throw UnimplementedError();
}

@override
int getDeactivatedTimeout() {
// TODO: implement getDeactivatedTimeout
throw UnimplementedError();
}

@override
int getSnapshotTimeout() {
// TODO: implement getSnapshotTimeout
throw UnimplementedError();
}

@override
int getMaxPoolSize() => 0;

@override
int getMinPoolSize() => 0;

@override
bool isStateful() => true;

@override
ActorInvocationResponse handleInvoke(ActorInvocation invocation) {
// TODO: implement invoke
throw UnimplementedError();
}
}
Loading

0 comments on commit 982bdeb

Please sign in to comment.