Skip to content

Commit

Permalink
set batch state to processing, missing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
terrywbrady committed May 16, 2024
1 parent dfbd06e commit 19e059d
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 21 deletions.
1 change: 1 addition & 0 deletions src/main/java/org/cdlib/mrt/zk/Batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public static Batch acquirePendingBatch(ZooKeeper client) throws MerrittZKNodeIn
if (b.lock(client)) {
b.load(client);
b.createData(client, ZKKey.STATES, null);
b.setStatus(client, BatchState.Processing);
return b;
}
}
Expand Down
1 change: 1 addition & 0 deletions src/main/ruby/lib/merritt_zk_batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def self.acquire_pending_batch(zk)
begin
if b.lock(zk)
b.set_data(zk, ZkKeys::STATES, nil)
b.set_status(zk, BatchState::Processing)
return b
end
rescue ZK::Exceptions::NodeExists
Expand Down
12 changes: 9 additions & 3 deletions src/main/ruby/lib/merritt_zk_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ class Job < QueueItem
DIR = '/jobs'
PREFIX = 'jid'

def initialize(id, bid: nil, data: nil)
def initialize(id, bid: nil, data: nil, identifiers: {}, metadata: {})
super(id, data: data)
@bid = bid
@priority = 5
@space_needed = 0
@job_state_path = nil
@batch_state_path = nil
@retry_count = 0
@identifiers = identifiers
@metadata = metadata
end

def load_status(zk, js)
Expand All @@ -31,6 +33,8 @@ def load_properties(zk)
@bid = string_property(zk, ZkKeys::BID)
@priority = int_property(zk, ZkKeys::PRIORITY)
@space_needed = int_property(zk, ZkKeys::SPACE_NEEEDED)
@identifiers = json_property(zk, ZkKeys::IDENTIFIERS) if zk.exists?("#{path}/#{ZkKeys::IDENTIFIERS}")
@metadata = json_property(zk, ZkKeys::METADATA) if zk.exists?("#{path}/#{ZkKeys::METADATA}")
set_job_state_path(zk)
set_batch_state_path(zk)
end
Expand Down Expand Up @@ -106,13 +110,15 @@ def path
"#{DIR}/#{@id}"
end

def self.create_job(zk, bid, data)
def self.create_job(zk, bid, data, identifiers: {}, metadata: {})
id = QueueItem.create_id(zk, prefix_path)
job = Job.new(id, bid: bid, data: data)
job = Job.new(id, bid: bid, data: data, identifiers: identifiers, metadata: metadata)
job.set_data(zk, ZkKeys::BID, bid)
job.set_data(zk, ZkKeys::PRIORITY, job.priority)
job.set_data(zk, ZkKeys::SPACE_NEEEDED, job.space_needed)
job.set_data(zk, ZkKeys::CONFIGURATION, data)
job.set_data(zk, ZkKeys::IDENTIFIERS, identifiers) unless identifiers.empty?
job.set_data(zk, ZkKeys::METADATA, metadata) unless metadata.empty?
job.set_status(zk, JobState.init)
job.set_job_state_path(zk)
job.set_batch_state_path(zk)
Expand Down
2 changes: 2 additions & 0 deletions src/main/ruby/lib/merritt_zk_queue_item.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class ZkKeys
CONFIGURATION = 'configuration'
STATUS = 'status'
LOCK = 'lock'
IDENTIFIERS = 'identifiers'
METADATA = 'metadata'
end

##
Expand Down
49 changes: 46 additions & 3 deletions src/main/ruby/spec/zk_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,13 @@ def make_batch_json(s = 'bar', u = 'bid-uuid')
b2 = MerrittZK::Batch.create_batch(@zk, make_batch_json('bar2', 'bid-uuid2'))
@remap['bid0'] = b.id
@remap['bid1'] = b2.id
MerrittZK::Batch.acquire_pending_batch(@zk)
MerrittZK::Batch.acquire_pending_batch(@zk)
expect(MerrittZK::Batch.acquire_pending_batch(@zk)).to be_nil
bb = MerrittZK::Batch.acquire_pending_batch(@zk)
expect(bb).to_not be_nil
expect(bb.status.status).to eq(:Processing)
bb = MerrittZK::Batch.acquire_pending_batch(@zk)
expect(bb).to_not be_nil
bb = MerrittZK::Batch.acquire_pending_batch(@zk)
expect(bb).to be_nil
end
end

Expand Down Expand Up @@ -767,6 +771,45 @@ def make_batch_json(s = 'bar', u = 'bid-uuid')
ids = MerrittZK::Batch.delete_completed_batches(@zk)
expect(ids.include?(bbbb.id)).to be(true)
end

it :job_create_config do |_x|
b = MerrittZK::Batch.create_batch(@zk, make_batch_json)
@remap['bid0'] = b.id
bb = MerrittZK::Batch.acquire_pending_batch(@zk)
expect(b.id).to eq(bb.id)
jc = { profile_name: 'a', submitter: 'b', payload_url: 'c', payload_type: 'd', response_type: 'e' }
j = MerrittZK::Job.create_job(@zk, bb.id, jc)
@remap['jid0'] = j.id
expect(j.bid).to eq(bb.id)
bb.unlock(@zk)
end

it :job_create_config_ident do |_x|
b = MerrittZK::Batch.create_batch(@zk, make_batch_json)
@remap['bid0'] = b.id
bb = MerrittZK::Batch.acquire_pending_batch(@zk)
expect(b.id).to eq(bb.id)
jc = { profile_name: 'a', submitter: 'b', payload_url: 'c', payload_type: 'd', response_type: 'e' }
ji = { primary_id: 'f', local_id: %w[g h] }
j = MerrittZK::Job.create_job(@zk, bb.id, jc, identifiers: ji)
@remap['jid0'] = j.id
expect(j.bid).to eq(bb.id)
bb.unlock(@zk)
end

it :job_create_config_ident_metadata do |_x|
b = MerrittZK::Batch.create_batch(@zk, make_batch_json)
@remap['bid0'] = b.id
bb = MerrittZK::Batch.acquire_pending_batch(@zk)
expect(b.id).to eq(bb.id)
jc = { profile_name: 'a', submitter: 'b', payload_url: 'c', payload_type: 'd', response_type: 'e' }
ji = { primary_id: 'f', local_id: %w[g h] }
jm = { erc_who: 'i', erc_what: 'j', erc_when: 'k', erc_where: 'l' }
j = MerrittZK::Job.create_job(@zk, bb.id, jc, identifiers: ji, metadata: jm)
@remap['jid0'] = j.id
expect(j.bid).to eq(bb.id)
bb.unlock(@zk)
end
end

describe 'Lock tests' do
Expand Down
10 changes: 7 additions & 3 deletions src/test/java/org/cdlib/mrt/zk/ZKTestIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,13 @@ public void batchAcquire() throws KeeperException, InterruptedException, Merritt
remap.put("bid0", b.id());
Batch b2 = Batch.createBatch(zk, fooBar("2", "bid-uuid2"));
remap.put("bid1", b2.id());
assertNotNull(Batch.acquirePendingBatch(zk));
assertNotNull(Batch.acquirePendingBatch(zk));
assertNull(Batch.acquirePendingBatch(zk));
Batch bb = Batch.acquirePendingBatch(zk);
assertNotNull(bb);
assertEquals(bb.status(), BatchState.Processing);
bb = Batch.acquirePendingBatch(zk);
assertNotNull(bb);
bb = Batch.acquirePendingBatch(zk);
assertNull(bb);
}

@Test
Expand Down
24 changes: 12 additions & 12 deletions test-cases.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ batch_acquire:
batchID: bid-uuid
/batches/bid0/states: null
/batches/bid0/status:
status: Pending
status: Processing
last_modified: now
/batches/bid0/lock: null
/batches/bid1/submission:
foo: bar2
batchID: bid-uuid2
/batches/bid1/states: null
/batches/bid1/status:
status: Pending
status: Processing
last_modified: now
/batches/bid1/lock: null
create_job:
Expand All @@ -120,7 +120,7 @@ create_job:
foo: bar
batchID: bid-uuid
/batches/bid0/status:
status: Pending
status: Processing
last_modified: now
/jobs/jid0/bid: bid0
/jobs/jid0/configuration:
Expand All @@ -143,7 +143,7 @@ create_job_state_change:
foo: bar
batchID: bid-uuid
/batches/bid0/status:
status: Pending
status: Processing
last_modified: now
/jobs/jid0/bid: bid0
/jobs/jid0/configuration:
Expand All @@ -166,7 +166,7 @@ load_job_state_change:
foo: bar
batchID: bid-uuid
/batches/bid0/status:
status: Pending
status: Processing
last_modified: now
/jobs/jid0/bid: bid0
/jobs/jid0/configuration:
Expand All @@ -188,7 +188,7 @@ acquire_pending_job:
foo: bar
batchID: bid-uuid
/batches/bid0/status:
status: Pending
status: Processing
last_modified: now
/jobs/jid0/bid: bid0
/jobs/jid0/lock: null
Expand All @@ -213,7 +213,7 @@ acquire_lowest_priority_job:
foo: bar
batchID: bid-uuid
/batches/bid0/status:
status: Pending
status: Processing
last_modified: now
/jobs/jid0/bid: bid0
/jobs/jid0/configuration:
Expand Down Expand Up @@ -260,7 +260,7 @@ job_happy_path:
foo: bar
batchID: bid-uuid
/batches/bid0/status:
status: Pending
status: Processing
last_modified: now
/jobs/jid0/bid: bid0
/jobs/jid0/configuration:
Expand Down Expand Up @@ -377,7 +377,7 @@ job_happy_path_with_delete:
foo: bar
batchID: bid-uuid
/batches/bid0/status:
status: Pending
status: Processing
last_modified: now
/jobs/jid0/bid: bid0
/jobs/jid0/configuration:
Expand Down Expand Up @@ -416,7 +416,7 @@ job_create_config:
foo: bar
batchID: bid-uuid
/batches/bid0/status:
status: Pending
status: Processing
last_modified: now
/jobs/jid0/bid: bid0
/jobs/jid0/configuration:
Expand All @@ -442,7 +442,7 @@ job_create_config_ident:
foo: bar
batchID: bid-uuid
/batches/bid0/status:
status: Pending
status: Processing
last_modified: now
/jobs/jid0/bid: bid0
/jobs/jid0/configuration:
Expand Down Expand Up @@ -473,7 +473,7 @@ job_create_config_ident_metadata:
foo: bar
batchID: bid-uuid
/batches/bid0/status:
status: Pending
status: Processing
last_modified: now
/jobs/jid0/bid: bid0
/jobs/jid0/configuration:
Expand Down

0 comments on commit 19e059d

Please sign in to comment.