Skip to content

Commit

Permalink
[INLONG-9125][Agent] Add offset manager
Browse files Browse the repository at this point in the history
  • Loading branch information
justinwwhuang committed Oct 26, 2023
1 parent 3ee9921 commit d31c10b
Showing 1 changed file with 76 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.core.task;

import org.apache.inlong.agent.conf.OffsetProfile;
import org.apache.inlong.agent.db.OffsetDb;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* used to store task position to db, task position is stored as properties in JobProfile.
* where key is task read file name and value is task sink position
* note that this class is generated
*/
public class OffsetManager {

private static final Logger LOGGER = LoggerFactory.getLogger(OffsetManager.class);
private static volatile OffsetManager offsetManager = null;
private final OffsetDb offsetDb;

private OffsetManager() {
this.offsetDb = new OffsetDb();
}

/**
* task position manager singleton, can only generated by agent manager
*/
public static OffsetManager init() {
if (offsetManager == null) {
synchronized (OffsetManager.class) {
if (offsetManager == null) {
offsetManager = new OffsetManager();
}
}
}
return offsetManager;
}

/**
* get taskPositionManager singleton
*/
public static OffsetManager getInstance() {
if (offsetManager == null) {
throw new RuntimeException("task position manager has not been initialized by agentManager");
}
return offsetManager;
}

public void setOffset(OffsetProfile profile) {
offsetDb.setOffset(profile);
}

public void deleteOffset(String taskId, String instanceId) {
offsetDb.deleteOffset(taskId, instanceId);
}

public OffsetProfile getOffset(String taskId, String instanceId) {
return offsetDb.getOffset(taskId, instanceId);
}
}

0 comments on commit d31c10b

Please sign in to comment.