Skip to content

Commit

Permalink
Modify UDF
Browse files Browse the repository at this point in the history
  • Loading branch information
Beasttt committed Dec 15, 2024
1 parent e105ec2 commit 50bd005
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,8 @@
public class UDAFIntegral implements UDTF {

private static final String TIME_UNIT_KEY = "unit";
private static final String TIME_UNIT_MS = "1S";
private static final String TIME_UNIT_S = "1s";
private static final String TIME_UNIT_M = "1m";
private static final String TIME_UNIT_H = "1H";
private static final String TIME_UNIT_D = "1d";


long unitTime;
long lastTime = -1;
Expand All @@ -53,41 +50,19 @@ public void validate(UDFParameterValidator validator) throws Exception {
validator
.validateInputSeriesNumber(1)
.validate(
unit ->
TIME_UNIT_D.equals(unit)
|| TIME_UNIT_H.equals(unit)
|| TIME_UNIT_M.equals(unit)
|| TIME_UNIT_S.equals(unit)
|| TIME_UNIT_MS.equals(unit),
"Unknown time unit input",
validator.getParameters().getStringOrDefault(TIME_UNIT_KEY, TIME_UNIT_S));
x -> (long) x > 0,
"Unknown time unit input.",
Util.parseTime(
validator.getParameters().getStringOrDefault(TIME_UNIT_KEY, TIME_UNIT_S),
validator.getParameters()));
}

@Override
public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
throws Exception {
configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
switch (parameters.getStringOrDefault(TIME_UNIT_KEY, TIME_UNIT_S)) {
case TIME_UNIT_MS:
unitTime = 1L;
break;
case TIME_UNIT_S:
unitTime = 1000L;
break;
case TIME_UNIT_M:
unitTime = 60000L;
break;
case TIME_UNIT_H:
unitTime = 3600000L;
break;
case TIME_UNIT_D:
unitTime = 3600000L * 24L;
break;
default:
throw new UDFException(
"Unknown time unit input: "
+ parameters.getStringOrDefault(TIME_UNIT_KEY, TIME_UNIT_S));
}
unitTime =
Util.parseTime(parameters.getStringOrDefault(TIME_UNIT_KEY, TIME_UNIT_S), parameters);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,26 @@
/** This function is used for timestamp repair. */
public class UDTFTimestampRepair implements UDTF {
String intervalMethod;
int interval;
int intervalMode;
long interval;
long intervalMode;

@Override
public void validate(UDFParameterValidator validator) throws Exception {
validator
.validateInputSeriesNumber(1)
.validateInputSeriesDataType(0, Type.DOUBLE, Type.FLOAT, Type.INT32, Type.INT64)
.validate(
x -> (Integer) x >= 0,
"Interval should be a positive integer.",
validator.getParameters().getIntOrDefault("interval", 0));
.validateInputSeriesDataType(0, Type.DOUBLE, Type.FLOAT, Type.INT32, Type.INT64);
String intervalString = validator.getParameters().getStringOrDefault("interval", null);
if (intervalString != null) {
try {
long parsedInterval = Util.parseTime(intervalString, validator.getParameters());
validator.validate(
x -> parsedInterval > 0,
"Invalid time unit input. Supported units are ns, us, ms, s, m, h, d.");
} catch (Exception e) {
throw new UDFException(
"Invalid time format for interval.");
}
}
}

@Override
Expand All @@ -54,15 +62,20 @@ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurati
.setAccessStrategy(new SlidingSizeWindowAccessStrategy(Integer.MAX_VALUE))
.setOutputDataType(parameters.getDataType(0));
intervalMethod = parameters.getStringOrDefault("method", "Median");
interval = parameters.getIntOrDefault("interval", 0);
String intervalString = parameters.getStringOrDefault("interval", null);
if (intervalString != null) {
interval = Util.parseTime(intervalString, parameters);
} else {
interval = 0;
}
if (interval > 0) {
intervalMode = interval;
} else if ("Median".equalsIgnoreCase(intervalMethod)) {
intervalMode = -1;
intervalMode = -1L;
} else if ("Mode".equalsIgnoreCase(intervalMethod)) {
intervalMode = -2;
intervalMode = -2L;
} else if ("Cluster".equalsIgnoreCase(intervalMethod)) {
intervalMode = -3;
intervalMode = -3L;
} else {
throw new UDFException("Illegal method.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,15 @@ public TimestampInterval(long[] time, double[] original) {

// get standard interval
// -1 median -2 mode -3 cluster
public long getInterval(int mode) {
switch (mode) {
case -1:
this.deltaT = getIntervalByMedian();
break;
case -2:
this.deltaT = getIntervalByMode();
break;
case -3:
this.deltaT = getIntervalByCluster();
break;
default:
this.deltaT = mode;
public long getInterval(long mode) {
if (mode == -1L) {
this.deltaT = getIntervalByMedian();
} else if (mode == -2L) {
this.deltaT = getIntervalByMode();
} else if (mode == -3L) {
this.deltaT = getIntervalByCluster();
} else {
this.deltaT = mode;
}
return this.deltaT;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class TimestampRepair {
protected long deltaT;
protected long start0;

public TimestampRepair(RowIterator dataIterator, int intervalMode, int startPointMode)
public TimestampRepair(RowIterator dataIterator, long intervalMode, int startPointMode)
throws Exception {
ArrayList<Long> timeList = new ArrayList<>();
ArrayList<Double> originList = new ArrayList<>();
Expand Down

0 comments on commit 50bd005

Please sign in to comment.