Skip to content

Commit

Permalink
Back out "Add spark input_file_name function support" (facebookincuba…
Browse files Browse the repository at this point in the history
…tor#9956)

Summary:
Pull Request resolved: facebookincubator#9956

See discussion in facebookincubator#9870

Original commit changeset: 3e50e70d5807

Original Phabricator Diff: D57161283

Reviewed By: Yuhta

Differential Revision: D57863331

fbshipit-source-id: a3c3e4532ece089f201af16fd91e6f263717a7ac
  • Loading branch information
mbasmanova authored and facebook-github-bot committed May 28, 2024
1 parent 5ef1409 commit 114e024
Show file tree
Hide file tree
Showing 17 changed files with 22 additions and 266 deletions.
4 changes: 0 additions & 4 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ struct ConnectorSplit {
virtual std::string toString() const {
return fmt::format("[split: {}]", connectorId);
}

virtual std::string getFileName() const {
return "";
}
};

class ColumnHandle : public ISerializable {
Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveConnectorSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
return fmt::format("Hive: {} {} - {}", filePath, start, length);
}

std::string getFileName() const override {
std::string getFileName() const {
auto i = filePath.rfind('/');
return i == std::string::npos ? filePath : filePath.substr(i + 1);
}
Expand Down
6 changes: 0 additions & 6 deletions velox/docs/functions/spark/misc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,3 @@ Miscellaneous Functions
``seed`` must be constant. ::

SELECT uuid(0); -- "8c7f0aac-97c4-4a2f-b716-a675d821ccc0"

.. spark:function:: input_file_name() => string
Returns the name of the file being read, or empty string if not available.
The returned file name align with vinalla spark implementation is
always valid url-encoded string if available.
1 change: 0 additions & 1 deletion velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,6 @@ struct DriverCtx {
/// Id of the partition to use by this driver. For local exchange, for
/// instance.
const uint32_t partitionId;
std::string inputFileName;

std::shared_ptr<Task> task;
Driver* driver;
Expand Down
6 changes: 1 addition & 5 deletions velox/exec/FilterProject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,7 @@ RowVectorPtr FilterProject::getOutput() {
auto* rows = localRows.get();
VELOX_DCHECK_NOT_NULL(rows)
rows->setAll();
EvalCtx evalCtx(
operatorCtx_->execCtx(),
exprs_.get(),
input_.get(),
operatorCtx_->driverCtx());
EvalCtx evalCtx(operatorCtx_->execCtx(), exprs_.get(), input_.get());

// Pre-load lazy vectors which are referenced by both expressions and identity
// projections.
Expand Down
6 changes: 0 additions & 6 deletions velox/exec/TableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ RowVectorPtr TableScan::getOutput() {
(getCurrentTimeMicro() - addSplitStartMicros) * 1'000,
RuntimeCounter::Unit::kNanos));
}
setInputFileName(connectorSplit);
curStatus_ = "getOutput: updating stats_.numSplits";
++stats_.wlock()->numSplits;

Expand Down Expand Up @@ -352,11 +351,6 @@ void TableScan::checkPreload() {
}
}

void TableScan::setInputFileName(
std::shared_ptr<connector::ConnectorSplit> split) {
driverCtx_->inputFileName = split->getFileName();
}

bool TableScan::isFinished() {
return noMoreSplits_;
}
Expand Down
1 change: 0 additions & 1 deletion velox/exec/TableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ class TableScan : public SourceOperator {
const std::shared_ptr<common::Filter>& filter) override;

private:
void setInputFileName(std::shared_ptr<connector::ConnectorSplit> split);
// Checks if this table scan operator needs to yield before processing the
// next split.
bool shouldYield(StopReason taskStopReason, size_t startTimeMs) const;
Expand Down
8 changes: 1 addition & 7 deletions velox/expression/EvalCtx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,10 @@ using facebook::velox::common::testutil::TestValue;

namespace facebook::velox::exec {

EvalCtx::EvalCtx(
core::ExecCtx* execCtx,
ExprSet* exprSet,
const RowVector* row,
DriverCtx* driverCtx)
EvalCtx::EvalCtx(core::ExecCtx* execCtx, ExprSet* exprSet, const RowVector* row)
: execCtx_(execCtx),
exprSet_(exprSet),
row_(row),
driverCtx_(driverCtx),
cacheEnabled_(execCtx->exprEvalCacheEnabled()),
maxSharedSubexprResultsCached_(
execCtx->queryCtx()
Expand Down Expand Up @@ -61,7 +56,6 @@ EvalCtx::EvalCtx(core::ExecCtx* execCtx)
: execCtx_(execCtx),
exprSet_(nullptr),
row_(nullptr),
driverCtx_(nullptr),
cacheEnabled_(execCtx->exprEvalCacheEnabled()),
maxSharedSubexprResultsCached_(
execCtx->queryCtx()
Expand Down
12 changes: 1 addition & 11 deletions velox/expression/EvalCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

namespace facebook::velox::exec {

struct DriverCtx;
class Expr;
class ExprSet;
class LocalDecodedVector;
Expand All @@ -37,11 +36,7 @@ class PeeledEncoding;
// flags for Expr interpreter.
class EvalCtx {
public:
EvalCtx(
core::ExecCtx* execCtx,
ExprSet* exprSet,
const RowVector* row,
DriverCtx* driverCtx = nullptr);
EvalCtx(core::ExecCtx* execCtx, ExprSet* exprSet, const RowVector* row);

/// For testing only.
explicit EvalCtx(core::ExecCtx* execCtx);
Expand Down Expand Up @@ -260,10 +255,6 @@ class EvalCtx {
return exprSet_;
}

DriverCtx* driverCtx() const {
return driverCtx_;
}

VectorEncoding::Simple wrapEncoding() const;

void setPeeledEncoding(std::shared_ptr<PeeledEncoding>& peel) {
Expand Down Expand Up @@ -372,7 +363,6 @@ class EvalCtx {
core::ExecCtx* const execCtx_;
ExprSet* const exprSet_;
const RowVector* row_;
DriverCtx* const driverCtx_;
const bool cacheEnabled_;
const uint32_t maxSharedSubexprResultsCached_;
bool inputFlatNoNulls_;
Expand Down
4 changes: 2 additions & 2 deletions velox/functions/prestosql/URLFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include <optional>
#include "velox/type/Type.h"

namespace facebook::velox::functions {
namespace facebook::velox::functions::detail {

std::optional<StringView> matchAuthorityAndPath(
StringView authorityAndPath,
Expand Down Expand Up @@ -54,4 +54,4 @@ std::optional<StringView> matchAuthorityAndPath(
return std::nullopt;
}

} // namespace facebook::velox::functions
} // namespace facebook::velox::functions::detail
52 changes: 15 additions & 37 deletions velox/functions/prestosql/URLFunctions.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,58 +101,36 @@ FOLLY_ALWAYS_INLINE void charEscape(unsigned char c, char* output) {
output[2] = toHex(c % 16);
}

FOLLY_ALWAYS_INLINE bool isDigit(char c) {
return c >= '0' && c <= '9';
}

FOLLY_ALWAYS_INLINE bool isAlphaNumeric(char c) {
return isDigit(c) || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z');
}

FOLLY_ALWAYS_INLINE bool shouldEncode(char c) {
switch (c) {
case '-':
case '_':
case '.':
case '*':
return false;
};
return !isAlphaNumeric(c);
}

/// Escapes ``input`` by encoding it so that it can be safely included in
/// URL query parameter names and values:
///
/// * Alphanumeric characters are not encoded.
/// * The characters ``.``, ``-``, ``*`` and ``_`` are not encoded.
/// * The ASCII space character is encoded as ``+`` if usePlusForSpace is true.
/// * The ASCII space character is encoded as ``+``.
/// * All other characters are converted to UTF-8 and the bytes are encoded
/// as the string ``%XX`` where ``XX`` is the uppercase hexadecimal
/// value of the UTF-8 byte.
template <typename TOutString, typename TInString>
FOLLY_ALWAYS_INLINE void urlEscape(
TOutString& output,
const TInString& input,
bool usePlusForSpace = true,
const uint64_t* doNotEncodeSymbolsBits = nullptr) {
FOLLY_ALWAYS_INLINE void urlEscape(TOutString& output, const TInString& input) {
auto inputSize = input.size();
output.resize(inputSize * 3);
output.reserve(inputSize * 3);

auto inputBuffer = input.data();
auto outputBuffer = output.data();

size_t outIndex = 0;
for (auto i = 0; i < inputSize; ++i) {
unsigned char p = inputBuffer[i];
if (p == ' ' && usePlusForSpace) {

if ((p >= 'a' && p <= 'z') || (p >= 'A' && p <= 'Z') ||
(p >= '0' && p <= '9') || p == '-' || p == '_' || p == '.' ||
p == '*') {
outputBuffer[outIndex++] = p;
} else if (p == ' ') {
outputBuffer[outIndex++] = '+';
} else if (
shouldEncode(p) &&
(!doNotEncodeSymbolsBits ||
!bits::isBitSet(doNotEncodeSymbolsBits, static_cast<size_t>(p)))) {
} else {
charEscape(p, outputBuffer + outIndex);
outIndex += 3;
} else {
outputBuffer[outIndex++] = p;
}
}
output.resize(outIndex);
Expand Down Expand Up @@ -227,8 +205,6 @@ FOLLY_ALWAYS_INLINE void urlUnescape(
output.resize(outputBuffer - output.data());
}

} // namespace detail

/// Matches the authority (i.e host[:port], ipaddress), and path from a string
/// representing the authority and path. Returns true if the regex matches, and
/// sets the appropriate groups matching authority in authorityMatch.
Expand All @@ -237,6 +213,8 @@ std::optional<StringView> matchAuthorityAndPath(
boost::cmatch& authorityMatch,
int subGroup);

} // namespace detail

template <typename T>
struct UrlExtractProtocolFunction {
VELOX_DEFINE_FUNCTION_TYPES(T);
Expand Down Expand Up @@ -313,7 +291,7 @@ struct UrlExtractHostFunction {
}
boost::cmatch authorityMatch;

if (auto host = matchAuthorityAndPath(
if (auto host = detail::matchAuthorityAndPath(
authAndPath.value(), authorityMatch, detail::kHost)) {
result.setNoCopy(host.value());
} else {
Expand All @@ -338,7 +316,7 @@ struct UrlExtractPortFunction {
}

boost::cmatch authorityMatch;
if (auto port = matchAuthorityAndPath(
if (auto port = detail::matchAuthorityAndPath(
authAndPath.value(), authorityMatch, detail::kPort)) {
if (!port.value().empty()) {
try {
Expand Down
3 changes: 1 addition & 2 deletions velox/functions/sparksql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ add_library(
Size.cpp
SplitFunctions.cpp
String.cpp
UnscaledValueFunction.cpp
InputFileName.cpp)
UnscaledValueFunction.cpp)

# GCC 12 has a bug where it does not respect "pragma ignore" directives and ends
# up failing compilation in an openssl header included by a hash-related
Expand Down
71 changes: 0 additions & 71 deletions velox/functions/sparksql/InputFileName.cpp

This file was deleted.

22 changes: 0 additions & 22 deletions velox/functions/sparksql/InputFileName.h

This file was deleted.

7 changes: 0 additions & 7 deletions velox/functions/sparksql/Register.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
#include "velox/functions/sparksql/DateTimeFunctions.h"
#include "velox/functions/sparksql/Hash.h"
#include "velox/functions/sparksql/In.h"
#include "velox/functions/sparksql/InputFileName.h"
#include "velox/functions/sparksql/LeastGreatest.h"
#include "velox/functions/sparksql/MightContain.h"
#include "velox/functions/sparksql/MonotonicallyIncreasingId.h"
Expand Down Expand Up @@ -455,12 +454,6 @@ void registerFunctions(const std::string& prefix) {

registerFunction<UuidFunction, Varchar, Constant<int64_t>>({prefix + "uuid"});

exec::registerVectorFunction(
prefix + "input_file_name",
inputFileNameSignatures(),
makeInputFileName(),
{.deterministic = false});

registerFunction<
ArrayFlattenFunction,
Array<Generic<T1>>,
Expand Down
1 change: 0 additions & 1 deletion velox/functions/sparksql/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ add_executable(
StringToMapTest.cpp
UnscaledValueFunctionTest.cpp
UuidTest.cpp
InputFileNameTest.cpp
XxHash64Test.cpp)

add_test(velox_functions_spark_test velox_functions_spark_test)
Expand Down
Loading

0 comments on commit 114e024

Please sign in to comment.