Skip to content

Commit

Permalink
Merge pull request #164 from mimiro-io/jobTitleForMetrics
Browse files Browse the repository at this point in the history
Switch metrics to use job title instead of id
  • Loading branch information
rompetroll authored Nov 3, 2022
2 parents a907f18 + e8d774c commit d00e2b1
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 15 deletions.
18 changes: 9 additions & 9 deletions internal/jobs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (job *job) Run() {
}
}
job.runner.logger.Infow(fmt.Sprintf("Job %v (%s) is running or could not get a ticket (%v avail). "+
"queuing for retry in %v", job.id, job.title, job.runner.raffle.ticketsFull, duration),
"queuing for retry in %v", job.title, job.id, job.runner.raffle.ticketsFull, duration),
"job.jobId", job.id,
"job.jobTitle", job.title,
"job.state", "Running")
Expand All @@ -75,7 +75,7 @@ func (job *job) Run() {
// could not obtain ticket. This indicates a job with the same jobId is running already. Or tickets are empty.
// We skip this execution request
job.runner.logger.Infow(fmt.Sprintf("Job %v (%s) running or did not get a ticket (%v avail), skipping.",
job.id, job.title, job.runner.raffle.ticketsIncr), "job.jobId", job.id, "job.jobTitle", job.title)
job.title, job.id, job.runner.raffle.ticketsIncr), "job.jobId", job.id, "job.jobTitle", job.title)
return
}
defer job.runner.raffle.returnTicket(ticket)
Expand All @@ -89,22 +89,22 @@ func (job *job) Run() {
jobType = "fullsync"
}

job.runner.logger.Infow(fmt.Sprintf("Starting %v %s with id '%s' (%s)", jobType, msg, job.id, job.title),
job.runner.logger.Infow(fmt.Sprintf("Starting %v %s with id '%s' (%s)", jobType, msg, job.title, job.id),
"job.jobId", job.id,
"job.jobTitle", job.title,
"job.state", "Starting",
"job.jobType", jobType)

tags := []string{
"application:datahub",
fmt.Sprintf("jobs:job-%s", job.id),
fmt.Sprintf("jobs:job-%s", job.title),
fmt.Sprintf("jobtype:%v", jobType),
}
_ = job.runner.statsdClient.Count("jobs.count", 1, tags, 1)

sourceType := job.pipeline.spec().source.GetConfig()["Type"]
sinkType := job.pipeline.spec().sink.GetConfig()["Type"]
job.runner.logger.Infow(fmt.Sprintf(" > Running task '%s' (%s): %s -> %s", job.id, job.title, sourceType, sinkType),
job.runner.logger.Infow(fmt.Sprintf(" > Running task '%s' (%s): %s -> %s", job.title, job.id, sourceType, sinkType),
"job.jobId", job.id,
"job.jobTitle", job.title,
"job.state", "Running",
Expand All @@ -114,14 +114,14 @@ func (job *job) Run() {
if err != nil {
if err.Error() == "got job interrupt" { // if a job gets killed, this will trigger
_ = job.runner.statsdClient.Count("jobs.cancelled", timed.Nanoseconds(), tags, 1)
job.runner.logger.Infow(fmt.Sprintf("Job '%s' (%s) was terminated", job.id, job.title),
job.runner.logger.Infow(fmt.Sprintf("Job '%s' (%s) was terminated", job.title, job.id),
"job.jobId", job.id,
"job.jobTitle", job.title,
"job.state", "Terminated",
"job.jobType", jobType)
} else {
_ = job.runner.statsdClient.Count("jobs.error", timed.Nanoseconds(), tags, 1)
job.runner.logger.Warnw(fmt.Sprintf("Failed running task for job '%s' (%s): %s", job.id, job.title, err.Error()),
job.runner.logger.Warnw(fmt.Sprintf("Failed running task for job '%s' (%s): %s", job.title, job.id, err.Error()),
"job.jobId", job.id,
"job.jobTitle", job.title,
"job.state", "Failed",
Expand All @@ -132,7 +132,7 @@ func (job *job) Run() {
_ = job.runner.statsdClient.Count("jobs.success", timed.Nanoseconds(), tags, 1)
}

job.runner.logger.Infow(fmt.Sprintf("Finished %s with id '%s' (%s) - duration was %s", msg, job.id, job.title, timed),
job.runner.logger.Infow(fmt.Sprintf("Finished %s with id '%s' (%s) - duration was %s", msg, job.title, job.id, timed),
"job.jobId", job.id,
"job.jobTitle", job.title,
"job.state", "Finished",
Expand All @@ -157,7 +157,7 @@ var retryJobIds sync.Map

func queueRetry(duration time.Duration, j *job) bool {
if _, alreadyQueued := retryJobIds.LoadOrStore(j.id, true); alreadyQueued {
j.runner.logger.Infow(fmt.Sprintf("could not queue, job already queued for retry: %v - %v", j.id, j.title),
j.runner.logger.Infow(fmt.Sprintf("could not queue, job already queued for retry: %v - %v", j.title, j.id),
"job.jobId", j.id,
"job.jobTitle", j.title,
"job.jobState", "Running")
Expand Down
10 changes: 5 additions & 5 deletions internal/jobs/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (pipeline *FullSyncPipeline) sync(job *job, ctx context.Context) error {
}
syncJobState.ContinuationToken = ""

tags := []string{"application:datahub", "job:" + job.id}
tags := []string{"application:datahub", "job:" + job.title}
for keepReading {

processEntities := func(entities []*server.Entity, continuationToken jobSource.DatasetContinuation) error {
Expand All @@ -95,7 +95,7 @@ func (pipeline *FullSyncPipeline) sync(job *job, ctx context.Context) error {
// apply transform if it exists
if pipeline.transform != nil {
transformTs := time.Now()
entities, err = pipeline.transform.transformEntities(runner, entities, job.id)
entities, err = pipeline.transform.transformEntities(runner, entities, job.title)
_ = runner.statsdClient.Timing("pipeline.transform.batch", time.Since(transformTs), tags, 1)
if err != nil {
return err
Expand Down Expand Up @@ -187,7 +187,7 @@ func (pipeline *IncrementalPipeline) sync(job *job, ctx context.Context) error {

keepReading := true

tags := []string{"application:datahub", "job:" + job.id}
tags := []string{"application:datahub", "job:" + job.title}
for keepReading {

processEntities := func(entities []*server.Entity, continuationToken jobSource.DatasetContinuation) error {
Expand Down Expand Up @@ -219,11 +219,11 @@ func (pipeline *IncrementalPipeline) sync(job *job, ctx context.Context) error {
if reflect.TypeOf(pipeline.transform) == reflect.TypeOf(&JavascriptTransform{}) {
t := pipeline.transform.(*JavascriptTransform)
tc, _ := t.Clone()
pe, e := tc.transformEntities(runner, lentities, job.id)
pe, e := tc.transformEntities(runner, lentities, job.title)
res.entities = pe
res.err = e
} else {
pe, e := pipeline.transform.transformEntities(runner, lentities, job.id)
pe, e := pipeline.transform.transformEntities(runner, lentities, job.title)
res.entities = pe
res.err = e
}
Expand Down
2 changes: 1 addition & 1 deletion internal/jobs/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (s *Scheduler) AddJob(jobConfig *JobConfiguration) error {
return err
}
} else {
s.Logger.Infof("Job '%s' is currently paused, it will not be started automatically", jobConfig.Id)
s.Logger.Infof("Job '%s' is currently paused, it will not be started automatically", jobConfig.Title)
}
}
return nil
Expand Down

0 comments on commit d00e2b1

Please sign in to comment.