Skip to content

Commit

Permalink
Introduce ports (local and external)
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Guyot <[email protected]>
  • Loading branch information
pguyot committed Nov 25, 2024
1 parent 598f8c3 commit 45af8b6
Show file tree
Hide file tree
Showing 12 changed files with 353 additions and 68 deletions.
3 changes: 2 additions & 1 deletion src/libAtomVM/context.c
Original file line number Diff line number Diff line change
Expand Up @@ -440,10 +440,11 @@ static struct ResourceMonitor *context_monitors_handle_terminate(Context *ctx)
term_put_tuple_element(info_tuple, 1, ref);
if (ctx->native_handler != NULL) {
term_put_tuple_element(info_tuple, 2, PORT_ATOM);
term_put_tuple_element(info_tuple, 3, term_port_from_local_process_id(ctx->process_id));
} else {
term_put_tuple_element(info_tuple, 2, PROCESS_ATOM);
term_put_tuple_element(info_tuple, 3, term_from_local_process_id(ctx->process_id));
}
term_put_tuple_element(info_tuple, 3, term_from_local_process_id(ctx->process_id));
term_put_tuple_element(info_tuple, 4, ctx->exit_reason);

mailbox_send(target, info_tuple);
Expand Down
57 changes: 56 additions & 1 deletion src/libAtomVM/externalterm.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#define EXPORT_EXT 113
#define MAP_EXT 116
#define SMALL_ATOM_UTF8_EXT 119
#define V4_PORT_EXT 120
#define INVALID_TERM_SIZE -1

#define NEW_FLOAT_EXT_SIZE 9
Expand Down Expand Up @@ -422,6 +423,31 @@ static int serialize_term(uint8_t *buf, term t, GlobalContext *glb)
WRITE_32_UNALIGNED(buf + k + 8, term_to_external_node_creation(t));
}
return k + 12;
} else if (term_is_local_port(t)) {
if (!IS_NULL_PTR(buf)) {
buf[0] = V4_PORT_EXT;
}
size_t k = 1;
term node_name = glb->node_name;
uint32_t creation = node_name == NONODE_AT_NOHOST_ATOM ? 0 : glb->creation;
k += serialize_term(IS_NULL_PTR(buf) ? NULL : buf + k, node_name, glb);
if (!IS_NULL_PTR(buf)) {
WRITE_64_UNALIGNED(buf + k, term_to_local_process_id(t));
WRITE_32_UNALIGNED(buf + k + 8, creation); // creation
}
return k + 12;
} else if (term_is_external_port(t)) {
if (!IS_NULL_PTR(buf)) {
buf[0] = V4_PORT_EXT;
}
size_t k = 1;
term node = term_to_external_node(t);
k += serialize_term(IS_NULL_PTR(buf) ? NULL : buf + k, node, glb);
if (!IS_NULL_PTR(buf)) {
WRITE_64_UNALIGNED(buf + k, term_to_external_port_number(t));
WRITE_32_UNALIGNED(buf + k + 8, term_to_external_node_creation(t));
}
return k + 12;
} else if (term_is_local_reference(t)) {
if (!IS_NULL_PTR(buf)) {
buf[0] = NEWER_REFERENCE_EXT;
Expand Down Expand Up @@ -759,6 +785,34 @@ static term parse_external_terms(const uint8_t *external_term_buf, size_t *eterm
}
}

case V4_PORT_EXT: {
size_t node_size;
term node = parse_external_terms(external_term_buf + 1, &node_size, copy, heap, glb);
if (UNLIKELY(!term_is_atom(node))) {
return term_invalid_term();
}
uint64_t number = READ_64_UNALIGNED(external_term_buf + node_size + 1);
uint32_t creation = READ_32_UNALIGNED(external_term_buf + node_size + 9);
*eterm_size = node_size + 13;
if (node != NONODE_AT_NOHOST_ATOM) {
term this_node = glb->node_name;
uint32_t this_creation = this_node == NONODE_AT_NOHOST_ATOM ? 0 : glb->creation;
if (node == this_node && creation == this_creation) {
if (UNLIKELY(number > TERM_MAX_LOCAL_PROCESS_ID)) {
return term_invalid_term();
}
return term_port_from_local_process_id(number);
} else {
return term_from_external_port_number(node, number, creation, heap);
}
} else {
if (UNLIKELY(number > TERM_MAX_LOCAL_PROCESS_ID || creation != 0)) {
return term_invalid_term();
}
return term_port_from_local_process_id(number);
}
}

case NEWER_REFERENCE_EXT: {
uint16_t len = READ_16_UNALIGNED(external_term_buf + 1);
if (UNLIKELY(len > 5)) {
Expand Down Expand Up @@ -1079,7 +1133,8 @@ static int calculate_heap_usage(const uint8_t *external_term_buf, size_t remaini
return 0;
}

case NEW_PID_EXT: {
case NEW_PID_EXT:
case V4_PORT_EXT: {
if (UNLIKELY(remaining < 1)) {
return INVALID_TERM_SIZE;
}
Expand Down
14 changes: 12 additions & 2 deletions src/libAtomVM/memory.c
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,7 @@ unsigned long memory_estimate_usage(term t)
} else if (term_is_local_pid(t)) {
t = temp_stack_pop(&temp_stack);

} else if (term_is_external_pid(t)) {
acc += EXTERNAL_PID_SIZE;
} else if (term_is_local_port(t)) {
t = temp_stack_pop(&temp_stack);

} else if (term_is_nonempty_list(t)) {
Expand Down Expand Up @@ -628,6 +627,10 @@ static void memory_scan_and_copy(HeapFragment *old_fragment, term *mem_start, co
TRACE("- Found external pid.\n");
break;

case TERM_BOXED_EXTERNAL_PORT:
TRACE("- Found external port.\n");
break;

case TERM_BOXED_EXTERNAL_REF:
TRACE("- Found external ref.\n");
break;
Expand Down Expand Up @@ -756,6 +759,10 @@ static void memory_scan_and_rewrite(size_t count, term *terms, const term *old_s
ptr += EXTERNAL_PID_SIZE - 1;
break;

case TERM_BOXED_EXTERNAL_PORT:
ptr += EXTERNAL_PORT_SIZE - 1;
break;

case TERM_BOXED_EXTERNAL_REF:
ptr += EXTERNAL_REF_SIZE - 1;
break;
Expand Down Expand Up @@ -833,6 +840,9 @@ HOT_FUNC static term memory_shallow_copy_term(HeapFragment *old_fragment, term t
} else if (term_is_local_pid(t)) {
return t;

} else if (term_is_local_port(t)) {
return t;

} else if (term_is_cp(t)) {
// CP is valid only on stack
return t;
Expand Down
40 changes: 32 additions & 8 deletions src/libAtomVM/nifs.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ static term nif_ets_lookup(Context *ctx, int argc, term argv[]);
static term nif_ets_lookup_element(Context *ctx, int argc, term argv[]);
static term nif_ets_delete(Context *ctx, int argc, term argv[]);
static term nif_erlang_pid_to_list(Context *ctx, int argc, term argv[]);
static term nif_erlang_port_to_list(Context *ctx, int argc, term argv[]);
static term nif_erlang_ref_to_list(Context *ctx, int argc, term argv[]);
static term nif_erlang_fun_to_list(Context *ctx, int argc, term argv[]);
static term nif_erlang_function_exported(Context *ctx, int argc, term argv[]);
Expand Down Expand Up @@ -583,6 +584,12 @@ static const struct Nif pid_to_list_nif =
.nif_ptr = nif_erlang_pid_to_list
};

static const struct Nif port_to_list_nif =
{
.base.type = NIFFunctionType,
.nif_ptr = nif_erlang_port_to_list
};

static const struct Nif ref_to_list_nif =
{
.base.type = NIFFunctionType,
Expand Down Expand Up @@ -985,7 +992,7 @@ static term nif_erlang_open_port_2(Context *ctx, int argc, term argv[])
if (!new_ctx) {
RAISE_ERROR(BADARG_ATOM);
} else {
return term_from_local_process_id(new_ctx->process_id);
return term_port_from_local_process_id(new_ctx->process_id);
}
}

Expand All @@ -996,7 +1003,7 @@ static term nif_erlang_register_2(Context *ctx, int argc, term argv[])
term reg_name_term = argv[0];
VALIDATE_VALUE(reg_name_term, term_is_atom);
term pid_or_port_term = argv[1];
VALIDATE_VALUE(pid_or_port_term, term_is_local_pid);
VALIDATE_VALUE(pid_or_port_term, term_is_local_pid_or_port);

int atom_index = term_to_atom_index(reg_name_term);
int32_t pid = term_to_local_process_id(pid_or_port_term);
Expand Down Expand Up @@ -1062,7 +1069,7 @@ static NativeHandlerResult process_echo_mailbox(Context *ctx)
}
result = NativeTerminate;
term reply = term_alloc_tuple(2, &ctx->heap);
term_put_tuple_element(reply, 0, term_from_local_process_id(ctx->process_id));
term_put_tuple_element(reply, 0, term_port_from_local_process_id(ctx->process_id));
term_put_tuple_element(reply, 1, CLOSED_ATOM);
port_send_message(ctx->global, pid, reply);
} else {
Expand Down Expand Up @@ -1094,7 +1101,7 @@ static NativeHandlerResult process_console_message(Context *ctx, term msg)
result = NativeTerminate;
term pid = term_get_tuple_element(msg, 0);
term reply = term_alloc_tuple(2, &ctx->heap);
term_put_tuple_element(reply, 0, term_from_local_process_id(ctx->process_id));
term_put_tuple_element(reply, 0, term_port_from_local_process_id(ctx->process_id));
term_put_tuple_element(reply, 1, CLOSED_ATOM);
port_send_message(ctx->global, pid, reply);
} else if (is_tagged_tuple(msg, IO_REQUEST_ATOM, 4)) {
Expand Down Expand Up @@ -1400,7 +1407,7 @@ static term nif_erlang_send_2(Context *ctx, int argc, term argv[])
term target = argv[0];
GlobalContext *glb = ctx->global;

if (term_is_local_pid(target)) {
if (term_is_local_pid_or_port(target)) {
int32_t local_process_id = term_to_local_process_id(target);

globalcontext_send_message(glb, local_process_id, argv[1]);
Expand Down Expand Up @@ -3454,6 +3461,23 @@ static term nif_erlang_pid_to_list(Context *ctx, int argc, term argv[])
return make_list_from_ascii_buf((uint8_t *) buf, str_len, ctx);
}

static term nif_erlang_port_to_list(Context *ctx, int argc, term argv[])
{
UNUSED(argc);

term t = argv[0];
VALIDATE_VALUE(t, term_is_port);
size_t max_len = term_is_external(t) ? EXTERNAL_PORT_AS_CSTRING_LEN : LOCAL_PORT_AS_CSTRING_LEN;

char buf[max_len];
int str_len = term_snprint(buf, max_len, t, ctx->global);
if (UNLIKELY(str_len < 0)) {
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}

return make_list_from_ascii_buf((uint8_t *) buf, str_len, ctx);
}

static term nif_erlang_ref_to_list(Context *ctx, int argc, term argv[])
{
UNUSED(argc);
Expand Down Expand Up @@ -3715,7 +3739,7 @@ static term nif_erlang_monitor(Context *ctx, int argc, term argv[])
RAISE_ERROR(BADARG_ATOM);
}

VALIDATE_VALUE(target_pid, term_is_local_pid);
VALIDATE_VALUE(target_pid, term_is_local_pid_or_port);

int local_process_id = term_to_local_process_id(target_pid);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
Expand Down Expand Up @@ -3783,7 +3807,7 @@ static term nif_erlang_link(Context *ctx, int argc, term argv[])

term target_pid = argv[0];

VALIDATE_VALUE(target_pid, term_is_local_pid);
VALIDATE_VALUE(target_pid, term_is_local_pid_or_port);

int local_process_id = term_to_local_process_id(target_pid);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
Expand Down Expand Up @@ -3814,7 +3838,7 @@ static term nif_erlang_unlink(Context *ctx, int argc, term argv[])

term target_pid = argv[0];

VALIDATE_VALUE(target_pid, term_is_local_pid);
VALIDATE_VALUE(target_pid, term_is_local_pid_or_port);

int local_process_id = term_to_local_process_id(target_pid);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
Expand Down
1 change: 1 addition & 0 deletions src/libAtomVM/nifs.gperf
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ erlang:throw/1, &throw_nif
erlang:raise/3, &raise_nif
erlang:unlink/1, &unlink_nif
erlang:pid_to_list/1, &pid_to_list_nif
erlang:port_to_list/1, &port_to_list_nif
erlang:ref_to_list/1, &ref_to_list_nif
erlang:fun_to_list/1, &fun_to_list_nif
erlang:function_exported/3, &function_exported_nif
Expand Down
15 changes: 2 additions & 13 deletions src/libAtomVM/opcodesswitch.h
Original file line number Diff line number Diff line change
Expand Up @@ -2384,7 +2384,7 @@ HOT_FUNC int scheduler_entry_point(GlobalContext *glb)
#ifdef IMPL_EXECUTE_LOOP
term recipient_term = x_regs[0];
int local_process_id;
if (term_is_local_pid(recipient_term)) {
if (term_is_local_pid_or_port(recipient_term)) {
local_process_id = term_to_local_process_id(recipient_term);
} else if (term_is_atom(recipient_term)) {
local_process_id = globalcontext_get_registered_process(ctx->global, term_to_atom_index(recipient_term));
Expand Down Expand Up @@ -2984,18 +2984,7 @@ HOT_FUNC int scheduler_entry_point(GlobalContext *glb)
#ifdef IMPL_EXECUTE_LOOP
TRACE("is_port/2, label=%i, arg1=%lx\n", label, arg1);

if (term_is_local_pid(arg1)) {
int local_process_id = term_to_local_process_id(arg1);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
bool is_port_driver = false;
if (target) {
is_port_driver = context_is_port_driver(target);
globalcontext_get_process_unlock(ctx->global, target);
}
if (!is_port_driver) {
pc = mod->labels[label];
}
} else {
if (!term_is_port(arg1)) {
pc = mod->labels[label];
}
#endif
Expand Down
25 changes: 19 additions & 6 deletions src/libAtomVM/term.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,16 @@ int term_funprint(PrinterFun *fun, term t, const GlobalContext *global)
// creation is not printed
return fun->print(fun, "<%" PRIu32 ".%" PRIu32 ".%" PRIu32 ">", node_atom_index, number, serial);

} else if (term_is_local_port(t)) {
int32_t process_id = term_to_local_process_id(t);
return fun->print(fun, "#Port<0.%" PRIu32 ".0>", process_id);

} else if (term_is_external_port(t)) {
uint32_t node_atom_index = term_to_atom_index(term_to_external_node(t));
uint64_t number = term_to_external_port_number(t);
// creation is not printed
return fun->print(fun, "#Port<%" PRIu32 ".%" PRIu64 ">", node_atom_index, number);

} else if (term_is_function(t)) {
const term *boxed_value = term_to_const_term_ptr(t);

Expand Down Expand Up @@ -447,24 +457,27 @@ static int term_type_to_index(term t)
} else if (term_is_function(t)) {
return 5;

} else if (term_is_pid(t)) {
} else if (term_is_port(t)) {
return 6;

} else if (term_is_tuple(t)) {
} else if (term_is_pid(t)) {
return 7;

} else if (term_is_nil(t)) {
} else if (term_is_tuple(t)) {
return 8;

} else if (term_is_nonempty_list(t)) {
} else if (term_is_nil(t)) {
return 9;

} else if (term_is_binary(t)) {
} else if (term_is_nonempty_list(t)) {
return 10;

} else if (term_is_map(t)) {
} else if (term_is_binary(t)) {
return 11;

} else if (term_is_map(t)) {
return 12;

} else {
AVM_ABORT();
}
Expand Down
Loading

0 comments on commit 45af8b6

Please sign in to comment.