diff --git a/velox/functions/remote/client/Remote.cpp b/velox/functions/remote/client/Remote.cpp index 7614a9dec662b..79ca26cc00856 100644 --- a/velox/functions/remote/client/Remote.cpp +++ b/velox/functions/remote/client/Remote.cpp @@ -52,8 +52,8 @@ class RemoteFunction : public exec::VectorFunction { const std::vector& inputArgs, const RemoteVectorFunctionMetadata& metadata) : functionName_(functionName), - serdeFormat_(metadata.serdeFormat), - serde_(getSerde(serdeFormat_)) { + metadata_(metadata), + serde_(getSerde(metadata_.serdeFormat)) { if (metadata.location.type() == typeid(SocketAddress)) { location_ = boost::get(metadata.location); thriftClient_ = getThriftClient(location_, &eventBase_); @@ -108,7 +108,7 @@ class RemoteFunction : public exec::VectorFunction { rows.end(), std::move(args)); - /// construct json request + // Construct JSON request folly::dynamic remoteFunctionHandle = folly::dynamic::object; remoteFunctionHandle["functionName"] = functionName_; remoteFunctionHandle["returnType"] = serializeType(outputType); @@ -118,8 +118,8 @@ class RemoteFunction : public exec::VectorFunction { } folly::dynamic inputs = folly::dynamic::object; - inputs["pageFormat"] = static_cast(serdeFormat_); - // use existing serializer(Prestopage or Sparkunsaferow) + inputs["pageFormat"] = static_cast(metadata_.serdeFormat); + // Use existing serializer (PrestoPage or SparkUnsafeRow) inputs["payload"] = iobufToString(rowVectorToIOBuf( remoteRowVector, rows.end(), *context.pool(), serde_.get())); inputs["rowCount"] = remoteRowVector->size(); @@ -129,11 +129,23 @@ class RemoteFunction : public exec::VectorFunction { jsonObject["inputs"] = inputs; jsonObject["throwOnError"] = context.throwOnError(); - // call Rest client to send request + // URL format - {endpoint}/v1/functions/{schema}/{functionName}/{functionId}/{version} + std::string fullUrl = fmt::format( + "{}/v1/functions/{}/{}/{}/{}", + url_.getUrl(), + metadata_.schema.value_or("default_schema"), + functionName_, + metadata_.functionId.value_or("default_function_id"), + metadata_.version.value_or("default_version")); + + // Set the full URL on the REST client. + restClient_->setUrl(fullUrl); + + // Call Rest client to send request restClient_->invoke_function(folly::toJson(jsonObject), responseBody); LOG(INFO) << responseBody; - // parse json response + // Parse JSON response auto responseJsonObj = parseJson(responseBody); if (responseJsonObj.count("err") > 0) { VELOX_NYI(responseJsonObj["err"].asString()); @@ -142,7 +154,7 @@ class RemoteFunction : public exec::VectorFunction { auto payloadIObuf = folly::IOBuf::copyBuffer( responseJsonObj["result"]["payload"].asString()); - // use existing deserializer(Prestopage or Sparkunsaferow) + // Use existing deserializer (PrestoPage or SparkUnsafeRow) auto outputRowVector = IOBufToRowVector( *payloadIObuf, ROW({outputType}), *context.pool(), serde_.get()); result = outputRowVector->childAt(0); @@ -182,7 +194,7 @@ class RemoteFunction : public exec::VectorFunction { auto requestInputs = request.inputs_ref(); requestInputs->rowCount_ref() = remoteRowVector->size(); - requestInputs->pageFormat_ref() = serdeFormat_; + requestInputs->pageFormat_ref() = metadata_.serdeFormat; // TODO: serialize only active rows. requestInputs->payload_ref() = rowVectorToIOBuf( @@ -215,12 +227,13 @@ class RemoteFunction : public exec::VectorFunction { std::unique_ptr restClient_; proxygen::URL url_; - remote::PageFormat serdeFormat_; std::unique_ptr serde_; // Structures we construct once to cache: RowTypePtr remoteInputType_; std::vector serializedInputTypes_; + + const RemoteVectorFunctionMetadata metadata_; }; std::shared_ptr createRemoteFunction( diff --git a/velox/functions/remote/client/Remote.h b/velox/functions/remote/client/Remote.h index fd90009da457f..88b5544c172be 100644 --- a/velox/functions/remote/client/Remote.h +++ b/velox/functions/remote/client/Remote.h @@ -31,8 +31,23 @@ struct RemoteVectorFunctionMetadata : public exec::VectorFunctionMetadata { /// SocketAddress::makeFromPath()). boost::variant location; - /// The serialization format to be used + /// The serialization format to be used when sending data to the remote. remote::PageFormat serdeFormat{remote::PageFormat::PRESTO_PAGE}; + + /// Optional schema defining the structure of the data or input/output types + /// involved in the remote function. This may include details such as column + /// names and data types. + std::optional schema; + + /// Optional identifier for the specific remote function to be invoked. + /// This can be useful when the same server hosts multiple functions, + /// and the client needs to specify which function to call. + std::optional functionId; + + /// Optional version information to be used when calling the remote function. + /// This can help in ensuring compatibility with a particular version of the + /// function if multiple versions are available on the server. + std::optional version; }; /// Registers a new remote function. It will use the meatadata defined in @@ -41,8 +56,8 @@ struct RemoteVectorFunctionMetadata : public exec::VectorFunctionMetadata { // /// Remote functions are registered as regular statufull functions (using the /// same internal catalog), and hence conflict if there already exists a -/// (non-remote) function registered with the same name. The `overwrite` flag -/// controls whether to overwrite in these cases. +/// (non-remote) function registered with the same name. The `overwrite` +/// flagwrite controls whether to overwrite in these cases. void registerRemoteFunction( const std::string& name, std::vector signatures, diff --git a/velox/functions/remote/client/RestClient.cpp b/velox/functions/remote/client/RestClient.cpp index f21bb53485498..f647c34dd45a9 100644 --- a/velox/functions/remote/client/RestClient.cpp +++ b/velox/functions/remote/client/RestClient.cpp @@ -27,7 +27,7 @@ RestClient::RestClient(const std::string& url) : url_(url) { void RestClient::invoke_function( const std::string& requestBody, std::string& responseBody) { - httpClient_->send(requestBody); + httpClient_->post(requestBody); responseBody = httpClient_->getResponseBody(); LOG(INFO) << responseBody; };