Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 2.1 support, add tests and implement CollReduce for Dataset #35

Open
wants to merge 27 commits into
base: sql
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0d6c5df
Move with-resources to fixtures, use in core-test
viesti Mar 19, 2017
51d10de
Extract example-asserts from core-test, use them in integration test
viesti Mar 20, 2017
f18164c
Add assert for keg/by-key
viesti Mar 20, 2017
d197cd1
Fix for #24, choose type hint based on Spark version
viesti Mar 22, 2017
110178b
Choose type hint based on signature of leftOuterJoin
viesti Mar 24, 2017
59993aa
Use Clojure’s DynamicClassLoader to make keg/join test work
viesti Mar 24, 2017
4ea9331
Collect to map in order to not rely on ordering
viesti Mar 24, 2017
9d65ddf
Rename has-class? to has-method?
viesti Mar 24, 2017
d1affd4
Narrow scope of compile-cond
viesti Mar 24, 2017
fff8dc2
failting test for #27
cgrand Mar 28, 2017
84289fa
Fixing #27: all fns have unqiue class names now
cgrand Mar 28, 2017
3306cf2
Merge pull request #28 from viesti/fix/support-spark-optional
cgrand Mar 28, 2017
8c141d0
proper exception handling
cgrand Mar 28, 2017
03d92ac
make signature matching more expressive
cgrand Mar 28, 2017
a26a0f8
fix compile-cond to really throw when no match and fix the failing te…
cgrand Mar 28, 2017
38dbde1
Merge branch 'master' of https://github.com/viesti/powderkeg into vie…
cgrand Mar 28, 2017
af03ec2
Use clojure-dynamic-classloader fixture for keg/join (and move cloj…
cgrand Mar 28, 2017
36853cc
Merge branch 'viesti-master'
cgrand Mar 28, 2017
af414a0
0.5.1
cgrand Mar 28, 2017
3f7598c
Move fixtures out of deftest forms into runner ns
viesti Mar 28, 2017
1c258b6
Log Spark docker startup/shutdown
viesti Mar 29, 2017
43a5df4
Remove asserts namespace, bring asserts back to core-test
viesti Mar 29, 2017
25003e0
Merge branch 'master' into sql-test-into
viesti Apr 1, 2017
c72c41c
Format ns form, strip trailing whitespace
viesti Mar 18, 2017
662eda8
Make it work on Spark 2.1
viesti Mar 18, 2017
20f36cb
Add tests for sql namespace (df, spec-of and exec)
viesti Mar 22, 2017
79919ba
Make Dataset/DataFrame implement CollReduce
viesti Mar 21, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ First, add Powderkeg and Spark (1.5 to 2.1 are supported, any scala version) to
E.g.

```clj
:dependencies [[hcadatalab/powderkeg "0.5.0"]
:dependencies [[hcadatalab/powderkeg "0.5.1"]
[org.apache.spark/spark-core_2.11 "2.1.0"]
[org.apache.spark/spark-streaming_2.11 "2.1.0"]]
```
or

```clj
:dependencies [[hcadatalab/powderkeg "0.5.0"]
:dependencies [[hcadatalab/powderkeg "0.5.1"]
[org.apache.spark/spark-core_2.10 "1.5.2"]
[org.apache.spark/spark-streaming_2.10 "1.5.2"]]
```
Expand Down
6 changes: 3 additions & 3 deletions circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ machine:

test:
override:
- lein test-all
- lein test-all-local
post:
- lein with-profile spark2 test :only powderkeg.integration-test/rdd-spark-2.1.0
- lein with-profile spark1.5 test :only powderkeg.integration-test/rdd-spark-1.5.2
- lein test-all-1.5
- lein test-all-2.1
6 changes: 4 additions & 2 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
(defproject hcadatalab/powderkeg "0.5.0"
(defproject hcadatalab/powderkeg "0.5.1"
:description "Live-coding Spark clusters!"
:url "https://github.com/HCADatalab/powderkeg"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:aliases {"test-all" ["with-profile" "spark1.5:spark2" "test"]}
:aliases {"test-all-local" ["with-profile" "spark1.5:spark2" "run" "-m" "powderkeg.runner/run-tests-local-spark"]
"test-all-1.5" ["with-profile" "spark1.5" "run" "-m" "powderkeg.runner/run-tests-foreign-spark-1.5"]
"test-all-2.1" ["with-profile" "spark2" "run" "-m" "powderkeg.runner/run-tests-foreign-spark-2.1"]}
:test-selectors {:default (complement :integration)
:integration :integration}
:profiles {:default [:spark2]
Expand Down
110 changes: 89 additions & 21 deletions src/main/clojure/powderkeg/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
[powderkeg.ouroboros :as ou] ; must be first
[powderkeg.kryo :as kryo]
[powderkeg.pool :as pool]
[powderkeg.macros :refer [compile-cond]]
[cemerick.pomegranate.aether :as mvn]
[clojure.core :as clj]
[clojure.java.io :as io]
Expand All @@ -13,25 +14,74 @@
[clojure.core.reducers :as r]
[net.cgrand.xforms :as x]))

(defn- generic-type [t]
(condp instance? t
java.lang.Class (symbol (.getName t))
java.lang.reflect.TypeVariable (if-some [bounds (seq (remove #{Object} (.getBounds t)))]
(list* 'extends (keyword (.getName t))
(map generic-type bounds))
(keyword (.getName t)))
java.lang.reflect.WildcardType (if-some [bounds (seq (remove #{Object} (.getUpperBounds t)))]
(list* 'extends '? (map generic-type bounds))
(if-some [bounds (seq (.getLowerBounds t))]
(list* 'super '? (map generic-type bounds))
'?))
java.lang.reflect.ParameterizedType (clj/into [(generic-type (.getRawType t))]
(map generic-type)
(.getActualTypeArguments t))
java.lang.reflect.GenericArrayType (list 'array-of (generic-type (.getGenericComponentType t)))
(throw (IllegalArgumentException. (pr-str t)))))

(defn- type-order [t]
(cond
(nil? t) [-1 0]
(= t '?) [0 0]
(keyword? t) [1 t]
(symbol? t) [2 t]
(vector? t) [3 (mapv type-order t)]
:else (case (first t)
super [4 (type-order (second t))]
extends [5 (mapv type-order t)]
array-of [6 (type-order (second t))])))

(defn- lookup [s k]
(if-some [x (s k)]
(if (keyword? x)
(recur s x)
x)
k))

(defn- unify-types
([a b]
(unify-types {} a b))
([s a b]
; todo: most advanced stuff (bounds) is skipped
(let [[a b] (sort-by type-order (map #(lookup s %) [a b]))]
(if (= a b)
s
(cond
(nil? a) nil
(= a '?) s
(keyword? a) (assoc s a b)
(symbol? a) (when (vector? b) (recur s a (first b)))
(vector? a) (when (and (vector? b) (= (count a) (count b)))
(reduce (fn [s [a b]]
(or (unify-types s a b)
(reduced nil))) s (map vector a b))))))))

(defn- unify-sigs [[ra na aa] [rb nb ab]]
(when (= na nb)
(some-> (unify-types ra rb) (unify-types aa ab))))

(defn- sig [x]
(cond
(instance? java.lang.reflect.Method x)
[(.getReturnType x) (.getName x) (vec (.getParameterTypes x))]
(instance? java.lang.reflect.Method x)
[(generic-type (.getGenericReturnType x)) (.getName x) (clj/into [] (map generic-type) (.getGenericParameterTypes x))]
(instance? java.lang.reflect.Constructor x)
[nil "<init>" (vec (.getParameterTypes x))]))

(defn- has-class? [^Class class msig]
(some #(= msig (sig %)) (concat (.getMethods class) (.getConstructors class))))
[nil "<init>" (clj/into [] (map generic-type) (.getGenericParameterTypes x))]))

(defmacro ^:private compile-cond [& choices]
(let [x (Object.)
expr
(reduce (fn [_ [test expr]]
(when (eval test) (reduced expr)))
x (partition 2 choices))]
(when (= x expr)
(throw (ex-info "No valid choice." {:form &form})))
expr))
(defn- has-method? [^Class class msig]
(some #(unify-sigs msig (sig %)) (concat (.getMethods class) (.getConstructors class))))

(defn- all-files
"Returns a map of relative paths (as Strings) to Files for all files (not directories) below the argument."
Expand Down Expand Up @@ -281,9 +331,9 @@
(reify org.apache.spark.api.java.function.FlatMapFunction ; todo: skip api.java.* go to spark
(call [_ it]
(compile-cond
(has-class? org.apache.spark.api.java.function.FlatMapFunction [java.util.Iterator "call" [Object]])
(has-method? org.apache.spark.api.java.function.FlatMapFunction '[java.util.Iterator "call" [?]])
(.iterator ((df) it))
(has-class? org.apache.spark.api.java.function.FlatMapFunction [Iterable "call" [Object]])
(has-method? org.apache.spark.api.java.function.FlatMapFunction '[java.lang.Iterable "call" [?]])
((df) it))))
preserve-partitioning)]
rdd))
Expand Down Expand Up @@ -384,7 +434,16 @@
(x/kvrf
([] (rf))
([acc] (rf acc))
([acc left right] (rf acc (.or ^com.google.common.base.Optional left not-found) right))))))
([acc left right] (rf acc (.or (compile-cond
(has-method? org.apache.spark.api.java.JavaPairRDD
'[[org.apache.spark.api.java.JavaPairRDD ? [scala.Tuple2 ? org.apache.spark.api.java.Optional]]
"leftOuterJoin" [?]])
^org.apache.spark.api.java.Optional left
(has-method? org.apache.spark.api.java.JavaPairRDD
'[[org.apache.spark.api.java.JavaPairRDD ? [scala.Tuple2 ? com.google.common.base.Optional]]
"leftOuterJoin" [?]])
^com.google.common.base.Optional left)
not-found) right))))))

(defn- default-right
"Returns a stateless transducer on pairs which expects Optionals in value position, unwraps their values or return not-found when no value."
Expand All @@ -394,7 +453,16 @@
(x/kvrf
([] (rf))
([acc] (rf acc))
([acc left right] (rf acc left (.or ^com.google.common.base.Optional right not-found)))))))
([acc left right] (rf acc left (.or (compile-cond
(has-method? org.apache.spark.api.java.JavaPairRDD
'[[org.apache.spark.api.java.JavaPairRDD ? [scala.Tuple2 ? org.apache.spark.api.java.Optional]]
"leftOuterJoin" [?]])
^org.apache.spark.api.java.Optional right
(has-method? org.apache.spark.api.java.JavaPairRDD
'[[org.apache.spark.api.java.JavaPairRDD ? [scala.Tuple2 ? com.google.common.base.Optional]]
"leftOuterJoin" [?]])
^com.google.common.base.Optional right)
not-found)))))))

(defn ^org.apache.spark.api.java.JavaRDD join
"Performs a join between two rdds, each rdd may be followed by ':or default-value'.
Expand Down Expand Up @@ -439,12 +507,12 @@
partitioner (org.apache.spark.Partitioner/defaultPartitioner
rdd (scala-seq rdds))]
(by-key (compile-cond
(has-class? org.apache.spark.rdd.CoGroupedRDD [nil "<init>" [scala.collection.Seq org.apache.spark.Partitioner scala.reflect.ClassTag]] )
(has-method? org.apache.spark.rdd.CoGroupedRDD '[nil "<init>" [scala.collection.Seq org.apache.spark.Partitioner scala.reflect.ClassTag]] )
(org.apache.spark.rdd.CoGroupedRDD.
(scala-seq (cons rdd rdds))
partitioner
(.AnyRef scala.reflect.ClassTag$/MODULE$))
(has-class? org.apache.spark.rdd.CoGroupedRDD [nil "<init>" [scala.collection.Seq org.apache.spark.Partitioner]] )
(has-method? org.apache.spark.rdd.CoGroupedRDD '[nil "<init>" [scala.collection.Seq org.apache.spark.Partitioner]] )
(org.apache.spark.rdd.CoGroupedRDD.
(scala-seq (cons rdd rdds))
partitioner))
Expand Down
12 changes: 12 additions & 0 deletions src/main/clojure/powderkeg/macros.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
(ns powderkeg.macros)

(defmacro compile-cond [& choices]
(let [x (Object.)
expr
(reduce (fn [x [test expr]]
(if (eval test) (reduced expr) x))
x (partition 2 choices))]
(when (= x expr)
(throw (ex-info "No valid choice." {:form &form})))
expr))

92 changes: 84 additions & 8 deletions src/main/clojure/powderkeg/ouroboros.clj
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,86 @@
(.accept rdr class-visitor 0)
(.toByteArray cw)))

(def var-transform
(defmacro ^:private proxy-unique-fns-method-visitor [mv]
(let [methods+arglists
(into [] (keep #(when (and (.startsWith (.getName %) "visit") (= Void/TYPE (.getReturnType %)))
[(.getName %) (repeatedly (count (.getParameterTypes %)) gensym)]))
(.getMethods clojure.asm.MethodVisitor))
sm (gensym "sm")]
`(let [insns# ~(into #{}
(comp (map first) (filter #(.endsWith % "Insn")) (map keyword))
methods+arglists)
mv# ^clojure.asm.MethodVisitor ~mv
q# (java.util.ArrayDeque.)
meths# ~(into {}
(for [[name arglist] methods+arglists]
[(keyword name)
`(fn [mv# [~@arglist]]
(~(symbol (str \. name)) mv# ~@arglist))]))
flush!# (fn flush!#
([] (flush!# #{}))
([black-list#]
(when-some [[meth# args#] (.poll q#)]
(when-not (black-list# meth#) ((meths# meth#) mv# args#))
(recur black-list#))))
~sm (fn [meth# args#]
(->
(->> q# (map first) (keep insns#) count)
(case
0 (and (= meth# :visitVarInsn)
(let [[op# idx#] args#]
(and (= op# clojure.asm.Opcodes/ALOAD)
(= idx# 7)))) ; brittle
1 (or (not (insns# meth#))
(and (= meth# :visitJumpInsn)
(let [[op# label#] args#]
(= op# clojure.asm.Opcodes/IFNULL))))
2 (or (not (insns# meth#))
(when
(and (= meth# :visitTypeInsn)
(let [[op# type#] args#]
(and (= op# clojure.asm.Opcodes/NEW)
(= type# "java/lang/StringBuilder"))))
:done))
nil)
(case
(nil false) (do
(flush!#)
((meths# meth#) mv# args#))
:done (do (flush!# insns#) ((meths# meth#) mv# args#))
true (.add q# [meth# args#]))))]
(proxy [clojure.asm.MethodVisitor] [clojure.asm.Opcodes/ASM4 mv#]
~@(for [[name arglist] methods+arglists]
`(~(symbol name) [~@arglist]
(~sm ~(keyword name) [~@arglist])))))))

(defn unique-fns [bytes]
(let [rdr (clojure.asm.ClassReader. bytes)
cw (clojure.asm.ClassWriter. 0)
class-visitor
(proxy [clojure.asm.ClassVisitor] [clojure.asm.Opcodes/ASM4 cw]
(visitMethod [access method-name mdesc sig exs]
(let [mv (.visitMethod cw access method-name mdesc sig exs)]
(if (= method-name "parse")
(proxy-unique-fns-method-visitor mv)
mv))))]
(.accept rdr class-visitor 0)
(.toByteArray cw)))

(defn transform-classes-once
"Runs a transformation (function from bytes to bytes) on the specified classes once."
[instrumentation classes bytes-transform]
"ClassFileTransfomer which instruments clojure.lang.Var."
(reify java.io.Serializable
java.lang.instrument.ClassFileTransformer
(transform [_ loader classname class domain bytes]
(when (= "clojure/lang/Var" classname)
(watch-vars bytes)))))
(let [classes (set classes)
transformer (reify java.io.Serializable
java.lang.instrument.ClassFileTransformer
(transform [_ loader classname class domain bytes]
(when (classes class) (bytes-transform bytes))))]
(.addTransformer instrumentation transformer true)
(try
(.retransformClasses instrumentation (into-array classes))
(finally
(.removeTransformer instrumentation transformer)))))

(defn ^java.util.jar.Manifest manifest
"Creates an MANIFEST.MF out of a map"
Expand Down Expand Up @@ -206,6 +279,9 @@
(println "done!"))

(print "Instrumenting clojure.lang.Var... ")
(.addTransformer powderkeg.Agent/instrumentation var-transform true)
(.retransformClasses powderkeg.Agent/instrumentation (into-array [clojure.lang.Var]))
(transform-classes-once powderkeg.Agent/instrumentation [clojure.lang.Var] watch-vars)
(println "done!")

(print "Patching clojure.lang.Compiler... ")
(transform-classes-once powderkeg.Agent/instrumentation [clojure.lang.Compiler$FnExpr] unique-fns)
(println "done!")))
Loading