@@ -3,6 +3,7 @@ package config
33import (
44 "context"
55 "errors"
6+ "reflect"
67 "strings"
78 "sync"
89 "time"
@@ -29,6 +30,7 @@ type task struct {
2930}
3031
3132var _ server.Task = (* task )(nil )
33+ var _ server.PGQueue = (* task )(nil )
3234
3335////////////////////////////////////////////////////////////////////////////////
3436// LIFECYCLE
@@ -38,7 +40,13 @@ func NewTask(manager *pgqueue.Manager, threads uint) (server.Task, error) {
3840 self .manager = manager
3941 self .taskpool = pgqueue .NewTaskPool (threads )
4042 self .callbacks = make (map [string ]server.PGCallback , 100 )
41- self .decoder = marshaler .NewDecoder ("json" , marshaler .ConvertTime , marshaler .ConvertDuration , marshaler .ConvertIntUint )
43+ self .decoder = marshaler .NewDecoder ("json" ,
44+ convertPtr ,
45+ convertFloatToIntUint ,
46+ marshaler .ConvertTime ,
47+ marshaler .ConvertDuration ,
48+ marshaler .ConvertIntUint ,
49+ )
4250 return self , nil
4351}
4452
@@ -154,6 +162,11 @@ FOR_LOOP:
154162////////////////////////////////////////////////////////////////////////////////
155163// PUBLIC METHODS
156164
165+ // Conn returns the underlying connection pool object.
166+ func (t * task ) Conn () pg.PoolConn {
167+ return t .manager .Conn ()
168+ }
169+
157170// RegisterTicker registers a periodic task (ticker) with a callback function.
158171// It returns the metadata of the registered ticker.
159172func (t * task ) RegisterTicker (ctx context.Context , meta schema.TickerMeta , fn server.PGCallback ) (* schema.Ticker , error ) {
@@ -283,3 +296,53 @@ func joinName(parts ...string) string {
283296func splitName (name string , n int ) []string {
284297 return strings .SplitN (name , namespaceSeparator , n )
285298}
299+
300+ // //////////////////////////////////////////////////////////////////////////////
301+ // PRIVATE METHODS
302+ var (
303+ nilValue = reflect .ValueOf (nil )
304+ )
305+
306+ // convertPtr returns value if pointer
307+ func convertPtr (src reflect.Value , dest reflect.Type ) (reflect.Value , error ) {
308+ // Pass value through
309+ if src .Type () == dest {
310+ return src , nil
311+ }
312+
313+ // Convert src to elem
314+ if dest .Kind () == reflect .Ptr {
315+ if dest .Elem () == src .Type () {
316+ if src .CanAddr () {
317+ return src .Addr (), nil
318+ } else {
319+ src_ := reflect .New (dest .Elem ())
320+ src_ .Elem ().Set (src )
321+ return src_ , nil
322+ }
323+ }
324+ }
325+
326+ // Skip
327+ return nilValue , nil
328+ }
329+
330+ // convert float types to int or uint
331+ func convertFloatToIntUint (src reflect.Value , dest reflect.Type ) (reflect.Value , error ) {
332+ // Pass value through
333+ if src .Type () == dest {
334+ return src , nil
335+ }
336+ switch dest .Kind () {
337+ case reflect .Int , reflect .Int8 , reflect .Int16 , reflect .Int32 , reflect .Int64 :
338+ if src .Kind () == reflect .Float64 || src .Kind () == reflect .Float32 {
339+ return reflect .ValueOf (int64 (src .Float ())), nil
340+ }
341+ case reflect .Uint , reflect .Uint8 , reflect .Uint16 , reflect .Uint32 , reflect .Uint64 :
342+ if src .Kind () == reflect .Float64 || src .Kind () == reflect .Float32 {
343+ return reflect .ValueOf (uint64 (src .Float ())), nil
344+ }
345+ }
346+ // Skip
347+ return nilValue , nil
348+ }
0 commit comments