Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memoized hydrate calls underlying function names are incorrectly resolved #737

Merged
merged 11 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions design/memoize.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Memoizing hydrate funcs

When a HydrateFunc is memoized, the result of the hydrate func is cached in memory for the duration of the plugin run. This is useful for hydrate funcs which are expensive to run and are called multiple times during a single plugin run.

When memoizing a func, a new function is returned which wraps the underlying func in cache get/set logic. The original func is not modified.

A map of original function names is maintained, keyed by the pointer to the wrapped function. THis allows retrieval of the underlying function name from the wrapped function. 76

*** This does not work - all anon function have the same pointer so the map wil only have a single entry ***


Who uses memoized name map/NamedHydrateFunc

## IsMemoized
used for rate limiters - still works as all memoized functions will appear in the map

## newNamedHydrateFunc
Who creates named hydrate call? This is crucial as we MUST NOT call newNamedHydrateFunc with a memoized hydrate func

### hydrate call depends
```
func newHydrateCall(config *HydrateConfig, d *QueryData) (*hydrateCall, error) {
res := &hydrateCall{
Config: config,
queryData: d,
// default to empty limiter
rateLimiter: rate_limiter.EmptyMultiLimiter(),
}
res.NamedHydrateFunc = *config.NamedHydrate

for _, f := range config.Depends {
res.Depends = append(res.Depends, newNamedHydrateFunc(f))
}

return res, nil
}
```

### RetryHydrate
Doesn't seem to be used anywhere
```
func RetryHydrate(ctx context.Context, d *QueryData, hydrateData *HydrateData, hydrate HydrateFunc, retryConfig *RetryConfig) (hydrateResult interface{}, err error) {
return retryNamedHydrate(ctx, d, hydrateData, newNamedHydrateFunc(hydrate), retryConfig)
}
```

## Name

Who references the name field

### HydrateConfig tags
GetConfig/ListConfig/HydrateConfig has a map ot tags which is auto populated with the hydrate name


### hydrateCall canStart
```

// check whether all hydrate functions we depend on have saved their results
for _, dep := range h.Depends {
if !helpers.StringSliceContains(rowData.getHydrateKeys(), dep.Name) {
return false
}
}
```

### hydrateCall start

```
// retrieve the concurrencyDelay for the call
concurrencyDelay := r.getHydrateConcurrencyDelay(h.Name)
```

### ListConfig Validate
```
// ensure that if there is an explicit hydrate config for the list hydrate, it does not declare dependencies
listHydrateName := table.List.namedHydrate.Name
for _, h := range table.HydrateConfig {
if h.namedHydrate.Name == listHydrateName {
...
}
```

### Plugin buildHydrateConfigMap
```
for i := range p.HydrateConfig {
h := &p.HydrateConfig[i]
h.initialise(nil)
funcName := h.namedHydrate.Name
p.hydrateConfigMap[funcName] = h
```
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/olekukonko/tablewriter v0.0.5
github.com/sethvargo/go-retry v0.2.4
github.com/stevenle/topsort v0.2.0
github.com/turbot/go-kit v0.9.0-rc.1
github.com/turbot/go-kit v0.9.0
github.com/zclconf/go-cty v1.14.1
go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/metric v1.21.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -556,8 +556,8 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tkrajina/go-reflector v0.5.6 h1:hKQ0gyocG7vgMD2M3dRlYN6WBBOmdoOzJ6njQSepKdE=
github.com/tkrajina/go-reflector v0.5.6/go.mod h1:ECbqLgccecY5kPmPmXg1MrHW585yMcDkVl6IvJe64T4=
github.com/turbot/go-kit v0.9.0-rc.1 h1:6j1IidB4LpTw0TDXY0DSY6UxtrjMr0KIOBk3glO3Xfk=
github.com/turbot/go-kit v0.9.0-rc.1/go.mod h1:BrOy6Xeizj+eBzXOOWuBMSzQosirN+IGw9MksKULvd4=
github.com/turbot/go-kit v0.9.0 h1:7RVIFpHa0vdsh8GMEr4cM+D4jQ7h4pGeFmT2EVG/U5Y=
github.com/turbot/go-kit v0.9.0/go.mod h1:fFQqR59I5z5JeeBLfK1PjSifn4Oprs3NiQx0CxeSJxs=
github.com/ulikunitz/xz v0.5.10 h1:t92gobL9l3HE202wg3rlk19F6X+JOxl9BBrCCMYEYd8=
github.com/ulikunitz/xz v0.5.10/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
36 changes: 27 additions & 9 deletions plugin/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,34 @@ type Column struct {
Description string
// explicitly specify the function which populates this data
// - this is only needed if any of the default hydrate functions will NOT return this column
Hydrate HydrateFunc
Hydrate HydrateFunc
NamedHydrate NamedHydrateFunc
// the default column value
Default interface{}
// a list of transforms to generate the column value
Transform *transform.ColumnTransforms
}

// QueryColumn is struct storing column name and resolved hydrate name
// this is used in the query data when the hydrate function has been resolved
type QueryColumn struct {
*Column
// the name of the hydrate function which will be used to populate this column
// - this may be a default hydrate function
hydrateName string
func (c *Column) initialise() {
if c.Hydrate == nil && c.NamedHydrate.empty() {
return
}
// populate the named hydrate funcs
if c.NamedHydrate.empty() {
// create a named hydrate func, assuming this function is not memoized
c.NamedHydrate = newNamedHydrateFunc(c.Hydrate)
} else {
// a named hydrate was explicitly specified - probably meaning the hydrate is memoized
// call initialize to populate IsMemoized
c.NamedHydrate.initialize()
// be sure to also set the Hydrate property to the underlying func
c.Hydrate = c.NamedHydrate.Func
}

}

// ToColumnValue converts a value of unknown type to a valid protobuf column value.type
func (c Column) ToColumnValue(val any) (*proto.Column, error) {
func (c *Column) ToColumnValue(val any) (*proto.Column, error) {
defer func() {
if r := recover(); r != nil {
panic(fmt.Errorf("%s: %v", c.Name, r))
Expand Down Expand Up @@ -213,7 +223,15 @@ func (c Column) ToColumnValue(val any) (*proto.Column, error) {
}

return columnValue, nil
}

// QueryColumn is struct storing column name and resolved hydrate name (including List/Get call)
// this is used in the query data when the hydrate function has been resolved
type QueryColumn struct {
*Column
// the name of the hydrate function which will be used to populate this column
// - this may be a default hydrate function
hydrateName string
}

func NewQueryColumn(column *Column, hydrateName string) *QueryColumn {
Expand Down
15 changes: 12 additions & 3 deletions plugin/get_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type GetConfig struct {
// Deprecated: use IgnoreConfig
ShouldIgnoreError ErrorPredicate
MaxConcurrency int
namedHydrate namedHydrateFunc
NamedHydrate NamedHydrateFunc
}

// initialise the GetConfig
Expand Down Expand Up @@ -106,7 +106,7 @@ func (c *GetConfig) initialise(table *Table) {
c.Tags = make(map[string]string)
}
// add in function name to tags
c.Tags[rate_limiter.RateLimiterScopeFunction] = c.namedHydrate.Name
c.Tags[rate_limiter.RateLimiterScopeFunction] = c.NamedHydrate.Name

// copy the (deprecated) top level ShouldIgnoreError property into the ignore config
if c.IgnoreConfig.ShouldIgnoreError == nil {
Expand All @@ -128,7 +128,16 @@ func (c *GetConfig) initialise(table *Table) {
log.Printf("[TRACE] GetConfig.initialise complete: RetryConfig: %s, IgnoreConfig: %s", c.RetryConfig.String(), c.IgnoreConfig.String())

// populate the named hydrate func
c.namedHydrate = newNamedHydrateFunc(c.Hydrate)
if c.NamedHydrate.empty() {
// create a named hydrate func, assuming this function is not memoized
c.NamedHydrate = newNamedHydrateFunc(c.Hydrate)
} else {
// a named hydrate was explicitly specified - probably meaning the hydrate is memoized
// call initialize to populate IsMemoized
c.NamedHydrate.initialize()
// be sure to also set the Hydrate property to the underlying func
c.Hydrate = c.NamedHydrate.Func
}

}

Expand Down
43 changes: 6 additions & 37 deletions plugin/hydrate_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,12 @@ import (
"context"
"fmt"
"log"
"reflect"
"sync"
"time"

"github.com/turbot/go-kit/helpers"
)

// map of memoized functions to the original function name
var memoizedNameMap = make(map[uintptr]string)
var memoizedNameMapLock sync.RWMutex

// map of currently executing memoized hydrate funcs

var memoizedHydrateFunctionsPending = make(map[string]*sync.WaitGroup)
var memoizedHydrateLock sync.RWMutex

/*
HydrateFunc is a function that gathers data to build table rows.
Typically this would make an API call and return the raw API output.
Expand Down Expand Up @@ -66,6 +56,11 @@ Use it to reduce the number of API calls if the HydrateFunc is used by multiple
}
*/
func (f HydrateFunc) Memoize(opts ...MemoizeOption) HydrateFunc {
// TODO determine if this is already memoized
// if so, return the existing memoized function

log.Printf("[INFO] Memoize %p %s", f, helpers.GetFunctionName(f))

config := newMemoizeConfiguration(f)
for _, o := range opts {
o(config)
Expand Down Expand Up @@ -130,11 +125,9 @@ func (f HydrateFunc) Memoize(opts ...MemoizeOption) HydrateFunc {
log.Printf("[TRACE] Memoize (connection %s, cache key %s) - no pending call found so calling and caching hydrate", d.Connection.Name, cacheKey)
// no call the hydrate function and cache the result
return callAndCacheHydrate(ctx, d, h, f, cacheKey, ttl)

}

// store the memoized func in the name map
f.setMemoizedFuncName(memoizedFunc)
log.Printf("[INFO] Memoize %p %s", f, helpers.GetFunctionName(f))

return memoizedFunc
}
Expand Down Expand Up @@ -189,27 +182,3 @@ func callAndCacheHydrate(ctx context.Context, d *QueryData, h *HydrateData, hydr
// return the hydrate data
return hydrateData, nil
}

// return the function name
// if this function has been memoized, return the underlying function name
func (f HydrateFunc) getOriginalFuncName() (name string, isMemoized bool) {
memoizedNameMapLock.RLock()
// check if this is a memoized function, if so get the original name
p := reflect.ValueOf(f).Pointer()
name, isMemoized = memoizedNameMap[p]
memoizedNameMapLock.RUnlock()

if !isMemoized {
name = helpers.GetFunctionName(f)
}

return name, isMemoized
}

func (f HydrateFunc) setMemoizedFuncName(memoizedFunc HydrateFunc) {
// add to map
memoizedNameMapLock.Lock()
p := reflect.ValueOf(memoizedFunc).Pointer()
memoizedNameMap[p] = helpers.GetFunctionName(f)
memoizedNameMapLock.Unlock()
}
10 changes: 5 additions & 5 deletions plugin/hydrate_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (

// hydrateCall struct encapsulates a hydrate call, its config and dependencies
type hydrateCall struct {
namedHydrateFunc
NamedHydrateFunc
// the dependencies expressed using function name
Depends []namedHydrateFunc
Depends []NamedHydrateFunc
Config *HydrateConfig

queryData *QueryData
Expand All @@ -27,7 +27,7 @@ func newHydrateCall(config *HydrateConfig, d *QueryData) (*hydrateCall, error) {
// default to empty limiter
rateLimiter: rate_limiter.EmptyMultiLimiter(),
}
res.namedHydrateFunc = newNamedHydrateFunc(config.Func)
res.NamedHydrateFunc = config.namedHydrate

for _, f := range config.Depends {
res.Depends = append(res.Depends, newNamedHydrateFunc(f))
Expand All @@ -38,7 +38,7 @@ func newHydrateCall(config *HydrateConfig, d *QueryData) (*hydrateCall, error) {

func (h *hydrateCall) shallowCopy() *hydrateCall {
return &hydrateCall{
namedHydrateFunc: namedHydrateFunc{
NamedHydrateFunc: NamedHydrateFunc{
Func: h.Func,
Name: h.Name,
},
Expand Down Expand Up @@ -111,7 +111,7 @@ func (h *hydrateCall) start(ctx context.Context, r *rowData, d *QueryData) time.

// call callHydrate async, ignoring return values
go func() {
r.callHydrate(ctx, d, h.namedHydrateFunc, h.Config)
r.callHydrate(ctx, d, h.NamedHydrateFunc, h.Config)
h.onFinished()
}()
// retrieve the concurrencyDelay for the call
Expand Down
3 changes: 2 additions & 1 deletion plugin/hydrate_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type HydrateConfig struct {
// Deprecated: use IgnoreConfig
ShouldIgnoreError ErrorPredicate

namedHydrate namedHydrateFunc
namedHydrate NamedHydrateFunc
}

func (c *HydrateConfig) String() string {
Expand All @@ -137,6 +137,7 @@ ScopeValues: %s`,
}

func (c *HydrateConfig) initialise(table *Table) {
// create a named hydrate func
c.namedHydrate = newNamedHydrateFunc(c.Func)

log.Printf("[TRACE] HydrateConfig.initialise func %s, table %s", c.namedHydrate.Name, table.Name)
Expand Down
4 changes: 2 additions & 2 deletions plugin/hydrate_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func RetryHydrate(ctx context.Context, d *QueryData, hydrateData *HydrateData, h
return retryNamedHydrate(ctx, d, hydrateData, newNamedHydrateFunc(hydrate), retryConfig)
}

func retryNamedHydrate(ctx context.Context, d *QueryData, hydrateData *HydrateData, hydrate namedHydrateFunc, retryConfig *RetryConfig) (hydrateResult interface{}, err error) {
func retryNamedHydrate(ctx context.Context, d *QueryData, hydrateData *HydrateData, hydrate NamedHydrateFunc, retryConfig *RetryConfig) (hydrateResult interface{}, err error) {
ctx, span := telemetry.StartSpan(ctx, d.Table.Plugin.Name, "RetryHydrate (%s)", d.Table.Name)
span.SetAttributes(
attribute.String("hydrate-func", hydrate.Name),
Expand Down Expand Up @@ -108,7 +108,7 @@ func getBackoff(retryConfig *RetryConfig) (retry.Backoff, error) {
}

// WrapHydrate is a higher order function which returns a [HydrateFunc] that handles Ignorable errors.
func WrapHydrate(hydrate namedHydrateFunc, ignoreConfig *IgnoreConfig) namedHydrateFunc {
func WrapHydrate(hydrate NamedHydrateFunc, ignoreConfig *IgnoreConfig) NamedHydrateFunc {
res := hydrate.clone()

res.Func = func(ctx context.Context, d *QueryData, h *HydrateData) (item interface{}, err error) {
Expand Down
Loading
Loading