Skip to content

Commit

Permalink
working!!
Browse files Browse the repository at this point in the history
  • Loading branch information
mgdenno committed Dec 31, 2023
1 parent 5c49164 commit 2dd43e7
Show file tree
Hide file tree
Showing 7 changed files with 473 additions and 2 deletions.
78 changes: 77 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,79 @@
{
"C_Cpp.default.compilerPath": "/usr/bin/g++"
"C_Cpp.default.compilerPath": "/usr/bin/g++",
"files.associations": {
".env": "shellscript",
"cctype": "cpp",
"clocale": "cpp",
"cmath": "cpp",
"csignal": "cpp",
"cstdarg": "cpp",
"cstddef": "cpp",
"cstdio": "cpp",
"cstdlib": "cpp",
"cstring": "cpp",
"ctime": "cpp",
"cwchar": "cpp",
"cwctype": "cpp",
"any": "cpp",
"array": "cpp",
"atomic": "cpp",
"strstream": "cpp",
"bit": "cpp",
"*.tcc": "cpp",
"bitset": "cpp",
"chrono": "cpp",
"cinttypes": "cpp",
"codecvt": "cpp",
"compare": "cpp",
"complex": "cpp",
"concepts": "cpp",
"condition_variable": "cpp",
"cstdint": "cpp",
"deque": "cpp",
"list": "cpp",
"map": "cpp",
"set": "cpp",
"string": "cpp",
"unordered_map": "cpp",
"unordered_set": "cpp",
"vector": "cpp",
"exception": "cpp",
"algorithm": "cpp",
"functional": "cpp",
"iterator": "cpp",
"memory": "cpp",
"memory_resource": "cpp",
"numeric": "cpp",
"optional": "cpp",
"random": "cpp",
"ratio": "cpp",
"regex": "cpp",
"string_view": "cpp",
"system_error": "cpp",
"tuple": "cpp",
"type_traits": "cpp",
"utility": "cpp",
"fstream": "cpp",
"future": "cpp",
"initializer_list": "cpp",
"iomanip": "cpp",
"iosfwd": "cpp",
"iostream": "cpp",
"istream": "cpp",
"limits": "cpp",
"mutex": "cpp",
"new": "cpp",
"numbers": "cpp",
"ostream": "cpp",
"semaphore": "cpp",
"sstream": "cpp",
"stdexcept": "cpp",
"stop_token": "cpp",
"streambuf": "cpp",
"thread": "cpp",
"cfenv": "cpp",
"typeindex": "cpp",
"typeinfo": "cpp",
"variant": "cpp"
}
}
6 changes: 5 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ set(LOADABLE_EXTENSION_NAME ${TARGET_NAME}_loadable_extension)
project(${TARGET_NAME})
include_directories(src/include)

set(EXTENSION_SOURCES src/hydro_duck_extension.cpp)
set(EXTENSION_SOURCES
src/hydro_duck_extension.cpp
src/functions/first.cpp
src/functions/nse.cpp
)

build_static_extension(${TARGET_NAME} ${EXTENSION_SOURCES})
build_loadable_extension(${TARGET_NAME} " " ${EXTENSION_SOURCES})
Expand Down
24 changes: 24 additions & 0 deletions src/functions/aliases.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#include "functions/functions.hpp"
#include "duckdb/catalog/catalog_entry/aggregate_function_catalog_entry.hpp"

namespace hydro_duck {
void Aliases::Register(duckdb::Connection &conn, duckdb::Catalog &catalog) {
// Register Volatility
auto &stddev = (duckdb::AggregateFunctionCatalogEntry &)catalog.GetEntry(
*conn.context, duckdb::CatalogType::AGGREGATE_FUNCTION_ENTRY,
DEFAULT_SCHEMA, "stddev_pop");
auto volatility = stddev.functions;
volatility.name = "volatility";
duckdb::CreateAggregateFunctionInfo volatility_info(volatility);
catalog.CreateFunction(*conn.context, volatility_info);

// Register SMA
auto &avg = (duckdb::AggregateFunctionCatalogEntry &)catalog.GetEntry(
*conn.context, duckdb::CatalogType::AGGREGATE_FUNCTION_ENTRY,
DEFAULT_SCHEMA, "avg");
auto sma = avg.functions;
sma.name = "sma";
duckdb::CreateAggregateFunctionInfo sma_info(sma);
catalog.CreateFunction(*conn.context, sma_info);
}
} // namespace hydro_duck
178 changes: 178 additions & 0 deletions src/functions/first.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
#include "functions/functions.hpp"
#include "duckdb/function/function_set.hpp"
#include "duckdb/parser/parsed_data/create_aggregate_function_info.hpp"

namespace hydro_duck {

using namespace duckdb;

template <class T> struct FirstScroogeState {
T first;
int64_t earliest_time;
bool executed;
};

struct FirstScroogeOperation {
template <class STATE> static void Initialize(STATE &state) {
state.earliest_time = duckdb::NumericLimits<int64_t>::Maximum();
state.executed = false;
}

template <class A_TYPE, class B_TYPE, class STATE, class OP>
static void
Operation(STATE &state,
const A_TYPE &x_data, const B_TYPE &y_data, duckdb::AggregateBinaryInput &aggr_input_data) {
/*
const auto time = y_data[yidx];
if (!state.executed || time < state.earliest_time) {
state.earliest_time = time;
state.first = x_data[xidx];
state.executed = true;
}
*/
}

template <class STATE, class OP>
static void Combine(const STATE &source, STATE &target,
duckdb::AggregateInputData &aggr_input_data) {
if (!target.executed) {
target = source;
} else if (source.executed) {
if (target.earliest_time > source.earliest_time) {
target.earliest_time = source.earliest_time;
target.first = source.first;
}
}
}

template <class T, class STATE>
static void Finalize(STATE &state, T &target, duckdb::AggregateFinalizeData &finalize_data) {
if (!state.executed) {
// mask.SetInvalid(idx);
} else {
// target[idx] = state.first;
}
}

static bool IgnoreNull() { return true; }
};

duckdb::unique_ptr<duckdb::FunctionData> BindDoubleFirst(
duckdb::ClientContext &context, duckdb::AggregateFunction &bound_function,
duckdb::vector<duckdb::unique_ptr<duckdb::Expression>> &arguments) {
auto &decimal_type = arguments[0]->return_type;
switch (decimal_type.InternalType()) {
case duckdb::PhysicalType::INT16: {
bound_function =
duckdb::AggregateFunction::BinaryAggregate<FirstScroogeState<int16_t>,
int16_t, int64_t, int16_t,
FirstScroogeOperation>(
decimal_type, duckdb::LogicalType::TIMESTAMP_TZ, decimal_type);
break;
}
case duckdb::PhysicalType::INT32: {
bound_function =
duckdb::AggregateFunction::BinaryAggregate<FirstScroogeState<int32_t>,
int32_t, int64_t, int32_t,
FirstScroogeOperation>(
decimal_type, duckdb::LogicalType::TIMESTAMP_TZ, decimal_type);
break;
}
case duckdb::PhysicalType::INT64: {
bound_function =
duckdb::AggregateFunction::BinaryAggregate<FirstScroogeState<int64_t>,
int64_t, int64_t, int64_t,
FirstScroogeOperation>(
decimal_type, duckdb::LogicalType::TIMESTAMP_TZ, decimal_type);
break;
}
default:
bound_function = duckdb::AggregateFunction::BinaryAggregate<
FirstScroogeState<duckdb::Hugeint>, duckdb::Hugeint, int64_t,
duckdb::Hugeint, FirstScroogeOperation>(
decimal_type, duckdb::LogicalType::TIMESTAMP_TZ, decimal_type);
}
bound_function.name = "first_s";
return nullptr;
}

duckdb::AggregateFunction
GetFirstScroogeFunction(const duckdb::LogicalType &timestamp_type,
const duckdb::LogicalType &type) {
switch (type.id()) {
case duckdb::LogicalTypeId::SMALLINT:
return duckdb::AggregateFunction::BinaryAggregate<
FirstScroogeState<int16_t>, int16_t, int64_t, int16_t,
FirstScroogeOperation>(type, timestamp_type, type);
case duckdb::LogicalTypeId::TINYINT:
return duckdb::AggregateFunction::BinaryAggregate<FirstScroogeState<int8_t>,
int8_t, int64_t, int8_t,
FirstScroogeOperation>(
type, timestamp_type, type);
case duckdb::LogicalTypeId::INTEGER:
return duckdb::AggregateFunction::BinaryAggregate<
FirstScroogeState<int32_t>, int32_t, int64_t, int32_t,
FirstScroogeOperation>(type, timestamp_type, type);
case duckdb::LogicalTypeId::BIGINT:
return duckdb::AggregateFunction::BinaryAggregate<
FirstScroogeState<int64_t>, int64_t, int64_t, int64_t,
FirstScroogeOperation>(type, timestamp_type, type);
case duckdb::LogicalTypeId::DECIMAL: {
auto decimal_aggregate = duckdb::AggregateFunction::BinaryAggregate<
FirstScroogeState<duckdb::hugeint_t>, duckdb::hugeint_t, int64_t,
duckdb::hugeint_t, FirstScroogeOperation>(type, timestamp_type, type);
decimal_aggregate.bind = BindDoubleFirst;
return decimal_aggregate;
}
case duckdb::LogicalTypeId::FLOAT:
return duckdb::AggregateFunction::BinaryAggregate<
FirstScroogeState<float>, float, int64_t, float, FirstScroogeOperation>(
type, timestamp_type, type);
case duckdb::LogicalTypeId::DOUBLE:
return duckdb::AggregateFunction::BinaryAggregate<FirstScroogeState<double>,
double, int64_t, double,
FirstScroogeOperation>(
type, timestamp_type, type);
case duckdb::LogicalTypeId::UTINYINT:
return duckdb::AggregateFunction::BinaryAggregate<
FirstScroogeState<uint8_t>, uint8_t, int64_t, uint8_t,
FirstScroogeOperation>(type, timestamp_type, type);
case duckdb::LogicalTypeId::USMALLINT:
return duckdb::AggregateFunction::BinaryAggregate<
FirstScroogeState<uint16_t>, uint16_t, int64_t, uint16_t,
FirstScroogeOperation>(type, timestamp_type, type);
case duckdb::LogicalTypeId::UINTEGER:
return duckdb::AggregateFunction::BinaryAggregate<
FirstScroogeState<uint32_t>, uint32_t, int64_t, uint32_t,
FirstScroogeOperation>(type, timestamp_type, type);
case duckdb::LogicalTypeId::UBIGINT:
return duckdb::AggregateFunction::BinaryAggregate<
FirstScroogeState<uint64_t>, uint64_t, int64_t, uint64_t,
FirstScroogeOperation>(type, timestamp_type, type);
case duckdb::LogicalTypeId::HUGEINT:
return duckdb::AggregateFunction::BinaryAggregate<
FirstScroogeState<duckdb::hugeint_t>, duckdb::hugeint_t, int64_t,
duckdb::hugeint_t, FirstScroogeOperation>(type, timestamp_type, type);
default:
throw duckdb::InternalException(
"Scrooge First Function only accept Numeric Inputs");
}
}

void FirstScrooge::RegisterFunction(duckdb::Connection &conn,
duckdb::Catalog &catalog) {
// The first aggregate allows you to get the first value of one column as
// ordered by another e.g., first(temperature, time) returns the earliest
// temperature value based on time within an aggregate group.
duckdb::AggregateFunctionSet first("first_s");
for (auto &type : duckdb::LogicalType::Numeric()) {
first.AddFunction(
GetFirstScroogeFunction(duckdb::LogicalType::TIMESTAMP_TZ, type));
first.AddFunction(
GetFirstScroogeFunction(duckdb::LogicalType::TIMESTAMP, type));
}
duckdb::CreateAggregateFunctionInfo first_info(first);
catalog.CreateFunction(*conn.context, first_info);
}

} // namespace hydro_duck
Loading

0 comments on commit 2dd43e7

Please sign in to comment.