Skip to content

Commit

Permalink
A method bucket.query now sent requests until get last record from se… (
Browse files Browse the repository at this point in the history
#50)

A method bucket.query now sent requests until get last record from server

Co-authored-by: Rustam Galikhanov <[email protected]>
  • Loading branch information
Rumpelshtinskiy and Rumpelshtinskiy authored Sep 26, 2024
1 parent 582fbd3 commit 7cd8a30
Show file tree
Hide file tree
Showing 37 changed files with 195 additions and 185 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- ServerClient.isAlive() method to check if the storage engine is
working [PR-17](https://github.com/reductstore/reduct-java/pull/17)
- TokenClient.getTokens() method to get a list of tokens [PR-20](https://github.com/reductstore/reduct-java/pull/20)
- EntryClient.writeRecord(Bucket bucket, Entry<?> body) method to add an record to a
- EntryClient.writeRecord(Bucket bucket, Entry<?> body) method to add a record to a
bucket [PR-25](https://github.com/reductstore/reduct-java/issues/25)
- EntryClient.getRecord(Bucket bucket, Entry<?> body) method to get an record from a
- EntryClient.getRecord(Bucket bucket, Entry<?> body) method to get a record from a
bucket [PR-25](https://github.com/reductstore/reduct-java/issues/27)
- Resolve bug: Bucket.query handle only one HTTP GET request. The query now returns the records until gets last
bucket [PR-50](https://github.com/reductstore/reduct-java/issues/47)

### Infrastructure:

Expand Down
68 changes: 35 additions & 33 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,46 +39,48 @@ jar {

tasks.named('test') {
useJUnitPlatform()
dependsOn(tasks.assemble)
}

import com.vanniktech.maven.publish.SonatypeHost
tasks.register('publishingToCentralMaven') {
println 'Publishing to Central Maven is started.'
mavenPublishing {
publishToMavenCentral(SonatypeHost.CENTRAL_PORTAL)
signAllPublications()

coordinates(group, archivesBaseName, version)
pom {
name = archivesBaseName
description = "ReductStore Client SDK for Java"
url = "https://www.reduct.store/docs"

mavenPublishing {
publishToMavenCentral(SonatypeHost.CENTRAL_PORTAL)
signAllPublications()

coordinates(group, archivesBaseName, version)
pom {
name = archivesBaseName
description = "ReductStore Client SDK for Java"
url = "https://www.reduct.store/docs"

licenses {
license {
name = "MIT License"
url = "https://opensource.org/licenses/MIT"
}
}
scm {
url = "https://github.com/reductstore/reduct-java.git"
connection = "scm:git:git://github.com/reductstore/reduct-java.git"
developerConnection = "scm:git:ssh://github.com/reductstore/reduct-java.git"
}
developers {
developer {
id = "mtp"
name = "matepocz"
licenses {
license {
name = "MIT License"
url = "https://opensource.org/licenses/MIT"
}
}
developer {
id = "Rumpelshtinskiy"
name = "Rustam Galikhanov"
scm {
url = "https://github.com/reductstore/reduct-java.git"
connection = "scm:git:git://github.com/reductstore/reduct-java.git"
developerConnection = "scm:git:ssh://github.com/reductstore/reduct-java.git"
}
developer {
id = "reductsoftware"
name = "ReductSoftware UG"
email = "[email protected]"
developers {
developer {
id = "mtp"
name = "matepocz"
}
developer {
id = "Rumpelshtinskiy"
name = "Rustam Galikhanov"
}
developer {
id = "reductsoftware"
name = "ReductSoftware UG"
email = "[email protected]"
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static store.reduct.utils.http.HttpHeaders.*;

Expand Down Expand Up @@ -260,7 +261,7 @@ public Record getMetaInfo(String entryName, Long timestamp) throws ReductExcepti
* @param ttl
* @return
*/
public Iterator<Record> query(String entryName, Integer start, Integer stop, Integer ttl) {
public Iterator<Record> query(String entryName, Long start, Long stop, Long ttl) throws ReductException, IllegalArgumentException {
if(Strings.isBlank(name) || Strings.isBlank(entryName) || Objects.isNull(start) || Objects.isNull(stop) || Objects.isNull(ttl))
{
throw new ReductException("Validation error");
Expand All @@ -272,102 +273,176 @@ public Iterator<Record> query(String entryName, Integer start, Integer stop, Int
.GET();
HttpResponse<String> response = reductClient.send(builder, HttpResponse.BodyHandlers.ofString());
QueryId queryId = JsonUtils.parseObject(response.body(), QueryId.class);
return getRecords(entryName, queryId.getId()).iterator();

return new RecordIterator(name, entryName, queryId.getId(), reductClient.getServerProperties().getBaseUrl());
}

/**
* Get a bulk of records from an entry
* @param entryName
* @param queryId
* @return
* @throws ReductException
* @throws IllegalArgumentException
*/
Stream<Record> getRecords(@NonNull String entryName, Long queryId) throws ReductException, IllegalArgumentException {
if(Objects.isNull(queryId))
public Iterator<Record> getMetaInfos(String entryName, Long start, Long stop, Long ttl) throws ReductException, IllegalArgumentException {
if(Strings.isBlank(name) || Strings.isBlank(entryName) || Objects.isNull(start) || Objects.isNull(stop) || Objects.isNull(ttl))
{
throw new ReductException("Validation error");
}
URI uri = URI.create(reductClient.getServerProperties().getBaseUrl() + String.format(RecordURL.GET_ENTRIES.getUrl(), name, entryName) + new Queries("q", queryId));
URI uri = URI.create(reductClient.getServerProperties().getBaseUrl() +
String.format(RecordURL.QUERY.getUrl(), name, entryName) + new Queries("start", start).add("stop", stop).add("ttl", ttl));
HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(uri)
.GET();
HttpResponse<byte[]> httpResponse = reductClient.send(builder, HttpResponse.BodyHandlers.ofByteArray());
ByteBuffer byteBuffer = ByteBuffer.wrap(httpResponse.body());
return httpResponse.headers()
.map()
.entrySet()
.stream()
.filter(ent -> ent.getKey().contains(getXReductTimeWithUnderscoreHeader()))
.map(ent -> {
String ts = ent.getKey().substring(getXReductTimeWithUnderscoreHeader().length());
String[] split = ent.getValue().get(0).split(",");
if(split.length < 2) {
throw new ReductException(String.format("Headers has a wrong format for timestamp: %s", ts));
}
try {
int length = Integer.parseInt(split[0]);
String type = split[1];
byte[] tempBuf = new byte[length];
byteBuffer.get(tempBuf);
return Record.builder()
.body(tempBuf)
.entryName(entryName)
.timestamp(Optional.of(ts).map(Long::parseLong).orElseThrow(() -> new ReductException(X_REDUCT_TIME_IS_NOT_SUCH_LONG_FORMAT)))
.type(type)
.length(length)
.build();
HttpResponse<String> response = reductClient.send(builder, HttpResponse.BodyHandlers.ofString());
QueryId queryId = JsonUtils.parseObject(response.body(), QueryId.class);

return new MetaInfoIterator(name, entryName, queryId.getId(), reductClient.getServerProperties().getBaseUrl());
}

private boolean isNotValidRecord(Record val) {
return Strings.isBlank(val.getEntryName())
|| val.getTimestamp() <= 0
|| Objects.isNull(val.getBody());
}

private class RecordIterator implements Iterator<Record> {
@Getter(AccessLevel.PACKAGE)
private final HttpRequest.Builder builder;
private final String recordEntryName;
@Getter(AccessLevel.PACKAGE)
private final PriorityBlockingQueue<HeaderInstance> headerInstances = new PriorityBlockingQueue<>(8, Comparator.comparingLong(instance -> instance.ts));
private byte[] body;
@Setter(AccessLevel.PACKAGE)
@Getter(AccessLevel.PACKAGE)
private boolean last = false;
boolean hasNextRecord() {
return !headerInstances.isEmpty();
}
private RecordIterator(String bucketName, String recordEntryName, Long queryId, String baseUrl) {
this(recordEntryName, queryId, HttpRequest.newBuilder()
.uri(URI.create(baseUrl + String.format(RecordURL.GET_ENTRIES.getUrl(), bucketName, recordEntryName) + new Queries("q", queryId)))
.GET());
}

private RecordIterator(String recordEntryName, Long queryId, HttpRequest.Builder builder) {
if(Objects.isNull(queryId))
{
throw new ReductException("Validation error: queryId is null");
}
this.recordEntryName = recordEntryName;
this.builder = builder;
}

@Override
public boolean hasNext() {
if(hasNextRecord()) {
return true;
}
if(!isLast()) {
HttpResponse<byte[]> httpResponse = reductClient.send(builder, HttpResponse.BodyHandlers.ofByteArray());
if (httpResponse.statusCode() != 204) {
body = httpResponse.body();
int offset = 0;
for (Map.Entry<String, List<String>> ent : httpResponse.headers().map().entrySet()) {
if (ent.getKey().contains(getXReductTimeWithUnderscoreHeader())) {
String ts = ent.getKey().substring(getXReductTimeWithUnderscoreHeader().length());
String[] split = ent.getValue().get(0).split(",");
if (split.length < 2) {
throw new ReductException(String.format("Headers has a wrong format for timestamp: %s", ts));
}
try {
int length = Integer.parseInt(split[0]);
String type = split[1];
headerInstances.put(HeaderInstance.builder()
.ts(Optional.of(ts).map(Long::parseLong).orElseThrow(() -> new ReductException(X_REDUCT_TIME_IS_NOT_SUCH_LONG_FORMAT)))
.type(type)
.length(length)
.offset(offset)
.build());
offset += length;
} catch (NumberFormatException ex) {
throw new ReductException(CONTENT_LENGTH_IS_NOT_SET_IN_THE_RECORD);
}
}
}
}
catch (NumberFormatException ex) {
throw new ReductException(CONTENT_LENGTH_IS_NOT_SET_IN_THE_RECORD);
else {
setLast(true);
}
});
}
return hasNextRecord();
}

@Override
public Record next() {
if(last) {
throw new NoSuchElementException();
}
else if(Objects.isNull(body) || !hasNextRecord()) {
throw new ReductException("Invoke hasNext() method first");
}
HeaderInstance instance = headerInstances.poll();
ByteBuffer byteBuffer = ByteBuffer.wrap(body);
byte[] nextBody = new byte[instance.length];
byteBuffer.get(nextBody, instance.getOffset(), instance.getLength());

return Record.builder()
.body(nextBody)
.entryName(recordEntryName)
.timestamp(instance.getTs())
.type(instance.getType())
.length(instance.getLength())
.build();
}
}

private class MetaInfoIterator extends RecordIterator {

private MetaInfoIterator(String bucketName, String recordEntryName, Long queryId, String baseUrl) {
super(bucketName, recordEntryName, queryId, baseUrl);

Collection<Record> getMetaInfos(Record record, Long queryId) throws ReductException, IllegalArgumentException {
if(Strings.isBlank(name) || Strings.isBlank(record.getEntryName()) || Objects.isNull(queryId))
{
throw new ReductException("Validation error");
}
URI uri = URI.create(reductClient.getServerProperties().getBaseUrl() + String.format(RecordURL.GET_ENTRIES.getUrl(), name, record.getEntryName()) + new Queries("q", queryId));
HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(uri)
.method("HEAD", HttpRequest.BodyPublishers.noBody());
HttpResponse<byte[]> httpResponse = reductClient.send(builder, HttpResponse.BodyHandlers.ofByteArray());
return httpResponse.headers()
.map()
.entrySet()
.stream()
.filter(ent -> ent.getKey().contains(getXReductTimeWithUnderscoreHeader()))
.map(ent -> {
String ts = ent.getKey().substring(getXReductTimeWithUnderscoreHeader().length());
String[] split = ent.getValue().get(0).split(",");
if(split.length < 2) {
throw new ReductException(String.format("Headers has a wrong format for timestamp: %s", ts));
}
try {
int length = Integer.parseInt(split[0]);
String type = split[1];
byte[] tempBuf = new byte[length];
return Record.builder()
.body(tempBuf)
.entryName(record.getEntryName())
.timestamp(Optional.of(ts).map(Long::parseLong).orElseThrow(() -> new ReductException(X_REDUCT_TIME_IS_NOT_SUCH_LONG_FORMAT)))
.type(type)
.length(length)
.build();
private MetaInfoIterator(String recordEntryName, Long queryId, HttpRequest.Builder builder) {
super(recordEntryName, queryId, builder.method("HEAD", HttpRequest.BodyPublishers.noBody()));
}

@Override
public boolean hasNext() {
if(hasNextRecord()) {
return true;
}
if(!isLast()) {
HttpResponse<byte[]> httpResponse = reductClient.send(getBuilder(), HttpResponse.BodyHandlers.ofByteArray());
if (httpResponse.statusCode() != 204) {
for (Map.Entry<String, List<String>> ent : httpResponse.headers().map().entrySet()) {
if (ent.getKey().contains(getXReductTimeWithUnderscoreHeader())) {
String ts = ent.getKey().substring(getXReductTimeWithUnderscoreHeader().length());
String[] split = ent.getValue().get(0).split(",");
if (split.length < 2) {
throw new ReductException(String.format("Headers has a wrong format for timestamp: %s", ts));
}
try {
int length = Integer.parseInt(split[0]);
String type = split[1];
getHeaderInstances().put(HeaderInstance.builder()
.ts(Optional.of(ts).map(Long::parseLong).orElseThrow(() -> new ReductException(X_REDUCT_TIME_IS_NOT_SUCH_LONG_FORMAT)))
.type(type)
.length(length)
.build());
} catch (NumberFormatException ex) {
throw new ReductException(CONTENT_LENGTH_IS_NOT_SET_IN_THE_RECORD);
}
}
}
}
catch (NumberFormatException ex) {
throw new ReductException(CONTENT_LENGTH_IS_NOT_SET_IN_THE_RECORD);
else {
setLast(true);
}
})
.collect(Collectors.toCollection(LinkedList::new));
}
return hasNextRecord();
}
}

private boolean isNotValidRecord(Record record) {
return Strings.isBlank(record.getEntryName())
|| record.getTimestamp() <= 0
|| Objects.isNull(record.getBody());
@Data
@Builder
private static class HeaderInstance {
Long ts;
String type;
int length;
int offset;
}
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@
import org.junit.jupiter.api.Test;
import store.reduct.utils.http.Queries;

import javax.xml.transform.stream.StreamResult;

import java.util.Iterator;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;

public class QueriesTest {
Expand Down
Loading

0 comments on commit 7cd8a30

Please sign in to comment.