-
Notifications
You must be signed in to change notification settings - Fork 114
[PROPOSAL]: HyperspaceOneRule #405
Comments
I'm relatively new to Hyperspace, so my view could be biased or based on a misunderstanding. So please take my words with a grain of salt. First of all, I think it's good to have a single Spark optimizer rule that does the orchestration of applying different index-related rules. One minor thing is that the name "HyperspaceOneRule" could be a little bit confusing to someone without the background knowledge of Hyperspace because the name makes more sense when you know that there were originally two rules and thus the word "One." About HyperspaceIndexCheck and CandidateIndexCollectorIt's just my gut feeling, but I think it's too early to assume and make it a norm a specific way of finding the applicable indices. The current way of doing it - populating all active indices and incrementally filtering them - might not be the best considering other types of indices. For example, we could load those indices defined for a specific table, using just the table name and maybe a version, for certain data sources, without even loading all other indices. If there are many indices in the system, it could be a good thing. For such relations and indices, ColumnSchemaCheck is not necessary - which doesn't seem very cheap because it should resolve column names for all indices for all relations. Instead, how about making things simple and less regular, until we finally grasp the idea of how applicable indices should be found? We can try to find applicable indices for each relation in the plan (or each node in the plan, considering indexed views in the future): def findApplicableIndices(plan: LogicalPlan, indexProvider: IndexProvider):
Map[LogicalPlan, Seq[IndexLogEntry]] = {
plan.map(node => (node, indexProvider.findApplicableIndices(node))).toMap
}
object IndexProvider {
val indexManager = Hyperspace
.getContext(spark)
.indexCollectionManager
def findApplicableIndices(node: LogicalPlan): Seq[IndexLogEntry] = {
indexManager
.getIndexes(ACTIVE)
.filter(checkColumns(node, _))
.filter(checkSignature(node, _))
}
} It has the same computational complexity as the original proposal because it involves iterating all indices and relations. In case we proceed with the original proposal, here're two minor things to note:
def apply(plan: LogicalPlan, allIndexes: Seq[IndexLogEntry]):
Map[LogicalPlan, Seq[IndexLogEntry]] = {
val planToIndexes = initializePlanToIndexes(plan, allIndexes)
planToIndexes.flatMap { case (node, indexes) =>
Some(node, checkBatch.foldLeft(indexes) { (idxs, check) => check(idxs) })
.filterNot(_._2.isEmpty)
}
} It looks less clean than the original one, but if we should have this rule-applying pattern, it is better to simplify each rule at the expense of complexity added to the orchestration logic. One additional benefit of this change is that now
About scoringTo me, it seems more natural to put I'd appreciate it if you can elaborate more about the scoring step, and if possible, some examples demonstrating the complexity of the problem. |
One problem is there's no metadata like table name/id & no way to check if the given df is applicable for indexes or not other than comparing all source data files. Comparing column schema is less expensive than listing & comparing all source files. In other words, we need to check & validate for all relations in a given plan for each index; it's same behavior with the current master.
Yea I tried this way at first 😁, but realized that if we don't trace with pair (relation -> index), we need to match each relation & indexes again later in application step. The problem is there can be multiple relations in the plan.
failure reason for other API - called whyNot #253. Yea actually it might be better to create "mode" for each check to break down the error message. For scoring, it's a greedy - heuristic for the first version. The reason why we need memoization + recursion is that an index rule (batch) can affect multiple relations (like JoinIndexRule) and after apply, it blocks the chance to apply another type of indexes. IndexPriorityCheck is mainly limit the total number of indexes.. yes you're right it's better to locate in ScoreBasedIndexApplication. 👍 |
I think the current way of matching relations and indices is not scalable, even if you do the column check first - it's still linear (in the number of indices) at best (or quadratic if you assume the number of relations in a query plan is also a variable), and requires all indices to be loaded in the memory although most of them might not be needed in the first place. A query plan with hundreds of relations does come up time to time, and having hundreds or thousands of indices in a system doesn't seem unusual. We should get help from metadata stores whenever we can. There are some types of data sources for which we don't have to compare every file name. For example, you can just compare a single path and maybe a version to match Delta or Iceberg relations. Or we can check if the For relations for which such metadata exists, we can build a table for all indices we have keyed by the relation type, identifier (such as a table name or a directory path), version (or equivalent, like the last DDL time), eliminating the need to load all indices at once and compare one by one. Of course we can and should still support relations without such metadata, but I believe there is a better way. First, we don't have to check indices created for relations for which metadata exists. It can greatly reduce the number of indices to be searched if most relations have metadata. Then we can check rootPaths first instead loading and comparing all file names, to quickly get rid of unrelated indices. If there is only a single root path, we can even use it to treat the relation as if it has metadata, using the root path as an identifier. We can override the version matching scheme to treat such relations have versions that never match with each other.
Um, I don't get it.
Like I said, I think (I might be wrong) we can just work on each relation independently with each other. All we have to do is to check which columns are used in predicates and which columns are to be projected to the parent, and find the index which matches those columns and has the best benefit. So instead of running transformation rules and each rule working on the entire plan, it would be like running each rule for each relation. Additionally, I don't really see the reason for transforming the plan during the scoring step. Can't we just apply the transformation after we determined indices to apply? |
We do this in Delta Lake or Iceberg signature calculation. But comparing column schema is still required before it because 1) comparing column schema is not expensive 2) anyway we need to load all index log entry to find applicable indexes.
For rootPath filter, there're 2 problems 1) rootPaths can be the all file paths depending on how a user input the paths to Dataframe and 2) the content inside the root path may differ since index creation time; it's cannot be used for identifier.
if we don't trace it, how we could determine which index is a final candidate for a relation in the plan? e.g. a query plan has 3 relations (r1, r2, r3), and the candidate indexes (i1, i2, i6, i7)?
join index rule involves 2 relations, so how can we handle them individually?
This is done in CandidateIndexCollector,
We can't because there can be rules involving 2 or more relations (later?)
Because if join index rule is applied, filter index rule cannot be applied after that. We need to consider all the cases after applying a rule. |
Maybe there's something I don't know, but why do we need to load all indices? As I said, for relations with metadata, we can use the identifier + version approach to load only the exact one (or maybe a few versions of) matching index.
2) is not an issue since root paths will only constitute the identifier part. By defining the version part of a relation without metadata as the current timestamp, the version will never match. When versions don't match, we examine the latest version of the index and see if it can be applied with or without hybrid scan, by comparing signatures or file names. Example: You have a relation with columns ( 1) is the interesting part. As long as users provide paths in a consistent way, it doesn't matter whether there are too many root paths or not, because once we get the identifier we don't care about the actual value of the identifier (also, we can even do some simple path compression to make the identifier part compact). The only possible problem is when the user loads the same data in an inconsistent way, like using different paths between queries. However, do we really need to support inconsistent use of file paths between queries? Is it really the usual way how Spark is used in the field? I think any serious data analysis should involve making metadata (it seems you can easily do even with native Spark, by creating Hive external tables). Maybe this is because I'm coming from the DBMS field. But I really believe scanning every index, even if the filtering step is as cheap as a simple column check, is not scalable and should be avoided.
If you look at my example code: def apply(plan: LogicalPlan, allIndexes: Seq[IndexLogEntry]):
Map[LogicalPlan, Seq[IndexLogEntry]] = {
val planToIndexes = initializePlanToIndexes(plan, allIndexes)
planToIndexes.flatMap { case (node, indexes) =>
Some(node, checkBatch.foldLeft(indexes) { (idxs, check) => check(node, idxs) })
.filterNot(_._2.isEmpty)
}
} You can see that it gives the same result as your code. Only the working unit is different. Here, each check work on a single relation at a time.
I'd like to propose a new perspective here. Indices are defined for relations, or maybe for an arbitrary tree of logical plan nodes. There is no such thing as a join index or a filter index (in Hyperspace). So, why can't we apply indices for each relation or node? Example: There is a query
R1 : a, b Now look at the relation We can do the same thing for other relations. For If we want to support indices for arbitrary trees of logical plan nodes, this step can be modified to visit each node top-down, depth-first. If we find a matching index for the current node, we can replace the node and the entire subtree with an index scan. With the above example, suppose we have defined an index on |
As accessing remote storage is expensive, Hyperspace load all index log entries when it's required & cached it for reuse. I think index log entry is also kind of metadata, so creating a new metadata type is not proper design. Instead of that, we could reduce the size of index log entry, as I said in other comment.
The problem is that not all relations in query have the metadata. There's no timestamp or table name, but just root paths.
Yes we need to support that inconsistency. Spark just use the rootPaths to list all source files inside HadoopFsRelation. At the time of reading, source files under the root paths may differ, so we cannot use rootPaths as an identifier; but we could add some filter for root paths - check if there's common root paths or not.
But not all user define & utilize the metadata & there can be so much inconsistency between apps, platform, ... etc; and that's why Delta Lake or Iceberg built.
Yep, but it actually does the same thing, isn't it? iterating in each check (mine) & iterating nodes in
One note for join index: it involves 2 indexes and if their bucket number are same & the join condition column is the same with "indexed columns" of both candidate indexes, we could remove shuffle stage which is very expensive in Spark query processing. Therefore, Hyperspace tries to find a pair of indexes which can apply both left & right child and also has the same bucket number so that we could eliminate the shuffle stage for join. |
Caching is irrelevant here because it can be also applied to the logic I described. In the current implementation the caching unit is the entire indices in the system, which is not very efficient because if you create a new index or modify a single index in the system, the entire indices are loaded again. If we use a more fine-grained caching it can be naturally applied to the logic I described. Index log entry is metadata, but it's currently scattered across the file system and there is no way to lookup a specific index efficiently other than loading all indices and checking one by one, which is basically the current implementation or this PR is doing. This PR doesn't change what was already being done, but I'm raising a concern here because it might impact the scalability of this project now and in the future. What I'm proposing is not new metadata for metadata; rather, it's more about the structure in which the metadata is laid out for efficient retrieval. For example, we can put entire index log entry data in a single data frame and store it in the file system. Being a DataFrame we can also cache() it. It would be much faster than reading 1000 different JSON files to create 1000 index log entries and iterating them to find the candidates. Maybe rootPaths is not adequate as you said. But I think we can find a way. Also, for relations with metadata like Delta and Iceberg tables, you see the benefit is clear.
The "version" for relations without metadata is the "current time", representing when the query is executed or an index is created. Therefore the version will never match, and we will end up comparing the files. This is the same as the current approach, but the difference is that now we can skip indices for relations with metadata.
So what you mean is users sometimes use paths like ('/a/b/c') and sometimes use ('/a/b/c/d', '/a/b/c/e', '/a/b/c/f') to represent the same data? It's unfortunate if this is true and if it's the usual way how Spark is used in the field for data analysis. Then I see there is no way than scanning every index (for relations without metadata) indeed. However, it really doesn't feel right that we have to check every index. It could be fine if Hyperspace is designed for a small system with a moderate number of tables and indices, though, and users can split their system each equipped with its own Hyperspace instance, so maybe not a huge issue.
I believe it's better, for three reasons:
First, as each entry in the map is independent of each other, there is no reason that each check does the same .map(...).flatten.toMap repeatedly. It is also superior in terms of locality, because you only iterate the map once. It is similar to how
Is it a planned work or already working? I couldn't find bucket size matching in the current JoinIndexRule implementation. Anyway, I do agree that local optimizations are not enough. I just wanted to say that there are simple local optimizations that we could try. Could you elaborate more about the scoring step, and if possible, some examples demonstrating the complexity of the problem? Because looking your description and the code in the PR, I couldn't understand how it works. For example, it seems the algorithm always stops if the applicable filter is deep down in the tree, because then the condition on the line 79 will be never true. Maybe I'm missing something? |
It's in JoinIndexRanker & to remove the shuffle, we also try to find the (join condition columns = indexed columns) in JoinIndexRule.
Actually.. plan.equals is a bit expensive operation.. it also compares children. But we can replace it with if score == 0 |
Thanks! I missed that there is a ranker for each rule.
An example of the scoring algorithm would be really helpful. What I don't quite understand is the part |
The batch is supposed to work from the top node. In FilterNodeCheck, I created NoOpBatch to move down w/o any application. Right now, there's no eligible index for children after applying an index to the parent node, but later we might have one. |
Oh, didn't know that you pushed a new commit between my comments. Now the condition makes sense. So it is the purpose of NoOpBatch? Since I didn't quite understand the role of NoOpBatch in the algorithm. |
I think I can move on the PR review, as the points I raised can be addressed later (e.g. CandidateIndexCollector). Please feel free to let me know if you want a review for the draft. |
@sezruby Thanks for this great proposal! I have a couple of thoughts/comments:
|
@wentaowu Thanks for the feedback! Actually this proposal is bit outdated while working on PRs and some of PRs are already merged.
Let me know if you have another comments & concerns :) |
NOTE: This proposal is outdated and most of PRs are merged. Please refer the PRs
Problem Statement
Currently, Hyperspace supports 1 type of index - non-clustered CoveringIndex.
And there're 2 optimizer rule - FilterIndexRule, JoinIndexRule.
However, as we're going to introduce another types of indexes (e.g. bloom filter index, .. etc), it's hard to apply different type of indexes only using sequence of optimizer rules. Because
Background and Motivation
TBD
Proposed Solution
Define one rule and handle all indexes in the rule - HyperspaceOneRule
Step1) Index candidate collection => only using the relation
Step2) Score based index application
Structure overview
Batches
FilterIndexRule & JoinIndexRule can be represented as a "Batch". It consists of sequence of
HyperspacePlanCheck
.NoOpBatch
FilterIndexBatch
match
/ notcollect
)JoinIndexBatch
Alternatives
None
Known/Potential Compatibility Issues
None
Design
"Check" based candidate index filters - for WhyNot API
HyperspaceSourceCheck
HyperspacePlanCheck
reason
field for each check.HyperspaceIndexCheck
asHyperspaceSourceCheck
Implementation
Performance Implications (if applicable)
Backtracking might take long time if the query plan is complex & there are many candidate indexes.
We could limit the total number of candidate indexes in index candidate collection step.
Open issues (if applicable)
A discussion of issues relating to this proposal for which the author does not know the solution. If you have already opened the corresponding issues, please link to them here. This section may be omitted if there are none.
Not sure ranker is possible - in the current design, it's hard to collect more than one candidate selections.
(possible but will increase the complexity)
Additional context (if applicable)
n/a
The text was updated successfully, but these errors were encountered: