diff --git a/README.md b/README.md index 6521172b..ff5813e7 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ repositories { } dependencies { - implementation group: 'org.radarbase', name: 'radar-commons', version: '0.12.3' + implementation group: 'org.radarbase', name: 'radar-commons', version: '0.13.0' } ``` @@ -69,7 +69,7 @@ repositories { } dependencies { - implementation group: 'org.radarbase', name: 'radar-commons-server', version: '0.12.3' + implementation group: 'org.radarbase', name: 'radar-commons-server', version: '0.13.0' } ``` @@ -83,7 +83,7 @@ repositories { } dependencies { - testImplementation group: 'org.radarbase', name: 'radar-commons-testing', version: '0.12.3' + testImplementation group: 'org.radarbase', name: 'radar-commons-testing', version: '0.13.0' } ``` @@ -96,7 +96,7 @@ repositories { } dependencies { - runtimeOnly group: 'org.radarbase', name: 'radar-commons-unsafe', version: '0.12.3' + runtimeOnly group: 'org.radarbase', name: 'radar-commons-unsafe', version: '0.13.0' } ``` @@ -121,7 +121,7 @@ configurations.all { } dependencies { - compile group: 'org.radarbase', name: 'radar-commons', version: '0.12.4-SNAPSHOT' + compile group: 'org.radarbase', name: 'radar-commons', version: '0.13.1-SNAPSHOT' } ``` diff --git a/build.gradle b/build.gradle index 05c480b0..58b8a60b 100644 --- a/build.gradle +++ b/build.gradle @@ -29,7 +29,7 @@ subprojects { // Configuration // //---------------------------------------------------------------------------// - version = '0.12.3' + version = '0.13.0' group = 'org.radarbase' ext.githubRepoName = 'RADAR-base/radar-commons' diff --git a/radar-commons-unsafe/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java b/radar-commons-unsafe/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java index 6f847a60..4a74187c 100644 --- a/radar-commons-unsafe/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java +++ b/radar-commons-unsafe/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java @@ -188,7 +188,7 @@ protected DatumReader getDatumReader(Schema writerSchema, Schema readerSchema } /** - * Normalizes the reader schema, puts the resolved schema into the cache. + * Normalizes the reader schema, puts the resolved schema into the cache. *
  • * * diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/AvroDataMapperFactory.java b/radar-commons/src/main/java/org/radarbase/producer/rest/AvroDataMapperFactory.java index e7a84623..51b68772 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/AvroDataMapperFactory.java +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/AvroDataMapperFactory.java @@ -3,7 +3,6 @@ import static org.apache.avro.JsonProperties.NULL_VALUE; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -103,19 +102,9 @@ public AvroDataMapper createMapper(Schema from, Schema to, final Object defaultV } catch (SchemaValidationException ex) { if (defaultVal != null) { if (defaultVal == NULL_VALUE) { - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - return null; - } - }; + return obj -> null; } else { - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - return defaultVal; - } - }; + return obj -> defaultVal; } } else { throw ex; @@ -142,12 +131,7 @@ private static AvroDataMapper mapEnum(Schema from, final Schema to, Object defau "Cannot map enum from non-string or enum type")); } if (containsAll) { - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - return new GenericData.EnumSymbol(to, obj.toString()); - } - }; + return obj -> new GenericData.EnumSymbol(to, obj.toString()); } else { String defaultString = (String) defaultVal; if (defaultString == null && to.hasEnumSymbol("UNKNOWN")) { @@ -157,27 +141,19 @@ public Object convertAvro(Object obj) { throw new SchemaValidationException(to, from, new IllegalArgumentException( "Cannot map enum symbols without default value")); } else { - final GenericEnumSymbol symbol = new GenericData.EnumSymbol(to, defaultString); - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - String value = obj.toString(); - if (to.hasEnumSymbol(value)) { - return new GenericData.EnumSymbol(to, value); - } else { - return symbol; - } + GenericEnumSymbol symbol = new GenericData.EnumSymbol(to, defaultString); + return obj -> { + String value = obj.toString(); + if (to.hasEnumSymbol(value)) { + return new GenericData.EnumSymbol(to, value); + } else { + return symbol; } }; } } } else if (from.getType() == Schema.Type.ENUM && to.getType() == Schema.Type.STRING) { - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - return obj.toString(); - } - }; + return Object::toString; } else { throw new SchemaValidationException(to, from, new IllegalArgumentException( "Cannot map unknown type with enum.")); @@ -244,40 +220,15 @@ public Number stringToNumber(String obj) { } else { switch (to.getType()) { case INT: - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - return ((Number) obj).intValue(); - } - }; + return obj -> ((Number) obj).intValue(); case LONG: - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - return ((Number) obj).longValue(); - } - }; + return obj -> ((Number) obj).longValue(); case DOUBLE: - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - return Double.valueOf(obj.toString()); - } - }; + return obj -> Double.valueOf(obj.toString()); case FLOAT: - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - return ((Number) obj).floatValue(); - } - }; + return obj -> ((Number) obj).floatValue(); case STRING: - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - return obj.toString(); - } - }; + return Object::toString; default: throw new SchemaValidationException(to, from, new IllegalArgumentException( "Cannot map numeric type with non-numeric type")); @@ -318,14 +269,11 @@ private AvroDataMapper mapUnion(Schema from, Schema to, Object defaultVal) if (defaultVal != null) { final Object actualDefault = getDefaultValue(defaultVal, to); final AvroDataMapper subMapper = createMapper(resolvedFrom, to, defaultVal); - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - if (obj == null) { - return actualDefault; - } else { - return subMapper.convertAvro(obj); - } + return obj -> { + if (obj == null) { + return actualDefault; + } else { + return subMapper.convertAvro(obj); } }; } else { @@ -335,14 +283,11 @@ public Object convertAvro(Object obj) { } else { Schema toNonNull = nonNullUnionSchema(to); final AvroDataMapper unionMapper = createMapper(resolvedFrom, toNonNull, defaultVal); - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - if (obj == null) { - return null; - } else { - return unionMapper.convertAvro(obj); - } + return obj -> { + if (obj == null) { + return null; + } else { + return unionMapper.convertAvro(obj); } }; } @@ -357,16 +302,13 @@ private AvroDataMapper mapArray(Schema from, Schema to) } final AvroDataMapper subMapper = createMapper(from.getElementType(), to.getElementType(), null); - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - List array = (List) obj; - List toArray = new ArrayList<>(array.size()); - for (Object val : array) { - toArray.add(subMapper.convertAvro(val)); - } - return toArray; + return obj -> { + List array = (List) obj; + List toArray = new ArrayList<>(array.size()); + for (Object val : array) { + toArray.add(subMapper.convertAvro(val)); } + return toArray; }; } @@ -378,17 +320,14 @@ private AvroDataMapper mapMap(Schema from, Schema to) throws SchemaValidationExc } final AvroDataMapper subMapper = createMapper(from.getValueType(), to.getValueType(), null); - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - @SuppressWarnings("unchecked") - Map map = (Map) obj; - Map toMap = new HashMap<>(map.size() * 4 / 3 + 1); - for (Map.Entry entry : map.entrySet()) { - toMap.put(entry.getKey().toString(), subMapper.convertAvro(entry.getValue())); - } - return toMap; + return obj -> { + @SuppressWarnings("unchecked") + Map map = (Map) obj; + Map toMap = new HashMap<>(map.size() * 4 / 3 + 1); + for (Map.Entry entry : map.entrySet()) { + toMap.put(entry.getKey().toString(), subMapper.convertAvro(entry.getValue())); } + return toMap; }; } @@ -399,46 +338,26 @@ private AvroDataMapper mapBytes(Schema from, final Schema to, final Object defau || (from.getType() == Type.FIXED && from.getFixedSize() == to.getFixedSize()))) { return IDENTITY_MAPPER; } else if (from.getType() == Type.FIXED && to.getType() == Schema.Type.BYTES) { - return new AvroDataMapper() { - @Override - public Object convertAvro(Object object) { - return ByteBuffer.wrap(((Fixed)object).bytes()); - } - }; + return object -> ByteBuffer.wrap(((Fixed)object).bytes()); } else if (from.getType() == Type.BYTES && to.getType() == Type.FIXED) { if (defaultVal == null) { throw new SchemaValidationException(to, from, new IllegalArgumentException( "Cannot map bytes to fixed without default value")); } - return new AvroDataMapper() { - @Override - public Object convertAvro(Object object) { - byte[] bytes = ((ByteBuffer) object).array(); - if (bytes.length == to.getFixedSize()) { - return GenericData.get().createFixed(null, bytes, to); - } else { - return GenericData.get().createFixed(null, (byte[]) defaultVal, to); - } + return object -> { + byte[] bytes = ((ByteBuffer) object).array(); + if (bytes.length == to.getFixedSize()) { + return GenericData.get().createFixed(null, bytes, to); + } else { + return GenericData.get().createFixed(null, (byte[]) defaultVal, to); } }; } else if (to.getType() == Type.STRING) { final Encoder encoder = Base64.getEncoder(); if (from.getType() == Type.FIXED) { - return new AvroDataMapper() { - @Override - public Object convertAvro(Object object) { - return new String(encoder.encode(((Fixed) object).bytes()), - StandardCharsets.UTF_8); - } - }; + return object -> encoder.encode(((Fixed) object).bytes()); } else { - return new AvroDataMapper() { - @Override - public Object convertAvro(Object object) { - return new String(encoder.encode(((ByteBuffer) object).array()), - StandardCharsets.UTF_8); - } - }; + return object -> encoder.encode(((ByteBuffer) object).array()); } } else { throw new SchemaValidationException(to, from, diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/BinaryRecordRequest.java b/radar-commons/src/main/java/org/radarbase/producer/rest/BinaryRecordRequest.java index 89b53ccc..4fe2a5c1 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/BinaryRecordRequest.java +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/BinaryRecordRequest.java @@ -41,7 +41,7 @@ public class BinaryRecordRequest implements RecordRequest { private RecordData records; private BinaryEncoder binaryEncoder; private final AvroWriter valueEncoder; - private int sourceIdPos; + private final int sourceIdPos; /** * Binary record request for given topic. diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRestClient.java b/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRestClient.java new file mode 100644 index 00000000..ae1a3343 --- /dev/null +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRestClient.java @@ -0,0 +1,117 @@ +package org.radarbase.producer.rest; + +import java.io.IOException; +import okhttp3.MediaType; +import okhttp3.Request; +import okhttp3.RequestBody; +import okio.BufferedSink; +import org.apache.avro.Schema; +import org.json.JSONException; +import org.json.JSONObject; +import org.radarbase.util.Strings; + +/** REST client for Confluent schema registry. */ +public class SchemaRestClient { + private final RestClient client; + + public SchemaRestClient(RestClient client) { + this.client = client; + } + + /** Retrieve schema metadata from server. */ + public ParsedSchemaMetadata retrieveSchemaMetadata(String subject, int version) + throws JSONException, IOException { + boolean isLatest = version <= 0; + + StringBuilder pathBuilder = new StringBuilder(50) + .append("/subjects/") + .append(subject) + .append("/versions/"); + + if (isLatest) { + pathBuilder.append("latest"); + } else { + pathBuilder.append(version); + } + + JSONObject node = requestJson(pathBuilder.toString()); + int newVersion = isLatest ? node.getInt("version") : version; + int schemaId = node.getInt("id"); + Schema schema = parseSchema(node.getString("schema")); + return new ParsedSchemaMetadata(schemaId, newVersion, schema); + } + + private JSONObject requestJson(String path) throws IOException { + Request request = client.requestBuilder(path) + .addHeader("Accept", "application/json") + .build(); + + String response = client.requestString(request); + return new JSONObject(response); + } + + + /** Parse a schema from string. */ + public Schema parseSchema(String schemaString) { + Schema.Parser parser = new Schema.Parser(); + return parser.parse(schemaString); + } + + /** Add a schema to a subject. */ + public int addSchema(String subject, Schema schema) throws IOException { + Request request = client.requestBuilder("/subjects/" + subject + "/versions") + .addHeader("Accept", "application/json") + .post(new SchemaRequestBody(schema)) + .build(); + + String response = client.requestString(request); + JSONObject node = new JSONObject(response); + return node.getInt("id"); + } + + /** Request metadata for a schema on a subject. */ + public ParsedSchemaMetadata requestMetadata(String subject, Schema schema) + throws IOException { + Request request = client.requestBuilder("/subjects/" + subject) + .addHeader("Accept", "application/json") + .post(new SchemaRequestBody(schema)) + .build(); + + String response = client.requestString(request); + JSONObject node = new JSONObject(response); + + return new ParsedSchemaMetadata(node.getInt("id"), + node.getInt("version"), schema); + } + + /** Retrieve schema metadata from server. */ + public Schema retrieveSchemaById(int id) + throws JSONException, IOException { + JSONObject node = requestJson("/schemas/ids/" + id); + return parseSchema(node.getString("schema")); + } + + private static class SchemaRequestBody extends RequestBody { + private static final byte[] SCHEMA = Strings.utf8("{\"schema\":"); + private static final MediaType CONTENT_TYPE = MediaType.parse( + "application/vnd.schemaregistry.v1+json; charset=utf-8"); + + private final Schema schema; + + private SchemaRequestBody(Schema schema) { + this.schema = schema; + } + + @Override + public MediaType contentType() { + return CONTENT_TYPE; + } + + @Override + public void writeTo(BufferedSink sink) throws IOException { + sink.write(SCHEMA); + sink.writeUtf8(JSONObject.quote(schema.toString())); + sink.writeByte('}'); + } + } +} diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java b/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java index ef5e5ce7..8b609397 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java @@ -17,23 +17,19 @@ package org.radarbase.producer.rest; import java.io.IOException; -import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import okhttp3.MediaType; -import okhttp3.Request; -import okhttp3.RequestBody; -import okio.BufferedSink; import org.apache.avro.Schema; -import org.apache.avro.Schema.Type; -import org.apache.avro.generic.GenericContainer; import org.json.JSONException; import org.json.JSONObject; import org.radarbase.config.ServerConfig; -import org.radarbase.util.Strings; +import org.radarbase.util.TimedInt; +import org.radarbase.util.TimedValue; +import org.radarbase.util.TimedVariable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,28 +39,24 @@ */ public class SchemaRetriever { private static final Logger logger = LoggerFactory.getLogger(SchemaRetriever.class); - private static final MediaType CONTENT_TYPE = MediaType.parse( - "application/vnd.schemaregistry.v1+json; charset=utf-8"); - private static final Schema NULL_SCHEMA = Schema.create(Type.NULL); - private static final Map, Schema> PRIMITIVE_SCHEMAS = new HashMap<>(); - private static final byte[] SCHEMA = Strings.utf8("{\"schema\":"); private static final long MAX_VALIDITY = 86400L; - static { - PRIMITIVE_SCHEMAS.put(Long.class, Schema.create(Type.LONG)); - PRIMITIVE_SCHEMAS.put(Integer.class, Schema.create(Type.INT)); - PRIMITIVE_SCHEMAS.put(Float.class, Schema.create(Type.FLOAT)); - PRIMITIVE_SCHEMAS.put(Double.class, Schema.create(Type.DOUBLE)); - PRIMITIVE_SCHEMAS.put(String.class, Schema.create(Type.STRING)); - PRIMITIVE_SCHEMAS.put(Boolean.class, Schema.create(Type.BOOLEAN)); - PRIMITIVE_SCHEMAS.put(byte[].class, Schema.create(Type.BYTES)); - } + private final ConcurrentMap> idCache = + new ConcurrentHashMap<>(); + private final ConcurrentMap schemaCache = new ConcurrentHashMap<>(); + private final ConcurrentMap> subjectVersionCache = + new ConcurrentHashMap<>(); + + private final SchemaRestClient restClient; + private final long cacheValidity; - private final ConcurrentMap cache = new ConcurrentHashMap<>(); - private final RestClient restClient; + public SchemaRetriever(RestClient client, long cacheValidity) { + restClient = new SchemaRestClient(client); + this.cacheValidity = cacheValidity; + } - private SchemaRetriever(RestClient client) { - restClient = client; + public SchemaRetriever(RestClient client) { + this(client, MAX_VALIDITY); } /** @@ -79,72 +71,29 @@ public SchemaRetriever(ServerConfig config, long connectionTimeout) { .build()); } - /** The subject in the Avro Schema Registry, given a Kafka topic. */ - protected static String subject(String topic, boolean ofValue) { - return topic + (ofValue ? "-value" : "-key"); - } - - /** Retrieve schema metadata from server. */ - protected ParsedSchemaMetadata retrieveSchemaMetadata(String subject, int version) - throws JSONException, IOException { - StringBuilder pathBuilder = new StringBuilder(50) - .append("/subjects/") - .append(subject) - .append("/versions/"); - if (version > 0) { - pathBuilder.append(version); - } else { - pathBuilder.append("latest"); - } - Request request = restClient.requestBuilder(pathBuilder.toString()) - .addHeader("Accept", "application/json") - .build(); - - String response = restClient.requestString(request); - JSONObject node = new JSONObject(response); - int newVersion = version < 1 ? node.getInt("version") : version; - int schemaId = node.getInt("id"); - Schema schema = parseSchema(node.getString("schema")); - return new ParsedSchemaMetadata(schemaId, newVersion, schema); - } - - /** Get schema metadata. Cached schema metadata will be used if present. */ - public ParsedSchemaMetadata getSchemaMetadata(String topic, boolean ofValue, int version) - throws JSONException, IOException { - String subject = subject(topic, ofValue); - TimedSchemaMetadata value = cache.get(subject); - if (value == null || value.isExpired()) { - value = new TimedSchemaMetadata(retrieveSchemaMetadata(subject, version)); - cache.put(subject, value); - } - return value.metadata; - } - - /** Parse a schema from string. */ - protected Schema parseSchema(String schemaString) { - Schema.Parser parser = new Schema.Parser(); - return parser.parse(schemaString); + /** + * Schema retriever for a Confluent Schema Registry. + * @param config schema registry configuration. + * @param connectionTimeout timeout in seconds. + * @param cacheValidity timeout in seconds for considering a schema stale. + */ + public SchemaRetriever(ServerConfig config, long connectionTimeout, long cacheValidity) { + this(RestClient.global() + .server(Objects.requireNonNull(config)) + .timeout(connectionTimeout, TimeUnit.SECONDS) + .build(), cacheValidity); } /** * Add schema metadata to the retriever. This implementation only adds it to the cache. + * @return schema ID */ - public void addSchemaMetadata(String topic, boolean ofValue, ParsedSchemaMetadata metadata) + public int addSchema(String topic, boolean ofValue, Schema schema) throws JSONException, IOException { String subject = subject(topic, ofValue); - if (metadata.getId() == null) { - - Request request = restClient.requestBuilder("/subjects/" + subject + "/versions") - .addHeader("Accept", "application/json") - .post(new SchemaRequestBody(metadata.getSchema())) - .build(); - - String response = restClient.requestString(request); - JSONObject node = new JSONObject(response); - int schemaId = node.getInt("id"); - metadata.setId(schemaId); - } - cache.put(subject, new TimedSchemaMetadata(metadata)); + int id = restClient.addSchema(subject, schema); + cache(new ParsedSchemaMetadata(id, null, schema), subject, false); + return id; } /** @@ -155,84 +104,163 @@ public void addSchemaMetadata(String topic, boolean ofValue, ParsedSchemaMetadat public ParsedSchemaMetadata getOrSetSchemaMetadata(String topic, boolean ofValue, Schema schema, int version) throws JSONException, IOException { try { - return getSchemaMetadata(topic, ofValue, version); - } catch (IOException ex) { - logger.warn("Schema for {} value was not yet added to the schema registry.", topic); - ParsedSchemaMetadata metadata = new ParsedSchemaMetadata(null, null, schema); - addSchemaMetadata(topic, ofValue, metadata); - return metadata; + return getBySubjectAndVersion(topic, ofValue, version); + } catch (RestException ex) { + if (ex.getStatusCode() == 404) { + logger.warn("Schema for {} value was not yet added to the schema registry.", topic); + addSchema(topic, ofValue, schema); + return getMetadata(topic, ofValue, schema, version <= 0); + } else { + throw ex; + } } } - private static class SchemaRequestBody extends RequestBody { - private final Schema schema; - - private SchemaRequestBody(Schema schema) { - this.schema = schema; + /** Get a schema by its ID. */ + public Schema getById(int id) throws IOException { + TimedValue value = idCache.get(id); + if (value == null || value.isExpired()) { + value = new TimedValue<>(restClient.retrieveSchemaById(id), cacheValidity); + idCache.put(id, value); + schemaCache.put(value.value, new TimedInt(id, cacheValidity)); } + return value.value; + } - @Override - public MediaType contentType() { - return CONTENT_TYPE; + /** Gets a schema by ID and check that it is present in the given topic. */ + public ParsedSchemaMetadata getBySubjectAndId(String topic, boolean ofValue, int id) + throws IOException { + Schema schema = getById(id); + String subject = subject(topic, ofValue); + ParsedSchemaMetadata metadata = getCachedVersion(subject, id, null, schema); + return metadata != null ? metadata : getMetadata(topic, ofValue, schema); + } + + /** Get schema metadata. Cached schema metadata will be used if present. */ + public ParsedSchemaMetadata getBySubjectAndVersion(String topic, boolean ofValue, int version) + throws JSONException, IOException { + String subject = subject(topic, ofValue); + ConcurrentMap versionMap = computeIfAbsent(subjectVersionCache, subject, + new ConcurrentHashMap<>()); + TimedInt id = versionMap.get(Math.max(version, 0)); + if (id == null || id.isExpired()) { + ParsedSchemaMetadata metadata = restClient.retrieveSchemaMetadata(subject, version); + cache(metadata, subject, version <= 0); + return metadata; + } else { + Schema schema = getById(id.value); + ParsedSchemaMetadata metadata = getCachedVersion(subject, id.value, version, schema); + return metadata != null ? metadata : getMetadata(topic, ofValue, schema, version <= 0); } + } + + /** Get all schema versions in a subject. */ + public ParsedSchemaMetadata getMetadata(String topic, boolean ofValue, Schema schema) + throws IOException { + return getMetadata(topic, ofValue, schema, false); + } - @Override - public void writeTo(BufferedSink sink) throws IOException { - sink.write(SCHEMA); - sink.writeUtf8(JSONObject.quote(schema.toString())); - sink.writeByte('}'); + + /** Get the metadata of a specific schema in a topic. */ + public ParsedSchemaMetadata getMetadata(String topic, boolean ofValue, Schema schema, + boolean ofLatestVersion) throws IOException { + TimedInt id = schemaCache.get(schema); + String subject = subject(topic, ofValue); + + if (id != null && !id.isExpired()) { + ParsedSchemaMetadata metadata = getCachedVersion(subject, id.value, null, schema); + if (metadata != null) { + return metadata; + } } + + ParsedSchemaMetadata metadata = restClient.requestMetadata(subject, schema); + cache(metadata, subject, ofLatestVersion); + return metadata; } + /** - * Get the schema of a generic object. This supports null, primitive types, String, and - * {@link org.apache.avro.generic.GenericContainer}. - * @param object object of recognized CONTENT_TYPE - * @throws IllegalArgumentException if passed object is not a recognized CONTENT_TYPE + * Get cached metadata. + * @param subject schema registry subject + * @param id schema ID. + * @param reportedVersion version requested by the client. Null if no version was requested. + * This version will be used if the actual version was not cached. + * @param schema schema to use. + * @return metadata if present. Returns null if no metadata is cached or if no version is cached + * and the reportedVersion is null. */ - public static Schema getSchema(Object object) { - if (object == null) { - return NULL_SCHEMA; - } - Schema schema = PRIMITIVE_SCHEMAS.get(object.getClass()); - if (schema != null) { - return schema; - } - if (object instanceof GenericContainer) { - return ((GenericContainer)object).getSchema(); + protected ParsedSchemaMetadata getCachedVersion(String subject, int id, + Integer reportedVersion, Schema schema) { + Integer version = reportedVersion; + if (version == null || version <= 0) { + ConcurrentMap versions = subjectVersionCache.get(subject); + if (versions != null) { + for (Map.Entry entry : versions.entrySet()) { + if (!entry.getValue().isExpired() && entry.getKey() != 0 + && entry.getValue().value == id) { + version = entry.getKey(); + break; + } + } + } + if (version == null || version <= 0) { + return null; + } } - throw new IllegalArgumentException("Passed object " + object + " of class " - + object.getClass() + " can not be schematized. " - + "Pass null, a primitive CONTENT_TYPE or a GenericContainer."); + return new ParsedSchemaMetadata(id, version, schema); } - private static class TimedSchemaMetadata { - private final ParsedSchemaMetadata metadata; - private final long expiry; + protected void cache(ParsedSchemaMetadata metadata, String subject, boolean latest) { + TimedInt id = new TimedInt(metadata.getId(), cacheValidity); + schemaCache.put(metadata.getSchema(), id); + if (metadata.getVersion() != null) { + ConcurrentMap versionCache = computeIfAbsent(subjectVersionCache, + subject, new ConcurrentHashMap<>()); - TimedSchemaMetadata(ParsedSchemaMetadata metadata) { - expiry = System.currentTimeMillis() + MAX_VALIDITY * 1000L; - this.metadata = Objects.requireNonNull(metadata); + versionCache.put(metadata.getVersion(), id); + if (latest) { + versionCache.put(0, id); + } } + idCache.put(metadata.getId(), new TimedValue<>(metadata.getSchema(), cacheValidity)); + } - boolean isExpired() { - return expiry < System.currentTimeMillis(); + /** + * Remove expired entries from cache. + */ + public void pruneCache() { + prune(schemaCache); + prune(idCache); + for (ConcurrentMap versionMap : subjectVersionCache.values()) { + prune(versionMap); } + } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; + /** + * Remove all entries from cache. + */ + public void clearCache() { + subjectVersionCache.clear(); + idCache.clear(); + schemaCache.clear(); + } + + /** The subject in the Avro Schema Registry, given a Kafka topic. */ + protected static String subject(String topic, boolean ofValue) { + return topic + (ofValue ? "-value" : "-key"); + } + + private static void prune(Map map) { + for (Entry entry : map.entrySet()) { + if (entry.getValue().isExpired()) { + map.remove(entry.getKey(), entry.getValue()); } - return metadata.equals(((TimedSchemaMetadata)o).metadata); } + } - @Override - public int hashCode() { - return metadata.hashCode(); - } + private static V computeIfAbsent(ConcurrentMap original, K key, V newValue) { + V existingValue = original.putIfAbsent(key, newValue); + return existingValue != null ? existingValue : newValue; } } diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/TopicRequestBody.java b/radar-commons/src/main/java/org/radarbase/producer/rest/TopicRequestBody.java index 43ed4fcd..353afb9c 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/TopicRequestBody.java +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/TopicRequestBody.java @@ -26,10 +26,10 @@ * TopicRequestData in a RequestBody. */ class TopicRequestBody extends RequestBody { - protected final RecordRequest data; + protected final RecordRequest data; private final MediaType mediaType; - TopicRequestBody(RecordRequest requestData, MediaType mediaType) { + TopicRequestBody(RecordRequest requestData, MediaType mediaType) { this.data = requestData; this.mediaType = mediaType; } diff --git a/radar-commons/src/main/java/org/radarbase/topic/AvroTopic.java b/radar-commons/src/main/java/org/radarbase/topic/AvroTopic.java index 351098fa..2b63875e 100644 --- a/radar-commons/src/main/java/org/radarbase/topic/AvroTopic.java +++ b/radar-commons/src/main/java/org/radarbase/topic/AvroTopic.java @@ -145,7 +145,7 @@ public boolean equals(Object o) { return false; } - AvroTopic topic = (AvroTopic) o; + AvroTopic topic = (AvroTopic) o; return keyClass == topic.getKeyClass() && valueClass == topic.getValueClass(); } diff --git a/radar-commons/src/main/java/org/radarbase/util/Base64.java b/radar-commons/src/main/java/org/radarbase/util/Base64.java index 60e6df86..62d630b1 100644 --- a/radar-commons/src/main/java/org/radarbase/util/Base64.java +++ b/radar-commons/src/main/java/org/radarbase/util/Base64.java @@ -25,7 +25,7 @@ package org.radarbase.util; -import java.util.Arrays; +import static java.nio.charset.StandardCharsets.UTF_8; // Since Android API 19 /** * This class consists exclusively of static methods for obtaining @@ -84,7 +84,7 @@ public static class Encoder { * index values into their "Base64 Alphabet" equivalents as specified * in "Table 1: The Base64 Alphabet" of RFC 2045 (and RFC 4648). */ - private static final byte[] BASE_64_BYTE = { + private static final byte[] BASE_64_CHAR = { 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', @@ -97,10 +97,6 @@ public static class Encoder { private Encoder() { } - private int outLength(int srclen) { - return 4 * ((srclen + 2) / 3); - } - /** * Encodes all bytes from the specified byte array into a newly-allocated * byte array using the {@link Base64} encoding scheme. The returned byte @@ -111,51 +107,36 @@ private int outLength(int srclen) { * @return A newly-allocated byte array containing the resulting * encoded bytes. */ - public byte[] encode(byte[] src) { - int len = outLength(src.length); // dst array size - byte[] dst = new byte[len]; - int ret = encode0(src, src.length, dst); - if (ret != dst.length) { - return Arrays.copyOf(dst, ret); + public String encode(byte[] src) { + int srcLen = src.length; + byte[] dst = new byte[4 * ((srcLen + 2) / 3)]; + int fullDataLen = srcLen / 3 * 3; + int dstP = 0; + int srcP = 0; + for (; srcP < fullDataLen; srcP += 3) { + int bits = (src[srcP] & 0xff) << 16 + | (src[srcP + 1] & 0xff) << 8 + | (src[srcP + 2] & 0xff); + dst[dstP++] = BASE_64_CHAR[(bits >>> 18) & 0x3f]; + dst[dstP++] = BASE_64_CHAR[(bits >>> 12) & 0x3f]; + dst[dstP++] = BASE_64_CHAR[(bits >>> 6) & 0x3f]; + dst[dstP++] = BASE_64_CHAR[bits & 0x3f]; } - return dst; - } - - private int encode0(byte[] src, int end, byte[] dst) { - int sp = 0; - int slen = end / 3 * 3; - int dp = 0; - while (sp < slen) { - int sl0 = Math.min(sp + slen, slen); - int dp0 = dp; - for (int sp0 = sp; sp0 < sl0; sp0 += 3) { - int bits = (src[sp0] & 0xff) << 16 - | (src[sp0 + 1] & 0xff) << 8 - | (src[sp0 + 2] & 0xff); - dst[dp0++] = BASE_64_BYTE[(bits >>> 18) & 0x3f]; - dst[dp0++] = BASE_64_BYTE[(bits >>> 12) & 0x3f]; - dst[dp0++] = BASE_64_BYTE[(bits >>> 6) & 0x3f]; - dst[dp0++] = BASE_64_BYTE[bits & 0x3f]; - } - int dlen = (sl0 - sp) / 3 * 4; - dp += dlen; - sp = sl0; - } - if (sp < end) { // 1 or 2 leftover bytes - int b0 = src[sp++] & 0xff; - dst[dp++] = BASE_64_BYTE[b0 >> 2]; - if (sp == end) { - dst[dp++] = BASE_64_BYTE[(b0 << 4) & 0x3f]; - dst[dp++] = '='; - dst[dp++] = '='; + if (srcP < srcLen) { // 1 or 2 leftover bytes + int b0 = src[srcP++] & 0xff; + dst[dstP++] = BASE_64_CHAR[b0 >> 2]; + if (srcP == srcLen) { + dst[dstP++] = BASE_64_CHAR[(b0 << 4) & 0x3f]; + dst[dstP++] = '='; } else { - int b1 = src[sp] & 0xff; - dst[dp++] = BASE_64_BYTE[(b0 << 4) & 0x3f | (b1 >> 4)]; - dst[dp++] = BASE_64_BYTE[(b1 << 2) & 0x3f]; - dst[dp++] = '='; + int b1 = src[srcP] & 0xff; + dst[dstP++] = BASE_64_CHAR[(b0 << 4) & 0x3f | (b1 >> 4)]; + dst[dstP++] = BASE_64_CHAR[(b1 << 2) & 0x3f]; } + dst[dstP] = '='; } - return dp; + + return new String(dst, UTF_8); } } } diff --git a/radar-commons/src/main/java/org/radarbase/util/RestUtils.java b/radar-commons/src/main/java/org/radarbase/util/RestUtils.java index 71b63702..6b90181a 100644 --- a/radar-commons/src/main/java/org/radarbase/util/RestUtils.java +++ b/radar-commons/src/main/java/org/radarbase/util/RestUtils.java @@ -23,7 +23,6 @@ import java.util.Arrays; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; @@ -40,12 +39,7 @@ public final class RestUtils { /** OkHttp3 default hostname verifier. */ public static final HostnameVerifier DEFAULT_HOSTNAME_VERIFIER = OkHostnameVerifier.INSTANCE; /** OkHttp3 hostname verifier for unsafe connections. */ - public static final HostnameVerifier UNSAFE_HOSTNAME_VERIFIER = new HostnameVerifier() { - @Override - public boolean verify(String hostname, SSLSession session) { - return true; - } - }; + public static final HostnameVerifier UNSAFE_HOSTNAME_VERIFIER = (hostname, session) -> true; /** Unsafe OkHttp3 trust manager that trusts all certificates. */ public static final TrustManager[] UNSAFE_TRUST_MANAGER = { diff --git a/radar-commons/src/main/java/org/radarbase/util/Strings.java b/radar-commons/src/main/java/org/radarbase/util/Strings.java index 72229cfe..e613eb8f 100644 --- a/radar-commons/src/main/java/org/radarbase/util/Strings.java +++ b/radar-commons/src/main/java/org/radarbase/util/Strings.java @@ -17,6 +17,7 @@ package org.radarbase.util; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Iterator; import java.util.regex.Pattern; @@ -26,7 +27,7 @@ */ @SuppressWarnings("PMD.ClassNamingConventions") public final class Strings { - private static final Charset UTF_8 = Charset.forName("UTF-8"); + private static final Charset UTF_8 = StandardCharsets.UTF_8; private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray(); private Strings() { diff --git a/radar-commons/src/main/java/org/radarbase/util/TimedInt.java b/radar-commons/src/main/java/org/radarbase/util/TimedInt.java new file mode 100644 index 00000000..108bbc54 --- /dev/null +++ b/radar-commons/src/main/java/org/radarbase/util/TimedInt.java @@ -0,0 +1,34 @@ +package org.radarbase.util; + +public class TimedInt implements TimedVariable { + public final int value; + private final long expiry; + + public TimedInt(int value, long validity) { + expiry = System.currentTimeMillis() + validity * 1000L; + this.value = value; + } + + @Override + public boolean isExpired() { + return expiry < System.currentTimeMillis(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TimedInt other = (TimedInt)o; + return value == other.value + && expiry == other.expiry; + } + + @Override + public int hashCode() { + return value; + } +} diff --git a/radar-commons/src/main/java/org/radarbase/util/TimedValue.java b/radar-commons/src/main/java/org/radarbase/util/TimedValue.java new file mode 100644 index 00000000..cf186055 --- /dev/null +++ b/radar-commons/src/main/java/org/radarbase/util/TimedValue.java @@ -0,0 +1,36 @@ +package org.radarbase.util; + +import java.util.Objects; + +public class TimedValue implements TimedVariable { + public final T value; + private final long expiry; + + public TimedValue(T value, long validity) { + expiry = System.currentTimeMillis() + validity * 1000L; + this.value = Objects.requireNonNull(value); + } + + @Override + public boolean isExpired() { + return expiry < System.currentTimeMillis(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TimedValue other = (TimedValue)o; + return value.equals(other.value) + && expiry == other.expiry; + } + + @Override + public int hashCode() { + return value.hashCode(); + } +} diff --git a/radar-commons/src/main/java/org/radarbase/util/TimedVariable.java b/radar-commons/src/main/java/org/radarbase/util/TimedVariable.java new file mode 100644 index 00000000..c4affa99 --- /dev/null +++ b/radar-commons/src/main/java/org/radarbase/util/TimedVariable.java @@ -0,0 +1,5 @@ +package org.radarbase.util; + +public interface TimedVariable { + boolean isExpired(); +} diff --git a/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRestClientTest.java b/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRestClientTest.java new file mode 100644 index 00000000..fd10e838 --- /dev/null +++ b/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRestClientTest.java @@ -0,0 +1,80 @@ +/* + * Copyright 2017 The Hyve and King's College London + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.radarbase.producer.rest; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.radarbase.config.ServerConfig; + +public class SchemaRestClientTest { + private MockWebServer server; + private SchemaRestClient retriever; + + @Before + public void setUp() { + server = new MockWebServer(); + ServerConfig config = new ServerConfig(); + config.setProtocol("http"); + config.setHost(server.getHostName()); + config.setPort(server.getPort()); + config.setPath("base"); + retriever = new SchemaRestClient(RestClient.global() + .server(Objects.requireNonNull(config)) + .timeout(1L, TimeUnit.SECONDS) + .build()); + } + + @After + public void tearDown() throws IOException { + server.close(); + } + + @Test + public void retrieveSchemaMetadata() throws Exception { + server.enqueue(new MockResponse().setBody("{\"id\":10,\"version\":2,\"schema\":\"\\\"string\\\"\"}")); + ParsedSchemaMetadata metadata = retriever.retrieveSchemaMetadata("bla-value", -1); + assertEquals(Integer.valueOf(10), metadata.getId()); + assertEquals(Integer.valueOf(2), metadata.getVersion()); + assertEquals(Schema.create(Schema.Type.STRING), metadata.getSchema()); + assertEquals("/base/subjects/bla-value/versions/latest", server.takeRequest().getPath()); + } + + + @Test + public void retrieveSchemaMetadataVersion() throws Exception { + server.enqueue(new MockResponse().setBody("{\"id\":10,\"version\":2,\"schema\":\"\\\"string\\\"\"}")); + ParsedSchemaMetadata metadata = retriever.retrieveSchemaMetadata("bla-value", 2); + assertEquals(Integer.valueOf(10), metadata.getId()); + assertEquals(Integer.valueOf(2), metadata.getVersion()); + assertEquals(Schema.create(Schema.Type.STRING), metadata.getSchema()); + assertEquals("/base/subjects/bla-value/versions/2", server.takeRequest().getPath()); + } +} diff --git a/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRetrieverTest.java b/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRetrieverTest.java index a56abae0..d78f9211 100644 --- a/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRetrieverTest.java +++ b/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRetrieverTest.java @@ -58,38 +58,17 @@ public void subject() { assertEquals("bla-key", SchemaRetriever.subject("bla", false)); } - @Test - public void retrieveSchemaMetadata() throws Exception { - server.enqueue(new MockResponse().setBody("{\"id\":10,\"version\":2,\"schema\":\"\\\"string\\\"\"}")); - ParsedSchemaMetadata metadata = retriever.retrieveSchemaMetadata("bla-value", -1); - assertEquals(Integer.valueOf(10), metadata.getId()); - assertEquals(Integer.valueOf(2), metadata.getVersion()); - assertEquals(Schema.create(Schema.Type.STRING), metadata.getSchema()); - assertEquals("/base/subjects/bla-value/versions/latest", server.takeRequest().getPath()); - } - - - @Test - public void retrieveSchemaMetadataVersion() throws Exception { - server.enqueue(new MockResponse().setBody("{\"id\":10,\"version\":2,\"schema\":\"\\\"string\\\"\"}")); - ParsedSchemaMetadata metadata = retriever.retrieveSchemaMetadata("bla-value", 2); - assertEquals(Integer.valueOf(10), metadata.getId()); - assertEquals(Integer.valueOf(2), metadata.getVersion()); - assertEquals(Schema.create(Schema.Type.STRING), metadata.getSchema()); - assertEquals("/base/subjects/bla-value/versions/2", server.takeRequest().getPath()); - } - @Test public void getSchemaMetadata() throws Exception { server.enqueue(new MockResponse().setBody("{\"id\":10,\"version\":2,\"schema\":\"\\\"string\\\"\"}")); - ParsedSchemaMetadata metadata = retriever.getSchemaMetadata("bla", true, 2); + ParsedSchemaMetadata metadata = retriever.getBySubjectAndVersion("bla", true, 2); assertEquals(Integer.valueOf(10), metadata.getId()); assertEquals(Integer.valueOf(2), metadata.getVersion()); assertEquals(Schema.create(Schema.Type.STRING), metadata.getSchema()); assertEquals("/base/subjects/bla-value/versions/2", server.takeRequest().getPath()); // Already queried schema is cached and does not need another request - ParsedSchemaMetadata metadata2 = retriever.getSchemaMetadata("bla", true, -1); + ParsedSchemaMetadata metadata2 = retriever.getBySubjectAndVersion("bla", true, 2); assertEquals(Integer.valueOf(10), metadata2.getId()); assertEquals(Integer.valueOf(2), metadata2.getVersion()); assertEquals(Schema.create(Schema.Type.STRING), metadata2.getSchema()); @@ -98,16 +77,14 @@ public void getSchemaMetadata() throws Exception { // Not yet queried schema needs a new request, so if the server does not respond, an // IOException is thrown. server.enqueue(new MockResponse().setResponseCode(500)); - assertThrows(IOException.class, () -> retriever.getSchemaMetadata("bla", false, 2)); + assertThrows(IOException.class, () -> retriever.getBySubjectAndVersion("bla", false, 2)); } @Test public void addSchemaMetadata() throws Exception { - ParsedSchemaMetadata metadata = new ParsedSchemaMetadata(null, null, Schema.create(Schema.Type.STRING)); server.enqueue(new MockResponse().setBody("{\"id\":10}")); - retriever.addSchemaMetadata("bla", true, metadata); - assertEquals(Integer.valueOf(10), metadata.getId()); - assertEquals(Schema.create(Schema.Type.STRING), metadata.getSchema()); + int id = retriever.addSchema("bla", true, Schema.create(Schema.Type.STRING)); + assertEquals(10, id); assertEquals(1, server.getRequestCount()); RecordedRequest request = server.takeRequest(); @@ -117,10 +94,9 @@ public void addSchemaMetadata() throws Exception { new Field("a", Schema.create(Schema.Type.INT), "that a", 10)); Schema record = Schema.createRecord("C", "that C", "org.radarcns", false, schemaFields); - metadata = new ParsedSchemaMetadata(null, null, record); server.enqueue(new MockResponse().setBody("{\"id\":11}")); - retriever.addSchemaMetadata("bla", true, metadata); - assertEquals(Integer.valueOf(11), metadata.getId()); + id = retriever.addSchema("bla", true, record); + assertEquals(11, id); request = server.takeRequest(); assertEquals("{\"schema\":\"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"C\\\",\\\"namespace\\\":\\\"org.radarcns\\\",\\\"doc\\\":\\\"that C\\\",\\\"fields\\\":[{\\\"name\\\":\\\"a\\\",\\\"type\\\":\\\"int\\\",\\\"doc\\\":\\\"that a\\\",\\\"default\\\":10}]}\"}", request.getBody().readUtf8()); } @@ -129,11 +105,12 @@ public void addSchemaMetadata() throws Exception { public void getOrSetSchemaMetadataSet() throws Exception { server.enqueue(new MockResponse().setResponseCode(404)); server.enqueue(new MockResponse().setBody("{\"id\":10}")); + server.enqueue(new MockResponse().setBody("{\"id\":10, \"version\": 2}")); ParsedSchemaMetadata metadata = retriever.getOrSetSchemaMetadata("bla", true, Schema.create(Schema.Type.STRING), -1); assertEquals(Integer.valueOf(10), metadata.getId()); assertEquals(Schema.create(Schema.Type.STRING), metadata.getSchema()); - assertEquals(2, server.getRequestCount()); + assertEquals(3, server.getRequestCount()); server.takeRequest(); RecordedRequest request = server.takeRequest(); assertEquals("{\"schema\":\"\\\"string\\\"\"}", request.getBody().readUtf8()); diff --git a/radar-commons/src/test/java/org/radarbase/util/Base64Test.java b/radar-commons/src/test/java/org/radarbase/util/Base64Test.java new file mode 100644 index 00000000..a22f6bf9 --- /dev/null +++ b/radar-commons/src/test/java/org/radarbase/util/Base64Test.java @@ -0,0 +1,25 @@ +package org.radarbase.util; + +import static org.junit.Assert.*; + +import java.util.concurrent.ThreadLocalRandom; +import kotlin.text.Charsets; +import org.junit.Test; +import org.radarbase.util.Base64.Encoder; + +public class Base64Test { + @Test + public void encoderTest() { + Encoder encoder = Base64.getEncoder(); + java.util.Base64.Encoder javaEncoder = java.util.Base64.getEncoder(); + + ThreadLocalRandom random = ThreadLocalRandom.current(); + for (int i = 0; i < 2_000; i += 7) { + byte[] src = new byte[i]; + random.nextBytes(src); + String actual = encoder.encode(src); + String expected = new String(javaEncoder.encode(src), Charsets.UTF_8); + assertEquals(expected, actual); + } + } +}