Skip to content

Commit

Permalink
Merge pull request disney#149 from disney/tpch-support
Browse files Browse the repository at this point in the history
Tpch support to develop for RC-5
  • Loading branch information
guymolinari authored Sep 2, 2024
2 parents caeed7b + a90c1e9 commit 607a5f1
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 31 deletions.
32 changes: 25 additions & 7 deletions core/projector.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,6 @@ func (p *Projector) fetchStrings(columnIDs []uint64, bsiResults map[string]map[s
// use FK IntBSI to transpose to parent columnID set
if _, ok := bsiResults[p.childTable][key]; !ok {
goto nochild
//continue
}
trxColumnIDs = p.transposeFKColumnIDs(bsiResults[p.childTable][key], columnIDs)
nochild: if v.MappingStrategy == "ParentRelation" {
Expand Down Expand Up @@ -530,11 +529,11 @@ func (p *Projector) fetchStrings(columnIDs []uint64, bsiResults map[string]map[s
if v.MappingStrategy == "ParentRelation" ||
(v.Parent.Name != p.childTable && p.childTable != "" && !p.negate && p.innerJoin) {
lBatch, err = p.getPartitionedStrings(lookupAttribute, trxColumnIDs)
//u.Debugf("TRANSLATING PROJ %v, LEFT = %v, CHILD = %v, INNER = %v", v.Parent.Name, p.leftTable,
//u.Errorf("TRANSLATING PROJ %v, LEFT = %v, CHILD = %v, INNER = %v", v.Parent.Name, p.leftTable,
// p.childTable, p.innerJoin)
} else {
lBatch, err = p.getPartitionedStrings(lookupAttribute, columnIDs)
//u.Debugf("NOT TRANSLATING PROJ %v, LEFT = %v, CHILD = %v, INNER = %v", v.Parent.Name, p.leftTable,
//u.Errorf("NOT TRANSLATING PROJ %v, LEFT = %v, CHILD = %v, INNER = %v", v.Parent.Name, p.leftTable,
// p.childTable, p.innerJoin)
}
if err != nil {
Expand Down Expand Up @@ -619,6 +618,20 @@ func (p *Projector) getRow(colID uint64, strMap map[string]map[interface{}]inter
return
}
*/
// Foreign key is @rownum
if strings.HasSuffix(v.ForeignKey, "@rownum") {
rs, ok := bsiResults[v.Parent.Name][v.FieldName]
if !ok {
row[i] = "NULL"
} else {
if val, ok := rs.GetValue(colID); !ok {
row[i] = "NULL"
} else {
row[i] = fmt.Sprintf("%10d", val)
}
}
continue
}
var pv *Attribute
if pka[0].Parent.TimeQuantumType == "" {
pv = pka[0]
Expand Down Expand Up @@ -751,11 +764,16 @@ func (p *Projector) checkColumnID(v *Attribute, cID, child uint64,
}
}
}
if (p.innerJoin || child > 0) && p.childTable != "" && v.Parent.Name != p.childTable {
if (p.innerJoin || child > 0) && p.childTable != "" && v.Parent.Name != p.childTable && !p.negate {
//if (p.innerJoin || child > 0) && p.childTable != "" && v.Parent.Name != p.leftTable {
if child == 0 && !p.innerJoin {
colID = 0
return
}
if v.Parent.Name == p.childTable {
colID = cID
return
}
if r, ok := p.findRelationLink(v.Parent.Name); !ok {
err = fmt.Errorf("findRelationLink failed for %s", v.Parent.Name)
} else {
Expand All @@ -767,10 +785,10 @@ func (p *Projector) checkColumnID(v *Attribute, cID, child uint64,
val, found := b.GetValue(cID)
if found {
colID = uint64(val)
//u.Debugf("FOUND %s.%s - COLID = %d, CHILD = %d", v.Parent.Name, v.FieldName, colID, child)
//u.Errorf("CHECK COLID FOUND %s.%s - COLID = %d, CHILD = %d", v.Parent.Name, v.FieldName, colID, child)
} else {
colID = cID
//u.Debugf("NOT FOUND %s.%s - COLID = %d", v.Parent.Name, v.FieldName, colID)
//u.Errorf("CHECK NOT FOUND %s.%s - COLID = %d", v.Parent.Name, v.FieldName, colID)
}
}
}
Expand All @@ -779,7 +797,7 @@ func (p *Projector) checkColumnID(v *Attribute, cID, child uint64,
if child == 0 && v.Parent.Name == p.childTable && !p.innerJoin {
colID = 0
}
//u.Debugf("SKIPPING %s.%s - COLID = %d, CHILD = %d", v.Parent.Name, v.FieldName, colID, child)
//u.Errorf("SKIPPING %s.%s - COLID = %d, CHILD = %d", v.Parent.Name, v.FieldName, colID, child)
}
return
}
Expand Down
8 changes: 8 additions & 0 deletions server/bitmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -1435,6 +1435,14 @@ func (m *BitmapIndex) OfflinePartitions(ctx context.Context, req *pb.PartitionIn
return nil, fmt.Errorf("Time must be specified.")
}

ts := time.Unix(0, req.Time)

if req.Index != "" {
u.Info("Offline partition request for %v, table = %s", ts.Format(timeFmt), req.Index)
} else {
u.Info("Offline partition request for %v, all partitioned tables", ts.Format(timeFmt))
}

// Iterate over shard cache insert into partition operation queue
m.iterateBSICache(func(p *Partition) error {

Expand Down
18 changes: 9 additions & 9 deletions server/bitmapio.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,17 @@ func (po *PartitionOperation) perform(path string, info os.FileInfo, err error)
}

if info.IsDir() {
//u.Warn("Partition operation not allowed against directory [%v]", path)
return nil
}
if po.RemoveOnly {
u.Infof("Partition remove only for [%v]", path)
return os.Remove(path)
}
dest := po.newPath + sep + info.Name()
err2 := os.Rename(path, dest)
if err2 == nil {
u.Infof("Partition rename move [%v] to [%v]", path, dest)
return nil
}
if !strings.HasSuffix(err2.Error(), "invalid cross-device link") {
Expand All @@ -182,6 +185,7 @@ func (po *PartitionOperation) perform(path string, info os.FileInfo, err error)
if err4 != nil {
return err4
}
u.Infof("Partition copy move [%v] to [%v]", path, dest)
return os.Remove(path)
}

Expand Down Expand Up @@ -472,27 +476,23 @@ func (m *BitmapIndex) readBitmapFiles(fragQueue chan *BitmapFragment) error {
func (m *BitmapIndex) purgePartition(aop *Partition) {

t := aop.Time.UnixNano()
// atw - TODO - understand
// The condition is wrong for bools.
// Since the keys are unique we can just delete the key.
// from both caches.
//if aop.RowIDOrBits > 0
{
// phoneType, city, isActive, rownum, isLegalAge
if aop.RowIDOrBits >= 0 {
rowID := uint64(aop.RowIDOrBits)
m.bitmapCacheLock.Lock()
defer m.bitmapCacheLock.Unlock()
_, ok := m.bitmapCache[aop.Index][aop.Field][rowID][t]
if ok {
delete(m.bitmapCache[aop.Index][aop.Field][rowID], t)
u.Infof("Purged standard bitmap %s.%s, ts = %v, rowID = %d", aop.Index, aop.Field,
aop.Time.Format(timeFmt), rowID)
}
} // else
{ // numFamilyMembers, first_name, address, hashedCurtid. age, height, last_name
} else {
m.bsiCacheLock.Lock()
defer m.bsiCacheLock.Unlock()
_, ok := m.bsiCache[aop.Index][aop.Field][t]
if ok {
delete(m.bsiCache[aop.Index][aop.Field], t)
u.Infof("Purged BSI %s.%s, ts = %v", aop.Index, aop.Field, aop.Time.Format(timeFmt))
}
}
}
27 changes: 13 additions & 14 deletions source/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,13 @@ func decorateRow(row []driver.Value, proj *rel.Projection, rowCols map[string]in
}
ctx := datasource.NewSqlDriverMessageMap(columnID, cpyRow, rowCols)
for i, v := range proj.Columns {
ri, rok := rowCols[v.As]
if rok {
ri1, aok := rowCols[v.As]
ri2, fok := rowCols[v.Name]
if fok || aok {
ri := ri1
if fok {
ri = ri2
}
if ri < len(row) {
newRow[i] = fmt.Sprintf("%s", row[ri])
} else {
Expand Down Expand Up @@ -377,19 +382,16 @@ func createProjection(orig *rel.SqlSelect, sch *schema.Schema, driverTable strin
for _, v := range orig.Columns {
_, isFunc := v.Expr.(*expr.FuncNode)
var table *schema.Table
l, r, isAliased := v.LeftRight()
//l, r, isAliased := v.LeftRight()
l, _, isAliased := expr.LeftRight(v.SourceOriginal)
if isAliased && !isFunc {
table = tableMap[aliasMap[l].Source.From[0].Name]
} else {
table = tableMap[orig.From[0].Name]
}
_, isIdent := v.Expr.(*expr.IdentityNode)
if vt, ok := table.Column(v.SourceField); ok && isIdent {
if isAliased {
ret.AddColumn(rel.NewColumn(fmt.Sprintf("%s.%s", l, r)), vt)
} else {
ret.AddColumn(v, vt)
}
ret.AddColumn(v, vt)
p := fmt.Sprintf("%s.%s", table.Name, v.SourceField)
if _, ok := projColsMap[p]; !ok {
projCols = append(projCols, p)
Expand Down Expand Up @@ -418,7 +420,8 @@ func createProjection(orig *rel.SqlSelect, sch *schema.Schema, driverTable strin
i := 0
for _, z := range ret.Columns {
v := z.Col
l, r, isAliased := v.LeftRight()
//l, r, isAliased := v.LeftRight()
l, r, isAliased := expr.LeftRight(v.SourceOriginal)
var table *schema.Table
_, isFunc := v.Expr.(*expr.FuncNode)
if isAliased && !isFunc {
Expand Down Expand Up @@ -446,11 +449,7 @@ func createProjection(orig *rel.SqlSelect, sch *schema.Schema, driverTable strin
}
} else {
v := z.Col
l, r, isAliased := v.LeftRight()
colName := r
if isAliased {
colName = fmt.Sprintf("%s.%s", l, r)
}
colName := v.As
colNames[colName] = i + rownumOffset
if colName == "@rownum" {
rownumOffset++
Expand Down
2 changes: 1 addition & 1 deletion version.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
quanta 0.9.14-rc-4
quanta 0.9.14-rc-5

0 comments on commit 607a5f1

Please sign in to comment.