Skip to content

Commit

Permalink
refract package gtimer for more stable
Browse files Browse the repository at this point in the history
  • Loading branch information
gqcn committed May 15, 2021
1 parent cc1224e commit d76e4c8
Show file tree
Hide file tree
Showing 21 changed files with 549 additions and 624 deletions.
2 changes: 1 addition & 1 deletion container/gqueue/gqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (q *Queue) Close() {

// Len returns the length of the queue.
// Note that the result might not be accurate as there's a
// asynchronize channel reading the list constantly.
// asynchronous channel reading the list constantly.
func (q *Queue) Len() (length int) {
if q.list != nil {
length += q.list.Len()
Expand Down
File renamed without changes.
File renamed without changes.
4 changes: 2 additions & 2 deletions net/ghttp/ghttp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func serverProcessInit() {

// It's an ugly calling for better initializing the main package path
// in source development environment. It is useful only be used in main goroutine.
// It fails retrieving the main package path in asynchronized goroutines.
// It fails retrieving the main package path in asynchronous goroutines.
gfile.MainPkgPath()
}

Expand Down Expand Up @@ -416,7 +416,7 @@ func (s *Server) startServer(fdMap listenerFdMap) {
s.servers = append(s.servers, s.newGracefulServer(itemFunc))
}
}
// Start listening asynchronizedly.
// Start listening asynchronously.
serverRunning.Add(1)
for _, v := range s.servers {
go func(server *gracefulServer) {
Expand Down
12 changes: 6 additions & 6 deletions os/gcron/gcron.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,29 +50,29 @@ func GetLogLevel() int {
// Add adds a timed task to default cron object.
// A unique <name> can be bound with the timed task.
// It returns and error if the <name> is already used.
func Add(pattern string, job func(), name ...string) (*Entry, error) {
func Add(pattern string, job func(), name ...string) (*Job, error) {
return defaultCron.Add(pattern, job, name...)
}

// AddSingleton adds a singleton timed task, to default cron object.
// A singleton timed task is that can only be running one single instance at the same time.
// A unique <name> can be bound with the timed task.
// It returns and error if the <name> is already used.
func AddSingleton(pattern string, job func(), name ...string) (*Entry, error) {
func AddSingleton(pattern string, job func(), name ...string) (*Job, error) {
return defaultCron.AddSingleton(pattern, job, name...)
}

// AddOnce adds a timed task which can be run only once, to default cron object.
// A unique <name> can be bound with the timed task.
// It returns and error if the <name> is already used.
func AddOnce(pattern string, job func(), name ...string) (*Entry, error) {
func AddOnce(pattern string, job func(), name ...string) (*Job, error) {
return defaultCron.AddOnce(pattern, job, name...)
}

// AddTimes adds a timed task which can be run specified times, to default cron object.
// A unique <name> can be bound with the timed task.
// It returns and error if the <name> is already used.
func AddTimes(pattern string, times int, job func(), name ...string) (*Entry, error) {
func AddTimes(pattern string, times int, job func(), name ...string) (*Job, error) {
return defaultCron.AddTimes(pattern, times, job, name...)
}

Expand Down Expand Up @@ -100,7 +100,7 @@ func DelayAddTimes(delay time.Duration, pattern string, times int, job func(), n

// Search returns a scheduled task with the specified <name>.
// It returns nil if no found.
func Search(name string) *Entry {
func Search(name string) *Job {
return defaultCron.Search(name)
}

Expand All @@ -115,7 +115,7 @@ func Size() int {
}

// Entries return all timed tasks as slice.
func Entries() []*Entry {
func Entries() []*Job {
return defaultCron.Entries()
}

Expand Down
28 changes: 14 additions & 14 deletions os/gcron/gcron_cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,20 @@ func (c *Cron) GetLogLevel() int {
// Add adds a timed task.
// A unique <name> can be bound with the timed task.
// It returns and error if the <name> is already used.
func (c *Cron) Add(pattern string, job func(), name ...string) (*Entry, error) {
func (c *Cron) Add(pattern string, job func(), name ...string) (*Job, error) {
if len(name) > 0 {
if c.Search(name[0]) != nil {
return nil, errors.New(fmt.Sprintf(`cron job "%s" already exists`, name[0]))
}
}
return c.addEntry(pattern, job, false, name...)
return c.addJob(pattern, job, false, name...)
}

// AddSingleton adds a singleton timed task.
// A singleton timed task is that can only be running one single instance at the same time.
// A unique <name> can be bound with the timed task.
// It returns and error if the <name> is already used.
func (c *Cron) AddSingleton(pattern string, job func(), name ...string) (*Entry, error) {
func (c *Cron) AddSingleton(pattern string, job func(), name ...string) (*Job, error) {
if entry, err := c.Add(pattern, job, name...); err != nil {
return nil, err
} else {
Expand All @@ -85,7 +85,7 @@ func (c *Cron) AddSingleton(pattern string, job func(), name ...string) (*Entry,
// AddOnce adds a timed task which can be run only once.
// A unique <name> can be bound with the timed task.
// It returns and error if the <name> is already used.
func (c *Cron) AddOnce(pattern string, job func(), name ...string) (*Entry, error) {
func (c *Cron) AddOnce(pattern string, job func(), name ...string) (*Job, error) {
if entry, err := c.Add(pattern, job, name...); err != nil {
return nil, err
} else {
Expand All @@ -97,7 +97,7 @@ func (c *Cron) AddOnce(pattern string, job func(), name ...string) (*Entry, erro
// AddTimes adds a timed task which can be run specified times.
// A unique <name> can be bound with the timed task.
// It returns and error if the <name> is already used.
func (c *Cron) AddTimes(pattern string, times int, job func(), name ...string) (*Entry, error) {
func (c *Cron) AddTimes(pattern string, times int, job func(), name ...string) (*Job, error) {
if entry, err := c.Add(pattern, job, name...); err != nil {
return nil, err
} else {
Expand Down Expand Up @@ -146,9 +146,9 @@ func (c *Cron) DelayAddTimes(delay time.Duration, pattern string, times int, job

// Search returns a scheduled task with the specified <name>.
// It returns nil if no found.
func (c *Cron) Search(name string) *Entry {
func (c *Cron) Search(name string) *Job {
if v := c.entries.Get(name); v != nil {
return v.(*Entry)
return v.(*Job)
}
return nil
}
Expand Down Expand Up @@ -182,7 +182,7 @@ func (c *Cron) Stop(name ...string) {
// Remove deletes scheduled task which named <name>.
func (c *Cron) Remove(name string) {
if v := c.entries.Get(name); v != nil {
v.(*Entry).Close()
v.(*Job).Close()
}
}

Expand All @@ -197,24 +197,24 @@ func (c *Cron) Size() int {
}

// Entries return all timed tasks as slice(order by registered time asc).
func (c *Cron) Entries() []*Entry {
func (c *Cron) Entries() []*Job {
array := garray.NewSortedArraySize(c.entries.Size(), func(v1, v2 interface{}) int {
entry1 := v1.(*Entry)
entry2 := v2.(*Entry)
entry1 := v1.(*Job)
entry2 := v2.(*Job)
if entry1.Time.Nanosecond() > entry2.Time.Nanosecond() {
return 1
}
return -1
}, true)
c.entries.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
array.Add(v.(*Entry))
array.Add(v.(*Job))
}
})
entries := make([]*Entry, array.Len())
entries := make([]*Job, array.Len())
array.RLockFunc(func(array []interface{}) {
for k, v := range array {
entries[k] = v.(*Entry)
entries[k] = v.(*Job)
}
})
return entries
Expand Down
56 changes: 28 additions & 28 deletions os/gcron/gcron_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,28 @@ import (
)

// Timed task entry.
type Entry struct {
type Job struct {
cron *Cron // Cron object belonged to.
entry *gtimer.Entry // Associated gtimer.Entry.
job *gtimer.Job // Associated gtimer.Job.
schedule *cronSchedule // Timed schedule object.
jobName string // Callback function name(address info).
times *gtype.Int // Running times limit.
Name string // Entry name.
Name string // Job name.
Job func() `json:"-"` // Callback function.
Time time.Time // Registered time.
}

// addEntry creates and returns a new Entry object.
// addJob creates and returns a new Job object.
// Param <job> is the callback function for timed task execution.
// Param <singleton> specifies whether timed task executing in singleton mode.
// Param <name> names this entry for manual control.
func (c *Cron) addEntry(pattern string, job func(), singleton bool, name ...string) (*Entry, error) {
func (c *Cron) addJob(pattern string, job func(), singleton bool, name ...string) (*Job, error) {
schedule, err := newSchedule(pattern)
if err != nil {
return nil, err
}
// No limit for <times>, for gtimer checking scheduling every second.
entry := &Entry{
entry := &Job{
cron: c,
schedule: schedule,
jobName: runtime.FuncForPC(reflect.ValueOf(job).Pointer()).Name(),
Expand All @@ -57,57 +57,57 @@ func (c *Cron) addEntry(pattern string, job func(), singleton bool, name ...stri
// It should start running after the entry is added to the entries map,
// to avoid the task from running during adding where the entries
// does not have the entry information, which might cause panic.
entry.entry = gtimer.AddEntry(time.Second, entry.check, singleton, -1, gtimer.StatusStopped)
entry.job = gtimer.AddJob(time.Second, entry.check, singleton, -1, gtimer.StatusStopped)
c.entries.Set(entry.Name, entry)
entry.entry.Start()
entry.job.Start()
return entry, nil
}

// IsSingleton return whether this entry is a singleton timed task.
func (entry *Entry) IsSingleton() bool {
return entry.entry.IsSingleton()
func (entry *Job) IsSingleton() bool {
return entry.job.IsSingleton()
}

// SetSingleton sets the entry running in singleton mode.
func (entry *Entry) SetSingleton(enabled bool) {
entry.entry.SetSingleton(true)
func (entry *Job) SetSingleton(enabled bool) {
entry.job.SetSingleton(true)
}

// SetTimes sets the times which the entry can run.
func (entry *Entry) SetTimes(times int) {
func (entry *Job) SetTimes(times int) {
entry.times.Set(times)
}

// Status returns the status of entry.
func (entry *Entry) Status() int {
return entry.entry.Status()
func (entry *Job) Status() int {
return entry.job.Status()
}

// SetStatus sets the status of the entry.
func (entry *Entry) SetStatus(status int) int {
return entry.entry.SetStatus(status)
func (entry *Job) SetStatus(status int) int {
return entry.job.SetStatus(status)
}

// Start starts running the entry.
func (entry *Entry) Start() {
entry.entry.Start()
func (entry *Job) Start() {
entry.job.Start()
}

// Stop stops running the entry.
func (entry *Entry) Stop() {
entry.entry.Stop()
func (entry *Job) Stop() {
entry.job.Stop()
}

// Close stops and removes the entry from cron.
func (entry *Entry) Close() {
func (entry *Job) Close() {
entry.cron.entries.Remove(entry.Name)
entry.entry.Close()
entry.job.Close()
}

// Timed task check execution.
// The running times limits feature is implemented by gcron.Entry and cannot be implemented by gtimer.Entry.
// gcron.Entry relies on gtimer to implement a scheduled task check for gcron.Entry per second.
func (entry *Entry) check() {
// The running times limits feature is implemented by gcron.Job and cannot be implemented by gtimer.Job.
// gcron.Job relies on gtimer to implement a scheduled task check for gcron.Job per second.
func (entry *Job) check() {
if entry.schedule.meet(time.Now()) {
path := entry.cron.GetLogPath()
level := entry.cron.GetLogLevel()
Expand All @@ -125,7 +125,7 @@ func (entry *Entry) check() {
// Running times check.
times := entry.times.Add(-1)
if times <= 0 {
if entry.entry.SetStatus(StatusClosed) == StatusClosed || times < 0 {
if entry.job.SetStatus(StatusClosed) == StatusClosed || times < 0 {
return
}
}
Expand All @@ -139,7 +139,7 @@ func (entry *Entry) check() {
} else {
glog.Path(path).Level(level).Debugf("[gcron] %s(%s) %s end", entry.Name, entry.schedule.pattern, entry.jobName)
}
if entry.entry.Status() == StatusClosed {
if entry.job.Status() == StatusClosed {
entry.Close()
}
}()
Expand Down
2 changes: 1 addition & 1 deletion os/gcron/gcron_unit_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/gogf/gf/test/gtest"
)

func TestCron_Entry_Operations(t *testing.T) {
func TestCron_Job_Operations(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
var (
cron = gcron.New()
Expand Down
Loading

0 comments on commit d76e4c8

Please sign in to comment.