diff --git a/media-proxy/include/proxy_context.h b/media-proxy/include/proxy_context.h index b1e6c59f..4bd295b8 100644 --- a/media-proxy/include/proxy_context.h +++ b/media-proxy/include/proxy_context.h @@ -12,7 +12,6 @@ #include #include -#include "controller.grpc.pb.h" #include "mtl.h" #include "libfabric_dev.h" #include "rdma_session.h" @@ -25,16 +24,6 @@ #define ST_APP_PAYLOAD_TYPE_ST22 (114) #endif -using controller::MemIFOps; -using controller::RxControlRequest; -using controller::St20pRxOps; -using controller::St20pTxOps; -using controller::StInit; -using controller::StopControlRequest; -using controller::StRxPort; -using controller::StTxPort; -using controller::TxControlRequest; - #pragma once class ProxyContext { @@ -81,16 +70,10 @@ class ProxyContext { std::string getTCPListenAddress(void); int getTCPListenPort(void); - void ParseStInitParam(const TxControlRequest* request, struct mtl_init_params* init_param); - void ParseStInitParam(const RxControlRequest* request, struct mtl_init_params* init_param); void ParseStInitParam(const mcm_conn_param* request, struct mtl_init_params* init_param); - void ParseMemIFParam(const TxControlRequest* request, memif_ops_t& memif_ops); - void ParseMemIFParam(const RxControlRequest* request, memif_ops_t& memif_ops); void ParseMemIFParam(const mcm_conn_param* request, memif_ops_t& memif_ops); - void ParseSt20TxOps(const TxControlRequest* request, struct st20p_tx_ops* opts); - void ParseSt20RxOps(const RxControlRequest* request, struct st20p_rx_ops* opts); void ParseSt20TxOps(const mcm_conn_param* request, struct st20p_tx_ops* opts); void ParseSt20RxOps(const mcm_conn_param* request, struct st20p_rx_ops* opts); void ParseSt22TxOps(const mcm_conn_param* request, struct st22p_tx_ops* opts); @@ -99,13 +82,10 @@ class ProxyContext { void ParseSt30RxOps(const mcm_conn_param* request, struct st30_rx_ops* opts); void ParseSt40TxOps(const mcm_conn_param* request, struct st40_tx_ops* opts); void ParseSt40RxOps(const mcm_conn_param* request, struct st40_rx_ops* opts); - int TxStart(const TxControlRequest* request); - int RxStart(const RxControlRequest* request); int TxStart(const mcm_conn_param* request); int RxStart(const mcm_conn_param* request); void TxStop(const int32_t session_id); void RxStop(const int32_t session_id); - void Stop(); private: std::atomic mSessionCount; diff --git a/media-proxy/src/api_server_grpc.cc b/media-proxy/src/api_server_grpc.cc index 81421623..736bab2b 100644 --- a/media-proxy/src/api_server_grpc.cc +++ b/media-proxy/src/api_server_grpc.cc @@ -13,67 +13,31 @@ ConfigureServiceImpl::ConfigureServiceImpl(ProxyContext* ctx) Status ConfigureServiceImpl::TxStart(ServerContext* context, const TxControlRequest* request, ControlReply* reply) { - int session_id = 0; - std::string ret_msg = ""; - std::cout << "\nReceived command: TxStart." << std::endl; - - session_id = m_ctx->TxStart(request); - if (session_id >= 0) { - ret_msg = std::to_string(session_id); - } else { - ret_msg = "Failed"; - } - reply->set_message("Create MTL TX session: " + ret_msg); - return Status::OK; } Status ConfigureServiceImpl::RxStart(ServerContext* context, const RxControlRequest* request, ControlReply* reply) { - int session_id = 0; - std::string ret_msg = ""; - std::cout << "\nReceived command: RxStart." << std::endl; - - session_id = m_ctx->RxStart(request); - if (session_id >= 0) { - ret_msg = std::to_string(session_id); - } else { - ret_msg = "Failed"; - } - reply->set_message("Create MTL RX session: " + ret_msg); - return Status::OK; } Status ConfigureServiceImpl::TxStop(ServerContext* context, const StopControlRequest* request, ControlReply* reply) { std::cout << "\nReceived command: Stop." << std::endl; - - m_ctx->TxStop(request->session_id()); - reply->set_message("gPRC reply: well received."); - return Status::OK; } Status ConfigureServiceImpl::RxStop(ServerContext* context, const StopControlRequest* request, ControlReply* reply) { std::cout << "\nReceived command: Stop." << std::endl; - - m_ctx->RxStop(request->session_id()); - reply->set_message("gPRC reply: well received."); - return Status::OK; } Status ConfigureServiceImpl::Stop(ServerContext* context, const StopControlRequest* request, ControlReply* reply) { std::cout << "\nReceived command: Stop." << std::endl; - - m_ctx->Stop(); - reply->set_message("gPRC reply: well received."); - return Status::OK; } diff --git a/media-proxy/src/media_proxy.cc b/media-proxy/src/media_proxy.cc index 80d2ee87..95dfbd8a 100644 --- a/media-proxy/src/media_proxy.cc +++ b/media-proxy/src/media_proxy.cc @@ -105,13 +105,8 @@ int main(int argc, char* argv[]) ctx->setDevicePort(dev_port); ctx->setDataPlaneAddress(dp_ip); - /* start gRPC server */ - std::thread rpcThread(RunRPCServer, ctx); - /* start TCP server */ std::thread tcpThread(RunTCPServer, ctx); - - rpcThread.join(); tcpThread.join(); delete (ctx); diff --git a/media-proxy/src/proxy_context.cc b/media-proxy/src/proxy_context.cc index 341d0fbc..86b576ff 100644 --- a/media-proxy/src/proxy_context.cc +++ b/media-proxy/src/proxy_context.cc @@ -7,6 +7,7 @@ #include #include #include +#include #include "proxy_context.h" #include @@ -125,92 +126,6 @@ st_frame_fmt ProxyContext::getStFrameFmt(video_pixel_format mcm_frame_fmt) return mtl_frame_fmt; } -void ProxyContext::ParseStInitParam(const TxControlRequest* request, struct mtl_init_params* st_param) -{ - StInit init = request->st_init(); - st_param->num_ports = init.number_ports(); - - std::string port_p = init.primary_port(); - strlcpy(st_param->port[MTL_PORT_P], port_p.c_str(), MTL_PORT_MAX_LEN); - - for (int i = 0; i < init.primary_sip_addr_size(); ++i) { - st_param->sip_addr[MTL_PORT_P][i] = init.primary_sip_addr(i); - } - - st_param->pmd[MTL_PORT_P] = mtl_pmd_by_port_name(st_param->port[MTL_PORT_P]); - st_param->flags = init.flags(); - st_param->log_level = (enum mtl_log_level)init.log_level(); - st_param->priv = NULL; - st_param->ptp_get_time_fn = NULL; - st_param->rx_queues_cnt[MTL_PORT_P] = init.rx_sessions_cnt_max(); - st_param->tx_queues_cnt[MTL_PORT_P] = init.tx_sessions_cnt_max(); - if (init.logical_cores().empty()) { - st_param->lcores = NULL; - } else { - st_param->lcores = (char*)init.logical_cores().c_str(); - } - - INFO("ProxyContext: ParseStInitParam(const TxControlRequest* request, struct mtl_init_params* st_param)"); - INFO("num_ports : %d", st_param->num_ports); - INFO("port : %s", st_param->port[MTL_PORT_P]); - INFO("sip_addr :"); - for (int i = 0; i < init.primary_sip_addr_size(); ++i) { - INFO(" %d, ", st_param->sip_addr[MTL_PORT_P][i]); - } - INFO("\nflags: %ld", st_param->flags); - INFO("log_level : %d", st_param->log_level); - if (st_param->lcores) { - INFO("lcores : %s", st_param->lcores); - } else { - INFO("lcores : NULL"); - } - INFO("rx_sessions_cnt_max : %d", st_param->rx_queues_cnt[MTL_PORT_P]); - INFO("tx_sessions_cnt_max : %d", st_param->tx_queues_cnt[MTL_PORT_P]); -} - -void ProxyContext::ParseStInitParam(const RxControlRequest* request, struct mtl_init_params* st_param) -{ - StInit init = request->st_init(); - st_param->num_ports = init.number_ports(); - - std::string port_p = init.primary_port(); - strlcpy(st_param->port[MTL_PORT_P], port_p.c_str(), MTL_PORT_MAX_LEN); - - for (int i = 0; i < init.primary_sip_addr_size(); ++i) { - st_param->sip_addr[MTL_PORT_P][i] = init.primary_sip_addr(i); - } - - st_param->pmd[MTL_PORT_P] = mtl_pmd_by_port_name(st_param->port[MTL_PORT_P]); - st_param->flags = init.flags(); - st_param->log_level = (enum mtl_log_level)init.log_level(); - st_param->priv = NULL; - st_param->ptp_get_time_fn = NULL; - st_param->rx_queues_cnt[MTL_PORT_P] = init.rx_sessions_cnt_max(); - st_param->tx_queues_cnt[MTL_PORT_P] = init.tx_sessions_cnt_max(); - if (init.logical_cores().empty()) { - st_param->lcores = NULL; - } else { - st_param->lcores = (char*)init.logical_cores().c_str(); - } - - INFO("ProxyContext: ParseStInitParam(const RxControlRequest* request, struct mtl_init_params* st_param)"); - INFO("num_ports : %d", st_param->num_ports); - INFO("port : %s", st_param->port[MTL_PORT_P]); - INFO("sip_addr :"); - for (int i = 0; i < init.primary_sip_addr_size(); ++i) { - INFO(" %d, ", st_param->sip_addr[MTL_PORT_P][i]); - } - INFO("\nflags: %ld", st_param->flags); - INFO("log_level : %d", st_param->log_level); - if (st_param->lcores) { - INFO("lcores : %s", st_param->lcores); - } else { - INFO("lcores : NULL"); - } - INFO("rx_sessions_cnt_max : %d", st_param->rx_queues_cnt[MTL_PORT_P]); - INFO("tx_sessions_cnt_max : %d", st_param->tx_queues_cnt[MTL_PORT_P]); -} - void ProxyContext::ParseStInitParam(const mcm_conn_param* request, struct mtl_init_params* st_param) { strlcpy(st_param->port[MTL_PORT_P], getDevicePort().c_str(), MTL_PORT_MAX_LEN); @@ -252,113 +167,6 @@ void ProxyContext::ParseStInitParam(const mcm_conn_param* request, struct mtl_in INFO("tx_sessions_cnt_max : %d", st_param->tx_queues_cnt[MTL_PORT_P]); } -void ProxyContext::ParseMemIFParam(const TxControlRequest* request, memif_ops_t& memif_ops) -{ - strlcpy(memif_ops.app_name, request->memif_ops().app_name().c_str(), sizeof(memif_ops.app_name)); - strlcpy(memif_ops.interface_name, request->memif_ops().interface_name().c_str(), sizeof(memif_ops.interface_name)); - strlcpy(memif_ops.socket_path, request->memif_ops().socket_path().c_str(), sizeof(memif_ops.socket_path)); -} - -void ProxyContext::ParseMemIFParam(const RxControlRequest* request, memif_ops_t& memif_ops) -{ - strlcpy(memif_ops.app_name, request->memif_ops().app_name().c_str(), sizeof(memif_ops.app_name)); - strlcpy(memif_ops.interface_name, request->memif_ops().interface_name().c_str(), sizeof(memif_ops.interface_name)); - strlcpy(memif_ops.socket_path, request->memif_ops().socket_path().c_str(), sizeof(memif_ops.socket_path)); -} - -void ProxyContext::ParseSt20RxOps(const RxControlRequest* request, struct st20p_rx_ops* ops_rx) -{ - St20pRxOps st20_rx = request->st20_rx(); - StRxPort rx_port = st20_rx.rx_port(); - - std::string port = rx_port.port(); - strlcpy(ops_rx->port.port[MTL_PORT_P], port.c_str(), MTL_PORT_MAX_LEN); - for (int i = 0; i < rx_port.sip_addr_size(); ++i) { - ops_rx->port.ip_addr[MTL_PORT_P][i] = rx_port.sip_addr(i); - } - - ops_rx->port.num_port = rx_port.number_ports(); - ops_rx->port.udp_port[MTL_PORT_P] = rx_port.udp_port(); - ops_rx->port.payload_type = rx_port.payload_type(); - - ops_rx->name = st20_rx.name().c_str(); - ops_rx->width = st20_rx.width(); - ops_rx->height = st20_rx.height(); - ops_rx->fps = (enum st_fps)st20_rx.fps(); - ops_rx->transport_fmt = (enum st20_fmt)st20_rx.transport_fmt(); - ops_rx->output_fmt = (enum st_frame_fmt)st20_rx.output_fmt(); - ops_rx->device = (enum st_plugin_device)st20_rx.device(); - ops_rx->framebuff_cnt = st20_rx.framebuffer_cnt(); - - INFO("ProxyContext: %s...", __func__); - INFO("port : %s", ops_rx->port.port[MTL_PORT_P]); - printf("INFO: ip_addr :"); - for (int i = 0; i < rx_port.sip_addr_size(); ++i) { - printf(" %d ", ops_rx->port.ip_addr[MTL_PORT_P][i]); - } - printf("\n"); - printf("INFO: ip_addr :"); - for (int i = 0; i < MTL_IP_ADDR_LEN; ++i) { - printf(" %d ", ops_rx->port.mcast_sip_addr[MTL_PORT_P][i]); - } - printf("\n"); - INFO("num_port : %d", ops_rx->port.num_port); - INFO("udp_port : %d", ops_rx->port.udp_port[MTL_PORT_P]); - INFO("payload_type : %d", ops_rx->port.payload_type); - INFO("name : %s", ops_rx->name); - INFO("width : %d", ops_rx->width); - INFO("height : %d", ops_rx->height); - INFO("fps : %d", ops_rx->fps); - INFO("transport_fmt : %d", ops_rx->transport_fmt); - INFO("output_fmt : %d", ops_rx->output_fmt); - INFO("device : %d", ops_rx->device); - INFO("framebuff_cnt : %d", ops_rx->framebuff_cnt); -} - -void ProxyContext::ParseSt20TxOps(const TxControlRequest* request, struct st20p_tx_ops* ops_tx) -{ - St20pTxOps st20_tx = request->st20_tx(); - StTxPort tx_port = st20_tx.tx_port(); - - std::string port = tx_port.port(); - strlcpy(ops_tx->port.port[MTL_PORT_P], port.c_str(), MTL_PORT_MAX_LEN); - for (int i = 0; i < tx_port.dip_addr_size(); ++i) { - ops_tx->port.dip_addr[MTL_PORT_P][i] = tx_port.dip_addr(i); - } - - ops_tx->port.num_port = tx_port.number_ports(); - ops_tx->port.udp_port[MTL_PORT_P] = tx_port.udp_port(); - ops_tx->port.payload_type = tx_port.payload_type(); - ops_tx->name = st20_tx.name().c_str(); - ops_tx->width = st20_tx.width(); - ops_tx->height = st20_tx.height(); - ops_tx->fps = (enum st_fps)st20_tx.fps(); - ops_tx->transport_fmt = (enum st20_fmt)st20_tx.transport_fmt(); - ops_tx->input_fmt = (enum st_frame_fmt)st20_tx.input_fmt(); - ops_tx->device = (enum st_plugin_device)st20_tx.device(); - ops_tx->framebuff_cnt = st20_tx.framebuffer_cnt(); - - INFO("ProxyContext: %s...", __func__); - INFO("port : %s", ops_tx->port.port[MTL_PORT_P]); - printf("INFO: dip_addr :"); - for (int i = 0; i < tx_port.dip_addr_size(); ++i) { - printf(" %d ", ops_tx->port.dip_addr[MTL_PORT_P][i]); - } - printf("\n"); - INFO("num_port : %d", ops_tx->port.num_port); - INFO("udp_port : %d", ops_tx->port.udp_port[MTL_PORT_P]); - INFO("udp_src_port : %d", ops_tx->port.udp_src_port[MTL_PORT_P]); - INFO("payload_type : %d", ops_tx->port.payload_type); - INFO("name : %s", ops_tx->name); - INFO("width : %d", ops_tx->width); - INFO("height : %d", ops_tx->height); - INFO("fps : %d", ops_tx->fps); - INFO("transport_fmt : %d", ops_tx->transport_fmt); - INFO("input_fmt : %d", ops_tx->input_fmt); - INFO("device : %d", ops_tx->device); - INFO("framebuff_cnt : %d", ops_tx->framebuff_cnt); -} - void ProxyContext::ParseSt20RxOps(const mcm_conn_param* request, struct st20p_rx_ops* ops_rx) { static int session_id = 0; @@ -715,89 +523,6 @@ void ProxyContext::ParseSt40RxOps(const mcm_conn_param* request, struct st40_rx_ INFO("name : %s", ops->name); } -int ProxyContext::RxStart(const RxControlRequest* request) -{ - INFO("ProxyContext: RxStart(const RxControlRequest* request)"); - dp_session_context_t *st_ctx = NULL; - rx_session_context_t* rx_ctx = NULL; - struct st20p_rx_ops opts = { 0 }; - memif_ops_t memif_ops = { 0 }; - - if (mDevHandle == NULL) { - struct mtl_init_params st_param = {}; - /* set default parameters */ - ParseStInitParam(request, &st_param); - - mDevHandle = inst_init(&st_param); - if (mDevHandle == NULL) { - INFO("%s, Failed to initialize MTL.", __func__); - return -1; - } - } - - ParseSt20RxOps(request, &opts); - - ParseMemIFParam(request, memif_ops); - - rx_ctx = mtl_st20p_rx_session_create(mDevHandle, &opts, &memif_ops); - if (rx_ctx == NULL) { - INFO("%s, Failed to create RX session.", __func__); - return -1; - } - - st_ctx = new (dp_session_context_t); - st_ctx->id = incrementMSessionCount(); - st_ctx->type = RX; - st_ctx->rx_session = rx_ctx; - mDpCtx.push_back(st_ctx); - - /* TODO: to be removed later. */ - mRxCtx.push_back(rx_ctx); - - return (st_ctx->id); -} - -int ProxyContext::TxStart(const TxControlRequest* request) -{ - INFO("ProxyContext: TxStart(const TxControlRequest* request)"); - dp_session_context_t *st_ctx = NULL; - tx_session_context_t* tx_ctx = NULL; - struct st20p_tx_ops opts = { 0 }; - memif_ops_t memif_ops = { 0 }; - - if (!mDevHandle) { - struct mtl_init_params st_param = {}; - /* set default parameters */ - ParseStInitParam(request, &st_param); - - mDevHandle = inst_init(&st_param); - if (!mDevHandle) { - INFO("%s, Failed to initialize MTL.", __func__); - return -1; - } - } - - ParseSt20TxOps(request, &opts); - - ParseMemIFParam(request, memif_ops); - - tx_ctx = mtl_st20p_tx_session_create(mDevHandle, &opts, &memif_ops); - if (tx_ctx == NULL) { - INFO("%s, Failed to create TX session.", __func__); - return -1; - } - - st_ctx = new (dp_session_context_t); - st_ctx->id = incrementMSessionCount(); - st_ctx->type = TX; - st_ctx->tx_session = tx_ctx; - mDpCtx.push_back(st_ctx); - - /* TODO: to be removed later. */ - mTxCtx.push_back(tx_ctx); - - return (st_ctx->id); -} int ProxyContext::RxStart_rdma(const mcm_conn_param *request) { @@ -1243,33 +968,3 @@ void ProxyContext::RxStop(const int32_t session_id) return; } - -void ProxyContext::Stop() -{ - int err; - - for (auto it : mDpCtx) { - if (it->type == TX) { - mtl_st20p_tx_session_stop(it->tx_session); - mtl_st20p_tx_session_destroy(&it->tx_session); - } else { - mtl_st20p_rx_session_stop(it->rx_session); - mtl_st20p_rx_session_destroy(&it->rx_session); - } - - delete (it); - } - - mDpCtx.clear(); - st_pthread_mutex_destroy(&sessions_count_mutex_lock); - - mtl_deinit(mDevHandle); - mDevHandle = NULL; - - if (mDevHandle_rdma) { - err = rdma_deinit(&mDevHandle_rdma); - if (err) { - ERROR("%s, Failed to destroy rdma device.", __func__); - } - } -}