Skip to content

Commit

Permalink
This way too large commit introduces various features
Browse files Browse the repository at this point in the history
- Upgrade to pillar 2.0.0
- Added cassandra.replicationFactor config option to allow configuring different replication factors than 3. This is used in testing if one runs against a single node.
- Added the Tasks and Config entries to the Test and Default configuration so one can provide different config for testing and "normale" usage
- Auto "USE keyspace" after Pillar.initialize to select the proper keyspace. Could be fixed with comeara/pillar#16
- Version bump to 1.0.0 as it is used in production now
  • Loading branch information
fkoehler committed Jul 14, 2014
1 parent 74bb6c6 commit 2a5f2bf
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 45 deletions.
9 changes: 4 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,18 @@ description := "sbt plugin for cassandra schema/data migrations using pillar (ht

organization := "io.ino"

version := "1.0.0-RC2"
version := "1.0.0"

licenses := Seq("Apache-2.0" -> url("http://www.apache.org/licenses/LICENSE-2.0.html"))

scalaVersion := "2.10.4"

scalacOptions += "-target:jvm-1.7"

// pillar intermediately published at bintray, while waiting for
// an official release (see https://github.com/comeara/pillar/issues/12)
resolvers += "bintray magro" at "http://dl.bintray.com/magro/maven"
libraryDependencies += "com.chrisomeara" %% "pillar" % "2.0.0"

libraryDependencies += "com.streamsend" %% "pillar" % "1.0.3-RC1"
// fix broken dependenxy in pillar until https://github.com/comeara/pillar/pull/14 is merged
libraryDependencies += "org.clapper" %% "argot" % "1.0.3"

// Maven publishing info
publishMavenStyle := true
Expand Down
96 changes: 56 additions & 40 deletions src/main/scala/io/ino/sbtpillar/Plugin.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package io.ino.sbtpillar

import sbt.Keys._
import sbt._
import Keys._
import scala.Some

import scala.util.Try

object Plugin extends sbt.Plugin {

Expand All @@ -14,35 +15,36 @@ object Plugin extends sbt.Plugin {

val pillarConfigFile = settingKey[File]("Path to the configuration file holding the cassandra uri")
val pillarConfigKey = settingKey[String]("Configuration key storing the cassandra url")
val pillarReplicationFactorConfigKey = settingKey[String]("Configuration key storing the replication factor to create keyspaces with")
val pillarMigrationsDir = settingKey[File]("Path to the directory holding migration files")
}

import PillarKeys._
import Pillar.{withCassandraUrl, withSession}
import com.datastax.driver.core.Session
import io.ino.sbtpillar.Plugin.Pillar.{withCassandraUrl, withSession}
import io.ino.sbtpillar.Plugin.PillarKeys._

def pillarSettings: Seq[sbt.Def.Setting[_]] = Seq(
private def taskSettings: Seq[sbt.Def.Setting[_]] = Seq(
createKeyspace := {
withCassandraUrl(pillarConfigFile.value, pillarConfigKey.value, streams.value.log) { url =>
withCassandraUrl(pillarConfigFile.value, pillarConfigKey.value, pillarReplicationFactorConfigKey.value, streams.value.log) { (url, replicationFactor) =>
streams.value.log.info(s"Creating keyspace ${url.keyspace} at ${url.hosts(0)}:${url.port}")
Pillar.initialize(url.keyspace, url.hosts(0), url.port)
Pillar.initialize(replicationFactor, url, streams.value.log)
}
},
dropKeyspace := {
withCassandraUrl(pillarConfigFile.value, pillarConfigKey.value, streams.value.log) { url =>
withCassandraUrl(pillarConfigFile.value, pillarConfigKey.value, pillarReplicationFactorConfigKey.value, streams.value.log) { (url, replicationFactor) =>
streams.value.log.info(s"Dropping keyspace ${url.keyspace} at ${url.hosts(0)}:${url.port}")
Pillar.destroy(url.keyspace, url.hosts(0), url.port)
Pillar.destroy(url, streams.value.log)
}
},
migrate := {
withCassandraUrl(pillarConfigFile.value, pillarConfigKey.value, streams.value.log) { url =>
val dir = pillarMigrationsDir.value
streams.value.log.info(s"Migrating keyspace ${url.keyspace} at ${url.hosts(0)}:${url.port} using migrations in $dir")
Pillar.migrate(url.keyspace, url.hosts(0), url.port, dir)
withCassandraUrl(pillarConfigFile.value, pillarConfigKey.value, pillarReplicationFactorConfigKey.value, streams.value.log) { (url, replicationFactor) =>
val migrationsDir = pillarMigrationsDir.value
streams.value.log.info(s"Migrating keyspace ${url.keyspace} at ${url.hosts(0)}:${url.port} using migrations in $migrationsDir")
Pillar.migrate(migrationsDir, url, streams.value.log)
}
},
cleanMigrate := {
withCassandraUrl(pillarConfigFile.value, pillarConfigKey.value, streams.value.log) { url =>
withCassandraUrl(pillarConfigFile.value, pillarConfigKey.value, pillarReplicationFactorConfigKey.value, streams.value.log) { (url, replicationFactor) =>
val host = url.hosts(0)

withSession(url, streams.value.log) { (url, session) =>
Expand All @@ -51,35 +53,43 @@ object Plugin extends sbt.Plugin {
}

streams.value.log.info(s"Creating keyspace ${url.keyspace} at $host:${url.port}")
Pillar.initialize(url.keyspace, host, url.port)
Pillar.initialize(replicationFactor, url, streams.value.log)

val dir = pillarMigrationsDir.value
streams.value.log.info(s"Migrating keyspace ${url.keyspace} at $host:${url.port} using migrations in $dir")
Pillar.migrate(url.keyspace, url.hosts(0), url.port, dir)
Pillar.migrate(dir, url, streams.value.log)
}
},
pillarConfigKey := "cassandra.url",
pillarReplicationFactorConfigKey := "cassandra.replicationFactor",
pillarConfigFile := file("conf/application.conf"),
pillarMigrationsDir := file("conf/migrations")
)

def pillarSettings: Seq[sbt.Def.Setting[_]] = inConfig(Test)(taskSettings) ++ taskSettings

private case class CassandraUrl(hosts: Seq[String], port: Int, keyspace: String)

private object Pillar {

import java.nio.file.Files
import com.datastax.driver.core.Cluster
import com.streamsend.pillar._
import com.chrisomeara.pillar._
import com.typesafe.config.ConfigFactory
import java.nio.file.Files
import scala.util.control.NonFatal

def withCassandraUrl(configFile: File, configKey: String, logger: Logger)(block: CassandraUrl => Unit): Unit = {
private val DEFAULT_REPLICATION_FACTOR = 3

logger.info(s"Reading config from ${configFile.getAbsolutePath}")
val urlString = ConfigFactory.parseFile(configFile).resolve().getString(configKey)
def withCassandraUrl(configFile: File, configKey: String, repFactorConfigKey: String, logger: Logger)(block: (CassandraUrl, Int) => Unit): Unit = {
val configFileMod = file(sys.env.getOrElse("PILLAR_CONFIG_FILE", configFile.getAbsolutePath))
logger.info(s"Reading config from ${configFileMod.getAbsolutePath}")
val config = ConfigFactory.parseFile(configFileMod).resolve()
val urlString = config.getString(configKey)
val url = parseUrl(urlString)

val replicationFactor = Try(config.getInt(repFactorConfigKey)).getOrElse(DEFAULT_REPLICATION_FACTOR)
try {
block(url)
block(url, replicationFactor)
} catch {
case NonFatal(e) =>
logger.error(s"An error occurred while performing task: $e")
Expand All @@ -101,20 +111,24 @@ object Plugin extends sbt.Plugin {
}
}

def initialize(keyspace: String, host: String, port: Int): Unit = {
val dataStore = DataStore("faker", keyspace, host)
Migrator(Registry(Seq.empty)).initialize(dataStore)
def initialize(replicationFactor: Int, url: CassandraUrl, logger: Logger): Unit = {
withSession(url, logger) { (url, session) =>
Migrator(Registry(Seq.empty)).initialize(session, url.keyspace, replicationOptionsWith(replicationFactor = replicationFactor))
}
}

def destroy(keyspace: String, host: String, port: Int): Unit = {
val dataStore = DataStore("faker", keyspace, host)
Migrator(Registry(Seq.empty)).destroy(dataStore)
def destroy(url: CassandraUrl, logger: Logger): Unit = {
withSession(url, logger) { (url, session) =>
Migrator(Registry(Seq.empty)).destroy(session, url.keyspace)
}
}

def migrate(keyspace: String, host: String, port: Int, migrationsDir: File): Unit = {
val registry = Registry(loadMigrations(migrationsDir))
val dataStore = DataStore("faker", keyspace, host)
Migrator(registry).migrate(dataStore)
def migrate(migrationsDir: File, url: CassandraUrl, logger: Logger): Unit = {
withSession(url, logger) { (url, session) =>
val registry = Registry(loadMigrations(migrationsDir))
session.execute(s"USE ${url.keyspace}")
Migrator(registry).migrate(session)
}
}

private def parseUrl(urlString: String): CassandraUrl = {
Expand All @@ -127,20 +141,22 @@ object Plugin extends sbt.Plugin {
}

private def loadMigrations(migrationsDir: File) = {
val parser = com.streamsend.pillar.Parser()
val parser = com.chrisomeara.pillar.Parser()
Option(migrationsDir.listFiles()) match {
case Some(files) => files.map { f =>
val in = Files.newInputStream(f.toPath)
try {
parser.parse(in)
} finally {
in.close()
}
}.toList
val in = Files.newInputStream(f.toPath)
try {
parser.parse(in)
} finally {
in.close()
}
}.toList
case None => throw new IllegalArgumentException("The pillarMigrationsDir does not contain any migration files - wrong configuration?")
}
}

private def replicationOptionsWith(replicationFactor: Int) =
new ReplicationOptions(Map("class" -> "SimpleStrategy", "replication_factor" -> replicationFactor))
}

}

0 comments on commit 2a5f2bf

Please sign in to comment.