Skip to content

Commit

Permalink
Merge pull request #897 from Nitish1814/12SEP24Fixes_verticalCol
Browse files Browse the repository at this point in the history
added vertical column show support
  • Loading branch information
sonalgoyal authored Sep 17, 2024
2 parents 38f24a6 + c1d3730 commit 4f36064
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 2 deletions.
8 changes: 8 additions & 0 deletions common/client/src/main/java/zingg/common/client/ZFrame.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ public interface ZFrame<D, R, C> {

public static final String COL_COUNT = "count";
public static final String COL_VALUE = "VALUE";

public static String PIVOT_COLUMN = "field";
public static String VALUE_1 = "value1";
public static String VALUE_2 = "value2";
public static String ORDER = "order";

public ZFrame<D, R, C> cache();
public ZFrame<D, R, C> as(String s);
Expand Down Expand Up @@ -179,4 +184,7 @@ public interface ZFrame<D, R, C> {

public C gt(C column1, C column2);

public ZFrame<D, R, C> transpose(String pivotColumn);

public void showVertical();
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public String getMsg2(double prediction, double score) {
public void displayRecords(ZFrame<D, R, C> records, String preMessage, String postMessage) {
//System.out.println();
System.out.println(preMessage);
records.show(false);
// records.show(false);
records.showVertical();
System.out.println(postMessage);
System.out.println("\tWhat do you think? Your choices are: ");
System.out.println();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package zingg.common.core.util;

import scala.collection.JavaConverters;
import scala.collection.Seq;

import java.util.List;

public class ListConverter<C> {
public Seq<C> convertListToSeq(List<C> inputList) {
return JavaConverters.asScalaIteratorConverter(inputList.iterator())
.asScala()
.toSeq();
}
}
60 changes: 59 additions & 1 deletion spark/client/src/main/java/zingg/spark/client/SparkFrame.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package zingg.spark.client;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.spark.internal.config.R;
Expand All @@ -14,6 +16,8 @@
import zingg.common.client.FieldData;
import zingg.common.client.ZFrame;
import zingg.common.client.util.ColName;
import zingg.common.core.util.ListConverter;
import zingg.spark.client.util.ExtendedFunction;

//Dataset, Row, column
public class SparkFrame implements ZFrame<Dataset<Row>, Row, Column> {
Expand Down Expand Up @@ -470,6 +474,60 @@ public Column substr(Column col, int startPos, int len) {
public Column gt(Column column1, Column column2) {
return column1.gt(column2);
}


@Override
public ZFrame<Dataset<Row>, Row, Column> transpose(String pivotColumn) {
ExtendedFunction extendedFunction = new ExtendedFunction();
List<String> columnsExceptPivot = new ArrayList<>(List.of(df.columns()));
columnsExceptPivot.remove(pivotColumn);
ListConverter<String> listConverter = new ListConverter<String>();
Dataset<Row> r = extendedFunction.TransposeDF(df, listConverter.convertListToSeq(columnsExceptPivot), pivotColumn);
return new SparkFrame(r);
}

@Override
public void showVertical() {
ZFrame<Dataset<Row>, Row, Column> headerIncludedFrame = getHeaderIncludedDataFrame(new SparkFrame(df));
ZFrame<Dataset<Row>, Row, Column> vertical = headerIncludedFrame.transpose(PIVOT_COLUMN);
vertical.sortAscending(ORDER).drop(ORDER).show(1000);
}

/***
* Add auto incremental row like {1, 2, 3, 4, 5} to the dataframe
* @return ZFrame
*/

private ZFrame<Dataset<Row>, Row, Column> addAutoIncrementalRow() {
String[] columns = df.columns();
Dataset<Row> temporaryDF = df.limit(1);
List<String> monotonicIncreasing = new ArrayList<>();
for (int idx = 0; idx < columns.length; idx++) {
monotonicIncreasing.add(String.valueOf(idx));
}
Collections.sort(monotonicIncreasing);
for (int idx = 0; idx < columns.length; idx++) {
temporaryDF = temporaryDF.withColumn(columns[idx], functions.lit(monotonicIncreasing.get(idx)));
}
return new SparkFrame(df.union(temporaryDF));
}

/***
* return new ZFrame with new Column added as PIVOT used for transposing the matrix
* @param records
* @return header included zFrame
*/
private ZFrame<Dataset<Row>, Row, Column> getHeaderIncludedDataFrame(ZFrame<Dataset<Row>, Row, Column> records) {
ZFrame<Dataset<Row>, Row, Column> orderedRowAdded = addAutoIncrementalRow();

ZFrame<Dataset<Row>, Row, Column> firstRecord = orderedRowAdded.limit(1);
ZFrame<Dataset<Row>, Row, Column> secondRecord = orderedRowAdded.except(firstRecord).limit(1);
ZFrame<Dataset<Row>, Row, Column> thirdRecord = orderedRowAdded.except(firstRecord.union(secondRecord));

//return new ZFrame with Field column added to be used as pivot
return firstRecord.withColumn(PIVOT_COLUMN, VALUE_1).
union(secondRecord.withColumn(PIVOT_COLUMN, VALUE_2)).
union(thirdRecord.withColumn(PIVOT_COLUMN, ORDER));
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package zingg.spark.client.util

import org.apache.spark.sql.functions.{col, collect_list, concat_ws}
import org.apache.spark.sql.{Dataset, Row}

class ExtendedFunction {
def TransposeDF(df: Dataset[Row], columns: Seq[String], pivotCol: String): Dataset[Row] = {
val columnsValue = columns.map(x => "'" + x + "', " + x)
val stackCols = columnsValue.mkString(",")
val df_1 = df.selectExpr(pivotCol, "stack(" + columns.size + "," + stackCols + ")")
.select(pivotCol, "col0", "col1")

val final_df = df_1.groupBy(col("col0")).pivot(pivotCol).agg(concat_ws("", collect_list(col("col1"))))
.withColumnRenamed("col0", pivotCol)
final_df
}

}

0 comments on commit 4f36064

Please sign in to comment.