Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support REST API v1.2 #11

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
name := "libricks"

version := "0.6"
version := "0.8.3"

//scalaVersion := "2.12.4"
scalaVersion := "2.10.7"
scalaVersion := "2.12.8"
//scalaVersion := "2.11.12"
//scalaVersion := "2.10.7"

licenses := Seq("Apache-2.0" -> url("http://www.apache.org/licenses/LICENSE-2.0"))

libraryDependencies ++= Seq(
"com.typesafe" % "config" % "1.3.2",
"org.apache.httpcomponents" % "httpclient" % "4.5.5",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.9.4",
"org.scalatest" %% "scalatest" % "3.0.4" % "test"
"org.scalatest" %% "scalatest" % "3.0.4" % "test",
"org.scala-lang.modules" %% "scala-parser-combinators" % "1.1.1",
"org.apache.httpcomponents" % "httpmime" % "4.5.6"
)

bintrayPackageLabels := Seq("databricks", "rest", "dbfs", "scala")
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
addSbtPlugin("org.foundweekends" % "sbt-bintray" % "0.5.2")
addSbtPlugin("org.foundweekends" % "sbt-bintray" % "0.5.4")

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")
110 changes: 110 additions & 0 deletions src/main/scala/com/databricks/CommandExecution.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package com.databricks

import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo}
import com.fasterxml.jackson.annotation.JsonSubTypes.Type

/**
* Trait for API command result
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "resultType")
@JsonSubTypes(Array(
new Type(value = classOf[ApiTableResult], name = "table"),
new Type(value = classOf[ApiTextResult], name = "text"),
new Type(value = classOf[ApiImageResult], name = "image"),
new Type(value = classOf[ApiErrorResult], name = "error")
))
sealed trait ApiCommandResult

/**
* API command table result
* @param data Data in table
* @param schema Schema of the table
* @param truncated Whether partial results are returned
* @param isJsonSchema True if we are sending a JSON schema instead of a string representation of
* the Hive type.
*/
case class ApiTableResult(data: List[List[Any]], schema: List[Map[String, Any]],
truncated: Boolean, isJsonSchema: Boolean) extends ApiCommandResult

/**
* API command text result
* @param data The text result
*/
case class ApiTextResult(data: String) extends ApiCommandResult

/**
* API command image result
* @param fileName Name of the image file
*/
case class ApiImageResult(fileName: String) extends ApiCommandResult

/**
* API command error result
* @param cause The cause of the error
*/
case class ApiErrorResult(summary: Option[String], cause: String) extends ApiCommandResult

/**
* API command status and result
* @param id Command id
* @param status status of the command: {"Queued", "Running", "Cancelling",
* "Finished", "Cancelled", "Error"}
* @param results Result of the command
*/
case class CommandResult(id: String, status: String, results: ApiCommandResult)

/**
* Access point for Command Execution API
* @param client - connection settings to user's shard
*/
class CommandExecution(client: ShardClient) extends Endpoint {
/** Common suffix of paths to dbfs endpoints */
override def url: String = client.url + "/1.2/commands"

/**
* Runs a command or file.
*
* @param language - one of languages: "scala", "python", "sql", "r"
* @param clusterId - cluster identifier.
* @param contextId - identifier of an execution context
* @param command - command to run in the execution context
*/
def execute(language: String, clusterId: String, contextId: String, command: String): IdResult = {
val resp = client.postFile(s"$url/execute", "command", command,
Map("language" -> language, "clusterId" -> clusterId, "contextId" -> contextId))
client.extract[IdResult](resp)
}

/**
* Shows the status of an existing execution context.
*
* @param clusterId - cluster identifier.
* @param contextId - identifier of a created context
* @param commandId - identifier of executed command
*/
def status(clusterId: String, contextId: String, commandId: String): CommandResult = {
val resp = client.req(
endpoint = s"$url/status?clusterId=${clusterId}&contextId=${contextId}&commandId=${commandId}",
"get", ""
)
client.extract[CommandResult](resp)
}

/**
* Cancels one command.
*
* @param clusterId - identifier of a cluster
* @param contextId - identifier of a created context
* @param commandId - identifier of executed command
*/
def cancel(clusterId: String, contextId: String, commandId: String): IdResult = {
val resp = client.req(s"$url/cancel", "post",
s"""{
| "contextId": "${contextId}",
| "clusterId": "${clusterId}",
| "commandId": "${commandId}"
| }""".stripMargin
)
client.extract[IdResult](resp)
}
}
62 changes: 62 additions & 0 deletions src/main/scala/com/databricks/ExecutionContext.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.databricks

/**
* The result with an id
* @param id The id for context or command
*/
case class IdResult(id: String)

/**
* Context id and status
* @param id Context id
* @param status Status of context: {"Pending", "Running", "Error"}
*/
case class ContextIdStatusResult(id: String, status: String)

/**
* Access point for Execution Context API
* @param client - connection settings to user's shard
*/
class ExecutionContext(client: ShardClient) extends Endpoint {
/** Common suffix of paths to dbfs endpoints */
override def url: String = client.url + "/1.2/contexts"

/**
* Creates an execution context on a specified cluster for a given programming language.
*
* @param language - one of languages: scala, python and sql
* @param clusterId - cluster identifier.
*/
def create(language: String, clusterId: String): IdResult = {
val resp = client.req(s"$url/create", "post",
s"""{"language": "$language", "clusterId": "${clusterId}"}"""
)
client.extract[IdResult](resp)
}

/**
* Shows the status of an existing execution context.
*
* @param contextId - identifier of a created context
*/
def status(clusterId: String, contextId: String): ContextIdStatusResult = {
val resp = client.req(
endpoint = s"$url/status?clusterId=${clusterId}&contextId=${contextId}", "get",
""
)
client.extract[ContextIdStatusResult](resp)
}

/**
* Destroys an execution context.
*
* @param clusterId - identifier of a cluster
* @param contextId - identifier of a created context
*/
def destroy(clusterId: String, contextId: String): IdResult = {
val resp = client.req(s"$url/destroy", "post",
s"""{"contextId": "${contextId}", "clusterId": "${clusterId}"}"""
)
client.extract[IdResult](resp)
}
}
49 changes: 41 additions & 8 deletions src/main/scala/com/databricks/ShardClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import org.apache.http.client.HttpClient
import org.apache.http.client.methods.HttpUriRequest
import org.apache.http.entity.StringEntity
import org.apache.http.entity.mime.MultipartEntityBuilder
import org.apache.http.util.EntityUtils

/**
Expand Down Expand Up @@ -41,6 +43,30 @@ case class ShardClient(client: HttpClient, shard: String) extends Endpoint {
* */
lazy val lib = new Libraries(this)

/**
* Entry point of Execution Context API:
* https://docs.databricks.com/api/1.2/index.html#execution-context
*/
lazy val ec = new ExecutionContext(this)
/**
* Entry point of Command ExecutionAPI:
* https://docs.databricks.com/api/1.2/index.html#command-execution
*/
lazy val command = new CommandExecution(this)

def sendRequest(request: HttpUriRequest): String = {
val response = client.execute(request)
val statusCode = response.getStatusLine.getStatusCode

statusCode match {
case 200 => EntityUtils.toString(response.getEntity)
case 400 =>
val body = EntityUtils.toString(response.getEntity)
mapper.readValue[BricksException](body).throwException
case _ => throw new HttpException(statusCode)
}
}

/**
* Makes a REST request to specific endpoint
*
Expand All @@ -67,17 +93,24 @@ case class ShardClient(client: HttpClient, shard: String) extends Endpoint {
request.addHeader("Expect", "100-continue")
}
request.setEntity(new StringEntity(data))
sendRequest(request)
}

val response = client.execute(request)
val statusCode = response.getStatusLine.getStatusCode
def postFile(
endpoint: String,
fileFieldName: String,
fileData: String,
fields: Map[String, String]): String = {
val post = new org.apache.http.client.methods.HttpPost(endpoint)

statusCode match {
case 200 => EntityUtils.toString(response.getEntity)
case 400 =>
val body = EntityUtils.toString(response.getEntity)
mapper.readValue[BricksException](body).throwException
case _ => throw new HttpException(statusCode)
val entityBuilder = MultipartEntityBuilder.create()
entityBuilder.addBinaryBody(fileFieldName, fileData.getBytes("UTF-8"))
fields.foreach { case (key, value) =>
entityBuilder.addTextBody(key, value)
}

post.setEntity(entityBuilder.build())
sendRequest(post)
}

def extract[A](json: String)(implicit mf: scala.reflect.Manifest[A]): A = {
Expand Down
50 changes: 50 additions & 0 deletions src/test/scala/com/databricks/CommandExecutionTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.databricks

import com.typesafe.config.ConfigFactory
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}

class CommandExecutionTests extends FlatSpec with Matchers with BeforeAndAfter {
var shard: ShardClient = _
var contextId: String = _
val clusterId: String = "1213-124217-drays466"

before {
shard = Shard(ConfigFactory.load("test-shard")).connect
val IdResult(id) = shard.ec.create("scala", clusterId)
contextId = id
}

after {
shard.ec.destroy(clusterId, contextId)
}

it should "command execution" in {
val IdResult(commandId) = shard.command.execute("scala", clusterId, contextId,
"1 + 1")
var status: String = ""
var res:CommandResult = null
do {
res = shard.command.status(clusterId, contextId, commandId)
println("command: " + res)
status = res.status
Thread.sleep(1000)
} while (status == "Running")

assert(res.results.asInstanceOf[ApiTextResult].data == "res0: Int = 2")
}

it should "command execution failure" in {
val IdResult(commandId) = shard.command.execute("scala", clusterId, contextId,
"""throw new IllegalArgumentException("Oops")""")
var status: String = ""
var res:CommandResult = null
do {
res = shard.command.status(clusterId, contextId, commandId)
println("command: " + res)
status = res.status
Thread.sleep(1000)
} while (status == "Running")

assert(res.results.asInstanceOf[ApiErrorResult].summary == Some("java.lang.IllegalArgumentException: Oops"))
}
}
21 changes: 21 additions & 0 deletions src/test/scala/com/databricks/ExecutionContextTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.databricks

import com.typesafe.config.ConfigFactory
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}

class ExecutionContextTests extends FlatSpec with Matchers with BeforeAndAfter {
var shard: ShardClient = _

before {
shard = Shard(ConfigFactory.load("test-shard")).connect
}

it should "roundtrip test" in {
val clusterId = "1213-124217-drays466"
val IdResult(contextId) = shard.ec.create("scala", clusterId)
val ContextIdStatusResult(id, status) = shard.ec.status(clusterId, contextId)
assert(id == contextId)
println("status: " + status)
shard.ec.destroy(clusterId, contextId)
}
}