Skip to content

Commit

Permalink
Merge pull request #51 from clojure-finance/main
Browse files Browse the repository at this point in the history
release 1.1.0
  • Loading branch information
hkulyc authored Jan 8, 2022
2 parents f27c6bf + 2b5a725 commit a6488fd
Show file tree
Hide file tree
Showing 22 changed files with 657 additions and 528 deletions.
135 changes: 80 additions & 55 deletions doc/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,46 +207,8 @@ You can also group by the combination of keys. (Use the above two rules together
;; get the min of the two columns grouped by ...
```



- sort

**Immediately** sort the dataframe

| Argument | Type | Function | Remarks |
| ------------------ | ----------------------- | ------------------------ | ------------------------------------------------------------ |
| `dataframe` | Clojask.DataFrame | The operated object | |
| `trending list` | Collection (seq vector) | Indicates the sort order | Example: ["Salary" "+" "Employee" "-"] means that sort the Salary in ascending order, if equal sort the Employee in descending order |
| `output-directory` | String | The output path | |

**Example**

```clojure
(sort y ["+" "Salary"] "resources/sort.csv")
;; sort by Salary ascendingly
```



- compute

Compute the result. The pre-defined lazy operations will be executed in pipeline, ie the result of the previous operation becomes the argument of the next operation.

| Argument | Type | Function | Remarks |
| ---------------- | ----------------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
| `dataframe` | Clojask.DataFrame | The operated object | |
| `num of workers` | int (max 8) | The number of worker instances (except the input and output nodes) | If this argument >= 2, will use [onyx](http://www.onyxplatform.org/) as the distributed platform |
| `output path` | String | The path of the output csv file | Could exist or not. |
| [`exception`] | boolean | Whether an exception during calculation will cause termination | Is useful for debugging or detecting empty fields |

**Example**

```clojure
(compute x 8 "../resources/test.csv" :exception true)
;; computes all the pre-registered operations
```



- inner-join / left-join / right-join

Expand All @@ -258,40 +220,52 @@ You can also group by the combination of keys. (Use the above two rules together

*Will automatically pipeline the registered operations and filters like `compute`. You could think of join as first compute the two dataframes then join.*

| Argument | Type | Function | Remarks |
| ------------------- | ------------------- | ------------------------------------------------------------ | ------------------------------------------------- |
| `dataframe a` | Clojask.DataFrame | The operated object | |
| `dataframe b` | Clojask.DataFrame | The operated object | |
| `a join keys` | String / Collection | The keys of a to be aligned | Find the specification [here](#groupby-keys) |
| `b join keys` | String / Collection | The keys of b to be aligned | Find the specification [here](#groupby-keys) |
| `number of workers` | int (max 8) | Number of worker nodes doing the joining | |
| `distination file` | string | The file path to the distination | Will be emptied first |
| [`exception`] | boolean | Whether an exception during calculation will cause termination | Is useful for debugging or detecting empty fields |
| Argument | Type | Function | Remarks |
| ------------- | ------------------- | --------------------------- | -------------------------------------------- |
| `dataframe a` | Clojask.DataFrame | The operated object | |
| `dataframe b` | Clojask.DataFrame | The operated object | |
| `a join keys` | String / Collection | The keys of a to be aligned | Find the specification [here](#groupby-keys) |
| `b join keys` | String / Collection | The keys of b to be aligned | Find the specification [here](#groupby-keys) |

**Example**
**Return**

A Clojask.JoinedDataFrame

- Unlike Clojask.DataFrame, it only supports three operations:
- `print-df`
- `get-col-names`
- `compute`
- This means you cannot further apply complicated operations to a joined dataframe. An alternative is to first compute the result, then read it in as a new dataframe.

**Example**

```clojure
(def x (dataframe "path/to/a"))
(def y (dataframe "path/to/b"))

(inner-join x y ["col a 1" "col a 2"] ["col b 1" "col b 2"] 8 "path/to/distination" :exception true)
(def z (inner-join x y ["col a 1" "col a 2"] ["col b 1" "col b 2"]))
(compute z 8 "path/to/output")
;; inner join x and y

(left-join x y ["col a 1" "col a 2"] ["col b 1" "col b 2"] 8 "path/to/distination" :exception true)
(def z (left-join x y ["col a 1" "col a 2"] ["col b 1" "col b 2"]))
(compute z 8 "path/to/output")
;; left join x and y

(right-join x y ["col a 1" "col a 2"] ["col b 1" "col b 2"] 8 "path/to/distination" :exception true)
(def z (right-join x y ["col a 1" "col a 2"] ["col b 1" "col b 2"]))
(compute z 8 "path/to/output")
;; right join x and y
```



- reorderCol / renameCol

Reorder the columns / rename the column names in the dataframe

| Argument | Type | Function | Remarks |
| ------------------- | ------------------ | ------------------------------------------------------------ | ------------------------------------------------- |
| `dataframe a` | Clojask.DataFrame | The operated object | |
| `a columns` | Clojure.collection | The new set of column names | Should be existing headers in dataframe a if it is `reorderCol` |
| Argument | Type | Function | Remarks |
| ------------- | ------------------ | --------------------------- | ------------------------------------------------------------ |
| `dataframe a` | Clojask.DataFrame | The operated object | |
| `a columns` | Clojure.collection | The new set of column names | Should be existing headers in dataframe a if it is `reorderCol` |


**Example**
Expand All @@ -301,3 +275,54 @@ You can also group by the combination of keys. (Use the above two rules together
(.renameCol y ["Employee" "new-Department" "EmployeeName" "Salary"])
```




- sort

**Immediately** sort the dataframe

| Argument | Type | Function | Remarks |
| ------------------ | ----------------------- | ------------------------ | ------------------------------------------------------------ |
| `dataframe` | Clojask.DataFrame | The operated object | |
| `trending list` | Collection (seq vector) | Indicates the sort order | Example: ["Salary" "+" "Employee" "-"] means that sort the Salary in ascending order, if equal sort the Employee in descending order |
| `output-directory` | String | The output path | |

**Example**

```clojure
(sort y ["+" "Salary"] "resources/sort.csv")
;; sort by Salary ascendingly
```



- compute

Compute the result. The pre-defined lazy operations will be executed in pipeline, ie the result of the previous operation becomes the argument of the next operation.

| Argument | Type | Function | Remarks |
| ---------------- | ------------------------------ | ------------------------------------------------------------ | ------------------------------------------------------------ |
| `dataframe` | Clojask.DataFrame | The operated object | |
| `num of workers` | int (max 8) | The number of worker instances (except the input and output nodes) | Use [onyx](http://www.onyxplatform.org/) as the distributed platform |
| `output path` | String | The path of the output csv file | Could exist or not. |
| [`exception`] | boolean | Whether an exception during calculation will cause termination | Is useful for debugging or detecting empty fields |
| [`select`] | String / Collection of strings | The name of the columns to select. Better to first refer to function `get-col-names` about all the names. (Similar to `SELECT` in sql ) | Can only specify either of select and exclude |
| [`exclude`] | String / Collection of strings | The name of the columns to exclude | Can only specify either of select and exclude |

**Example**

```clojure
(compute x 8 "../resources/test.csv" :exception true)
;; computes all the pre-registered operations

(compute x 8 "../resources/test.csv" :select "col a")
;; only select column a

(compute x 8 "../resources/test.csv" :select ["col b" "col a"])
;; select two columns, column b and column a in order

(compute x 8 "../resources/test.csv" :exclude ["col b" "col a"])
;; select all columns except column b and column a, other columns are in order
```

15 changes: 0 additions & 15 deletions examples/multi-threading.clj

This file was deleted.

28 changes: 18 additions & 10 deletions src/main/clojure/aggregate/aggre_onyx_comps.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
[onyx.test-helper :refer [with-test-env feedback-exception!]]
;; [tech.v3.dataset :as ds]
[clojure.data.csv :as csv]
[clojask.utils :refer [eval-res eval-res-ne filter-check]]
[clojask.utils :as u]
[clojure.set :as set]
[clojask.groupby :refer [read-csv-seq]])
(:import (java.io BufferedReader FileReader BufferedWriter FileWriter)))
Expand Down Expand Up @@ -38,10 +38,11 @@


(defn worker-func-gen
[df exception]
[df exception aggre-funcs index formatter]
(reset! dataframe df)
(let [aggre-funcs (.getAggreFunc (.row-info (deref dataframe)))
formatters (.getFormatter (.col-info (deref dataframe)))
(let [
;; aggre-funcs (.getAggreFunc (.row-info (deref dataframe)))
formatters formatter
;; key-index (.getKeyIndex (.col-info (deref dataframe)))
;; formatters (set/rename-keys formatters key-index)
]
Expand All @@ -52,7 +53,10 @@
(let [data (read-csv-seq (:file seq))
pre (:d seq)
data-map (-> (iterate inc 0)
(zipmap (apply map vector data)))]
(zipmap (apply map vector data)))
reorder (fn [a b]
;; (println [a b])
(u/gets (concat a b) index))]
;; (mapv (fn [_]
;; (let [func (first _)
;; index (nth _ 1)]
Expand All @@ -62,7 +66,9 @@
res []]
(if (= aggre-funcs [])
;; {:d (vec (concat pre res))}
{:d (mapv concat (repeat pre) (apply map vector res))}
(if (= res [])
{:d [(u/gets pre index)]}
{:d (mapv reorder (repeat pre) (apply map vector res))})
(let [func (first (first aggre-funcs))
index (nth (first aggre-funcs) 1)
res-funcs (rest aggre-funcs)
Expand Down Expand Up @@ -223,15 +229,17 @@
{:zookeeper/address "127.0.0.1:2188"
:zookeeper/server? true
:zookeeper.server/port 2188
:onyx/tenancy-id id})
:onyx/tenancy-id id
:onyx.log/file "_clojask/clojask.log"})

(def peer-config
{:zookeeper/address "127.0.0.1:2188"
:onyx/tenancy-id id
:onyx.peer/job-scheduler :onyx.job-scheduler/balanced
:onyx.messaging/impl :aeron
:onyx.messaging/peer-port 40200
:onyx.messaging/bind-addr "localhost"})
:onyx.messaging/bind-addr "localhost"
:onyx.log/file "_clojask/clojask.log"})

(def env (onyx.api/start-env env-config))

Expand All @@ -250,11 +258,11 @@

(defn start-onyx-aggre
"start the onyx cluster with the specification inside dataframe"
[num-work batch-size dataframe dist exception]
[num-work batch-size dataframe dist exception aggre-func index formatter]
(try
(workflow-gen num-work)
(config-env)
(worker-func-gen dataframe exception) ;;need some work
(worker-func-gen dataframe exception aggre-func index formatter) ;;need some work
(catalog-gen num-work batch-size)
(lifecycle-gen "./_clojask/grouped" dist)
(flow-cond-gen num-work)
Expand Down
2 changes: 1 addition & 1 deletion src/main/clojure/clojask/ColInfo.clj
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@

(getKeys
[this]
col-keys)
(mapv (fn [index] (get index-key index)) (take (count index-key) (iterate inc 0))))

(getKeyIndex
[this]
Expand Down
25 changes: 17 additions & 8 deletions src/main/clojure/clojask/clojask_aggre.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@
[clojure.java.io :as io]
[taoensso.timbre :refer [debug info] :as timbre]
[clojure.string :as string]
[clojask.api.aggregate :refer [start]])
[clojask.api.aggregate :refer [start]]
[clojask.utils :as u])
(:import (java.io BufferedReader FileReader BufferedWriter FileWriter)))

(def df (atom nil))
(def aggre-func (atom nil))
(def select (atom nil))

(defn inject-dataframe
[dataframe]
[dataframe a b]
(reset! df dataframe)
(reset! aggre-func a)
(reset! select b)
)

(defn c-count
Expand Down Expand Up @@ -39,7 +44,9 @@
:lifecycle/after-task-stop close-writer})

(defrecord ClojaskOutput
[memo]
[memo
aggre-func
select]
p/Plugin
(start [this event]
;; Initialize the plugin, generally by assoc'ing any initial state.
Expand All @@ -52,7 +59,7 @@
(let [data (mapv (fn [_] (if (coll? _) _ [_])) (deref memo))]
;; (.write (:clojask/wtr event) (str data "\n"))
(if (apply = (map count data))
(mapv #(.write (:clojask/wtr event) (str (string/join "," %) "\n")) (apply map vector data))
(mapv #(.write (:clojask/wtr event) (str (string/join "," (u/gets % select)) "\n")) (apply map vector data))
(throw (Exception. "aggregation result is not of the same length"))))
this)

Expand Down Expand Up @@ -86,7 +93,7 @@
;; before write-batch is called repeatedly.
true)

(write-batch [this {:keys [onyx.core/write-batch clojask/wtr clojask/aggre-func]} replica messenger]
(write-batch [this {:keys [onyx.core/write-batch clojask/wtr]} replica messenger]
;; keys [:Departement]
;; Write the batch to your datasink.
;; In this case we are conjoining elements onto a collection.
Expand All @@ -111,6 +118,8 @@
;; from your task-map here, in order to improve the performance of your plugin
;; Extending the function below is likely good for most use cases.
(defn output [pipeline-data]
(let [aggre-func (.getAggreFunc (:row-info (deref df)))]
(->ClojaskOutput (volatile! (doall (take (count aggre-func)
(repeat start)))))))
(let []
(->ClojaskOutput (volatile! (doall (take (count (deref aggre-func))
(repeat start))))
(deref aggre-func)
(deref select))))
12 changes: 7 additions & 5 deletions src/main/clojure/clojask/clojask_groupby.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

(def dataframe (atom nil))
(def groupby-keys (atom nil))
(def write-index (atom nil))

(defn inject-dataframe
[df groupby-key]
[df groupby-key index]
(reset! dataframe df)
(reset! groupby-keys groupby-key))
(reset! groupby-keys groupby-key)
(reset! write-index index))

(defn- inject-into-eventmap
[event lifecycle]
Expand All @@ -35,7 +37,7 @@
(def writer-aggre-calls
{:lifecycle/before-task-start inject-into-eventmap})

(defrecord ClojaskGroupby []
(defrecord ClojaskGroupby [write-index]
p/Plugin
(start [this event]
;; Initialize the plugin, generally by assoc'ing any initial state.
Expand Down Expand Up @@ -90,7 +92,7 @@
;(.write wtr (str msg "\n"))
;; !! define argument (debug)
;; (def groupby-keys [:Department :EmployeeName])
(output-groupby dist (:d msg) groupby-keys key-index formatter)))
(output-groupby dist (:d msg) groupby-keys key-index formatter write-index)))

(recur (rest batch)))))
true))
Expand All @@ -101,4 +103,4 @@
;; from your task-map here, in order to improve the performance of your plugin
;; Extending the function below is likely good for most use cases.
(defn groupby [pipeline-data]
(->ClojaskGroupby))
(->ClojaskGroupby (deref write-index)))
Loading

0 comments on commit a6488fd

Please sign in to comment.