Skip to content

rubanm/ignite-scala

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

23 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

ignite scala

Scala API for distributed closures on Apache Ignite. Inspired by Scalding.

http://apacheignite.readme.io/v1.0/docs/distributed-closures

example 0 - cluster setup

import org.apache.ignite._
import org.apache.ignite.configuration._
import ignite.scala._

val cfg = new IgniteConfiguration // configure as appropriate
val ignite = Ignition.start(cfg)
val compute = ignite.compute(ignite.cluster)
implicit val cr = ComputeRunner(compute)

example 1 - character count

import com.twitter.algebird.Semigroup

implicit val sg = Semigroup.intSemigroup

val chain = IgnitePipe.from("The quick brown fox jumps over the lazy dog.".split(" "))
  .map(_.length) // fork
  .reduce // join

chain.execute // Option[Int]

example 2 - working with distributed cache

val cache = {
  val cfg = new CacheConfiguration[K, V]
  cfg.setName(name)
  ignite.getOrCreateCache(cfg)
}

val process: V => R // computation
val isValid: R => Boolean

IgnitePipe.from(keys)
  .map { k =>
    val v = cache.get(k)
    process(v)
  }
  .filter(isValid)
  .execute // Iterable[R]

example 3 - collocating compute with cache

Ignite allows routing computations to the nodes where data is cached.

IgnitePipe.collocated(cache, keys) { (c, k) =>
  val v = c.localPeek(k)
  process(v)
}
.filter(isValid)
.execute

example 4 - more chaining

val cache: IgniteCache[K, V]
val db: CacheJdbcBlobStore[K, V]

val cacheResults = IgnitePipe.from(keys)
  .map { k => cacheGetAndCompute(cache, k) }

val dbResults = IgnitePipe.from(keys)
  .map { k => dbGetAndCompute(db, k) }
  
val combined = (cacheResults ++ dbResults)
  .reduce // reduction could be to consolidate cache and db for instance
  .toPipe // continuation
  
combined.map { exportResults(_) }.execute

installing

Add the following to your build.sbt (fetches from sonatype)

resolvers += Resolver.sonatypeRepo("releases")
libraryDependencies += "com.github.rubanm" %% "ignite-scala" % "0.0.1"

core api

/* Provides composable distributed closures that can run on Apache Ignite. */
trait IgnitePipe[T] {
  
  def map[U](f: T => U): IgnitePipe[U]
  
  def flatMap[U](f: T => TraversableOnce[U]): IgnitePipe[U]
  
  def ++(p: IgnitePipe[T]): IgnitePipe[T]
  
  def reduce(implicit sg: Semigroup[T]): Reduction[T]
  
  def execute: Iterable[T]
}

/* Represents a reduction of the distributed closure results.*/
trait Reduction[T] {

  def execute: Option[T]

  def toPipe: IgnitePipe[T]
}

About

Scala API for distributed closures on Apache Ignite

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages