diff --git a/internal/jobs/job.go b/internal/jobs/job.go index b67e3a2..5a33e6b 100644 --- a/internal/jobs/job.go +++ b/internal/jobs/job.go @@ -63,7 +63,7 @@ func (job *job) Run() { } } job.runner.logger.Infow(fmt.Sprintf("Job %v (%s) is running or could not get a ticket (%v avail). "+ - "queuing for retry in %v", job.id, job.title, job.runner.raffle.ticketsFull, duration), + "queuing for retry in %v", job.title, job.id, job.runner.raffle.ticketsFull, duration), "job.jobId", job.id, "job.jobTitle", job.title, "job.state", "Running") @@ -75,7 +75,7 @@ func (job *job) Run() { // could not obtain ticket. This indicates a job with the same jobId is running already. Or tickets are empty. // We skip this execution request job.runner.logger.Infow(fmt.Sprintf("Job %v (%s) running or did not get a ticket (%v avail), skipping.", - job.id, job.title, job.runner.raffle.ticketsIncr), "job.jobId", job.id, "job.jobTitle", job.title) + job.title, job.id, job.runner.raffle.ticketsIncr), "job.jobId", job.id, "job.jobTitle", job.title) return } defer job.runner.raffle.returnTicket(ticket) @@ -89,7 +89,7 @@ func (job *job) Run() { jobType = "fullsync" } - job.runner.logger.Infow(fmt.Sprintf("Starting %v %s with id '%s' (%s)", jobType, msg, job.id, job.title), + job.runner.logger.Infow(fmt.Sprintf("Starting %v %s with id '%s' (%s)", jobType, msg, job.title, job.id), "job.jobId", job.id, "job.jobTitle", job.title, "job.state", "Starting", @@ -97,14 +97,14 @@ func (job *job) Run() { tags := []string{ "application:datahub", - fmt.Sprintf("jobs:job-%s", job.id), + fmt.Sprintf("jobs:job-%s", job.title), fmt.Sprintf("jobtype:%v", jobType), } _ = job.runner.statsdClient.Count("jobs.count", 1, tags, 1) sourceType := job.pipeline.spec().source.GetConfig()["Type"] sinkType := job.pipeline.spec().sink.GetConfig()["Type"] - job.runner.logger.Infow(fmt.Sprintf(" > Running task '%s' (%s): %s -> %s", job.id, job.title, sourceType, sinkType), + job.runner.logger.Infow(fmt.Sprintf(" > Running task '%s' (%s): %s -> %s", job.title, job.id, sourceType, sinkType), "job.jobId", job.id, "job.jobTitle", job.title, "job.state", "Running", @@ -114,14 +114,14 @@ func (job *job) Run() { if err != nil { if err.Error() == "got job interrupt" { // if a job gets killed, this will trigger _ = job.runner.statsdClient.Count("jobs.cancelled", timed.Nanoseconds(), tags, 1) - job.runner.logger.Infow(fmt.Sprintf("Job '%s' (%s) was terminated", job.id, job.title), + job.runner.logger.Infow(fmt.Sprintf("Job '%s' (%s) was terminated", job.title, job.id), "job.jobId", job.id, "job.jobTitle", job.title, "job.state", "Terminated", "job.jobType", jobType) } else { _ = job.runner.statsdClient.Count("jobs.error", timed.Nanoseconds(), tags, 1) - job.runner.logger.Warnw(fmt.Sprintf("Failed running task for job '%s' (%s): %s", job.id, job.title, err.Error()), + job.runner.logger.Warnw(fmt.Sprintf("Failed running task for job '%s' (%s): %s", job.title, job.id, err.Error()), "job.jobId", job.id, "job.jobTitle", job.title, "job.state", "Failed", @@ -132,7 +132,7 @@ func (job *job) Run() { _ = job.runner.statsdClient.Count("jobs.success", timed.Nanoseconds(), tags, 1) } - job.runner.logger.Infow(fmt.Sprintf("Finished %s with id '%s' (%s) - duration was %s", msg, job.id, job.title, timed), + job.runner.logger.Infow(fmt.Sprintf("Finished %s with id '%s' (%s) - duration was %s", msg, job.title, job.id, timed), "job.jobId", job.id, "job.jobTitle", job.title, "job.state", "Finished", @@ -157,7 +157,7 @@ var retryJobIds sync.Map func queueRetry(duration time.Duration, j *job) bool { if _, alreadyQueued := retryJobIds.LoadOrStore(j.id, true); alreadyQueued { - j.runner.logger.Infow(fmt.Sprintf("could not queue, job already queued for retry: %v - %v", j.id, j.title), + j.runner.logger.Infow(fmt.Sprintf("could not queue, job already queued for retry: %v - %v", j.title, j.id), "job.jobId", j.id, "job.jobTitle", j.title, "job.jobState", "Running") diff --git a/internal/jobs/pipeline.go b/internal/jobs/pipeline.go index d873f4a..f656782 100644 --- a/internal/jobs/pipeline.go +++ b/internal/jobs/pipeline.go @@ -77,7 +77,7 @@ func (pipeline *FullSyncPipeline) sync(job *job, ctx context.Context) error { } syncJobState.ContinuationToken = "" - tags := []string{"application:datahub", "job:" + job.id} + tags := []string{"application:datahub", "job:" + job.title} for keepReading { processEntities := func(entities []*server.Entity, continuationToken jobSource.DatasetContinuation) error { @@ -95,7 +95,7 @@ func (pipeline *FullSyncPipeline) sync(job *job, ctx context.Context) error { // apply transform if it exists if pipeline.transform != nil { transformTs := time.Now() - entities, err = pipeline.transform.transformEntities(runner, entities, job.id) + entities, err = pipeline.transform.transformEntities(runner, entities, job.title) _ = runner.statsdClient.Timing("pipeline.transform.batch", time.Since(transformTs), tags, 1) if err != nil { return err @@ -187,7 +187,7 @@ func (pipeline *IncrementalPipeline) sync(job *job, ctx context.Context) error { keepReading := true - tags := []string{"application:datahub", "job:" + job.id} + tags := []string{"application:datahub", "job:" + job.title} for keepReading { processEntities := func(entities []*server.Entity, continuationToken jobSource.DatasetContinuation) error { @@ -219,11 +219,11 @@ func (pipeline *IncrementalPipeline) sync(job *job, ctx context.Context) error { if reflect.TypeOf(pipeline.transform) == reflect.TypeOf(&JavascriptTransform{}) { t := pipeline.transform.(*JavascriptTransform) tc, _ := t.Clone() - pe, e := tc.transformEntities(runner, lentities, job.id) + pe, e := tc.transformEntities(runner, lentities, job.title) res.entities = pe res.err = e } else { - pe, e := pipeline.transform.transformEntities(runner, lentities, job.id) + pe, e := pipeline.transform.transformEntities(runner, lentities, job.title) res.entities = pe res.err = e } diff --git a/internal/jobs/scheduler.go b/internal/jobs/scheduler.go index 04aaa6c..eef3ebd 100644 --- a/internal/jobs/scheduler.go +++ b/internal/jobs/scheduler.go @@ -154,7 +154,7 @@ func (s *Scheduler) AddJob(jobConfig *JobConfiguration) error { return err } } else { - s.Logger.Infof("Job '%s' is currently paused, it will not be started automatically", jobConfig.Id) + s.Logger.Infof("Job '%s' is currently paused, it will not be started automatically", jobConfig.Title) } } return nil