diff --git a/build.sbt b/build.sbt index 1404fee..2c9c203 100644 --- a/build.sbt +++ b/build.sbt @@ -6,7 +6,7 @@ description := "sbt plugin for cassandra schema/data migrations using pillar (ht organization := "io.ino" -version := "1.0.0-RC2" +version := "1.0.0" licenses := Seq("Apache-2.0" -> url("http://www.apache.org/licenses/LICENSE-2.0.html")) @@ -14,11 +14,10 @@ scalaVersion := "2.10.4" scalacOptions += "-target:jvm-1.7" -// pillar intermediately published at bintray, while waiting for -// an official release (see https://github.com/comeara/pillar/issues/12) -resolvers += "bintray magro" at "http://dl.bintray.com/magro/maven" +libraryDependencies += "com.chrisomeara" %% "pillar" % "2.0.0" -libraryDependencies += "com.streamsend" %% "pillar" % "1.0.3-RC1" +// fix broken dependenxy in pillar until https://github.com/comeara/pillar/pull/14 is merged +libraryDependencies += "org.clapper" %% "argot" % "1.0.3" // Maven publishing info publishMavenStyle := true diff --git a/src/main/scala/io/ino/sbtpillar/Plugin.scala b/src/main/scala/io/ino/sbtpillar/Plugin.scala index 6ee1fdb..bf357c9 100644 --- a/src/main/scala/io/ino/sbtpillar/Plugin.scala +++ b/src/main/scala/io/ino/sbtpillar/Plugin.scala @@ -1,8 +1,9 @@ package io.ino.sbtpillar +import sbt.Keys._ import sbt._ -import Keys._ -import scala.Some + +import scala.util.Try object Plugin extends sbt.Plugin { @@ -14,35 +15,36 @@ object Plugin extends sbt.Plugin { val pillarConfigFile = settingKey[File]("Path to the configuration file holding the cassandra uri") val pillarConfigKey = settingKey[String]("Configuration key storing the cassandra url") + val pillarReplicationFactorConfigKey = settingKey[String]("Configuration key storing the replication factor to create keyspaces with") val pillarMigrationsDir = settingKey[File]("Path to the directory holding migration files") } - import PillarKeys._ - import Pillar.{withCassandraUrl, withSession} import com.datastax.driver.core.Session + import io.ino.sbtpillar.Plugin.Pillar.{withCassandraUrl, withSession} + import io.ino.sbtpillar.Plugin.PillarKeys._ - def pillarSettings: Seq[sbt.Def.Setting[_]] = Seq( + private def taskSettings: Seq[sbt.Def.Setting[_]] = Seq( createKeyspace := { - withCassandraUrl(pillarConfigFile.value, pillarConfigKey.value, streams.value.log) { url => + withCassandraUrl(pillarConfigFile.value, pillarConfigKey.value, pillarReplicationFactorConfigKey.value, streams.value.log) { (url, replicationFactor) => streams.value.log.info(s"Creating keyspace ${url.keyspace} at ${url.hosts(0)}:${url.port}") - Pillar.initialize(url.keyspace, url.hosts(0), url.port) + Pillar.initialize(replicationFactor, url, streams.value.log) } }, dropKeyspace := { - withCassandraUrl(pillarConfigFile.value, pillarConfigKey.value, streams.value.log) { url => + withCassandraUrl(pillarConfigFile.value, pillarConfigKey.value, pillarReplicationFactorConfigKey.value, streams.value.log) { (url, replicationFactor) => streams.value.log.info(s"Dropping keyspace ${url.keyspace} at ${url.hosts(0)}:${url.port}") - Pillar.destroy(url.keyspace, url.hosts(0), url.port) + Pillar.destroy(url, streams.value.log) } }, migrate := { - withCassandraUrl(pillarConfigFile.value, pillarConfigKey.value, streams.value.log) { url => - val dir = pillarMigrationsDir.value - streams.value.log.info(s"Migrating keyspace ${url.keyspace} at ${url.hosts(0)}:${url.port} using migrations in $dir") - Pillar.migrate(url.keyspace, url.hosts(0), url.port, dir) + withCassandraUrl(pillarConfigFile.value, pillarConfigKey.value, pillarReplicationFactorConfigKey.value, streams.value.log) { (url, replicationFactor) => + val migrationsDir = pillarMigrationsDir.value + streams.value.log.info(s"Migrating keyspace ${url.keyspace} at ${url.hosts(0)}:${url.port} using migrations in $migrationsDir") + Pillar.migrate(migrationsDir, url, streams.value.log) } }, cleanMigrate := { - withCassandraUrl(pillarConfigFile.value, pillarConfigKey.value, streams.value.log) { url => + withCassandraUrl(pillarConfigFile.value, pillarConfigKey.value, pillarReplicationFactorConfigKey.value, streams.value.log) { (url, replicationFactor) => val host = url.hosts(0) withSession(url, streams.value.log) { (url, session) => @@ -51,35 +53,43 @@ object Plugin extends sbt.Plugin { } streams.value.log.info(s"Creating keyspace ${url.keyspace} at $host:${url.port}") - Pillar.initialize(url.keyspace, host, url.port) + Pillar.initialize(replicationFactor, url, streams.value.log) val dir = pillarMigrationsDir.value streams.value.log.info(s"Migrating keyspace ${url.keyspace} at $host:${url.port} using migrations in $dir") - Pillar.migrate(url.keyspace, url.hosts(0), url.port, dir) + Pillar.migrate(dir, url, streams.value.log) } }, pillarConfigKey := "cassandra.url", + pillarReplicationFactorConfigKey := "cassandra.replicationFactor", pillarConfigFile := file("conf/application.conf"), pillarMigrationsDir := file("conf/migrations") ) + def pillarSettings: Seq[sbt.Def.Setting[_]] = inConfig(Test)(taskSettings) ++ taskSettings + private case class CassandraUrl(hosts: Seq[String], port: Int, keyspace: String) private object Pillar { + import java.nio.file.Files import com.datastax.driver.core.Cluster - import com.streamsend.pillar._ + import com.chrisomeara.pillar._ import com.typesafe.config.ConfigFactory - import java.nio.file.Files import scala.util.control.NonFatal - def withCassandraUrl(configFile: File, configKey: String, logger: Logger)(block: CassandraUrl => Unit): Unit = { + private val DEFAULT_REPLICATION_FACTOR = 3 - logger.info(s"Reading config from ${configFile.getAbsolutePath}") - val urlString = ConfigFactory.parseFile(configFile).resolve().getString(configKey) + def withCassandraUrl(configFile: File, configKey: String, repFactorConfigKey: String, logger: Logger)(block: (CassandraUrl, Int) => Unit): Unit = { + val configFileMod = file(sys.env.getOrElse("PILLAR_CONFIG_FILE", configFile.getAbsolutePath)) + logger.info(s"Reading config from ${configFileMod.getAbsolutePath}") + val config = ConfigFactory.parseFile(configFileMod).resolve() + val urlString = config.getString(configKey) val url = parseUrl(urlString) + + val replicationFactor = Try(config.getInt(repFactorConfigKey)).getOrElse(DEFAULT_REPLICATION_FACTOR) try { - block(url) + block(url, replicationFactor) } catch { case NonFatal(e) => logger.error(s"An error occurred while performing task: $e") @@ -101,20 +111,24 @@ object Plugin extends sbt.Plugin { } } - def initialize(keyspace: String, host: String, port: Int): Unit = { - val dataStore = DataStore("faker", keyspace, host) - Migrator(Registry(Seq.empty)).initialize(dataStore) + def initialize(replicationFactor: Int, url: CassandraUrl, logger: Logger): Unit = { + withSession(url, logger) { (url, session) => + Migrator(Registry(Seq.empty)).initialize(session, url.keyspace, replicationOptionsWith(replicationFactor = replicationFactor)) + } } - def destroy(keyspace: String, host: String, port: Int): Unit = { - val dataStore = DataStore("faker", keyspace, host) - Migrator(Registry(Seq.empty)).destroy(dataStore) + def destroy(url: CassandraUrl, logger: Logger): Unit = { + withSession(url, logger) { (url, session) => + Migrator(Registry(Seq.empty)).destroy(session, url.keyspace) + } } - def migrate(keyspace: String, host: String, port: Int, migrationsDir: File): Unit = { - val registry = Registry(loadMigrations(migrationsDir)) - val dataStore = DataStore("faker", keyspace, host) - Migrator(registry).migrate(dataStore) + def migrate(migrationsDir: File, url: CassandraUrl, logger: Logger): Unit = { + withSession(url, logger) { (url, session) => + val registry = Registry(loadMigrations(migrationsDir)) + session.execute(s"USE ${url.keyspace}") + Migrator(registry).migrate(session) + } } private def parseUrl(urlString: String): CassandraUrl = { @@ -127,20 +141,22 @@ object Plugin extends sbt.Plugin { } private def loadMigrations(migrationsDir: File) = { - val parser = com.streamsend.pillar.Parser() + val parser = com.chrisomeara.pillar.Parser() Option(migrationsDir.listFiles()) match { case Some(files) => files.map { f => - val in = Files.newInputStream(f.toPath) - try { - parser.parse(in) - } finally { - in.close() - } - }.toList + val in = Files.newInputStream(f.toPath) + try { + parser.parse(in) + } finally { + in.close() + } + }.toList case None => throw new IllegalArgumentException("The pillarMigrationsDir does not contain any migration files - wrong configuration?") } } + private def replicationOptionsWith(replicationFactor: Int) = + new ReplicationOptions(Map("class" -> "SimpleStrategy", "replication_factor" -> replicationFactor)) } } \ No newline at end of file