Skip to content

Commit

Permalink
[CELEBORN-1609] Support SSL for celeborn RESTful service
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Support SSL for celeborn RESTful service.

### Why are the changes needed?
For HTTP SSL connection requirements.

### Does this PR introduce _any_ user-facing change?
No, SSL is disabled by defaults.

### How was this patch tested?

Integration testing.

```
celeborn.master.http.ssl.enabled=true
celeborn.master.http.ssl.keystore.path=/hadoop/keystore.jks
celeborn.master.http.ssl.keystore.password=xxxxxxx
```
<img width="1143" alt="image" src="https://github.com/user-attachments/assets/2334561d-1de3-4b38-bc80-5d5d86d3b8ff">

<img width="695" alt="image" src="https://github.com/user-attachments/assets/e3877468-cc3b-4a4a-bf75-2994f557a104">

Closes apache#2756 from turboFei/HADP_1609_ssl2.

Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Shuang <[email protected]>
  • Loading branch information
turboFei authored and RexXiong committed Sep 25, 2024
1 parent d880979 commit b2aa359
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 16 deletions.
116 changes: 116 additions & 0 deletions common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2376,6 +2376,64 @@ object CelebornConf extends Logging {
.toSequence
.createWithDefault(Seq.empty)

val MASTER_HTTP_SSL_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.master.http.ssl.enabled")
.categories("master")
.version("0.6.0")
.doc("Set this to true for using SSL encryption in http server.")
.booleanConf
.createWithDefault(false)

val MASTER_HTTP_SSL_KEYSTORE_PATH: OptionalConfigEntry[String] =
buildConf("celeborn.master.http.ssl.keystore.path")
.categories("master")
.version("0.6.0")
.doc("SSL certificate keystore location.")
.stringConf
.createOptional

val MASTER_HTTP_SSL_KEYSTORE_PASSWORD: OptionalConfigEntry[String] =
buildConf("celeborn.master.http.ssl.keystore.password")
.categories("master")
.version("0.6.0")
.doc("SSL certificate keystore password.")
.stringConf
.createOptional

val MASTER_HTTP_SSL_KEYSTORE_TYPE: OptionalConfigEntry[String] =
buildConf("celeborn.master.http.ssl.keystore.type")
.categories("master")
.version("0.6.0")
.doc("SSL certificate keystore type.")
.stringConf
.createOptional

val MASTER_HTTP_SSL_KEYSTORE_ALGORITHM: OptionalConfigEntry[String] =
buildConf("celeborn.master.http.ssl.keystore.algorithm")
.categories("master")
.version("0.6.0")
.doc("SSL certificate keystore algorithm.")
.stringConf
.createOptional

val MASTER_HTTP_SSL_DISALLOWED_PROTOCOLS: ConfigEntry[Seq[String]] =
buildConf("celeborn.master.http.ssl.disallowed.protocols")
.categories("master")
.version("0.6.0")
.doc("SSL versions to disallow.")
.stringConf
.toSequence
.createWithDefault(Seq("SSLv2", "SSLv3"))

val MASTER_HTTP_SSL_INCLUDE_CIPHER_SUITES: ConfigEntry[Seq[String]] =
buildConf("celeborn.master.http.ssl.include.ciphersuites")
.categories("master")
.version("0.6.0")
.doc("A comma-separated list of include SSL cipher suite names.")
.stringConf
.toSequence
.createWithDefault(Nil)

val HA_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.master.ha.enabled")
.withAlternative("celeborn.ha.enabled")
Expand Down Expand Up @@ -3123,6 +3181,64 @@ object CelebornConf extends Logging {
.toSequence
.createWithDefault(Seq.empty)

val WORKER_HTTP_SSL_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.http.ssl.enabled")
.categories("worker")
.version("0.6.0")
.doc("Set this to true for using SSL encryption in http server.")
.booleanConf
.createWithDefault(false)

val WORKER_HTTP_SSL_KEYSTORE_PATH: OptionalConfigEntry[String] =
buildConf("celeborn.worker.http.ssl.keystore.path")
.categories("worker")
.version("0.6.0")
.doc("SSL certificate keystore location.")
.stringConf
.createOptional

val WORKER_HTTP_SSL_KEYSTORE_PASSWORD: OptionalConfigEntry[String] =
buildConf("celeborn.worker.http.ssl.keystore.password")
.categories("worker")
.version("0.6.0")
.doc("SSL certificate keystore password.")
.stringConf
.createOptional

val WORKER_HTTP_SSL_KEYSTORE_TYPE: OptionalConfigEntry[String] =
buildConf("celeborn.worker.http.ssl.keystore.type")
.categories("worker")
.version("0.6.0")
.doc("SSL certificate keystore type.")
.stringConf
.createOptional

val WORKER_HTTP_SSL_KEYSTORE_ALGORITHM: OptionalConfigEntry[String] =
buildConf("celeborn.worker.http.ssl.keystore.algorithm")
.categories("worker")
.version("0.6.0")
.doc("SSL certificate keystore algorithm.")
.stringConf
.createOptional

val WORKER_HTTP_SSL_DISALLOWED_PROTOCOLS: ConfigEntry[Seq[String]] =
buildConf("celeborn.worker.http.ssl.disallowed.protocols")
.categories("worker")
.version("0.6.0")
.doc("SSL versions to disallow.")
.stringConf
.toSequence
.createWithDefault(Seq("SSLv2", "SSLv3"))

val WORKER_HTTP_SSL_INCLUDE_CIPHER_SUITES: ConfigEntry[Seq[String]] =
buildConf("celeborn.worker.http.ssl.include.ciphersuites")
.categories("worker")
.version("0.6.0")
.doc("A comma-separated list of include SSL cipher suite names.")
.stringConf
.toSequence
.createWithDefault(Nil)

val WORKER_RPC_PORT: ConfigEntry[Int] =
buildConf("celeborn.worker.rpc.port")
.categories("worker")
Expand Down
7 changes: 7 additions & 0 deletions docs/configuration/master.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ license: |
| celeborn.master.http.proxy.client.ip.header | X-Real-IP | false | The HTTP header to record the real client IP address. If your server is behind a load balancer or other proxy, the server will see this load balancer or proxy IP address as the client IP address, to get around this common issue, most load balancers or proxies offer the ability to record the real remote IP address in an HTTP header that will be added to the request for other devices to use. Note that, because the header value can be specified to any IP address, so it will not be used for authentication. | 0.6.0 | |
| celeborn.master.http.spnego.keytab | &lt;undefined&gt; | false | The keytab file for SPNego authentication. | 0.6.0 | |
| celeborn.master.http.spnego.principal | &lt;undefined&gt; | false | SPNego service principal, typical value would look like HTTP/_[email protected]. SPNego service principal would be used when celeborn http authentication is enabled. This needs to be set only if SPNEGO is to be used in authentication. | 0.6.0 | |
| celeborn.master.http.ssl.disallowed.protocols | SSLv2,SSLv3 | false | SSL versions to disallow. | 0.6.0 | |
| celeborn.master.http.ssl.enabled | false | false | Set this to true for using SSL encryption in http server. | 0.6.0 | |
| celeborn.master.http.ssl.include.ciphersuites | | false | A comma-separated list of include SSL cipher suite names. | 0.6.0 | |
| celeborn.master.http.ssl.keystore.algorithm | &lt;undefined&gt; | false | SSL certificate keystore algorithm. | 0.6.0 | |
| celeborn.master.http.ssl.keystore.password | &lt;undefined&gt; | false | SSL certificate keystore password. | 0.6.0 | |
| celeborn.master.http.ssl.keystore.path | &lt;undefined&gt; | false | SSL certificate keystore location. | 0.6.0 | |
| celeborn.master.http.ssl.keystore.type | &lt;undefined&gt; | false | SSL certificate keystore type. | 0.6.0 | |
| celeborn.master.http.stopTimeout | 5s | false | Master http server stop timeout. | 0.5.0 | |
| celeborn.master.internal.port | 8097 | false | Internal port on the master where both workers and other master nodes connect. | 0.5.0 | |
| celeborn.master.persist.workerNetworkLocation | false | false | | 0.6.0 | |
Expand Down
7 changes: 7 additions & 0 deletions docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ license: |
| celeborn.worker.http.proxy.client.ip.header | X-Real-IP | false | The HTTP header to record the real client IP address. If your server is behind a load balancer or other proxy, the server will see this load balancer or proxy IP address as the client IP address, to get around this common issue, most load balancers or proxies offer the ability to record the real remote IP address in an HTTP header that will be added to the request for other devices to use. Note that, because the header value can be specified to any IP address, so it will not be used for authentication. | 0.6.0 | |
| celeborn.worker.http.spnego.keytab | &lt;undefined&gt; | false | The keytab file for SPNego authentication. | 0.6.0 | |
| celeborn.worker.http.spnego.principal | &lt;undefined&gt; | false | SPNego service principal, typical value would look like HTTP/_[email protected]. SPNego service principal would be used when celeborn http authentication is enabled. This needs to be set only if SPNEGO is to be used in authentication. | 0.6.0 | |
| celeborn.worker.http.ssl.disallowed.protocols | SSLv2,SSLv3 | false | SSL versions to disallow. | 0.6.0 | |
| celeborn.worker.http.ssl.enabled | false | false | Set this to true for using SSL encryption in http server. | 0.6.0 | |
| celeborn.worker.http.ssl.include.ciphersuites | | false | A comma-separated list of include SSL cipher suite names. | 0.6.0 | |
| celeborn.worker.http.ssl.keystore.algorithm | &lt;undefined&gt; | false | SSL certificate keystore algorithm. | 0.6.0 | |
| celeborn.worker.http.ssl.keystore.password | &lt;undefined&gt; | false | SSL certificate keystore password. | 0.6.0 | |
| celeborn.worker.http.ssl.keystore.path | &lt;undefined&gt; | false | SSL certificate keystore location. | 0.6.0 | |
| celeborn.worker.http.ssl.keystore.type | &lt;undefined&gt; | false | SSL certificate keystore type. | 0.6.0 | |
| celeborn.worker.http.stopTimeout | 5s | false | Worker http server stop timeout. | 0.5.0 | |
| celeborn.worker.internal.port | 0 | false | Internal server port on the Worker where the master nodes connect. | 0.5.0 | |
| celeborn.worker.jvmProfiler.enabled | false | false | Turn on code profiling via async_profiler in workers. | 0.5.0 | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,14 @@ abstract class HttpService extends Service with Logging {
httpPort(),
httpMaxWorkerThreads(),
httpStopTimeout(),
httpIdleTimeout())
httpIdleTimeout(),
httpSslEnabled(),
httpSslKeyStorePath(),
httpSslKeyStorePassword(),
httpSslKeyStoreType(),
httpSslKeyStoreAlgorithm(),
httpSslDisallowedProtocols(),
httpSslIncludedCipherSuites())
httpServer.start()
startInternal()
// block until the HTTP server is started, otherwise, we may get
Expand Down Expand Up @@ -261,6 +268,69 @@ abstract class HttpService extends Service with Logging {
}
}

private[celeborn] def httpSslEnabled(): Boolean = {
serviceName match {
case Service.MASTER =>
conf.get(CelebornConf.MASTER_HTTP_SSL_ENABLED)
case Service.WORKER =>
conf.get(CelebornConf.WORKER_HTTP_SSL_ENABLED)
}
}

private def httpSslKeyStorePath(): Option[String] = {
serviceName match {
case Service.MASTER =>
conf.get(CelebornConf.MASTER_HTTP_SSL_KEYSTORE_PATH)
case Service.WORKER =>
conf.get(CelebornConf.WORKER_HTTP_SSL_KEYSTORE_PATH)
}
}

private def httpSslKeyStorePassword(): Option[String] = {
serviceName match {
case Service.MASTER =>
conf.get(CelebornConf.MASTER_HTTP_SSL_KEYSTORE_PASSWORD)
case Service.WORKER =>
conf.get(CelebornConf.WORKER_HTTP_SSL_KEYSTORE_PASSWORD)
}
}

private def httpSslKeyStoreType(): Option[String] = {
serviceName match {
case Service.MASTER =>
conf.get(CelebornConf.MASTER_HTTP_SSL_KEYSTORE_TYPE)
case Service.WORKER =>
conf.get(CelebornConf.WORKER_HTTP_SSL_KEYSTORE_TYPE)
}
}

private def httpSslKeyStoreAlgorithm(): Option[String] = {
serviceName match {
case Service.MASTER =>
conf.get(CelebornConf.MASTER_HTTP_SSL_KEYSTORE_ALGORITHM)
case Service.WORKER =>
conf.get(CelebornConf.WORKER_HTTP_SSL_KEYSTORE_ALGORITHM)
}
}

private def httpSslDisallowedProtocols(): Seq[String] = {
serviceName match {
case Service.MASTER =>
conf.get(CelebornConf.MASTER_HTTP_SSL_DISALLOWED_PROTOCOLS)
case Service.WORKER =>
conf.get(CelebornConf.WORKER_HTTP_SSL_DISALLOWED_PROTOCOLS)
}
}

private def httpSslIncludedCipherSuites(): Seq[String] = {
serviceName match {
case Service.MASTER =>
conf.get(CelebornConf.MASTER_HTTP_SSL_INCLUDE_CIPHER_SUITES)
case Service.WORKER =>
conf.get(CelebornConf.WORKER_HTTP_SSL_INCLUDE_CIPHER_SUITES)
}
}

def connectionUrl: String = {
httpServer.getServerUri
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ package org.apache.celeborn.server.common.http
import scala.util.Try

import org.apache.commons.lang3.SystemUtils
import org.eclipse.jetty.server.{Handler, HttpConfiguration, HttpConnectionFactory, Server, ServerConnector}
import org.eclipse.jetty.http.HttpVersion
import org.eclipse.jetty.server.{Handler, HttpConfiguration, HttpConnectionFactory, Server, ServerConnector, SslConnectionFactory}
import org.eclipse.jetty.server.handler.{ContextHandlerCollection, ErrorHandler}
import org.eclipse.jetty.util.component.LifeCycle
import org.eclipse.jetty.util.ssl.SslContextFactory
import org.eclipse.jetty.util.thread.{QueuedThreadPool, ScheduledExecutorScheduler}

import org.apache.celeborn.common.internal.Logging
Expand Down Expand Up @@ -105,15 +107,22 @@ private[celeborn] case class HttpServer(
def getState: String = server.getState
}

object HttpServer {
object HttpServer extends Logging {

def apply(
role: String,
host: String,
port: Int,
poolSize: Int,
stopTimeout: Long,
idleTimeout: Long): HttpServer = {
idleTimeout: Long,
sslEnabled: Boolean,
keyStorePath: Option[String],
keyStorePassword: Option[String],
keyStoreType: Option[String],
keyStoreAlgorithm: Option[String],
sslDisallowedProtocols: Seq[String],
sslIncludeCipherSuites: Seq[String]): HttpServer = {
val pool = new QueuedThreadPool(math.max(poolSize, 8))
pool.setName(s"$role-JettyThreadPool")
pool.setDaemon(true)
Expand All @@ -130,14 +139,51 @@ object HttpServer {

val serverExecutor = new ScheduledExecutorScheduler(s"$role-JettyScheduler", true)
val httpConf = new HttpConfiguration()
val connector = new ServerConnector(
server,
null,
serverExecutor,
null,
-1,
-1,
new HttpConnectionFactory(httpConf))

val connector =
if (sslEnabled) {
if (keyStorePath.isEmpty) {
throw new IllegalArgumentException("KeyStorePath is not provided for SSL connection.")
}
if (keyStorePassword.isEmpty) {
throw new IllegalArgumentException("KeyStorePassword is not provided for SSL connection.")
}

val sslContextFactory = new SslContextFactory.Server()
logInfo(
"HTTP Server SSL: adding excluded protocols: " + sslDisallowedProtocols.mkString(","))
sslContextFactory.addExcludeProtocols(sslDisallowedProtocols: _*)
logInfo(s"HTTP Server SSL: SslContextFactory.getExcludeProtocols = ${sslContextFactory.getExcludeProtocols.mkString(",")}")
logInfo(
"HTTP Server SSL: adding included cipher suites: " + sslIncludeCipherSuites.mkString(","))
sslContextFactory.setIncludeCipherSuites(sslIncludeCipherSuites: _*)
logInfo(s"HTTP Server SSL: SslContextFactory.getIncludeCipherSuites = ${sslContextFactory.getIncludeCipherSuites.mkString(",")}")

sslContextFactory.setKeyStorePath(keyStorePath.get)
sslContextFactory.setKeyStorePassword(keyStorePassword.get)
keyStoreType.foreach(sslContextFactory.setKeyStoreType)
keyStoreAlgorithm.foreach(sslContextFactory.setKeyManagerFactoryAlgorithm)

new ServerConnector(
server,
null,
serverExecutor,
null,
-1,
-1,
new SslConnectionFactory(sslContextFactory, HttpVersion.HTTP_1_1.toString),
new HttpConnectionFactory(httpConf))
} else {
new ServerConnector(
server,
null,
serverExecutor,
null,
-1,
-1,
new HttpConnectionFactory(httpConf))
}

connector.setHost(host)
connector.setPort(port)
connector.setReuseAddress(!SystemUtils.IS_OS_WINDOWS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.celeborn.server.common.http.api

import java.net.URI
import javax.servlet.ServletConfig
import javax.ws.rs.{GET, Path, PathParam, Produces}
import javax.ws.rs.core.{Application, Context, HttpHeaders, MediaType, Response, UriInfo}
Expand Down Expand Up @@ -60,7 +61,7 @@ class CelebornOpenApiResource extends BaseOpenApiResource with ApiRequestContext
.ctxId(ctxId)
.buildContext(true)

val openApi = setCelebornOpenAPIDefinition(ctx.read(), uriInfo.getBaseUri.toString)
val openApi = setCelebornOpenAPIDefinition(ctx.read(), uriInfo.getBaseUri)

if (StringUtils.isNotBlank(tpe) && tpe.trim().equalsIgnoreCase("yaml")) {
Response.status(Response.Status.OK)
Expand All @@ -81,9 +82,10 @@ class CelebornOpenApiResource extends BaseOpenApiResource with ApiRequestContext
}
}

private def setCelebornOpenAPIDefinition(openApi: OpenAPI, requestBaseUrl: String): OpenAPI = {
// TODO: to improve when https is enabled.
val apiUrls = List(requestBaseUrl, s"http://${httpService.connectionUrl}/").distinct
private def setCelebornOpenAPIDefinition(openApi: OpenAPI, requestBaseUri: URI): OpenAPI = {
val httpScheme = if (httpService.httpSslEnabled()) "https:" else "http:"
val requestBaseUrl = s"$httpScheme${requestBaseUri.getSchemeSpecificPart}"
val apiUrls = List(requestBaseUrl, s"$httpScheme//${httpService.connectionUrl}/").distinct
openApi.info(
new Info().title(
s"Apache Celeborn REST API Documentation")
Expand Down

0 comments on commit b2aa359

Please sign in to comment.