Skip to content

Commit

Permalink
[CLIENT-1984] Support scan-show and query-show info commands
Browse files Browse the repository at this point in the history
The execute task now uses the new INFO commands for job progress if supported.
  • Loading branch information
khaf committed Nov 28, 2022
1 parent 2c718a5 commit c43efee
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 14 deletions.
13 changes: 13 additions & 0 deletions lib/aerospike/features.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,18 @@ module Features

# Server supports the 'blob-bits' command
BLOB_BITS = :'blob-bits'

# Server supports resumable partition scans
PARTITION_SCAN = :'pscans'

# Server supports the 'query-show' command to check for the
# progress of the scans and queries
QUERY_SHOW = :'query-show'

# Server supports the batch command for all types of operations, including wrties
BATCH_ANY = :'batch-any'

# Server supports resumable partition queries
PARTITION_QUERY = :'pquery'
end
end
31 changes: 17 additions & 14 deletions lib/aerospike/task/execute_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

module Aerospike

private

# ExecuteTask is used to poll for long running server execute job completion.
Expand All @@ -29,19 +28,24 @@ def initialize(cluster, statement)
self
end

# IsDone queries all nodes for task completion status.
# queries all nodes for task completion status.
def all_nodes_done?

if @scan
command = 'scan-list'
else
command = 'query-list'
end
modul = @scan ? "scan" : "query"
cmd1 = "query-show:trid=#{@task_id}"
cmd2 = modul + "-show:trid=#{@task_id}"
cmd3 = "jobs:module=" + modul + ";cmd=get-job;trid=#{@task_id}"

nodes = @cluster.nodes
done = false

nodes.each do |node|
command = cmd3
if node.supports_feature?(Aerospike::Features::PARTITION_QUERY)
command = cmd1
elsif node.supports_feature?(Aerospike::Features::QUERY_SHOW)
command = cmd2
end

conn = node.get_connection(0)
responseMap, _ = Info.request(conn, command)
node.put_connection(conn)
Expand All @@ -58,28 +62,27 @@ def all_nodes_done?

b = index + find.length
response = response[b, response.length]
find = 'job_status='
find = "job_status="
index = response.index(find)

next unless index

b = index + find.length
response = response[b, response.length]
e = response.index(':')
e = response.index(":")
status = response[0, e]

case status
when 'ABORTED'
when "ABORTED"
raise Aerospike::Exceptions::QueryTerminated
when 'IN PROGRESS'
when "IN PROGRESS"
return false
when 'DONE'
when "DONE"
done = true
end
end

done
end

end
end

0 comments on commit c43efee

Please sign in to comment.