diff --git a/pkg/c3/defs.h b/pkg/c3/defs.h index dac060a816..e5274726cb 100644 --- a/pkg/c3/defs.h +++ b/pkg/c3/defs.h @@ -117,7 +117,7 @@ inline c3_s c3_sift_short(c3_y buf_y[2]) { - return (buf_y[1] << 8 | buf_y[0]); + return ((c3_s)buf_y[1] << 8 | (c3_s)buf_y[0]); } inline c3_w diff --git a/pkg/vere/io/mesa.c b/pkg/vere/io/mesa.c index 442dbd0aba..0f0cc41607 100644 --- a/pkg/vere/io/mesa.c +++ b/pkg/vere/io/mesa.c @@ -155,6 +155,7 @@ typedef struct _u3_mesa { typedef struct _u3_peer { u3_mesa* sam_u; // backpointer + u3_ship her_u; // who is this peer c3_o ful_o; // has this been initialized? u3_lane dan_u; // direct lane (nullable) u3_lane_state dir_u; // direct lane state @@ -970,7 +971,7 @@ static void _mesa_send_buf(u3_mesa* sam_u, u3_lane lan_u, c3_y* buf_y, c3_w len_ #ifdef MESA_DEBUG c3_c* sip_c = inet_ntoa(add_u.sin_addr); - /* u3l_log("mesa: sending packet to %s:%u", sip_c, por_s); */ + // u3l_log("mesa: sending packet to %s:%u", sip_c, por_s); #endif uv_buf_t buf_u = uv_buf_init((c3_c*)buf_y, len_w); @@ -1069,22 +1070,17 @@ _mesa_send_modal(u3_peer* per_u, c3_y* buf_y, c3_w len_w) #ifdef MESA_DEBUG c3_c* gal_c = u3_ship_to_string(gal_u); - u3l_log("mesa: sending to %s", gal_c); + // u3l_log("mesa: sending to %s", gal_c); c3_free(gal_c); #endif - // XX if we have lanes in arvo, send it also there? - // otherwise after a peer turns indirect because we haven't contacted them, - // we never achieve a direct route since we only send - // to the sponsor, and all pages will come forwarded, and per_u->dir_u.her_d - // only gets updated when pages come directly, or if it's the first time - // after a restart of the driver // u3_lane imp_u = _mesa_get_czar_lane(sam_u, per_u->imp_y); _mesa_send_buf(sam_u, imp_u, sen_y, len_w); per_u->ind_u.sen_d = now_d; - if ( (c3n == _mesa_is_lane_zero(&per_u->dan_u)) && - (per_u->dir_u.sen_d + DIRECT_ROUTE_RETRY_MICROS > now_d)) { + if ( (c3n == _mesa_is_lane_zero(&per_u->dan_u)) + // && (per_u->dir_u.sen_d + DIRECT_ROUTE_RETRY_MICROS > now_d) // XX same check as _mesa_is_direct_mode + ) { c3_y* san_y = c3_calloc(len_w); memcpy(san_y, buf_y, len_w); _mesa_send_buf(sam_u, per_u->dan_u, san_y, len_w); @@ -1099,7 +1095,7 @@ _mesa_send_request(u3_mesa_request_data* dat_u) { u3_peer* per_u = _mesa_get_peer(dat_u->sam_u, dat_u->her_u); if ( !per_u ) { - u3l_log("mesa: send_bufs()"); + // u3l_log("mesa: send_bufs()"); _mesa_send_bufs(dat_u->sam_u, NULL, dat_u->buf_y, @@ -1113,23 +1109,25 @@ _mesa_send_request(u3_mesa_request_data* dat_u) } static void -_try_resend(u3_pend_req* req_u, c3_d ack_d) +_try_resend(u3_pend_req* req_u, c3_d nex_d) { c3_o los_o = c3n; c3_d now_d = _get_now_micros(); u3_mesa_pact *pac_u = &req_u->pic_u->pac_u; c3_y buf_y[PACT_SIZE]; - for ( c3_d i_d = req_u->lef_d; i_d < ack_d; i_d++ ) { + for ( c3_d i_d = req_u->lef_d; i_d < nex_d; i_d++ ) { // TODO: make fast recovery different from slow // TODO: track skip count but not dupes, since dupes are meaningless if ( (c3y == bitset_has(&req_u->was_u, i_d)) && (now_d - req_u->wat_u[i_d].sen_d > req_u->gag_u->rto_w) ) { los_o = c3y; - _mesa_req_pact_resent(req_u, &pac_u->pek_u.nam_u); pac_u->pek_u.nam_u.fra_d = i_d; + // u3l_log("resend fra_w: %llu", i_d); + c3_w len_w = mesa_etch_pact_to_buf(buf_y, PACT_SIZE, pac_u); _mesa_send_modal(req_u->per_u, buf_y, len_w); + _mesa_req_pact_resent(req_u, &pac_u->pek_u.nam_u); } } @@ -1159,8 +1157,8 @@ _update_resend_timer(u3_pend_req *req_u) for ( c3_d i = req_u->lef_d; i < req_u->nex_d; i++ ) { // u3l_log("fra %u (%u)", i, __LINE__); if ( c3y == bitset_has(&req_u->was_u, i) && - wen_d > req_u->wat_u[i].sen_d - ) { + wen_d > req_u->wat_u[i].sen_d + ) { wen_d = req_u->wat_u[i].sen_d; idx_d = i; } @@ -1177,6 +1175,7 @@ _update_resend_timer(u3_pend_req *req_u) // now_d - req_u->wat_u[idx_d].sen_d; c3_d next_expiry = req_u->gag_u->rto_w; // u3l_log("next_expiry %llu", next_expiry / 1000); + // u3l_log("DUE %llu", uv_timer_get_due_in(&req_u->tim_u)); uv_timer_start(&req_u->tim_u, _mesa_packet_timeout, next_expiry / 1000, 0); } @@ -1185,7 +1184,7 @@ _update_resend_timer(u3_pend_req *req_u) static void _mesa_packet_timeout(uv_timer_t* tim_u) { u3_pend_req* req_u = (u3_pend_req*)tim_u->data; - /* u3l_log("%"PRIu64" packet timed out", req_u->old_d); */ + // u3l_log("old %llu nex %llu packet timed out", req_u->old_d, req_u->nex_d); _try_resend(req_u, req_u->nex_d); _update_resend_timer(req_u); } @@ -1254,6 +1253,7 @@ _mesa_req_pact_done(u3_pend_req* req_u, // received duplicate if ( c3n == bitset_has(&req_u->was_u, nam_u->fra_d) ) { // MESA_LOG(sam_u, DUPE); + _update_resend_timer(req_u); return; } @@ -1271,7 +1271,7 @@ _mesa_req_pact_done(u3_pend_req* req_u, c3_free(par_u); return; } else if ( nam_u->fra_d >= req_u->los_u->counter + (sizeof(req_u->mis_u)/sizeof(u3_misord_buf)) ) { - // u3l_log("fragment number too high: %"PRIu64, nam_u->fra_d); + // u3l_log("fragment number too high: %llu counter %u", nam_u->fra_d, req_u->los_u->counter ); c3_free(par_u); return; } else { @@ -1281,15 +1281,15 @@ _mesa_req_pact_done(u3_pend_req* req_u, buf_u->len_w = dat_u->len_w; memcpy(buf_u->fra_y, dat_u->fra_y, dat_u->len_w); buf_u->par_u = par_u; -#ifdef U3_OS_osx - u3l_log("insert into misordered queue fra: [%llu] = %llu [counter %u]", -#else - u3l_log("insert into misordered queue fra: [%lu] = %lu [counter %u]", -#endif - nam_u->fra_d - req_u->los_u->counter - 1, - nam_u->fra_d, - req_u->los_u->counter); + // u3l_log("insert into misordered queue fra: [%llu] = %llu [counter %u]", + // nam_u->fra_d - (c3_d)req_u->los_u->counter - 1, + // nam_u->fra_d, + // req_u->los_u->counter); + bitset_del(&req_u->was_u, nam_u->fra_d); + _mesa_handle_ack(req_u->gag_u, &req_u->wat_u[nam_u->fra_d]); + _try_resend(req_u, nam_u->fra_d); + _update_resend_timer(req_u); return; } } @@ -1315,10 +1315,10 @@ _mesa_req_pact_done(u3_pend_req* req_u, req_u->ack_d = nam_u->fra_d; } - bitset_del(&req_u->was_u, nam_u->fra_d); // XX also for out of order frags? + bitset_del(&req_u->was_u, nam_u->fra_d); #ifdef MESA_DEBUG - // u3l_log("fragment %llu counter %llu hav_d %llu nex_d %llu ack_d %llu lef_d %llu old_d %llu", nam_u->fra_d, req_u->los_u->counter, req_u->hav_d, req_u->nex_d, req_u->ack_d, req_u->lef_d, req_u->old_d); + // u3l_log("fragment %llu counter %u hav_d %llu nex_d %llu ack_d %llu lef_d %llu old_d %llu", nam_u->fra_d, req_u->los_u->counter, req_u->hav_d, req_u->nex_d, req_u->ack_d, req_u->lef_d, req_u->old_d); #endif u3_lane_state* sat_u; @@ -1339,7 +1339,7 @@ _mesa_req_pact_done(u3_pend_req* req_u, memcpy(req_u->dat_y + (siz_w * nam_u->fra_d), dat_u->fra_y, dat_u->len_w); // XX FIXME? - _try_resend(req_u, nam_u->fra_d); // _try_resend(req_u, req_u->nex_d); + _try_resend(req_u, nam_u->fra_d); _update_resend_timer(req_u); } @@ -1352,7 +1352,7 @@ _realise_lane(u3_noun lan) { if ( c3y == u3a_is_cat(lan) ) { // u3_assert( lan < 256 ); if ( (c3n == u3_Host.ops_u.net) ) { - lan_u.pip_w = 0x7f000001 ; + lan_u.pip_w = 0x7f000001 ; // XX get the real galaxy IP lan_u.por_s = _ames_czar_port(lan); } } else { @@ -1732,6 +1732,12 @@ _mesa_ef_send(u3_mesa* sam_u, u3_noun las, u3_noun pac) c3_o _ames_kick_newt(void* sam_u, u3_noun tag, u3_noun dat); +u3_atom +u3_ames_encode_lane(u3_lane lan); + +u3_lane +u3_ames_decode_lane(u3_atom lan); + static void _meet_peer(u3_mesa* sam_u, u3_peer* per_u, u3_ship her_u); static void _init_peer(u3_mesa* sam_u, u3_peer* per_u); @@ -1773,6 +1779,28 @@ static c3_o _mesa_kick(u3_mesa* sam_u, u3_noun tag, u3_noun dat) } u3_ship who_u; u3_ship_of_noun(who_u ,who); + + // XX the format of the lane %nail gives is (list (each @p address)) + // + u3_noun las = u3do("tail", u3k(dat)); + u3m_p("las nail", las); + + if ( las == u3_nul ) { + per_u->dan_u = (u3_lane){0,0}; // delete lane + } + else { + u3_noun lan = u3h(las); + // we either have a direct route, and a galaxy, or just one lane + if ( c3n == u3h(lan) ) { + u3_lane lan_u = u3_ames_decode_lane(u3k(u3t(lan))); + _log_lane(&lan_u); + per_u->dan_u = lan_u; + } else { + // delete direct lane if galaxy + per_u->dan_u = (u3_lane){0,0}; + } + } + _meet_peer(sam_u, per_u, who_u); ret_o = _ames_kick_newt(u3_Host.sam_u, u3k(tag), u3k(dat)); @@ -1897,10 +1925,12 @@ _name_to_jumbo_scry(u3_mesa_name* nam_u) u3_noun boq = _dire_etch_ud(31); // XX make configurable u3_noun fag = _dire_etch_ud(0); // XX 1 u3_noun pax = _mesa_encode_path(nam_u->pat_s, (c3_y*)nam_u->pat_c); - u3_noun wer = nam_u->nit_o == c3y - ? u3nc(c3__init, pax) - : u3nt(nam_u->aut_o == c3y ? c3__auth : c3__data, fag, pax); + // u3_noun wer = nam_u->nit_o == c3y + // ? u3nc(c3__init, pax) + // : u3nt(nam_u->aut_o == c3y ? c3__auth : c3__data, fag, pax); + // XX only boq_y of MAX JUMBO allowed + u3_noun wer = u3nt(c3__data, fag, pax); u3_noun res = u3nc(c3__mess, u3nq(rif, c3__pact, boq, u3nc(c3__etch, wer))); return res; @@ -1929,6 +1959,8 @@ _mesa_put_jumbo_cache(u3_mesa* sam_u, u3_mesa_name* nam_u, u3_mesa_line* lin_u) _mesa_free_line(u3a_into(u3t(del))); u3z(del); } + u3_weak res = u3h_get(sam_u->pac_p, pax); + u3_mesa_line* mesa_line = u3a_into(res); u3z(pax); } @@ -2099,7 +2131,7 @@ _mesa_page_scry_jumbo_cb(void* vod_p, u3_noun res) } #ifdef MESA_DEBUG - // u3l_log("mesa: scry_jumbo_cb()"); + u3l_log("mesa: scry_jumbo_cb()"); // log_pact(pac_u); #endif @@ -2199,13 +2231,70 @@ _saxo_cb(void* vod_p, u3_noun nun) u3z(nun); } +static void +_forward_lanes_cb(void* vod_p, u3_noun nun) +{ + u3_peer* per_u = vod_p; + u3_mesa* sam_u = per_u->sam_u; + + u3_weak las = u3r_at(7, nun); + u3m_p("_forward_lanes_cb", las); + if ( las != u3_none ) { + u3_peer* new_u = _mesa_get_peer(per_u->sam_u, per_u->her_u); + if ( new_u != NULL ) { + per_u = new_u; + } + u3_noun gal = u3do("head", u3k(las)); + u3_assert( c3y == u3a_is_cat(gal) && gal < 256 ); + // // both atoms guaranteed to be cats, bc we don't call unless forwarding + per_u->ful_o = c3y; + per_u->imp_y = gal; + u3l_log("putting ship"); + u3_noun sal = u3do("tail", u3k(las)); + u3_noun lan; + u3_lane zer_u = {0, 0}; + while ( sal != u3_nul ) { + u3x_cell(sal, &lan, &sal); + + if ( c3n == u3a_is_cat(lan) ) { + // there should be only one lane that is not a direct atom + // + u3_lane lan_u = _realise_lane(u3k(lan)); + per_u->dan_u = lan_u; + } + } + u3z(lan); + _mesa_put_peer(per_u->sam_u, per_u->her_u, per_u); + } + + u3z(nun); +} + static void _meet_peer(u3_mesa* sam_u, u3_peer* per_u, u3_ship her_u) { u3_noun her = u3_ship_to_noun(her_u); u3_noun gan = u3nc(u3_nul, u3_nul); + + per_u->her_u[0] = her_u[0]; + per_u->her_u[1] = her_u[1]; + u3_noun pax = u3nc(u3dc("scot", c3__p, her), u3_nul); u3_pier_peek_last(sam_u->pir_u, gan, c3__j, c3__saxo, pax, per_u, _saxo_cb); + +} + +static void +_get_peer_lanes(u3_mesa* sam_u, u3_peer* per_u) +{ + u3_noun her = u3_ship_to_noun(per_u->her_u); + u3_noun gan = u3nc(u3_nul, u3_nul); + u3_noun pax = u3nq(u3i_string("chums"), + u3dc("scot", 'p', her), + u3i_string("lanes"), + u3_nul); + u3m_p("pax", pax); + u3_pier_peek_last(sam_u->pir_u, gan, c3__ax, u3_nul, pax, per_u, _forward_lanes_cb); } static void @@ -2232,6 +2321,7 @@ _mesa_request_next_fragments(u3_mesa* sam_u, if ( fra_w >= req_u->tof_d ) { break; } + // u3l_log("next fra_w: %u", fra_w); nex_u->pac_u.pek_u.nam_u.fra_d = nex_d + i; _mesa_add_our_to_pit(sam_u, &nex_u->pac_u.pek_u.nam_u); _mesa_send(nex_u, &lan_u); @@ -2450,6 +2540,8 @@ _mesa_hear_page(u3_mesa_pict* pic_u, u3_lane lan_u) u3_mesa_pact* pac_u = &pic_u->pac_u; u3_mesa_name* nam_u = &pac_u->pek_u.nam_u; c3_s fra_s; + // u3l_log("heard fra %llu", pac_u->pag_u.nam_u.fra_d); + c3_o our_o = u3_ships_equal(nam_u->her_u, sam_u->pir_u->who_d); @@ -2470,11 +2562,13 @@ _mesa_hear_page(u3_mesa_pict* pic_u, u3_lane lan_u) } c3_o dir_o = __(pac_u->hed_u.hop_y == 0); - if ( pac_u->hed_u.hop_y == 0 ) { - _hear_peer(sam_u, per_u, lan_u, dir_o); - } else { - u3l_log(" received forwarded page"); - } + // if ( pac_u->hed_u.hop_y == 0 ) { + // u3l_log(" received direct page"); + // } else { + // u3l_log(" received forwarded page"); + // } + _hear_peer(sam_u, per_u, lan_u, dir_o); + if ( new_o == c3y ) { //u3l_log("new lane is direct %c", c3y == dir_o ? 'y' : 'n'); //_log_lane(&lan_u); @@ -2513,6 +2607,7 @@ _mesa_hear_page(u3_mesa_pict* pic_u, u3_lane lan_u) return; } if ( c3n == our ) { + u3l_log(" not our"); // TODO: free pact and pict _mesa_free_pict(pic_u); u3z(pin); @@ -2563,6 +2658,8 @@ _mesa_hear_page(u3_mesa_pict* pic_u, u3_lane lan_u) if ( 0 == nam_u->fra_d ) { _mesa_req_pact_init(sam_u, pic_u, &lan_u); } + u3l_log(" _mesa_req_pact_init"); + _mesa_del_pit(sam_u, nam_u); _mesa_free_pict(pic_u); // XX leaks packet u3z(pin); @@ -2634,6 +2731,7 @@ _mesa_hear_page(u3_mesa_pict* pic_u, u3_lane lan_u) u3_auto_plan(&sam_u->car_u, u3_ovum_init(0, c3__ames, u3nc(c3__ames, u3_nul), cad)); } else if ( req_u->hav_d < lev_d ) { + _mesa_request_next_fragments(sam_u, req_u, lan_u); _mesa_free_pict(pic_u); u3z(pin); @@ -2652,24 +2750,33 @@ _mesa_forward_request(u3_mesa* sam_u, u3_mesa_pict* pic_u, u3_lane lan_u) if ( !per_u ) { #ifdef MESA_DEBUG c3_c* mes = u3_ship_to_string(pac_u->pek_u.nam_u.her_u); - u3l_log("mesa: alien forward for %s", mes); + u3l_log("mesa: alien forward for %s; meeting ship", mes); c3_free(mes); #endif + per_u = c3_calloc(sizeof(u3_peer)); + _init_peer(sam_u, per_u); + per_u->her_u[0] = pac_u->pek_u.nam_u.her_u[0]; + per_u->her_u[1] = pac_u->pek_u.nam_u.her_u[1]; + + _get_peer_lanes(sam_u, per_u); // forward-lanes _mesa_free_pict(pic_u); return; } if ( c3y == sam_u->for_o && sam_u->pir_u->who_d[0] == per_u->imp_y ) { + // if ( c3y == sam_u->for_o ) { u3_lane lin_u = _mesa_get_direct_lane(sam_u, pac_u->pek_u.nam_u.her_u); u3_lane zer_u = {0, 0}; if ( _mesa_lanes_equal(&zer_u, &lin_u) == c3y) { + u3l_log("zero lanes ip: %u port: %u", lin_u.pip_w , lin_u.por_s ); _mesa_free_pict(pic_u); return; } inc_hopcount(&pac_u->hed_u); #ifdef MESA_DEBUG u3l_log("mesa: forward_request()"); - log_pact(pac_u); + _log_lane(&lan_u); #endif + _mesa_add_lane_to_pit(sam_u, &pac_u->pek_u.nam_u, lan_u); _mesa_send(pic_u, &lin_u); } @@ -2692,6 +2799,7 @@ _mesa_hear_peek(u3_mesa_pict* pic_u, u3_lane lan_u) _mesa_forward_request(sam_u, pic_u, lan_u); return; } + // record interest _mesa_add_lane_to_pit(sam_u, &pac_u->pek_u.nam_u, lan_u); @@ -2700,6 +2808,8 @@ _mesa_hear_peek(u3_mesa_pict* pic_u, u3_lane lan_u) pac_u->pek_u.nam_u.fra_d = bat_d; + // u3l_log("hear peek fra %llu", fra_d); + // if we have the page, send it u3_mesa_line* lin_u = _mesa_get_jumbo_cache(sam_u, &pac_u->pek_u.nam_u); if ( ( NULL != lin_u ) && ( CTAG_ITEM == lin_u->typ_y )) { @@ -2708,7 +2818,7 @@ _mesa_hear_peek(u3_mesa_pict* pic_u, u3_lane lan_u) return; } - // if we are waiting, no-op + // // if we are waiting, no-op if ( ( NULL != lin_u ) && ( CTAG_WAIT == lin_u->typ_y )) { _mesa_free_pict(pic_u); return; @@ -2717,7 +2827,7 @@ _mesa_hear_peek(u3_mesa_pict* pic_u, u3_lane lan_u) if ( NULL == lin_u ) { u3l_log("lin_u NULL"); } // otherwise, if blocked or NULL scry - lin_u = c3_calloc(sizeof(u3_mesa_line)); + lin_u = u3a_malloc(sizeof(u3_mesa_line)); lin_u->typ_y = CTAG_WAIT; _mesa_copy_name(&lin_u->nam_u, &pac_u->pek_u.nam_u); // XX @@ -2792,8 +2902,17 @@ _mesa_hear_poke(u3_mesa_pict* pic_u, u3_lane* lan_u) // u3l_log("new lane is direct %c", c3y == dir_o ? 'y' : 'n'); // _log_lane(lan_u); } + // XX _meet_peer, in the _saxo_cb, is already putting the peer in her_p + // _mesa_put_peer(sam_u, pac_u->pok_u.pay_u.her_u, per_u); + u3_pend_req* req_u = _mesa_get_request(sam_u, &pac_u->pok_u.pay_u); + if ( req_u != NULL) { + // u3l_log("req pending"); + _mesa_free_pict(pic_u); + return; + } + u3_ovum_peer nes_f; u3_ovum_bail bal_f; void* ptr_v; @@ -2815,6 +2934,7 @@ _mesa_hear_poke(u3_mesa_pict* pic_u, u3_lane* lan_u) // XX check request state for *payload* (in-progress duplicate) assert(pac_u->pok_u.dat_u.tob_d); + u3_mesa_lane_cb_data* dat_u = c3_malloc(sizeof(u3_mesa_lane_cb_data)); { dat_u->nam_u = c3_malloc(sizeof(u3_mesa_name)); diff --git a/pkg/vere/io/mesa/mesa.h b/pkg/vere/io/mesa/mesa.h index 02e08243ae..5750acb8b3 100644 --- a/pkg/vere/io/mesa/mesa.h +++ b/pkg/vere/io/mesa/mesa.h @@ -11,7 +11,7 @@ #define HEAD_SIZE 4 // header size in bytes #define PACT_SIZE 1472 -static c3_w MESA_COOKIE = 0x51ad1d5e; +static c3_w MESA_COOKIE = 0x67e00200; typedef enum _u3_mesa_ptag { PACT_RESV = 0, diff --git a/pkg/vere/io/mesa/pact.c b/pkg/vere/io/mesa/pact.c index 5d2eaf0b6e..edc873e3bc 100644 --- a/pkg/vere/io/mesa/pact.c +++ b/pkg/vere/io/mesa/pact.c @@ -458,7 +458,7 @@ _etch_short(u3_etcher* ech_u, c3_s val_s) c3_etch_short(_etch_next(ech_u, 2), val_s); } -static c3_y +static c3_s _sift_short(u3_sifter* sif_u) { c3_y *res_y = _sift_next(sif_u, 2);