From b9b7e96d999fb9c18fca4a436706bebf93bcf046 Mon Sep 17 00:00:00 2001 From: PangXing Date: Sun, 18 Jun 2023 21:12:14 +0800 Subject: [PATCH 1/2] feat: Auto clean transaction log (#671) --- conf/config.yaml | 2 + integration_test/config/db/config.yaml | 2 + integration_test/config/db_tbl/config.yaml | 2 + integration_test/config/db_tbl_rw/config.yaml | 9 ++++ integration_test/config/tbl/config.yaml | 2 + pkg/config/api.go | 1 + pkg/config/config_reader.go | 6 +++ pkg/config/config_test.go | 8 +++ pkg/config/model.go | 1 + pkg/config/path.go | 6 +++ pkg/runtime/tenant/tenant.go | 2 +- pkg/runtime/transaction/trx_log.go | 50 +++++++++++++++---- pkg/runtime/transaction/trx_manager.go | 9 +++- testdata/fake_config.yaml | 9 ++++ 14 files changed, 98 insertions(+), 11 deletions(-) diff --git a/conf/config.yaml b/conf/config.yaml index d8523af5..c211664b 100644 --- a/conf/config.yaml +++ b/conf/config.yaml @@ -29,6 +29,8 @@ data: password: "123456" # TODO It is recommended that the initialization of the subsequent system library be handled by arana internally database: __arana_sys + weight: r10w10 + parameters: users: - username: root password: "123456" diff --git a/integration_test/config/db/config.yaml b/integration_test/config/db/config.yaml index 8cd78458..3a3be2ee 100644 --- a/integration_test/config/db/config.yaml +++ b/integration_test/config/db/config.yaml @@ -34,6 +34,8 @@ data: password: "123456" # TODO It is recommended that the initialization of the subsequent system library be handled by arana internally database: __arana_sys + weight: r10w10 + parameters: clusters: - name: employees type: mysql diff --git a/integration_test/config/db_tbl/config.yaml b/integration_test/config/db_tbl/config.yaml index 1fbba6d5..456f95d7 100644 --- a/integration_test/config/db_tbl/config.yaml +++ b/integration_test/config/db_tbl/config.yaml @@ -34,6 +34,8 @@ data: password: "123456" # TODO It is recommended that the initialization of the subsequent system library be handled by arana internally database: __arana_sys + weight: r10w10 + parameters: clusters: - name: employees type: mysql diff --git a/integration_test/config/db_tbl_rw/config.yaml b/integration_test/config/db_tbl_rw/config.yaml index acd48452..ab43afee 100644 --- a/integration_test/config/db_tbl_rw/config.yaml +++ b/integration_test/config/db_tbl_rw/config.yaml @@ -34,6 +34,15 @@ data: password: "123456" - username: arana password: "123456" + sys_db: + host: arana-mysql + port: 3306 + username: root + password: "123456" + # TODO It is recommended that the initialization of the subsequent system library be handled by arana internally + database: __arana_sys + weight: r10w10 + parameters: clusters: - name: employees diff --git a/integration_test/config/tbl/config.yaml b/integration_test/config/tbl/config.yaml index b0c20c47..0f09f1fc 100644 --- a/integration_test/config/tbl/config.yaml +++ b/integration_test/config/tbl/config.yaml @@ -34,6 +34,8 @@ data: password: "123456" # TODO It is recommended that the initialization of the subsequent system library be handled by arana internally database: __arana_sys + weight: r10w10 + parameters: clusters: - name: employees type: mysql diff --git a/pkg/config/api.go b/pkg/config/api.go index bf7eda77..0733b07a 100644 --- a/pkg/config/api.go +++ b/pkg/config/api.go @@ -69,6 +69,7 @@ const ( const ( ConfigItemSpec = "spec" + ConfigItemSysDB = "sys_db" ConfigItemUsers = "users" ConfigItemClusters = "clusters" ConfigItemShardingRule = "sharding_rule" diff --git a/pkg/config/config_reader.go b/pkg/config/config_reader.go index 4b0b4e94..4250eb9e 100644 --- a/pkg/config/config_reader.go +++ b/pkg/config/config_reader.go @@ -172,6 +172,12 @@ func (c *configReader) compositeConfiguration(loadFilter map[PathKey]string) *Te } } + if _, ok := loadFilter[c.pathInfo.DefaultConfigSysDBPath]; ok { + if val := c.holders[c.pathInfo.DefaultConfigSysDBPath].Load(); val != nil { + conf.SysDB = val.(*Tenant).SysDB + } + } + if _, ok := loadFilter[c.pathInfo.DefaultConfigDataNodesPath]; ok { if val := c.holders[c.pathInfo.DefaultConfigDataNodesPath].Load(); val != nil { conf.Nodes = val.(*Tenant).Nodes diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index a41f9a4a..60dae2b7 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -299,6 +299,7 @@ func Test_configReader_LoadAll(t *testing.T) { } data, _ := yaml.Marshal(nodes) + sysdb, _ := yaml.Marshal(config.NewEmptyTenant().SysDB) users, _ := yaml.Marshal(config.NewEmptyTenant().Users) shadow, _ := yaml.Marshal(config.NewEmptyTenant().ShadowRule) sharding, _ := yaml.Marshal(config.NewEmptyTenant().ShardingRule) @@ -311,6 +312,9 @@ func Test_configReader_LoadAll(t *testing.T) { Do(func(interface{}) { atomic.AddInt32(&callCnt, 1) }) + mockStoreOperator.EXPECT().Get(config.NewPathInfo("arana").DefaultConfigSysDBPath). + AnyTimes(). + Return(sysdb, nil) mockStoreOperator.EXPECT().Get(config.NewPathInfo("arana").DefaultConfigDataUsersPath). AnyTimes(). Return(users, nil) @@ -357,6 +361,7 @@ func Test_configReader_LoadAll(t *testing.T) { mockStoreOperator.EXPECT().Watch(gomock.Any()).AnyTimes().Return(shareCh, nil) nodes, _ := yaml.Marshal(config.NewEmptyTenant().Nodes) + sysdb, _ := yaml.Marshal(config.NewEmptyTenant().SysDB) users, _ := yaml.Marshal(config.NewEmptyTenant().Users) shadow, _ := yaml.Marshal(config.NewEmptyTenant().ShadowRule) sharding, _ := yaml.Marshal(config.NewEmptyTenant().ShardingRule) @@ -366,6 +371,9 @@ func Test_configReader_LoadAll(t *testing.T) { mockStoreOperator.EXPECT().Get(config.NewPathInfo("arana").DefaultConfigDataNodesPath). AnyTimes(). Return(nodes, nil) + mockStoreOperator.EXPECT().Get(config.NewPathInfo("arana").DefaultConfigSysDBPath). + AnyTimes(). + Return(sysdb, nil) mockStoreOperator.EXPECT().Get(config.NewPathInfo("arana").DefaultConfigDataUsersPath). AnyTimes(). Return(users, nil) diff --git a/pkg/config/model.go b/pkg/config/model.go index b396132c..6c9084ec 100644 --- a/pkg/config/model.go +++ b/pkg/config/model.go @@ -419,6 +419,7 @@ func NewEmptyTenant() *Tenant { Spec: Spec{ Metadata: map[string]interface{}{}, }, + SysDB: nil, Users: make([]*User, 0, 1), DataSourceClusters: make([]*DataSourceCluster, 0, 1), ShardingRule: new(ShardingRule), diff --git a/pkg/config/path.go b/pkg/config/path.go index de705b28..265f0c8c 100644 --- a/pkg/config/path.go +++ b/pkg/config/path.go @@ -28,6 +28,7 @@ import ( type PathInfo struct { DefaultConfigSpecPath PathKey DefaultTenantBaseConfigPath PathKey + DefaultConfigSysDBPath PathKey DefaultConfigDataNodesPath PathKey DefaultConfigDataUsersPath PathKey DefaultConfigDataSourceClustersPath PathKey @@ -45,6 +46,7 @@ func NewPathInfo(tenant string) *PathInfo { p.DefaultTenantBaseConfigPath = PathKey(filepath.Join(string(DefaultRootPath), fmt.Sprintf("tenants/%s", tenant))) p.DefaultConfigSpecPath = PathKey(filepath.Join(string(p.DefaultTenantBaseConfigPath), "spec")) + p.DefaultConfigSysDBPath = PathKey(filepath.Join(string(p.DefaultTenantBaseConfigPath), "sys_db")) p.DefaultConfigDataNodesPath = PathKey(filepath.Join(string(p.DefaultTenantBaseConfigPath), "nodes")) p.DefaultConfigDataUsersPath = PathKey(filepath.Join(string(p.DefaultTenantBaseConfigPath), "users")) p.DefaultConfigDataSourceClustersPath = PathKey(filepath.Join(string(p.DefaultTenantBaseConfigPath), "dataSourceClusters")) @@ -63,6 +65,9 @@ func NewPathInfo(tenant string) *PathInfo { p.DefaultConfigSpecPath: func(cfg *Tenant) interface{} { return &cfg.Spec }, + p.DefaultConfigSysDBPath: func(cfg *Tenant) interface{} { + return &cfg.SysDB + }, p.DefaultConfigDataUsersPath: func(cfg *Tenant) interface{} { return &cfg.Users }, @@ -82,6 +87,7 @@ func NewPathInfo(tenant string) *PathInfo { p.ConfigKeyMapping = map[PathKey]string{ p.DefaultConfigSpecPath: ConfigItemSpec, + p.DefaultConfigSysDBPath: ConfigItemSysDB, p.DefaultConfigDataUsersPath: ConfigItemUsers, p.DefaultConfigDataSourceClustersPath: ConfigItemClusters, p.DefaultConfigDataShardingRulePath: ConfigItemShardingRule, diff --git a/pkg/runtime/tenant/tenant.go b/pkg/runtime/tenant/tenant.go index ef978482..6e790b77 100644 --- a/pkg/runtime/tenant/tenant.go +++ b/pkg/runtime/tenant/tenant.go @@ -56,7 +56,7 @@ func LoadSysDB(tenant string) (proto.DB, error) { defer lock.RUnlock() val, ok := _tenantSysDB[tenant] - if ok { + if !ok { return nil, fmt.Errorf("cannot load sysdb: tenant=%s", tenant) } diff --git a/pkg/runtime/transaction/trx_log.go b/pkg/runtime/transaction/trx_log.go index 02fcaa4c..705b8254 100644 --- a/pkg/runtime/transaction/trx_log.go +++ b/pkg/runtime/transaction/trx_log.go @@ -22,6 +22,8 @@ import ( "encoding/json" "fmt" "strings" + "sync" + "time" ) import ( @@ -40,6 +42,8 @@ var ( "start_time": {}, "update_time": {}, } + _initTxLogOnce sync.Once + _txLogCleanTimer *time.Timer ) const ( @@ -71,14 +75,18 @@ type TxLogManager struct { } // init executes create __arana_tx_log table action -func (gm *TxLogManager) init() error { - ctx := context.Background() - res, _, err := gm.sysDB.Call(ctx, _initTxLog) - if err != nil { - return err - } - _, _ = res.RowsAffected() - return nil +func (gm *TxLogManager) Init(delay time.Duration) error { + var err error + _initTxLogOnce.Do(func() { + ctx := context.Background() + res, _, err := gm.sysDB.Call(ctx, _initTxLog) + if err != nil { + return + } + _, _ = res.RowsAffected() + _txLogCleanTimer = time.AfterFunc(delay, gm.runCleanTxLogTask) + }) + return err } // AddOrUpdateTxLog Add or update transaction log @@ -179,5 +187,29 @@ func (gm *TxLogManager) ScanTxLog(pageNo, pageSize uint64, conditions []Conditio // partition table according to the day level or hour level. // the execution of this task requires distributed task preemption based on the metadata DB func (gm *TxLogManager) runCleanTxLogTask() { - + var ( + pageNo uint64 + pageSize uint64 = 50 + conditions = []Condition{ + { + FiledName: "status", + Operation: Equal, + Value: runtime.TrxFinish, + }, + } + ) + var txLogs []TrxLog + for { + total, logs, err := gm.ScanTxLog(pageNo, pageSize, conditions) + if err != nil { + break + } + txLogs = append(txLogs, logs...) + if len(txLogs) >= int(total) { + break + } + } + for _, l := range txLogs { + gm.DeleteTxLog(l) + } } diff --git a/pkg/runtime/transaction/trx_manager.go b/pkg/runtime/transaction/trx_manager.go index e124194d..5b9d683b 100644 --- a/pkg/runtime/transaction/trx_manager.go +++ b/pkg/runtime/transaction/trx_manager.go @@ -20,6 +20,7 @@ package transaction import ( "errors" "sync" + "time" ) import ( @@ -28,6 +29,7 @@ import ( var ( ErrorTrxManagerNotInitialize = errors.New("TrxManager not initialize") + DefaultCleanLogDelay = 1 * time.Hour ) var ( @@ -45,11 +47,16 @@ func CreateTrxManager(tenant string) error { } sysDB, err := aranatenant.LoadSysDB(tenant) - if err == nil { + if err != nil { return err } trxLog := &TxLogManager{sysDB: sysDB} + err = trxLog.Init(DefaultCleanLogDelay) + if err != nil { + return err + } + trxBottomMaker := &TxFaultDecisionExecutor{tm: trxLog} trxMgrs[tenant] = &TrxManager{ diff --git a/testdata/fake_config.yaml b/testdata/fake_config.yaml index cfbad10e..59ddbe3c 100644 --- a/testdata/fake_config.yaml +++ b/testdata/fake_config.yaml @@ -27,6 +27,15 @@ data: password: "123456" - username: arana password: "123456" + sys_db: + host: arana-mysql + port: 3306 + username: root + password: "123456" + # TODO It is recommended that the initialization of the subsequent system library be handled by arana internally + database: __arana_sys + weight: r10w10 + parameters: clusters: - name: employees type: mysql From c6964767a4223945128ff61ef804ae24f622d4e8 Mon Sep 17 00:00:00 2001 From: csynineyang <93956978+csynineyang@users.noreply.github.com> Date: Mon, 19 Jun 2023 02:40:07 +0800 Subject: [PATCH 2/2] [SQL] Support MySQL window function. (#688) * add node config support (#464) * Support MySQL CAST_CHAR function. * format style * Support MySQL CAST_TIME function. (#570) * Support MySQL CAST_DATE function. (#569) * Support MySQL CAST_DATETIME function. (#568) * Support MySQL CAST_TIME/CAST_DATE/CAST_DATETIME function * Resolve Conversation * Support CREATE TABLE * add: IfNotExists * fix: reformat imports * Resolve Conversation * Support window function: CUME_DIST * Support window function: PERCENT_RANK * Support window function: RANK * Support window function: DENSE_RANK * Support window function: FIRST_VALUE/LAST_VALUE/LAG/LEAD * Support window function: NTH_VALUE/NTILE/ROW_NUMBER * support argument(n) in LAG/LEAD * convert Int64 to Float64 in test case --- pkg/runtime/function/cume_dist.go | 72 ++++++++ pkg/runtime/function/cume_dist_test.go | 127 +++++++++++++ pkg/runtime/function/dense_rank.go | 70 ++++++++ pkg/runtime/function/dense_rank_test.go | 127 +++++++++++++ pkg/runtime/function/first_value.go | 95 ++++++++++ pkg/runtime/function/first_value_test.go | 198 +++++++++++++++++++++ pkg/runtime/function/lag.go | 123 +++++++++++++ pkg/runtime/function/lag_test.go | 159 +++++++++++++++++ pkg/runtime/function/last_value.go | 108 +++++++++++ pkg/runtime/function/last_value_test.go | 198 +++++++++++++++++++++ pkg/runtime/function/lead.go | 123 +++++++++++++ pkg/runtime/function/lead_test.go | 159 +++++++++++++++++ pkg/runtime/function/nth_value.go | 123 +++++++++++++ pkg/runtime/function/nth_value_test.go | 207 ++++++++++++++++++++++ pkg/runtime/function/ntile.go | 127 +++++++++++++ pkg/runtime/function/ntile_test.go | 207 ++++++++++++++++++++++ pkg/runtime/function/percent_rank.go | 72 ++++++++ pkg/runtime/function/percent_rank_test.go | 127 +++++++++++++ pkg/runtime/function/rank.go | 68 +++++++ pkg/runtime/function/rank_test.go | 127 +++++++++++++ pkg/runtime/function/row_number.go | 109 ++++++++++++ pkg/runtime/function/row_number_test.go | 152 ++++++++++++++++ 22 files changed, 2878 insertions(+) create mode 100644 pkg/runtime/function/cume_dist.go create mode 100644 pkg/runtime/function/cume_dist_test.go create mode 100644 pkg/runtime/function/dense_rank.go create mode 100644 pkg/runtime/function/dense_rank_test.go create mode 100644 pkg/runtime/function/first_value.go create mode 100644 pkg/runtime/function/first_value_test.go create mode 100644 pkg/runtime/function/lag.go create mode 100644 pkg/runtime/function/lag_test.go create mode 100644 pkg/runtime/function/last_value.go create mode 100644 pkg/runtime/function/last_value_test.go create mode 100644 pkg/runtime/function/lead.go create mode 100644 pkg/runtime/function/lead_test.go create mode 100644 pkg/runtime/function/nth_value.go create mode 100644 pkg/runtime/function/nth_value_test.go create mode 100644 pkg/runtime/function/ntile.go create mode 100644 pkg/runtime/function/ntile_test.go create mode 100644 pkg/runtime/function/percent_rank.go create mode 100644 pkg/runtime/function/percent_rank_test.go create mode 100644 pkg/runtime/function/rank.go create mode 100644 pkg/runtime/function/rank_test.go create mode 100644 pkg/runtime/function/row_number.go create mode 100644 pkg/runtime/function/row_number_test.go diff --git a/pkg/runtime/function/cume_dist.go b/pkg/runtime/function/cume_dist.go new file mode 100644 index 00000000..b07920a4 --- /dev/null +++ b/pkg/runtime/function/cume_dist.go @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" +) + +import ( + "github.com/pkg/errors" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +// FuncCumeDist is https://dev.mysql.com/doc/refman/8.0/en/window-function-descriptions.html +const FuncCumeDist = "CUME_DIST" + +var _ proto.Func = (*cumedistFunc)(nil) + +func init() { + proto.RegisterFunc(FuncCumeDist, cumedistFunc{}) +} + +type cumedistFunc struct{} + +func (a cumedistFunc) Apply(ctx context.Context, inputs ...proto.Valuer) (proto.Value, error) { + first, err := inputs[0].Value(ctx) + if first == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncCumeDist) + } + firstDec, _ := first.Float64() + firstNum := 0 + + for _, it := range inputs[1:] { + val, err := it.Value(ctx) + if val == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncCumeDist) + } + valDec, _ := val.Float64() + + if valDec <= firstDec { + firstNum++ + } + } + + r := 0.0 + if len(inputs) > 1 { + r = float64(firstNum) / float64(len(inputs)-1) + } + return proto.NewValueFloat64(r), nil +} + +func (a cumedistFunc) NumInput() int { + return 0 +} diff --git a/pkg/runtime/function/cume_dist_test.go b/pkg/runtime/function/cume_dist_test.go new file mode 100644 index 00000000..23eed956 --- /dev/null +++ b/pkg/runtime/function/cume_dist_test.go @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +func TestFuncCumeDist(t *testing.T) { + fn := proto.MustGetFunc(FuncCumeDist) + type tt struct { + inputs []proto.Value + want string + } + for _, v := range []tt{ + { + []proto.Value{ + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(2), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(4), + proto.NewValueFloat64(4), + proto.NewValueFloat64(5), + }, + "0.2222222222222222", + }, + { + []proto.Value{ + proto.NewValueFloat64(2), + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(2), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(4), + proto.NewValueFloat64(4), + proto.NewValueFloat64(5), + }, + "0.3333333333333333", + }, + { + []proto.Value{ + proto.NewValueFloat64(3), + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(2), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(4), + proto.NewValueFloat64(4), + proto.NewValueFloat64(5), + }, + "0.6666666666666666", + }, + { + []proto.Value{ + proto.NewValueFloat64(4), + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(2), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(4), + proto.NewValueFloat64(4), + proto.NewValueFloat64(5), + }, + "0.8888888888888888", + }, + { + []proto.Value{ + proto.NewValueFloat64(5), + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(2), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(4), + proto.NewValueFloat64(4), + proto.NewValueFloat64(5), + }, + "1", + }, + } { + t.Run(v.want, func(t *testing.T) { + var inputs []proto.Valuer + for i := range v.inputs { + inputs = append(inputs, proto.ToValuer(v.inputs[i])) + } + out, err := fn.Apply(context.Background(), inputs...) + assert.NoError(t, err) + assert.Equal(t, v.want, fmt.Sprint(out)) + }) + } +} diff --git a/pkg/runtime/function/dense_rank.go b/pkg/runtime/function/dense_rank.go new file mode 100644 index 00000000..f125fdb7 --- /dev/null +++ b/pkg/runtime/function/dense_rank.go @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" +) + +import ( + "github.com/pkg/errors" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +// FuncDenseRank is https://dev.mysql.com/doc/refman/8.0/en/window-function-descriptions.html +const FuncDenseRank = "DENSE_RANK" + +var _ proto.Func = (*denserankFunc)(nil) + +func init() { + proto.RegisterFunc(FuncDenseRank, denserankFunc{}) +} + +type denserankFunc struct{} + +func (a denserankFunc) Apply(ctx context.Context, inputs ...proto.Valuer) (proto.Value, error) { + first, err := inputs[0].Value(ctx) + if first == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncDenseRank) + } + firstDec, _ := first.Float64() + secondDec := firstDec + firstNum := 0 + + for _, it := range inputs[1:] { + val, err := it.Value(ctx) + if val == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncDenseRank) + } + valDec, _ := val.Float64() + + if valDec < firstDec && valDec != secondDec { + firstNum++ + secondDec = valDec + } + } + + return proto.NewValueInt64(int64(firstNum) + 1), nil +} + +func (a denserankFunc) NumInput() int { + return 0 +} diff --git a/pkg/runtime/function/dense_rank_test.go b/pkg/runtime/function/dense_rank_test.go new file mode 100644 index 00000000..646819a6 --- /dev/null +++ b/pkg/runtime/function/dense_rank_test.go @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +func TestFuncDenseRankt(t *testing.T) { + fn := proto.MustGetFunc(FuncDenseRank) + type tt struct { + inputs []proto.Value + want string + } + for _, v := range []tt{ + { + []proto.Value{ + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(2), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(4), + proto.NewValueFloat64(4), + proto.NewValueFloat64(5), + }, + "1", + }, + { + []proto.Value{ + proto.NewValueFloat64(2), + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(2), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(4), + proto.NewValueFloat64(4), + proto.NewValueFloat64(5), + }, + "2", + }, + { + []proto.Value{ + proto.NewValueFloat64(3), + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(2), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(4), + proto.NewValueFloat64(4), + proto.NewValueFloat64(5), + }, + "3", + }, + { + []proto.Value{ + proto.NewValueFloat64(4), + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(2), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(4), + proto.NewValueFloat64(4), + proto.NewValueFloat64(5), + }, + "4", + }, + { + []proto.Value{ + proto.NewValueFloat64(5), + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(2), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(4), + proto.NewValueFloat64(4), + proto.NewValueFloat64(5), + }, + "5", + }, + } { + t.Run(v.want, func(t *testing.T) { + var inputs []proto.Valuer + for i := range v.inputs { + inputs = append(inputs, proto.ToValuer(v.inputs[i])) + } + out, err := fn.Apply(context.Background(), inputs...) + assert.NoError(t, err) + assert.Equal(t, v.want, fmt.Sprint(out)) + }) + } +} diff --git a/pkg/runtime/function/first_value.go b/pkg/runtime/function/first_value.go new file mode 100644 index 00000000..2883ce60 --- /dev/null +++ b/pkg/runtime/function/first_value.go @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" + "strings" +) + +import ( + "github.com/pkg/errors" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +// FuncFirstValue is https://dev.mysql.com/doc/refman/8.0/en/window-function-descriptions.html +const FuncFirstValue = "FIRST_VALUE" + +var _ proto.Func = (*firstvalueFunc)(nil) + +func init() { + proto.RegisterFunc(FuncFirstValue, firstvalueFunc{}) +} + +type firstvalueFunc struct{} + +func (a firstvalueFunc) Apply(ctx context.Context, inputs ...proto.Valuer) (proto.Value, error) { + if len(inputs) < 3 { + return proto.NewValueString(""), nil + } + + // partition by this column + firstPartitionColumn, err := inputs[1].Value(ctx) + if firstPartitionColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncFirstValue) + } + firstPartitionColumnStr := firstPartitionColumn.String() + // output by this volumn + firstValueColumn, err := inputs[2].Value(ctx) + if firstValueColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncFirstValue) + } + firstValueColumnDec, _ := firstValueColumn.Float64() + firstValue := 0.0 + startOffset := 3 + + if len(inputs) < 6 { + return proto.NewValueFloat64(firstValueColumnDec), nil + } + + for { + partitionColumn, err := inputs[startOffset+1].Value(ctx) + if partitionColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncFirstValue) + } + partitionColumnStr := partitionColumn.String() + valueColumn, err := inputs[startOffset+2].Value(ctx) + if valueColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncFirstValue) + } + valueColumnDec, _ := valueColumn.Float64() + if strings.Compare(firstPartitionColumnStr, partitionColumnStr) == 0 { + firstValue = valueColumnDec + break + } + + startOffset += 3 + if startOffset >= len(inputs) { + break + } + } + + return proto.NewValueFloat64(firstValue), nil +} + +func (a firstvalueFunc) NumInput() int { + return 0 +} diff --git a/pkg/runtime/function/first_value_test.go b/pkg/runtime/function/first_value_test.go new file mode 100644 index 00000000..5473e7ab --- /dev/null +++ b/pkg/runtime/function/first_value_test.go @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +func TestFuncFirstValue(t *testing.T) { + fn := proto.MustGetFunc(FuncFirstValue) + type tt struct { + inputs [][]proto.Value + want string + } + for _, v := range []tt{ + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "10", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "10", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "10", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "10", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "0", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "0", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "0", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "0", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "0", + }, + } { + t.Run(v.want, func(t *testing.T) { + var inputs []proto.Valuer + for i := range v.inputs { + for j := range v.inputs[i] { + inputs = append(inputs, proto.ToValuer(v.inputs[i][j])) + } + } + out, err := fn.Apply(context.Background(), inputs...) + assert.NoError(t, err) + assert.Equal(t, v.want, fmt.Sprint(out)) + }) + } +} diff --git a/pkg/runtime/function/lag.go b/pkg/runtime/function/lag.go new file mode 100644 index 00000000..39ec052d --- /dev/null +++ b/pkg/runtime/function/lag.go @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" + "strings" +) + +import ( + "github.com/pkg/errors" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +// FuncLag is https://dev.mysql.com/doc/refman/8.0/en/window-function-descriptions.html +const FuncLag = "LAG" + +var _ proto.Func = (*lagFunc)(nil) + +func init() { + proto.RegisterFunc(FuncLag, lagFunc{}) +} + +type lagFunc struct{} + +func (a lagFunc) Apply(ctx context.Context, inputs ...proto.Valuer) (proto.Value, error) { + if len(inputs) < 7 { + return proto.NewValueString(""), nil + } + + // lag number + lagNum, err := inputs[0].Value(ctx) + if lagNum == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLag) + } + lagNumInt, _ := lagNum.Int64() + // order by this column + firstOrderColumn, err := inputs[1].Value(ctx) + if firstOrderColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLag) + } + firstOrderColumnStr := firstOrderColumn.String() + // partition by this column + firstPartitionColumn, err := inputs[2].Value(ctx) + if firstPartitionColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLag) + } + firstPartitionColumnStr := firstPartitionColumn.String() + // output by this volumn + firstValueColumn, err := inputs[3].Value(ctx) + if firstValueColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLag) + } + firstValueColumnDec, _ := firstValueColumn.Float64() + lagValue := 0.0 + lagIndex := int64(-1) + startOffset := int64(4) + + for { + orderColumn, err := inputs[startOffset].Value(ctx) + if orderColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLag) + } + orderColumnStr := orderColumn.String() + partitionColumn, err := inputs[startOffset+1].Value(ctx) + if partitionColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLag) + } + partitionColumnStr := partitionColumn.String() + valueColumn, err := inputs[startOffset+2].Value(ctx) + if valueColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLag) + } + valueColumnDec, _ := valueColumn.Float64() + if strings.Compare(firstOrderColumnStr, orderColumnStr) == 0 && + strings.Compare(firstPartitionColumnStr, partitionColumnStr) == 0 && + firstValueColumnDec == valueColumnDec { + if startOffset >= 4+3*lagNumInt { + lagValueColumn, err := inputs[startOffset+2-3*lagNumInt].Value(ctx) + if lagValueColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLag) + } + lagValueColumnDec, _ := lagValueColumn.Float64() + lagValue = lagValueColumnDec + lagIndex = startOffset - 1 + } + break + } + + startOffset += 3 + if startOffset >= int64(len(inputs)) { + break + } + } + + if lagIndex < 0 { + return proto.NewValueString(""), nil + } else { + return proto.NewValueFloat64(lagValue), nil + } +} + +func (a lagFunc) NumInput() int { + return 1 +} diff --git a/pkg/runtime/function/lag_test.go b/pkg/runtime/function/lag_test.go new file mode 100644 index 00000000..1315e9e4 --- /dev/null +++ b/pkg/runtime/function/lag_test.go @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +func TestFuncLag(t *testing.T) { + fn := proto.MustGetFunc(FuncLag) + type tt struct { + inputs [][]proto.Value + want string + } + for _, v := range []tt{ + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(1)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + }, + "", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(1)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + }, + "100", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(1)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + }, + "125", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(1)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + }, + "132", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(1)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + }, + "145", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(1)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + }, + "140", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(1)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + }, + "150", + }, + } { + t.Run(v.want, func(t *testing.T) { + var inputs []proto.Valuer + for i := range v.inputs { + for j := range v.inputs[i] { + inputs = append(inputs, proto.ToValuer(v.inputs[i][j])) + } + } + out, err := fn.Apply(context.Background(), inputs...) + assert.NoError(t, err) + assert.Equal(t, v.want, fmt.Sprint(out)) + }) + } +} diff --git a/pkg/runtime/function/last_value.go b/pkg/runtime/function/last_value.go new file mode 100644 index 00000000..bbbb27fa --- /dev/null +++ b/pkg/runtime/function/last_value.go @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" + "strings" +) + +import ( + "github.com/pkg/errors" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +// FuncLastValue is https://dev.mysql.com/doc/refman/8.0/en/window-function-descriptions.html +const FuncLastValue = "LAST_VALUE" + +var _ proto.Func = (*lastvalueFunc)(nil) + +func init() { + proto.RegisterFunc(FuncLastValue, lastvalueFunc{}) +} + +type lastvalueFunc struct{} + +func (a lastvalueFunc) Apply(ctx context.Context, inputs ...proto.Valuer) (proto.Value, error) { + if len(inputs) < 3 { + return proto.NewValueString(""), nil + } + + // order by this column + firstOrderColumn, err := inputs[0].Value(ctx) + if firstOrderColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLastValue) + } + firstOrderColumnStr := firstOrderColumn.String() + // partition by this column + firstPartitionColumn, err := inputs[1].Value(ctx) + if firstPartitionColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLastValue) + } + firstPartitionColumnStr := firstPartitionColumn.String() + // output by this volumn + firstValueColumn, err := inputs[2].Value(ctx) + if firstValueColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLastValue) + } + firstValueColumnDec, _ := firstValueColumn.Float64() + lastValue := 0.0 + startOffset := 3 + + if len(inputs) < 6 { + return proto.NewValueFloat64(firstValueColumnDec), nil + } + + for { + orderColumn, err := inputs[startOffset].Value(ctx) + if orderColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLastValue) + } + orderColumnStr := orderColumn.String() + partitionColumn, err := inputs[startOffset+1].Value(ctx) + if partitionColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLastValue) + } + partitionColumnStr := partitionColumn.String() + valueColumn, err := inputs[startOffset+2].Value(ctx) + if valueColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLastValue) + } + valueColumnDec, _ := valueColumn.Float64() + if strings.Compare(firstOrderColumnStr, orderColumnStr) == 0 && + strings.Compare(firstPartitionColumnStr, partitionColumnStr) == 0 && + firstValueColumnDec == valueColumnDec { + lastValue = valueColumnDec + break + } + + startOffset += 3 + if startOffset >= len(inputs) { + break + } + } + + return proto.NewValueFloat64(lastValue), nil +} + +func (a lastvalueFunc) NumInput() int { + return 0 +} diff --git a/pkg/runtime/function/last_value_test.go b/pkg/runtime/function/last_value_test.go new file mode 100644 index 00000000..4ba6c567 --- /dev/null +++ b/pkg/runtime/function/last_value_test.go @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +func TestFuncLastValue(t *testing.T) { + fn := proto.MustGetFunc(FuncLastValue) + type tt struct { + inputs [][]proto.Value + want string + } + for _, v := range []tt{ + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "10", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "9", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "25", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "0", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "0", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "10", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "5", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "30", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "25", + }, + } { + t.Run(v.want, func(t *testing.T) { + var inputs []proto.Valuer + for i := range v.inputs { + for j := range v.inputs[i] { + inputs = append(inputs, proto.ToValuer(v.inputs[i][j])) + } + } + out, err := fn.Apply(context.Background(), inputs...) + assert.NoError(t, err) + assert.Equal(t, v.want, fmt.Sprint(out)) + }) + } +} diff --git a/pkg/runtime/function/lead.go b/pkg/runtime/function/lead.go new file mode 100644 index 00000000..dd5d7ac4 --- /dev/null +++ b/pkg/runtime/function/lead.go @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" + "strings" +) + +import ( + "github.com/pkg/errors" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +// FuncLead is https://dev.mysql.com/doc/refman/8.0/en/window-function-descriptions.html +const FuncLead = "LEAD" + +var _ proto.Func = (*leadFunc)(nil) + +func init() { + proto.RegisterFunc(FuncLead, leadFunc{}) +} + +type leadFunc struct{} + +func (a leadFunc) Apply(ctx context.Context, inputs ...proto.Valuer) (proto.Value, error) { + if len(inputs) < 7 { + return proto.NewValueString(""), nil + } + + // lag number + leadNum, err := inputs[0].Value(ctx) + if leadNum == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLead) + } + leadNumInt, _ := leadNum.Int64() + // order by this column + firstOrderColumn, err := inputs[1].Value(ctx) + if firstOrderColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLead) + } + firstOrderColumnStr := firstOrderColumn.String() + // partition by this column + firstPartitionColumn, err := inputs[2].Value(ctx) + if firstPartitionColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLead) + } + firstPartitionColumnStr := firstPartitionColumn.String() + // output by this volumn + firstValueColumn, err := inputs[3].Value(ctx) + if firstValueColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLead) + } + firstValueColumnDec, _ := firstValueColumn.Float64() + lagValue := 0.0 + lagIndex := int64(-1) + startOffset := int64(4) + + for { + orderColumn, err := inputs[startOffset].Value(ctx) + if orderColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLead) + } + orderColumnStr := orderColumn.String() + partitionColumn, err := inputs[startOffset+1].Value(ctx) + if partitionColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLead) + } + partitionColumnStr := partitionColumn.String() + valueColumn, err := inputs[startOffset+2].Value(ctx) + if valueColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLead) + } + valueColumnDec, _ := valueColumn.Float64() + if strings.Compare(firstOrderColumnStr, orderColumnStr) == 0 && + strings.Compare(firstPartitionColumnStr, partitionColumnStr) == 0 && + firstValueColumnDec == valueColumnDec { + if startOffset+2+3*leadNumInt < int64(len(inputs)) { + lagValueColumn, err := inputs[startOffset+2+3*leadNumInt].Value(ctx) + if lagValueColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncLead) + } + lagValueColumnDec, _ := lagValueColumn.Float64() + lagValue = lagValueColumnDec + lagIndex = startOffset - 1 + } + break + } + + startOffset += 3 + if startOffset >= int64(len(inputs)) { + break + } + } + + if lagIndex < 0 { + return proto.NewValueString(""), nil + } else { + return proto.NewValueFloat64(lagValue), nil + } +} + +func (a leadFunc) NumInput() int { + return 1 +} diff --git a/pkg/runtime/function/lead_test.go b/pkg/runtime/function/lead_test.go new file mode 100644 index 00000000..16f68104 --- /dev/null +++ b/pkg/runtime/function/lead_test.go @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +func TestFuncLead(t *testing.T) { + fn := proto.MustGetFunc(FuncLead) + type tt struct { + inputs [][]proto.Value + want string + } + for _, v := range []tt{ + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(1)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + }, + "125", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(1)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + }, + "132", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(1)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + }, + "145", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(1)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + }, + "140", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(1)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + }, + "150", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(1)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + }, + "200", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(1)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + }, + "", + }, + } { + t.Run(v.want, func(t *testing.T) { + var inputs []proto.Valuer + for i := range v.inputs { + for j := range v.inputs[i] { + inputs = append(inputs, proto.ToValuer(v.inputs[i][j])) + } + } + out, err := fn.Apply(context.Background(), inputs...) + assert.NoError(t, err) + assert.Equal(t, v.want, fmt.Sprint(out)) + }) + } +} diff --git a/pkg/runtime/function/nth_value.go b/pkg/runtime/function/nth_value.go new file mode 100644 index 00000000..574f9c75 --- /dev/null +++ b/pkg/runtime/function/nth_value.go @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" + "strings" +) + +import ( + "github.com/pkg/errors" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +// FuncNthValue is https://dev.mysql.com/doc/refman/8.0/en/window-function-descriptions.html +const FuncNthValue = "NTH_VALUE" + +var _ proto.Func = (*nthvalueFunc)(nil) + +func init() { + proto.RegisterFunc(FuncNthValue, nthvalueFunc{}) +} + +type nthvalueFunc struct{} + +func (a nthvalueFunc) Apply(ctx context.Context, inputs ...proto.Valuer) (proto.Value, error) { + if len(inputs) < 7 { + return proto.NewValueString(""), nil + } + + // nth + nth, err := inputs[0].Value(ctx) + if nth == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncNthValue) + } + nthInt, _ := nth.Int64() + // order by this column + firstOrderColumn, err := inputs[1].Value(ctx) + if firstOrderColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncNthValue) + } + firstOrderColumnStr := firstOrderColumn.String() + // partition by this column + firstPartitionColumn, err := inputs[2].Value(ctx) + if firstPartitionColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncNthValue) + } + firstPartitionColumnStr := firstPartitionColumn.String() + // output by this volumn + firstValueColumn, err := inputs[3].Value(ctx) + if firstValueColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncNthValue) + } + firstValueColumnDec, _ := firstValueColumn.Float64() + nthIndex := int64(0) + nthValue := 0.0 + startOffset := 4 + + for { + orderColumn, err := inputs[startOffset].Value(ctx) + if orderColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncNthValue) + } + orderColumnStr := orderColumn.String() + partitionColumn, err := inputs[startOffset+1].Value(ctx) + if partitionColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncNthValue) + } + partitionColumnStr := partitionColumn.String() + valueColumn, err := inputs[startOffset+2].Value(ctx) + if valueColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncNthValue) + } + valueColumnDec, _ := valueColumn.Float64() + + if strings.Compare(firstPartitionColumnStr, partitionColumnStr) == 0 { + nthIndex += 1 + if nthIndex == nthInt { + nthValue = valueColumnDec + break + } + } + + if strings.Compare(firstOrderColumnStr, orderColumnStr) == 0 && + strings.Compare(firstPartitionColumnStr, partitionColumnStr) == 0 && + firstValueColumnDec == valueColumnDec { + break + } + + startOffset += 3 + if startOffset >= len(inputs) { + break + } + } + + if nthIndex < nthInt { + return proto.NewValueString(""), nil + } else { + return proto.NewValueFloat64(nthValue), nil + } +} + +func (a nthvalueFunc) NumInput() int { + return 1 +} diff --git a/pkg/runtime/function/nth_value_test.go b/pkg/runtime/function/nth_value_test.go new file mode 100644 index 00000000..60a895fd --- /dev/null +++ b/pkg/runtime/function/nth_value_test.go @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +func TestFuncNthValue(t *testing.T) { + fn := proto.MustGetFunc(FuncNthValue) + type tt struct { + inputs [][]proto.Value + want string + } + for _, v := range []tt{ + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(4)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(4)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(4)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(4)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "0", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(4)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(4)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(4)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(4)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "30", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(4)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("st113"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("st113"), proto.NewValueFloat64(9)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("st113"), proto.NewValueFloat64(25)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("st113"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(0)}, + {proto.NewValueString("07:15:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(10)}, + {proto.NewValueString("07:30:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(5)}, + {proto.NewValueString("07:45:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(30)}, + {proto.NewValueString("08:00:00"), proto.NewValueString("xh458"), proto.NewValueFloat64(25)}, + }, + "30", + }, + } { + t.Run(v.want, func(t *testing.T) { + var inputs []proto.Valuer + for i := range v.inputs { + for j := range v.inputs[i] { + inputs = append(inputs, proto.ToValuer(v.inputs[i][j])) + } + } + out, err := fn.Apply(context.Background(), inputs...) + assert.NoError(t, err) + assert.Equal(t, v.want, fmt.Sprint(out)) + }) + } +} diff --git a/pkg/runtime/function/ntile.go b/pkg/runtime/function/ntile.go new file mode 100644 index 00000000..f75a3d19 --- /dev/null +++ b/pkg/runtime/function/ntile.go @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" + "strings" +) + +import ( + "github.com/pkg/errors" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +// FuncNtile is https://dev.mysql.com/doc/refman/8.0/en/window-function-descriptions.html +const FuncNtile = "NTILE" + +var _ proto.Func = (*ntileFunc)(nil) + +func init() { + proto.RegisterFunc(FuncNtile, ntileFunc{}) +} + +type ntileFunc struct{} + +func (a ntileFunc) Apply(ctx context.Context, inputs ...proto.Valuer) (proto.Value, error) { + if len(inputs) < 7 { + return proto.NewValueString(""), nil + } + + // bucket number + bucketNum, err := inputs[0].Value(ctx) + if bucketNum == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncNtile) + } + bucketNumInt, _ := bucketNum.Int64() + if bucketNumInt <= 0 { + return proto.NewValueString(""), nil + } + // order by this column + firstOrderColumn, err := inputs[1].Value(ctx) + if firstOrderColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncNtile) + } + firstOrderColumnStr := firstOrderColumn.String() + // partition by this column + firstPartitionColumn, err := inputs[2].Value(ctx) + if firstPartitionColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncNtile) + } + firstPartitionColumnStr := firstPartitionColumn.String() + // output by this volumn + firstValueColumn, err := inputs[3].Value(ctx) + if firstValueColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncNtile) + } + firstValueColumnDec, _ := firstValueColumn.Float64() + startOffset := 4 + bucketSeq := int64(1) + bucketIndex := int64(0) + bucketLeft := int64(0) + bucketDiv := int64((len(inputs)-4)/3) / bucketNumInt + bucketMod := int64((len(inputs)-4)/3) % bucketNumInt + + for { + orderColumn, err := inputs[startOffset].Value(ctx) + if orderColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncNtile) + } + orderColumnStr := orderColumn.String() + partitionColumn, err := inputs[startOffset+1].Value(ctx) + if partitionColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncNtile) + } + partitionColumnStr := partitionColumn.String() + valueColumn, err := inputs[startOffset+2].Value(ctx) + if valueColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncNtile) + } + valueColumnDec, _ := valueColumn.Float64() + + bucketIndex += 1 + if bucketIndex > bucketDiv { + if bucketIndex == bucketDiv+1 && bucketLeft < bucketMod { + bucketLeft += 1 + } else { + bucketIndex = int64(1) + bucketSeq += 1 + } + } + + if strings.Compare(firstOrderColumnStr, orderColumnStr) == 0 && + strings.Compare(firstPartitionColumnStr, partitionColumnStr) == 0 && + firstValueColumnDec == valueColumnDec { + break + } + + startOffset += 3 + if startOffset >= len(inputs) { + break + } + } + + return proto.NewValueInt64(int64(bucketSeq)), nil +} + +func (a ntileFunc) NumInput() int { + return 1 +} diff --git a/pkg/runtime/function/ntile_test.go b/pkg/runtime/function/ntile_test.go new file mode 100644 index 00000000..94b28964 --- /dev/null +++ b/pkg/runtime/function/ntile_test.go @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +func TestFuncNtile(t *testing.T) { + fn := proto.MustGetFunc(FuncNtile) + type tt struct { + inputs [][]proto.Value + want string + } + for _, v := range []tt{ + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(2)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + {proto.NewValueString("19:00:00"), proto.NewValueString(""), proto.NewValueFloat64(220)}, + {proto.NewValueString("20:00:00"), proto.NewValueString(""), proto.NewValueFloat64(260)}, + }, + "1", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(2)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + {proto.NewValueString("19:00:00"), proto.NewValueString(""), proto.NewValueFloat64(220)}, + {proto.NewValueString("20:00:00"), proto.NewValueString(""), proto.NewValueFloat64(260)}, + }, + "1", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(2)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + {proto.NewValueString("19:00:00"), proto.NewValueString(""), proto.NewValueFloat64(220)}, + {proto.NewValueString("20:00:00"), proto.NewValueString(""), proto.NewValueFloat64(260)}, + }, + "1", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(2)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + {proto.NewValueString("19:00:00"), proto.NewValueString(""), proto.NewValueFloat64(220)}, + {proto.NewValueString("20:00:00"), proto.NewValueString(""), proto.NewValueFloat64(260)}, + }, + "1", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(2)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + {proto.NewValueString("19:00:00"), proto.NewValueString(""), proto.NewValueFloat64(220)}, + {proto.NewValueString("20:00:00"), proto.NewValueString(""), proto.NewValueFloat64(260)}, + }, + "1", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(2)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + {proto.NewValueString("19:00:00"), proto.NewValueString(""), proto.NewValueFloat64(220)}, + {proto.NewValueString("20:00:00"), proto.NewValueString(""), proto.NewValueFloat64(260)}, + }, + "2", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(2)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + {proto.NewValueString("19:00:00"), proto.NewValueString(""), proto.NewValueFloat64(220)}, + {proto.NewValueString("20:00:00"), proto.NewValueString(""), proto.NewValueFloat64(260)}, + }, + "2", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(2)}, + {proto.NewValueString("19:00:00"), proto.NewValueString(""), proto.NewValueFloat64(220)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + {proto.NewValueString("19:00:00"), proto.NewValueString(""), proto.NewValueFloat64(220)}, + {proto.NewValueString("20:00:00"), proto.NewValueString(""), proto.NewValueFloat64(260)}, + }, + "2", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueInt64(2)}, + {proto.NewValueString("20:00:00"), proto.NewValueString(""), proto.NewValueFloat64(260)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + {proto.NewValueString("19:00:00"), proto.NewValueString(""), proto.NewValueFloat64(220)}, + {proto.NewValueString("20:00:00"), proto.NewValueString(""), proto.NewValueFloat64(260)}, + }, + "2", + }, + } { + t.Run(v.want, func(t *testing.T) { + var inputs []proto.Valuer + for i := range v.inputs { + for j := range v.inputs[i] { + inputs = append(inputs, proto.ToValuer(v.inputs[i][j])) + } + } + out, err := fn.Apply(context.Background(), inputs...) + assert.NoError(t, err) + assert.Equal(t, v.want, fmt.Sprint(out)) + }) + } +} diff --git a/pkg/runtime/function/percent_rank.go b/pkg/runtime/function/percent_rank.go new file mode 100644 index 00000000..eeed8411 --- /dev/null +++ b/pkg/runtime/function/percent_rank.go @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" +) + +import ( + "github.com/pkg/errors" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +// FuncPercentRank is https://dev.mysql.com/doc/refman/8.0/en/window-function-descriptions.html +const FuncPercentRank = "PERCENT_RANK" + +var _ proto.Func = (*percentrankFunc)(nil) + +func init() { + proto.RegisterFunc(FuncPercentRank, percentrankFunc{}) +} + +type percentrankFunc struct{} + +func (a percentrankFunc) Apply(ctx context.Context, inputs ...proto.Valuer) (proto.Value, error) { + first, err := inputs[0].Value(ctx) + if first == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncPercentRank) + } + firstDec, _ := first.Float64() + firstNum := 0 + + for _, it := range inputs[1:] { + val, err := it.Value(ctx) + if val == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncPercentRank) + } + valDec, _ := val.Float64() + + if valDec < firstDec { + firstNum++ + } + } + + r := 0.0 + if len(inputs) > 2 { + r = float64(firstNum) / float64(len(inputs)-2) + } + return proto.NewValueFloat64(r), nil +} + +func (a percentrankFunc) NumInput() int { + return 0 +} diff --git a/pkg/runtime/function/percent_rank_test.go b/pkg/runtime/function/percent_rank_test.go new file mode 100644 index 00000000..0621540c --- /dev/null +++ b/pkg/runtime/function/percent_rank_test.go @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +func TestPercentRankDist(t *testing.T) { + fn := proto.MustGetFunc(FuncPercentRank) + type tt struct { + inputs []proto.Value + want string + } + for _, v := range []tt{ + { + []proto.Value{ + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(2), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(4), + proto.NewValueFloat64(4), + proto.NewValueFloat64(5), + }, + "0", + }, + { + []proto.Value{ + proto.NewValueFloat64(2), + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(2), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(4), + proto.NewValueFloat64(4), + proto.NewValueFloat64(5), + }, + "0.25", + }, + { + []proto.Value{ + proto.NewValueFloat64(3), + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(2), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(4), + proto.NewValueFloat64(4), + proto.NewValueFloat64(5), + }, + "0.375", + }, + { + []proto.Value{ + proto.NewValueFloat64(4), + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(2), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(4), + proto.NewValueFloat64(4), + proto.NewValueFloat64(5), + }, + "0.75", + }, + { + []proto.Value{ + proto.NewValueFloat64(5), + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(2), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(4), + proto.NewValueFloat64(4), + proto.NewValueFloat64(5), + }, + "1", + }, + } { + t.Run(v.want, func(t *testing.T) { + var inputs []proto.Valuer + for i := range v.inputs { + inputs = append(inputs, proto.ToValuer(v.inputs[i])) + } + out, err := fn.Apply(context.Background(), inputs...) + assert.NoError(t, err) + assert.Equal(t, v.want, fmt.Sprint(out)) + }) + } +} diff --git a/pkg/runtime/function/rank.go b/pkg/runtime/function/rank.go new file mode 100644 index 00000000..9ff7e104 --- /dev/null +++ b/pkg/runtime/function/rank.go @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" +) + +import ( + "github.com/pkg/errors" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +// FuncRank is https://dev.mysql.com/doc/refman/8.0/en/window-function-descriptions.html +const FuncRank = "RANK" + +var _ proto.Func = (*rankFunc)(nil) + +func init() { + proto.RegisterFunc(FuncRank, rankFunc{}) +} + +type rankFunc struct{} + +func (a rankFunc) Apply(ctx context.Context, inputs ...proto.Valuer) (proto.Value, error) { + first, err := inputs[0].Value(ctx) + if first == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncRank) + } + firstDec, _ := first.Float64() + firstNum := 0 + + for _, it := range inputs[1:] { + val, err := it.Value(ctx) + if val == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncRank) + } + valDec, _ := val.Float64() + + if valDec < firstDec { + firstNum++ + } + } + + return proto.NewValueInt64(int64(firstNum) + 1), nil +} + +func (a rankFunc) NumInput() int { + return 0 +} diff --git a/pkg/runtime/function/rank_test.go b/pkg/runtime/function/rank_test.go new file mode 100644 index 00000000..b4befc7f --- /dev/null +++ b/pkg/runtime/function/rank_test.go @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +func TestFuncRank(t *testing.T) { + fn := proto.MustGetFunc(FuncRank) + type tt struct { + inputs []proto.Value + want string + } + for _, v := range []tt{ + { + []proto.Value{ + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(2), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(4), + proto.NewValueFloat64(4), + proto.NewValueFloat64(5), + }, + "1", + }, + { + []proto.Value{ + proto.NewValueFloat64(2), + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(2), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(4), + proto.NewValueFloat64(4), + proto.NewValueFloat64(5), + }, + "3", + }, + { + []proto.Value{ + proto.NewValueFloat64(3), + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(2), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(4), + proto.NewValueFloat64(4), + proto.NewValueFloat64(5), + }, + "4", + }, + { + []proto.Value{ + proto.NewValueFloat64(4), + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(2), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(4), + proto.NewValueFloat64(4), + proto.NewValueFloat64(5), + }, + "7", + }, + { + []proto.Value{ + proto.NewValueFloat64(5), + proto.NewValueFloat64(1), + proto.NewValueFloat64(1), + proto.NewValueFloat64(2), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(3), + proto.NewValueFloat64(4), + proto.NewValueFloat64(4), + proto.NewValueFloat64(5), + }, + "9", + }, + } { + t.Run(v.want, func(t *testing.T) { + var inputs []proto.Valuer + for i := range v.inputs { + inputs = append(inputs, proto.ToValuer(v.inputs[i])) + } + out, err := fn.Apply(context.Background(), inputs...) + assert.NoError(t, err) + assert.Equal(t, v.want, fmt.Sprint(out)) + }) + } +} diff --git a/pkg/runtime/function/row_number.go b/pkg/runtime/function/row_number.go new file mode 100644 index 00000000..db45a3e0 --- /dev/null +++ b/pkg/runtime/function/row_number.go @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" + "strings" +) + +import ( + "github.com/pkg/errors" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +// FuncRowNumber is https://dev.mysql.com/doc/refman/8.0/en/window-function-descriptions.html +const FuncRowNumber = "ROW_NUMBER" + +var _ proto.Func = (*rownumberFunc)(nil) + +func init() { + proto.RegisterFunc(FuncRowNumber, rownumberFunc{}) +} + +type rownumberFunc struct{} + +func (a rownumberFunc) Apply(ctx context.Context, inputs ...proto.Valuer) (proto.Value, error) { + if len(inputs) < 6 { + return proto.NewValueString(""), nil + } + + // order by this column + firstOrderColumn, err := inputs[0].Value(ctx) + if firstOrderColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncRowNumber) + } + firstOrderColumnStr := firstOrderColumn.String() + // partition by this column + firstPartitionColumn, err := inputs[1].Value(ctx) + if firstPartitionColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncRowNumber) + } + firstPartitionColumnStr := firstPartitionColumn.String() + // output by this volumn + firstValueColumn, err := inputs[2].Value(ctx) + if firstValueColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncRowNumber) + } + firstValueColumnDec, _ := firstValueColumn.Float64() + rowNumber := 0 + startOffset := 3 + + for { + orderColumn, err := inputs[startOffset].Value(ctx) + if orderColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncRowNumber) + } + orderColumnStr := orderColumn.String() + partitionColumn, err := inputs[startOffset+1].Value(ctx) + if partitionColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncRowNumber) + } + partitionColumnStr := partitionColumn.String() + valueColumn, err := inputs[startOffset+2].Value(ctx) + if valueColumn == nil || err != nil { + return nil, errors.Wrapf(err, "cannot eval %s", FuncRowNumber) + } + valueColumnDec, _ := valueColumn.Float64() + + rowNumber += 1 + if strings.Compare(firstOrderColumnStr, orderColumnStr) == 0 && + strings.Compare(firstPartitionColumnStr, partitionColumnStr) == 0 && + firstValueColumnDec == valueColumnDec { + break + } + + startOffset += 3 + if startOffset >= len(inputs) { + break + } + } + + if rowNumber <= 0 { + return proto.NewValueString(""), nil + } else { + return proto.NewValueInt64(int64(rowNumber)), nil + } +} + +func (a rownumberFunc) NumInput() int { + return 0 +} diff --git a/pkg/runtime/function/row_number_test.go b/pkg/runtime/function/row_number_test.go new file mode 100644 index 00000000..9b4a38ec --- /dev/null +++ b/pkg/runtime/function/row_number_test.go @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package function + +import ( + "context" + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/arana-db/arana/pkg/proto" +) + +func TestFuncRowNumber(t *testing.T) { + fn := proto.MustGetFunc(FuncRowNumber) + type tt struct { + inputs [][]proto.Value + want string + } + for _, v := range []tt{ + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + }, + "1", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + }, + "2", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + }, + "3", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + }, + "4", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + }, + "5", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + }, + "6", + }, + { + [][]proto.Value{ + //order column, partition column, value column + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + {proto.NewValueString("12:00:00"), proto.NewValueString(""), proto.NewValueFloat64(100)}, + {proto.NewValueString("13:00:00"), proto.NewValueString(""), proto.NewValueFloat64(125)}, + {proto.NewValueString("14:00:00"), proto.NewValueString(""), proto.NewValueFloat64(132)}, + {proto.NewValueString("15:00:00"), proto.NewValueString(""), proto.NewValueFloat64(145)}, + {proto.NewValueString("16:00:00"), proto.NewValueString(""), proto.NewValueFloat64(140)}, + {proto.NewValueString("17:00:00"), proto.NewValueString(""), proto.NewValueFloat64(150)}, + {proto.NewValueString("18:00:00"), proto.NewValueString(""), proto.NewValueFloat64(200)}, + }, + "7", + }, + } { + t.Run(v.want, func(t *testing.T) { + var inputs []proto.Valuer + for i := range v.inputs { + for j := range v.inputs[i] { + inputs = append(inputs, proto.ToValuer(v.inputs[i][j])) + } + } + out, err := fn.Apply(context.Background(), inputs...) + assert.NoError(t, err) + assert.Equal(t, v.want, fmt.Sprint(out)) + }) + } +}