Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
Include job type in key names. Closes #1
Record task start + end times, processing time and wall-time. Closes #2
Rollup counter and timers into parent (shard -> namespace, namespace -> job). Closes #3
Rename iterator / splitter. Closes #5
Record job data and use to track overall status when shard / namespaces complete. Closes #6
Use transactional task enqueueing rather than named tasks #7
Provide abort mechanism (no write yet) #9
Output to cloud storage and rollup files (slice -> shard, shard -> namespace) Closes #12
  • Loading branch information
CaptainCodeman committed Jul 11, 2016
1 parent 7eafe10 commit ef6d680
Show file tree
Hide file tree
Showing 21 changed files with 1,741 additions and 972 deletions.
19 changes: 2 additions & 17 deletions api_jobs.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package mapper

import (
"fmt"
"net/http"
"strconv"

"google.golang.org/appengine"
)

type (
Expand All @@ -25,20 +21,9 @@ func (a apiJobs) Get(w http.ResponseWriter, r *http.Request, id string) (int, in
*/

func (a apiJobs) Post(w http.ResponseWriter, r *http.Request, id string) (int, interface{}, error) {
values := r.URL.Query()
name := values.Get("type")
job, _ := CreateJobInstance(name)
if job == nil {
return http.StatusBadRequest, nil, fmt.Errorf("job type not found")
if err := StartJob(r); err != nil {
return http.StatusBadRequest, nil, err
}

shards, _ := strconv.Atoi(values.Get("shards"))
queue := values.Get("queue")
query, _ := job.Query(r)

c := appengine.NewContext(r)

StartJob(c, job, query, queue, shards)
data := map[string]interface{}{
"started": true,
}
Expand Down
93 changes: 93 additions & 0 deletions common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package mapper

import (
"strings"
"time"

"google.golang.org/appengine/datastore"
)

type (
// common contains properties that are common across all
// mapper entities (job, iterator, namespace and shard)
common struct {
// Counters holds the task counters map
Counters Counters `datastore:"-"`

// Active indicates if this task is still active
Active bool `datastore:"active,noindex"`

// Count is the number of records processed
Count int64 `datastore:"count,noindex"`

// Started is when the task began
Started time.Time `datastore:"started"`

// Updated is when the task was last updated
Updated time.Time `datastore:"updated"`

// ProcessTime is the time that the task spent executing
ProcessTime time.Duration `datastore:"process_time,noindex"`

// WallTime is the wall-time that the task takes
WallTime time.Duration `datastore:"wall_time,noindex"`

// private fields used by local instance
id string
queue string
startTime time.Time
}
)

func (c *common) start() {
c.Active = true
c.Counters = NewCounters()
c.Count = 0
c.Started = getTime()
c.Updated = c.Started
c.startTime = c.Started
}

func (c *common) complete() {
c.Active = false
c.Updated = getTime()
c.WallTime = c.Updated.Sub(c.Started)
}

func (c *common) rollup(r common) {
c.Count += r.Count
c.ProcessTime += r.ProcessTime
c.Counters.Add(r.Counters)
}

/* datastore */
func (c *common) Load(props []datastore.Property) error {
datastore.LoadStruct(c, props)

c.Counters = make(map[string]int64)
for _, prop := range props {
if strings.HasPrefix(prop.Name, "counters.") {
key := prop.Name[9:len(prop.Name)]
c.Counters[key] = prop.Value.(int64)
}
}

c.startTime = getTime()

return nil
}

func (c *common) Save() ([]datastore.Property, error) {
c.ProcessTime += getTime().Sub(c.startTime)

props, err := datastore.SaveStruct(c)
if err != nil {
return nil, err
}

for key, value := range c.Counters {
props = append(props, datastore.Property{Name: "counters." + key, Value: value, NoIndex: true, Multiple: false})
}

return props, nil
}
40 changes: 13 additions & 27 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

type (
// Config stores configuration settings and defaults
// Config stores mapper configuration settings and defaults
Config struct {
// DatastorePrefix is added to the beginning of every mapreduce collection name
DatastorePrefix string
Expand All @@ -22,27 +22,16 @@ type (
// ShardCount is the default number of shards to use
ShardCount int

// MaxShardCount is the maximum number of shards to use
MaxShardCount int

// OverplitFactor helps achieve more event shard distribution with 'clumpy' data
// OversamplingFactor helps achieve more event shard distribution with 'clumpy' data
// clumpy is definitely a technical term
OverplitFactor int

// MaxNamespacesForKeyShard is the maximum number of namespaces
// that will be sharded by datastore key before switching to a
// strategy where sharding is done lexographically by namespace.
MaxNamespacesForKeyShard int
OversamplingFactor int

// LeaseDuration is how long a worker will hold a lock for
LeaseDuration time.Duration

// MaxLeaseDuration is the time considered to be a timeout
MaxLeaseDuration time.Duration

// Delay between consecutive controller callback invocations.
ControllerPeriod time.Duration

// TaskMaxAttempts is the maximum number of times to retry a failing task
TaskMaxAttempts int

Expand All @@ -57,18 +46,15 @@ var (

func defaultConfig() *Config {
return &Config{
DatastorePrefix: "MP_",
TaskPrefix: "MP-",
BasePath: "/mapper",
Queue: "default",
ShardCount: 8,
MaxShardCount: 256,
OverplitFactor: 32,
MaxNamespacesForKeyShard: 10,
LeaseDuration: time.Duration(30) * time.Second,
MaxLeaseDuration: time.Duration(10)*time.Minute + time.Duration(30)*time.Second,
ControllerPeriod: time.Duration(4) * time.Second,
TaskMaxAttempts: 31,
LogVerbose: true,
DatastorePrefix: "MP_",
TaskPrefix: "MP-",
BasePath: "/mapper",
Queue: "default",
ShardCount: 8,
OversamplingFactor: 32,
LeaseDuration: time.Duration(30) * time.Second,
MaxLeaseDuration: time.Duration(10)*time.Minute + time.Duration(30)*time.Second,
TaskMaxAttempts: 31,
LogVerbose: true,
}
}
4 changes: 4 additions & 0 deletions docs.go
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
package mapper // import "github.com/captaincodeman/datastore-mapper"

/*
mapper is a library to iterate over appengine Datastore entities
*/
163 changes: 163 additions & 0 deletions iterator_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package mapper

import (
"net/http"

"golang.org/x/net/context"
"google.golang.org/appengine"
"google.golang.org/appengine/datastore"
"google.golang.org/appengine/log"
)

const (
iteratorURL = "/iterate"
iteratorCompleteURL = iteratorURL + "/complete"
)

func init() {
Server.HandleFunc(iteratorURL, iteratorHandler)
Server.HandleFunc(iteratorCompleteURL, iteratorCompleteHandler)
}

func iteratorHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
return
}

c := appengine.NewContext(r)

id, seq, queue, _ := ParseLock(r)
log.Infof(c, "iterator %s %d", id, seq)

k := datastore.NewKey(c, config.DatastorePrefix+iteratorKind, id, 0, nil)
it := new(iterator)

if err := GetLock(c, k, it, seq); err != nil {
if serr, ok := err.(*LockError); ok {
// for locking errors, the error gives us the response to use
w.WriteHeader(serr.Response)
w.Write([]byte(serr.Error()))
} else {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
}
log.Errorf(c, "error %s", err.Error())
return
}

it.id = id
it.queue = queue

j, err := getJob(c, id)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
ClearLock(c, k, it, false)
log.Errorf(c, "error %s", err.Error())
return
}

if j.Abort {
w.WriteHeader(http.StatusOK)
return
}

it.job = j

completed, err := it.iterate(c)
if err != nil {
// this will cause a task retry
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
ClearLock(c, k, it, false)
log.Errorf(c, "error %s", err.Error())
return
}

var url string
if completed {
// schedule completion
url = config.BasePath + iteratorCompleteURL
} else {
// schedule continuation
url = config.BasePath + iteratorURL
}
_, err = ScheduleLock(c, k, it, url, nil, queue)
if err != nil {
// this will cause a task retry
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
ClearLock(c, k, it, false)
} else {
w.WriteHeader(http.StatusOK)
}
}

func iteratorCompleteHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
return
}

c := appengine.NewContext(r)

id, seq, queue, _ := ParseLock(r)
log.Infof(c, "iterator complete %s %d", id, seq)

k := datastore.NewKey(c, config.DatastorePrefix+iteratorKind, id, 0, nil)
it := new(iterator)

if err := GetLock(c, k, it, seq); err != nil {
if serr, ok := err.(*LockError); ok {
// for locking errors, the error gives us the response to use
w.WriteHeader(serr.Response)
w.Write([]byte(serr.Error()))
} else {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
}
log.Errorf(c, "error %s", err.Error())
return
}

it.id = id
it.queue = queue

// mark iterator as complete
it.complete()
it.RequestID = ""

// update iterator status and job within a transaction
jk := datastore.NewKey(c, config.DatastorePrefix+jobKind, id, 0, nil)
j := new(jobState)
err := storage.RunInTransaction(c, func(tc context.Context) error {
if err := storage.Get(tc, jk, j); err != nil {
return err
}
if j.Abort {
return nil
}

j.NamespacesTotal += int(it.Count)
j.Iterating = false
// only the iterator walltime is rolled up into the job counts
j.WallTime += it.WallTime

if _, err := storage.Put(tc, k, it); err != nil {
return err
}
if _, err := storage.Put(tc, jk, j); err != nil {
return err
}
return nil
}, &datastore.TransactionOptions{XG: true, Attempts: attempts})

if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
ClearLock(c, k, it, false)
log.Errorf(c, "error %s", err.Error())
return
}

w.WriteHeader(http.StatusOK)
}
Loading

0 comments on commit ef6d680

Please sign in to comment.