Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ [Job Engine] Added JobDataCleanupJobEndCallback to delete JobInstanceData after JobExecution has ended #4150

Merged
merged 2 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*******************************************************************************
* Copyright (c) 2024, 2024 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Eurotech - initial API and implementation
*******************************************************************************/
package org.eclipse.kapua.job.engine.jbatch.overrides.callback;

import com.ibm.jbatch.container.callback.JobEndCallback;
import com.ibm.jbatch.container.servicesmanager.ServicesManager;
import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
import org.eclipse.kapua.job.engine.jbatch.driver.JbatchDriver;
import org.eclipse.kapua.job.engine.jbatch.persistence.JPAPersistenceManagerImpl;
import org.eclipse.kapua.job.engine.jbatch.persistence.jpa.JpaJobInstanceData;
import org.eclipse.kapua.model.id.KapuaId;
import org.eclipse.kapua.service.job.Job;
import org.eclipse.kapua.service.job.execution.JobExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link JpaJobInstanceData} cleanup {@link JobEndCallback}.
* <p>
* This {@link JobEndCallback} removes {@link JpaJobInstanceData} after the {@link JobExecution} ends it execution.
* This is done to avoid that {@link Job}s that run a lot of times (thousands or more) fill up the {@link JpaJobInstanceData}
* table making {@link JbatchDriver#isRunningJob(KapuaId, KapuaId)} tanking a lot of time to execute.
* <p>
* {@link JpaJobInstanceData} aren't required nor useful once the {@link JobExecution} ends its processing so they can be safely deleted.
*
* @since 2.1.0
*/
public class JobDataCleanupJobEndCallback implements JobEndCallback {

private static final Logger LOG = LoggerFactory.getLogger(JobDataCleanupJobEndCallback.class);

private final JPAPersistenceManagerImpl persistenceService;
private long jobExecutionId;

/**
* Constructor.
*
* @since 2.1.0
*/
public JobDataCleanupJobEndCallback() {
ServicesManager servicesManager = ServicesManagerImpl.getInstance();
persistenceService = (JPAPersistenceManagerImpl) servicesManager.getPersistenceManagerService();
}

@Override
public void setExecutionId(long jobExecutionId) {
this.jobExecutionId = jobExecutionId;
}

@Override
public void done(long jobExecutionId) {
try {
long jobInstanceId = persistenceService.getJobInstanceIdByExecutionId(jobExecutionId);

LOG.info("Deleting JobInstanceData {} after JobExecution {} has completed...", jobInstanceId, jobExecutionId);
persistenceService.deleteJobInstanceData(jobInstanceId);
LOG.info("Deleting JobInstanceData {} after JobExecution {} has completed... DONE!", jobInstanceId, jobExecutionId);
}
catch (Exception e) {
LOG.error("Deleting JobInstanceData for JobExecution {} has completed... ERROR! This will leave JobInstanceData in the DB until the Job gets deleted", jobExecutionId, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*******************************************************************************
* Copyright (c) 2024, 2024 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Eurotech - initial API and implementation
*******************************************************************************/
package org.eclipse.kapua.job.engine.jbatch.overrides.callback;

import com.ibm.jbatch.container.callback.IJobEndCallbackService;
import com.ibm.jbatch.container.callback.JobEndCallback;
import com.ibm.jbatch.container.callback.JobEndCallbackManagerImpl;

/**
* Kapua {@link IJobEndCallbackService} which extends default {@link JobEndCallbackManagerImpl} to register custom {@link JobEndCallback}s
*
* @since 2.1.0
*/
public class KapuaJobEndCallbackManagerImpl extends JobEndCallbackManagerImpl implements IJobEndCallbackService {

/**
* Constructor
*
* @since 2.1.0
*/
public KapuaJobEndCallbackManagerImpl() {
super();

// Required to delete JobInstanceData after JobExecution ends
registerJobEndCallback(new JobDataCleanupJobEndCallback());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@
* <pre>
* JobInstanceData (aka: JobInstace)
* |
* |-- as one --- JobStatus
* |-- as many -- ExecutionInstanceData (aka: JobExecution)
* |-- has one --- JobStatus
* |-- has many -- ExecutionInstanceData (aka: JobExecution)
* |
* |-- as many -- StepExecutionInstanceData (aka: JobStepExecution)
* |-- has many -- StepExecutionInstanceData (aka: JobStepExecution)
* | |
* | |-- as one --- StepStatus
* | |-- has one --- StepStatus
* |
* |-- as many -- CheckpointData
* |-- has many -- CheckpointData
* </pre>
*
* @since 1.2.0
Expand Down Expand Up @@ -220,6 +220,20 @@ public String getJobCurrentTag(long jobInstanceId) {
return "NOTSET";
}

/**
* Deletes {@link JpaJobInstanceData} by its {@link JpaJobInstanceData#getId()}.
*
* @param jobInstanceId The {@link JpaJobInstanceData#getId()} to delete.
* @since 2.1.0
*/
public void deleteJobInstanceData(long jobInstanceId) {
try {
txManager.execute(tx -> jobInstanceDataRepository.deleteById(tx, jobInstanceId));
} catch (Exception e) {
throw new PersistenceException(e);
}
}

/**
* Deletes {@link JobInstance}s by name.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@
@NamedQuery(name = "JobInstanceData.countByNameTagApp",
query = "SELECT COUNT(jid) FROM JobInstanceData jid WHERE jid.name = :name AND jid.appTag = :appTag"),
@NamedQuery(name = "JobInstanceData.deleteByName",
query = "DELETE FROM JobInstanceData jid WHERE jid.name = :name")
query = "DELETE FROM JobInstanceData jid WHERE jid.name = :name"),
@NamedQuery(name = "JobInstanceData.deleteById",
query = "DELETE FROM JobInstanceData jid WHERE jid.id = :id")

})
public class JpaJobInstanceData implements Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,22 @@
public interface JpaJobInstanceDataRepository {
JpaJobInstanceData create(TxContext tx, String name, String appTag, String jobXml);

int deleteByName(TxContext tx, String jobName);

JpaJobInstanceData find(TxContext tx, long id);

Integer getJobInstanceCount(TxContext tx, String jobName, String appTag);

List<Long> getJobInstanceIds(TxContext tx, String jobName, String appTag, Integer offset, Integer limit);

List<JpaJobInstanceData> getExternalJobInstanceData(TxContext tx);

int deleteByName(TxContext tx, String jobName);

/**
* Deletes {@link JpaJobInstanceData} by its {@link JpaJobInstanceData#getId()}.
*
* @param tx The {@link TxContext}
* @param jobInstanceId The {@link JpaJobInstanceData#getId()} to delete.
* @since 2.1.0
*/
int deleteById(TxContext tx, long jobInstanceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,6 @@ public JpaJobInstanceData create(TxContext tx, String name, String appTag, Strin
return jpaJobInstanceData;
}

@Override
public int deleteByName(TxContext tx, String jobName) {
final EntityManager em = JpaAwareTxContext.extractEntityManager(tx);

TypedQuery<Integer> deleteByNameQuery = em.createNamedQuery("JobInstanceData.deleteByName", Integer.class);
deleteByNameQuery.setParameter("name", jobName);
return deleteByNameQuery.executeUpdate();
}

@Override
public JpaJobInstanceData find(TxContext tx, long id) {
final EntityManager em = JpaAwareTxContext.extractEntityManager(tx);
Expand Down Expand Up @@ -102,4 +93,22 @@ public List<JpaJobInstanceData> getExternalJobInstanceData(TxContext tx) {
final List<JpaJobInstanceData> queryResult = selectQuery.getResultList();
return queryResult;
}

@Override
public int deleteByName(TxContext tx, String jobName) {
final EntityManager em = JpaAwareTxContext.extractEntityManager(tx);

TypedQuery<Integer> deleteByNameQuery = em.createNamedQuery("JobInstanceData.deleteByName", Integer.class);
deleteByNameQuery.setParameter("name", jobName);
return deleteByNameQuery.executeUpdate();
}

@Override
public int deleteById(TxContext tx, long jobInstanceId) {
final EntityManager em = JpaAwareTxContext.extractEntityManager(tx);

TypedQuery<Integer> deleteByNameQuery = em.createNamedQuery("JobInstanceData.deleteById", Integer.class);
deleteByNameQuery.setParameter("id", jobInstanceId);
return deleteByNameQuery.executeUpdate();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
J2SE_MODE=true
JOBXML_LOADER_SERVICE=com.ibm.jbatch.container.services.impl.DirectoryJobXMLLoaderServiceImpl
CONTAINER_ARTIFACT_FACTORY_SERVICE=org.eclipse.kapua.job.engine.jbatch.KapuaDelegatingBatchArtifactFactoryImpl
# com.ibm.jbatch.container.services.impl.DelegatingBatchArtifactFactoryImpl
CALLBACK_SERVICE=org.eclipse.kapua.job.engine.jbatch.overrides.callback.KapuaJobEndCallbackManagerImpl
BATCH_THREADPOOL_SERVICE=com.ibm.jbatch.container.services.impl.GrowableThreadPoolServiceImpl
PERSISTENCE_MANAGEMENT_SERVICE=org.eclipse.kapua.job.engine.jbatch.persistence.JPAPersistenceManagerImpl
Loading
Loading