diff --git a/build.gradle b/build.gradle index 42f65f4..3e8cb9b 100644 --- a/build.gradle +++ b/build.gradle @@ -30,6 +30,8 @@ dependencies { compile "io.confluent:kafka-avro-serializer:3.2.0" compile "io.confluent:kafka-schema-registry:3.2.0" compile "io.confluent:common:3.2.0" + + compile group: 'org.apache.commons', name: 'commons-csv', version: '1.5' } mainClassName = 'org.engine.process.performance.Main' diff --git a/src/main/java/org/engine/process/performance/EnginePerformance.java b/src/main/java/org/engine/process/performance/EnginePerformance.java deleted file mode 100644 index 9c6d5c7..0000000 --- a/src/main/java/org/engine/process/performance/EnginePerformance.java +++ /dev/null @@ -1,559 +0,0 @@ -package org.engine.process.performance; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Random; -import java.util.concurrent.CompletionStage; -import java.util.function.Predicate; -import java.util.stream.Collectors; - -import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import io.confluent.kafka.serializers.KafkaAvroSerializer; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.GenericRecordBuilder; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; - -import akka.Done; -import akka.actor.ActorSystem; -import akka.japi.Pair; -import akka.japi.function.Procedure; -import akka.kafka.ConsumerSettings; -import akka.kafka.ProducerSettings; -import akka.kafka.Subscriptions; -import akka.kafka.javadsl.Consumer; -import akka.kafka.javadsl.Producer; -import akka.stream.ActorMaterializer; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; -import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; - -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.engine.process.performance.utils.InnerService; - -import java.util.Properties; - -/** - * @author assafsh - * Jul 2017 - * - * The class will check the performance of the engine by create messages to topics - * sourceName (source1..) - * update - * get the message timestamp and compare - * - */ -public class EnginePerformance extends InnerService { - - private StringBuffer output = new StringBuffer(); - private String externalSystemID; - private String kafkaAddress; - private String schemaRegustryUrl; - private String schemaRegustryIdentity; - private String sourceName; - private String endl = "\n"; - private final ActorSystem system = ActorSystem.create(); - private final ActorMaterializer materializer = ActorMaterializer.create(system); - private SchemaRegistryClient schemaRegistry = null; - private List> sourceRecordsList = new ArrayList<>(); - private List> updateRecordsList = new ArrayList<>(); - boolean testing; - - public EnginePerformance(String kafkaAddress, String schemaRegustryUrl, String schemaRegustryIdentity,String sourceName) { - - this.kafkaAddress = kafkaAddress; - this.schemaRegustryUrl = schemaRegustryUrl; - this.schemaRegustryIdentity = schemaRegustryIdentity; - this.sourceName = sourceName; - } - - //ONLY FOR TESTING - public EnginePerformance(String kafkaAddress,SchemaRegistryClient schemaRegistry,String sourceName) { - - this.sourceName = sourceName; - this.schemaRegistry = schemaRegistry; - this.sourceName = sourceName; - testing = true; - this.kafkaAddress = kafkaAddress; - } - - @Override - public String getOutput() { - return output.toString(); - } - - @Override - public void preExecute() throws IOException, RestClientException { - - if(testing) - return; - - if(kafkaAddress == null) { - - this.kafkaAddress = "localhost:9092"; - } - - if(schemaRegustryUrl != null) { - - schemaRegistry = new CachedSchemaRegistryClient(schemaRegustryUrl, Integer.parseInt(schemaRegustryIdentity)); - } - else { - schemaRegistry = new MockSchemaRegistryClient(); - registerSchemas(schemaRegistry); - } - } - - @Override - public void postExecute() { - - } - - @Override - public ServiceStatus execute() throws IOException, RestClientException { - externalSystemID = utils.randomExternalSystemID(); - System.out.println("after random "+externalSystemID); - handleCreateMessage(); - handleUpdateMessage(); - - return ServiceStatus.SUCCESS; - } - - private void handleCreateMessage() throws IOException, RestClientException { - - //Akka Actor - if(testing) { - System.out.println("Create message with Akka Actor"); - - ProducerSettings producerSettings = ProducerSettings - .create(system, new StringSerializer(), new KafkaAvroSerializer(schemaRegistry)) - .withBootstrapServers(kafkaAddress); - - creationTopicProducer(producerSettings); - String lat = "44.9"; - String longX = "95.8"; - sourceTopicProducer(producerSettings,lat,longX); - - sourceRecordsList.clear(); - updateRecordsList.clear(); - - callConsumers(); - - - long diffTime = getTimeDifferences(lat, longX); - output.append("The create took "+diffTime +" millisec").append(endl); - } - //KafkaConsumer - else { - - System.out.println("Create message with KafkaConsumer"); - - Properties props = getProperties(); - - String lat = "44.9"; - String longX = "95.8"; - TopicPartition partitionSource = new TopicPartition(sourceName, 0); - TopicPartition partitionUpdate = new TopicPartition("update", 0); - - sourceRecordsList.clear(); - updateRecordsList.clear(); - long lastOffsetForSource; - long lastOffsetForUpdate; - - KafkaConsumer consumer = new KafkaConsumer(props); - consumer.assign(Arrays.asList(partitionSource)); - consumer.seekToEnd(Arrays.asList(partitionSource)); - lastOffsetForSource = consumer.position(partitionSource); - - System.out.println("Create message with KafkaConsumer2 "+lastOffsetForSource); - - KafkaConsumer consumer2 = new KafkaConsumer(props); - consumer2.assign(Arrays.asList(partitionUpdate)); - consumer2.seekToEnd(Arrays.asList(partitionUpdate)); - lastOffsetForUpdate = consumer2.position(partitionUpdate); - - System.out.println("Create message with KafkaConsumer3 "+lastOffsetForUpdate); - - try { - Thread.sleep(1000); - } catch (InterruptedException e) {} - - try(KafkaProducer producer = new KafkaProducer<>(props)) { - - ProducerRecord record = new ProducerRecord<>("creation",getCreationGenericRecord()); - producer.send(record); - - ProducerRecord record2 = new ProducerRecord<>(sourceName,getSourceGenericRecord(lat, longX)); - producer.send(record2); - - - } - - try { - Thread.sleep(1000); - } catch (InterruptedException e) {} - - System.out.println("Create message with KafkaConsumer5 "+lastOffsetForUpdate); - consumer.seek(partitionSource, lastOffsetForSource); - callConsumersWithKafkaConsuemr(consumer,lat,longX); - - System.out.println("Create message with KafkaConsumer6 "+lastOffsetForUpdate); - consumer2.seek(partitionUpdate, lastOffsetForUpdate); - callConsumersWithKafkaConsuemr(consumer2,lat,longX); - - System.out.println("Create message with KafkaConsumer7"); - - long diffTime = getTimeDifferences(lat, longX); - output.append("The create action between topics <"+sourceName+"> and took "+diffTime +" millisec").append(endl); - } - - } - - private Properties getProperties() { - - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddress); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - io.confluent.kafka.serializers.KafkaAvroSerializer.class); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - io.confluent.kafka.serializers.KafkaAvroDeserializer.class); - props.put("schema.registry.url", schemaRegustryUrl); - props.put("group.id", "group1"); - - return props; - } - - private GenericRecord getCreationGenericRecord() throws IOException, RestClientException { - - Schema creationSchema = getSchema("detectionEvent"); - GenericRecord creationRecord = new GenericRecordBuilder(creationSchema) - .set("sourceName", sourceName) - .set("externalSystemID",externalSystemID) - .build(); - - return creationRecord; - } - - private GenericRecord getSourceGenericRecord(String lat, String longX) throws IOException, RestClientException { - - Schema basicAttributesSchema = getSchema("basicEntityAttributes"); - Schema coordinateSchema = basicAttributesSchema.getField("coordinate").schema(); - - GenericRecord coordinate = new GenericRecordBuilder(coordinateSchema) - .set("lat", Double.parseDouble(lat)) - .set("long",Double.parseDouble(longX)) - .build(); - - GenericRecord basicAttributes = new GenericRecordBuilder(basicAttributesSchema) - .set("coordinate", coordinate) - .set("isNotTracked", false) - .set("entityOffset", 50l) - .set("sourceName",sourceName) - .build(); - - Schema dataSchema = getSchema("generalEntityAttributes"); - Schema nationalitySchema = dataSchema.getField("nationality").schema(); - Schema categorySchema = dataSchema.getField("category").schema(); - GenericRecord dataRecord = new GenericRecordBuilder(dataSchema) - .set("basicAttributes", basicAttributes) - .set("speed", 4.7) - .set("elevation", 7.8) - .set("course", 8.3) - .set("nationality", new GenericData.EnumSymbol(nationalitySchema, "USA")) - .set("category", new GenericData.EnumSymbol(categorySchema, "boat")) - .set("pictureURL", "huh?") - .set("height", 6.1) - .set("nickname", "rerere") - .set("externalSystemID", externalSystemID) - .build(); - - - return dataRecord; - } - - private void handleUpdateMessage() throws IOException, RestClientException { - - String lat = "34.66"; - String longX = "48.66"; - //Akka Actor - if(testing) { - - System.out.println("Update message with Akka Actor"); - - ProducerSettings producerSettings = ProducerSettings - .create(system, new StringSerializer(), new KafkaAvroSerializer(schemaRegistry)) - .withBootstrapServers(kafkaAddress); - - sourceTopicProducer(producerSettings,lat,longX); - - sourceRecordsList.clear(); - updateRecordsList.clear(); - - callConsumers(); - } - - //KafkaConsumer - else { - - System.out.println("Update message with KafkaConsumer"); - - Properties props = getProperties(); - - TopicPartition partitionSource = new TopicPartition(sourceName, 0); - TopicPartition partitionUpdate = new TopicPartition("update", 0); - - sourceRecordsList.clear(); - updateRecordsList.clear(); - long lastOffsetForSource; - long lastOffsetForUpdate; - - KafkaConsumer consumer = new KafkaConsumer(props); - consumer.assign(Arrays.asList(partitionSource)); - consumer.seekToEnd(Arrays.asList(partitionSource)); - lastOffsetForSource = consumer.position(partitionSource); - - - - KafkaConsumer consumer2 = new KafkaConsumer(props); - consumer2.assign(Arrays.asList(partitionUpdate)); - consumer2.seekToEnd(Arrays.asList(partitionUpdate)); - lastOffsetForUpdate = consumer2.position(partitionUpdate); - - try { - Thread.sleep(1000); - } catch (InterruptedException e) {} - - try(KafkaProducer producer = new KafkaProducer<>(props)) { - - ProducerRecord record2 = new ProducerRecord<>(sourceName,getSourceGenericRecord(lat, longX)); - producer.send(record2); - } - - try { - Thread.sleep(1000); - } catch (InterruptedException e) {} - - consumer.seek(partitionSource, lastOffsetForSource); - callConsumersWithKafkaConsuemr(consumer,lat,longX); - - consumer2.seek(partitionUpdate, lastOffsetForUpdate); - callConsumersWithKafkaConsuemr(consumer2,lat,longX); - - } - - long diffTime = getTimeDifferences(lat, longX); - output.append("The update action between topics <"+sourceName+"> and took "+diffTime +" millisec").append(endl); - } - - private void callConsumers() { - - KafkaAvroDeserializer keyDeserializer = new KafkaAvroDeserializer(schemaRegistry); - keyDeserializer.configure(Collections.singletonMap("schema.registry.url", "http://fake-url"), true); - - final ConsumerSettings consumerSettings = - ConsumerSettings.create(system, new StringDeserializer(), keyDeserializer) - .withBootstrapServers(kafkaAddress) - .withGroupId("group1") - .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - Procedure> f = new Procedure>() { - - private static final long serialVersionUID = 1L; - - public void apply(ConsumerRecord param) - throws Exception { - - //System.out.println("Topic is: "+param.topic()+" timestamp is: "+param.timestamp() + - // " value is: "+ param.value()); - if( param.topic().equals("update")) { - updateRecordsList.add(new Pair((GenericRecord)param.value(),param.timestamp())); - } - else { - sourceRecordsList.add(new Pair((GenericRecord)param.value(),param.timestamp())); - } - } - }; - - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - System.out.println("\n\n\n====="+sourceName); - - Consumer.committableSource(consumerSettings, Subscriptions.topics(sourceName)) - .map(result -> result.record()).runForeach(f, materializer); - - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - System.out.println("=====update"); - Consumer.committableSource(consumerSettings, Subscriptions.topics("update")) - .map(result -> result.record()).runForeach(f, materializer); - } - - - private void callConsumersWithKafkaConsuemr(KafkaConsumer consumer,String lat,String longX) { - - boolean isRunning = true; - while (isRunning) { - ConsumerRecords records = consumer.poll(10000); - - for (ConsumerRecord param : records) { - - GenericRecord record = (GenericRecord)param.value(); - - GenericRecord entityAttributes = ((GenericRecord) record.get("entityAttributes")); - GenericRecord basicAttributes = (entityAttributes != null) ? ((GenericRecord) entityAttributes.get("basicAttributes")) : ((GenericRecord) record.get("basicAttributes")); - String externalSystemIDTmp = (entityAttributes != null) ? entityAttributes.get("externalSystemID").toString() : record.get("externalSystemID").toString(); - GenericRecord coordinate = (GenericRecord)basicAttributes.get("coordinate"); - String latTmp = coordinate.get("lat").toString(); - String longXTmp = coordinate.get("long").toString(); - - if( externalSystemIDTmp.equals(externalSystemID) && lat.equals(latTmp) && longX.equals(longXTmp)) { - - if( param.topic().equals("update")) { - updateRecordsList.add(new Pair((GenericRecord)param.value(),param.timestamp())); - } - else { - sourceRecordsList.add(new Pair((GenericRecord)param.value(),param.timestamp())); - } - isRunning = false; - consumer.close(); - break; - } - - } - } - } - - private void sourceTopicProducer(ProducerSettings producerSettings, String lat, String longX) throws IOException, RestClientException { - - Sink, CompletionStage> sink = Producer.plainSink(producerSettings); - - ProducerRecord producerRecord = new ProducerRecord(sourceName, getSourceGenericRecord(lat, longX)); - - Source.from(Arrays.asList(producerRecord)) - .to(sink) - .run(materializer); - - } - - private void creationTopicProducer(ProducerSettings producerSettings) throws IOException, RestClientException { - Sink, CompletionStage> sink = Producer.plainSink(producerSettings); - - ProducerRecord producerRecord = new ProducerRecord("creation", getCreationGenericRecord()); - - Source.from(Arrays.asList(producerRecord)) - .to(sink) - .run(materializer); - } - - - private long getTimeDifferences(String inputLat,String inputLongX) { - - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - Predicate> predicate = record -> { - - GenericRecord entityAttributes = ((GenericRecord) record.first().get("entityAttributes")); - GenericRecord basicAttributes = (entityAttributes != null) ? ((GenericRecord) entityAttributes.get("basicAttributes")) : ((GenericRecord) record.first().get("basicAttributes")); - String externalSystemIDTmp = (entityAttributes != null) ? entityAttributes.get("externalSystemID").toString() : record.first().get("externalSystemID").toString(); - GenericRecord coordinate = (GenericRecord)basicAttributes.get("coordinate"); - String lat = coordinate.get("lat").toString(); - String longX = coordinate.get("long").toString(); - - return externalSystemIDTmp.equals(externalSystemID) && lat.equals(inputLat) && longX.equals(inputLongX); - }; - - Pair update = updateRecordsList.stream().filter(predicate).collect(Collectors.toList()).get(0); - System.out.println("Consumer from topic update: "+update.toString()); - Pair source = sourceRecordsList.stream().filter(predicate).collect(Collectors.toList()).get(0); - System.out.println("Consumer from topic "+sourceName+": "+source.toString()); - - return update.second() - source.second(); - } - - private void registerSchemas(SchemaRegistryClient schemaRegistry) throws IOException, RestClientException { - Schema.Parser parser = new Schema.Parser(); - schemaRegistry.register("detectionEvent", - parser.parse("{\"type\": \"record\", " - + "\"name\": \"detectionEvent\", " - + "\"doc\": \"This is a schema for entity detection report event\", " - + "\"fields\": [" - + "{ \"name\": \"sourceName\", \"type\": \"string\", \"doc\" : \"interface name\" }, " - + "{ \"name\": \"externalSystemID\", \"type\": \"string\", \"doc\":\"external system ID\"}" - + "]}")); - schemaRegistry.register("basicEntityAttributes", - parser.parse("{\"type\": \"record\"," - + "\"name\": \"basicEntityAttributes\"," - + "\"doc\": \"This is a schema for basic entity attributes, this will represent basic entity in all life cycle\"," - + "\"fields\": [" - + "{\"name\": \"coordinate\", \"type\":" - + "{\"type\": \"record\"," - + "\"name\": \"coordinate\"," - + "\"doc\": \"Location attribute in grid format\"," - + "\"fields\": [" - + "{\"name\": \"lat\",\"type\": \"double\"}," - + "{\"name\": \"long\",\"type\": \"double\"}" - + "]}}," - + "{\"name\": \"isNotTracked\",\"type\": \"boolean\"}," - + "{\"name\": \"entityOffset\",\"type\": \"long\"}," - + "{\"name\": \"sourceName\", \"type\": \"string\"}" - + "]}")); - schemaRegistry.register("generalEntityAttributes", - parser.parse("{\"type\": \"record\", " - + "\"name\": \"generalEntityAttributes\"," - + "\"doc\": \"This is a schema for general entity before acquiring by the system\"," - + "\"fields\": [" - + "{\"name\": \"basicAttributes\",\"type\": \"basicEntityAttributes\"}," - + "{\"name\": \"speed\",\"type\": \"double\",\"doc\" : \"This is the magnitude of the entity's velcity vector.\"}," - + "{\"name\": \"elevation\",\"type\": \"double\"}," - + "{\"name\": \"course\",\"type\": \"double\"}," - + "{\"name\": \"nationality\",\"type\": {\"name\": \"nationality\", \"type\": \"enum\",\"symbols\" : [\"ISRAEL\", \"USA\", \"SPAIN\"]}}," - + "{\"name\": \"category\",\"type\": {\"name\": \"category\", \"type\": \"enum\",\"symbols\" : [\"airplane\", \"boat\"]}}," - + "{\"name\": \"pictureURL\",\"type\": \"string\"}," - + "{\"name\": \"height\",\"type\": \"double\"}," - + "{\"name\": \"nickname\",\"type\": \"string\"}," - + "{\"name\": \"externalSystemID\",\"type\": \"string\",\"doc\" : \"This is ID given be external system.\"}" - + "]}")); - - - } - - private Schema getSchema(String name) throws IOException, RestClientException { - if(!testing) { - name = "org.sourcestream.entities."+name; - } - int id = schemaRegistry.getLatestSchemaMetadata(name).getId(); - return schemaRegistry.getByID(id); - } -} - diff --git a/src/main/java/org/engine/process/performance/EnginePerformanceFromBeginning.java b/src/main/java/org/engine/process/performance/EnginePerformanceSingleMessage.java similarity index 91% rename from src/main/java/org/engine/process/performance/EnginePerformanceFromBeginning.java rename to src/main/java/org/engine/process/performance/EnginePerformanceSingleMessage.java index 6afa8b5..776077d 100644 --- a/src/main/java/org/engine/process/performance/EnginePerformanceFromBeginning.java +++ b/src/main/java/org/engine/process/performance/EnginePerformanceSingleMessage.java @@ -4,8 +4,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Map; -import java.util.Random; +import java.util.Map; import java.util.stream.Collectors; import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; @@ -27,7 +26,7 @@ import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.KafkaProducer; import org.engine.process.performance.utils.InnerService; import com.fasterxml.jackson.core.JsonParseException; @@ -42,13 +41,18 @@ * Jul 2017 * * The class will check the performance of the engine by create messages to topics - * sourceName-row-data + * sourceName-raw-data * update * get the message timestamp and compare + * + * Sep 2017 + * + * The class send only one single message and print the time of processing + * We used that class for initial implementation and not using it anymore * */ -public class EnginePerformanceFromBeginning extends InnerService { +public class EnginePerformanceSingleMessage extends InnerService { private StringBuffer output = new StringBuffer(); private String externalSystemID; @@ -60,9 +64,9 @@ public class EnginePerformanceFromBeginning extends InnerService { private SchemaRegistryClient schemaRegistry = null; private List> rawDataRecordsList = new ArrayList<>(); private List> sourceRecordsList = new ArrayList<>(); - private List> updateRecordsList = new ArrayList<>(); + private List> updateRecordsList = new ArrayList<>(); - public EnginePerformanceFromBeginning(String kafkaAddress, String schemaRegustryUrl, String schemaRegustryIdentity,String sourceName) { + public EnginePerformanceSingleMessage(String kafkaAddress, String schemaRegustryUrl, String schemaRegustryIdentity,String sourceName) { this.kafkaAddress = kafkaAddress; this.schemaRegustryUrl = schemaRegustryUrl; @@ -101,23 +105,23 @@ public void postExecute() { @Override public ServiceStatus execute() throws IOException, RestClientException { externalSystemID = utils.randomExternalSystemID(); - System.out.println("After random "+externalSystemID); + logger.debug("After random "+externalSystemID); - System.out.println("Create message with KafkaConsumer"); + logger.debug("Create message with KafkaConsumer"); output.append("Create a new entity").append(endl); output.append("===================").append(endl); handleMessage("44.9","95.8"); Pair diffTime = getTimeDifferences("44.9", "95.8"); - output.append("The create action between topics <"+sourceName+"-row-data> and <"+sourceName+"> took "+diffTime.second() +" millisec").append(endl); + output.append("The create action between topics <"+sourceName+"-raw-data> and <"+sourceName+"> took "+diffTime.second() +" millisec").append(endl); output.append("The create action between topics <"+sourceName+"> and took "+diffTime.first() +" millisec").append(endl).append(endl); - System.out.println("Update message with KafkaConsumer"); + logger.debug("Update message with KafkaConsumer"); output.append("Update the entity").append(endl); output.append("=================").append(endl); handleMessage("34.66","48.66"); diffTime = getTimeDifferences("34.66","48.66"); - output.append("The update action between topics <"+sourceName+"-row-data> and <"+sourceName+"> took "+diffTime.second() +" millisec").append(endl); + output.append("The update action between topics <"+sourceName+"-raw-data> and <"+sourceName+"> took "+diffTime.second() +" millisec").append(endl); output.append("The update action between topics <"+sourceName+"> and took "+diffTime.first() +" millisec").append(endl); return ServiceStatus.SUCCESS; @@ -301,13 +305,13 @@ private Pair getTimeDifferences(String inputLat,String inputLongX) { } Pair update = updateRecordsList.stream().collect(Collectors.toList()).get(0); - System.out.println("====Consumer from topic update: "+update.toString()); + logger.debug("====Consumer from topic update: "+update.toString()); Pair source = sourceRecordsList.stream().collect(Collectors.toList()).get(0); - System.out.println("====Consumer from topic source: "+source.toString()); - Pair rowData = rawDataRecordsList.stream().collect(Collectors.toList()).get(0); - System.out.println("====Consumer from topic "+sourceName+"-row-data: "+rowData.toString()); + logger.debug("====Consumer from topic source: "+source.toString()); + Pair rawData = rawDataRecordsList.stream().collect(Collectors.toList()).get(0); + logger.debug("====Consumer from topic "+sourceName+"-raw-data: "+rawData.toString()); - return new Pair(update.second() - source.second(), source.second() - rowData.second()); + return new Pair(update.second() - source.second(), source.second() - rawData.second()); } private void registerSchemas(SchemaRegistryClient schemaRegistry) throws IOException, RestClientException { @@ -352,9 +356,7 @@ private void registerSchemas(SchemaRegistryClient schemaRegistry) throws IOExcep + "{\"name\": \"height\",\"type\": \"double\"}," + "{\"name\": \"nickname\",\"type\": \"string\"}," + "{\"name\": \"externalSystemID\",\"type\": \"string\",\"doc\" : \"This is ID given be external system.\"}" - + "]}")); - - + + "]}")); } private Map jsonToMap(String json) throws JsonParseException, JsonMappingException, IOException { @@ -364,4 +366,11 @@ private Map jsonToMap(String json) throws JsonParseException, Jso return map; } + + @Override + public void printOutputToFile(String fileLocation) { + + logger.error("Ussuported operation - create output file"); + } + } diff --git a/src/main/java/org/engine/process/performance/Main.java b/src/main/java/org/engine/process/performance/Main.java index 1a0479c..958552b 100644 --- a/src/main/java/org/engine/process/performance/Main.java +++ b/src/main/java/org/engine/process/performance/Main.java @@ -1,14 +1,15 @@ package org.engine.process.performance; + +import java.io.IOException; -import java.io.FileWriter; -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Date; +import joptsimple.internal.Strings; -import org.engine.process.performance.multi.EngingPerformanceMultiPeriods; -import org.engine.process.performance.multi.HandlePerformanceMessages; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.engine.process.performance.multi.EngingPerformanceMultiCycles; import org.engine.process.performance.utils.InnerService; + /** * @author assafsh * @@ -16,68 +17,98 @@ public class Main { + final static public Logger logger = Logger.getLogger(Main.class); + + static { + + setDebugLevel(System.getenv("DEBUG_LEVEL")); + } + public static void main(String[] args) throws InterruptedException, IOException { - + + String kafkaAddress = System.getenv("KAFKA_ADDRESS"); String schemaRegistryUrl = System.getenv("SCHEMA_REGISTRY_ADDRESS"); String schemaRegistryIdentity = System.getenv("SCHEMA_REGISTRY_IDENTITY"); String sourceName = System.getenv("SOURCE_NAME"); String printToFile = System.getenv("PRINT_TO_FILE"); String fileLocation = System.getenv("FILE_LOCATION"); - String secToDelay = System.getenv("SEC_TO_DELAY"); - String startFromBeginning = System.getenv("START_FROM_BEGINNING"); + String secToDelay = System.getenv("SEC_TO_DELAY"); String multiMessages = System.getenv("MULTI_MESSAGES"); - - System.out.println("KAFKA_ADDRESS::::::::" + kafkaAddress); - System.out.println("SCHEMA_REGISTRY_ADDRESS::::::::" + schemaRegistryUrl); - System.out.println("SCHEMA_REGISTRY_IDENTITY::::::::" + schemaRegistryIdentity); - System.out.println("SOURCE_NAME::::::::" + sourceName); - System.out.println("PRINT_TO_FILE::::::::" + printToFile); - System.out.println("FILE_LOCATION::::::::" + fileLocation); - System.out.println("SEC_TO_DELAY::::::::" + secToDelay); - System.out.println("START_FROM_BEGINNING::::::::" + startFromBeginning); - System.out.println("MULTI_MESSAGES::::::::" + multiMessages); - + String debugLevel = System.getenv("DEBUG_LEVEL"); + + logger.debug("KAFKA_ADDRESS::::::::" + kafkaAddress); + logger.debug("SCHEMA_REGISTRY_ADDRESS::::::::" + schemaRegistryUrl); + logger.debug("SCHEMA_REGISTRY_IDENTITY::::::::" + schemaRegistryIdentity); + logger.debug("SOURCE_NAME::::::::" + sourceName); + logger.debug("PRINT_TO_FILE::::::::" + printToFile); + logger.debug("FILE_LOCATION::::::::" + fileLocation); + logger.debug("SEC_TO_DELAY::::::::" + secToDelay); + logger.debug("MULTI_MESSAGES::::::::" + multiMessages); + logger.debug("MULTI_MESSAGES::::::::" + debugLevel); + Thread.sleep((secToDelay == null ? 0 : Long.parseLong(secToDelay))*1000); - + + setDebugLevel(debugLevel); InnerService service; - + if(multiMessages.equalsIgnoreCase("true")) { - service = new EngingPerformanceMultiPeriods(kafkaAddress,schemaRegistryUrl,schemaRegistryIdentity,sourceName); + service = new EngingPerformanceMultiCycles(kafkaAddress,schemaRegistryUrl,schemaRegistryIdentity,sourceName); } - else if(startFromBeginning.equalsIgnoreCase("true")) { - service = new EnginePerformanceFromBeginning(kafkaAddress,schemaRegistryUrl,schemaRegistryIdentity,sourceName); - } - else { - service = new EnginePerformance(kafkaAddress,schemaRegistryUrl,schemaRegistryIdentity,sourceName); - } - - - ServiceStatus status = service.run(); - System.out.println(status.getMessage()); - - if(status != ServiceStatus.SUCCESS) + else { + service = new EnginePerformanceSingleMessage(kafkaAddress,schemaRegistryUrl,schemaRegistryIdentity,sourceName); + } + + ServiceStatus status = service.run(); + + if(status != ServiceStatus.SUCCESS) { + logger.error(status.getMessage()); System.exit(-1); - + } + + logger.info(service.getOutput()); + if(printToFile.equalsIgnoreCase("true")) { - - printToFile(service.getOutput(),fileLocation); - - } - else { - System.out.println(service.getOutput()); - } - } - - public static void printToFile(String output, String fileLocation) throws IOException { - - String dateTime = new SimpleDateFormat("yyyyMMdd_HHmm").format(new Date()); - try( FileWriter fw = new FileWriter(fileLocation+"/enginePeformanceResult_"+dateTime+".log")) - { - fw.write(output+"\n"); + + service.printOutputToFile(fileLocation); + } + + logger.info("END!"); + } + + /** + * The option are - + * Trace < Debug < Info < Warn < Error < Fatal. + * Trace is of the lowest priority and Fatal is having highest priority. + * When we define logger level, anything having higher priority logs are also getting printed + * + * @param debugLevel + */ + private static void setDebugLevel(String debugLevel) { + + + if( Strings.isNullOrEmpty(debugLevel)) { + debugLevel = "ALL"; } - } - + switch (debugLevel) { + case "ALL": + logger.setLevel(Level.ALL); + break; + case "DEBUG": + logger.setLevel(Level.DEBUG); + break; + case "ERROR": + logger.setLevel(Level.ERROR); + break; + case "WARNING": + logger.setLevel(Level.WARN); + break; + default: + logger.setLevel(Level.ALL); + } + } + + } diff --git a/src/main/java/org/engine/process/performance/csv/CsvFileWriter.java b/src/main/java/org/engine/process/performance/csv/CsvFileWriter.java new file mode 100644 index 0000000..b89dc16 --- /dev/null +++ b/src/main/java/org/engine/process/performance/csv/CsvFileWriter.java @@ -0,0 +1,73 @@ +package org.engine.process.performance.csv; + + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.function.Consumer; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.apache.log4j.Logger; +import org.engine.process.performance.Main; + +public class CsvFileWriter { + + private String fileName; + private Logger logger = Main.logger; + + public CsvFileWriter(String fileLocation) { + + if(fileLocation == null || fileLocation.isEmpty()) { + fileLocation = System.getenv("HOME"); + } + + File dir = new File(fileLocation); + if( !dir.exists()) { + dir.mkdir(); + } + + String dateTime = new SimpleDateFormat("yyyyMMdd_HHmm").format(new Date()); + fileName = fileLocation+"/enginePeformanceResult_"+dateTime+".csv"; + logger.info("Output file is: "+fileName); + + } + + public void writeCsvFile(List header,Object[] columnsName,List data) { + + CSVFormat csvFileFormat = CSVFormat.EXCEL; + + try(FileWriter fileWriter = new FileWriter(fileName); + CSVPrinter csvFilePrinter = new CSVPrinter(fileWriter, csvFileFormat)) + { + csvFilePrinter.printRecords(header); + csvFilePrinter.printRecord(columnsName); + data.forEach(getConsumer(csvFilePrinter)); + + } catch (IOException e) { + + e.printStackTrace(); + } + } + + private Consumer getConsumer(CSVPrinter csvFilePrinter) { + + Consumer consumer = new Consumer () { + + @Override + public void accept(CsvRecordForCreate record) { + + try { + csvFilePrinter.printRecord(record.toObjectArray()); + } catch (IOException e) { + + e.printStackTrace(); + } + }}; + + return consumer; + } +} diff --git a/src/main/java/org/engine/process/performance/csv/CsvRecordForCreate.java b/src/main/java/org/engine/process/performance/csv/CsvRecordForCreate.java new file mode 100644 index 0000000..baba6a1 --- /dev/null +++ b/src/main/java/org/engine/process/performance/csv/CsvRecordForCreate.java @@ -0,0 +1,83 @@ +package org.engine.process.performance.csv; + +/** + * @author assafsh + * + * Arrange the data for one record in the csv output gile + * Create\Update message + * + */ +public class CsvRecordForCreate { + + private TopicTimeData createAction; + private TopicTimeData updateAction; + + public CsvRecordForCreate(TopicTimeData createAction, TopicTimeData updateAction) { + + this.createAction = createAction; + this.updateAction = updateAction; + } + public TopicTimeData getCreateAction() { + return createAction; + } + public void setCreateAction(TopicTimeData createAction) { + this.createAction = createAction; + } + public TopicTimeData getUpdateAction() { + return updateAction; + } + public void setUpdateAction(TopicTimeData updateAction) { + this.updateAction = updateAction; + } + + @Override + public String toString() { + return "CsvRecordForCreate [createAction=" + createAction + + ", updateAction=" + updateAction + "]"; + } + + public Object[] toObjectArray() { + + Object[] obj = new Object[createAction.toObjectArray().length+updateAction.toObjectArray().length]; + int i = 0; + for(Object o : createAction.toObjectArray()) { + obj[i++] = o; + } + for(Object o : updateAction.toObjectArray()) { + obj[i++] = o; + } + return obj; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((createAction == null) ? 0 : createAction.hashCode()); + result = prime * result + + ((updateAction == null) ? 0 : updateAction.hashCode()); + return result; + } + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + CsvRecordForCreate other = (CsvRecordForCreate) obj; + if (createAction == null) { + if (other.createAction != null) + return false; + } else if (!createAction.equals(other.createAction)) + return false; + if (updateAction == null) { + if (other.updateAction != null) + return false; + } else if (!updateAction.equals(other.updateAction)) + return false; + return true; + } +} diff --git a/src/main/java/org/engine/process/performance/csv/CsvRecordForMerge.java b/src/main/java/org/engine/process/performance/csv/CsvRecordForMerge.java new file mode 100644 index 0000000..abc37be --- /dev/null +++ b/src/main/java/org/engine/process/performance/csv/CsvRecordForMerge.java @@ -0,0 +1,15 @@ +package org.engine.process.performance.csv; + +public class CsvRecordForMerge { + + private long rawDataToSourceCreate; + private long rawDataToSourceUpdate; + private long sourceToUpdateCreate; + private long sourceToUpdate; + + + + + + +} diff --git a/src/main/java/org/engine/process/performance/csv/TopicTimeData.java b/src/main/java/org/engine/process/performance/csv/TopicTimeData.java new file mode 100644 index 0000000..63b1e59 --- /dev/null +++ b/src/main/java/org/engine/process/performance/csv/TopicTimeData.java @@ -0,0 +1,77 @@ +package org.engine.process.performance.csv; + +public class TopicTimeData { + + private T rawDataToSource; + private T sourceToUpdate; + + public TopicTimeData(T rawDataToSource,T sourceToUpdate) { + + this.rawDataToSource = rawDataToSource; + this.sourceToUpdate = sourceToUpdate; + } + + public TopicTimeData() {} + + public T getRawDataToSource() { + return rawDataToSource; + } + + public void setRawDataToSource(T rawDataToSource) { + this.rawDataToSource = rawDataToSource; + } + + public T getSourceToUpdate() { + return sourceToUpdate; + } + + public void setSourceToUpdate(T sourceToUpdate) { + this.sourceToUpdate = sourceToUpdate; + } + + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((rawDataToSource == null) ? 0 : rawDataToSource.hashCode()); + result = prime * result + + ((sourceToUpdate == null) ? 0 : sourceToUpdate.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + TopicTimeData other = (TopicTimeData) obj; + if (rawDataToSource == null) { + if (other.rawDataToSource != null) + return false; + } else if (!rawDataToSource.equals(other.rawDataToSource)) + return false; + if (sourceToUpdate == null) { + if (other.sourceToUpdate != null) + return false; + } else if (!sourceToUpdate.equals(other.sourceToUpdate)) + return false; + return true; + } + + @Override + public String toString() { + return "TopicData [rawDataToSource=" + rawDataToSource + + ", sourceToUpdate=" + sourceToUpdate + "]"; + } + + public Object[] toObjectArray() { + + Object[] obj = {rawDataToSource, sourceToUpdate}; + return obj; + } +} diff --git a/src/main/java/org/engine/process/performance/multi/EngingPerformanceMultiCycles.java b/src/main/java/org/engine/process/performance/multi/EngingPerformanceMultiCycles.java new file mode 100644 index 0000000..fe4a94e --- /dev/null +++ b/src/main/java/org/engine/process/performance/multi/EngingPerformanceMultiCycles.java @@ -0,0 +1,322 @@ +package org.engine.process.performance.multi; + +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.engine.process.performance.ServiceStatus; +import org.engine.process.performance.csv.CsvFileWriter; +import org.engine.process.performance.csv.CsvRecordForCreate; +import org.engine.process.performance.csv.TopicTimeData; +import org.engine.process.performance.utils.InnerService; + +import akka.japi.Pair; + +public class EngingPerformanceMultiCycles extends InnerService { + + private List cyclesList; + private StringBuffer output = new StringBuffer(); + private String kafkaAddress; + private String schemaRegustryUrl; + private String sourceName; + private int num_of_cycles; + private int num_of_updates_per_cycle; + private String endl = "\n"; + + private TopicTimeData createAndUpdateArray; + private TopicTimeData createArray; + private TopicTimeData updateArray; + + private int interval; + private int durationInMin; + private int numOfInteraces; + private int numOfUpdates; + private String seperator = ","; + private String emptyString = ""; + + public EngingPerformanceMultiCycles(String kafkaAddress, + String schemaRegistryUrl, String schemaRegistryIdentity,String sourceName) { + + this.kafkaAddress = kafkaAddress; + this.sourceName = sourceName; + this.schemaRegustryUrl = schemaRegistryUrl; + cyclesList = Collections.synchronizedList(new ArrayList()); + + logger.debug("NUM_OF_CYCLES::::::::" + System.getenv("NUM_OF_CYCLES")); + logger.debug("NUM_OF_UPDATES_PER_CYCLE::::::::" + System.getenv("NUM_OF_UPDATES_PER_CYCLE")); + logger.debug("DURATION (in Minute)::::::::" + System.getenv("DURATION")); + logger.debug("INTERVAL::::::::" + System.getenv("INTERVAL")); + logger.debug("NUM_OF_INTERFACES::::::::" + System.getenv("NUM_OF_INTERFACES")); + logger.debug("NUM_OF_UPDATES::::::::" + System.getenv("NUM_OF_UPDATES")); + + num_of_cycles = Integer.parseInt(System.getenv("NUM_OF_CYCLES")); + num_of_updates_per_cycle = Integer.parseInt(System.getenv("NUM_OF_UPDATES_PER_CYCLE")); + interval = Integer.parseInt(System.getenv("INTERVAL")); + durationInMin = Integer.parseInt(System.getenv("DURATION")); + numOfInteraces = Integer.parseInt(System.getenv("NUM_OF_INTERFACES")); + numOfUpdates = Integer.parseInt(System.getenv("NUM_OF_UPDATES")); + } + + @Override + protected void preExecute() throws Exception { + + } + + @Override + protected void postExecute() throws Exception { + + logger.debug("===postExecute"); + + createAndUpdateArray = new TopicTimeData(new double[num_of_cycles*num_of_updates_per_cycle],new double[num_of_cycles*num_of_updates_per_cycle]); + createArray = new TopicTimeData(new double[num_of_cycles],new double[num_of_cycles]); + updateArray = new TopicTimeData(new double[num_of_cycles*(num_of_updates_per_cycle-1)],new double[num_of_cycles*(num_of_updates_per_cycle-1)]); + + int i = 0; + int index = 0; + int index1 = 0; + int index2 = 0; + for(SingleCycle cycle : cyclesList ) { + logger.debug("Cycle "+i); + int j = 0; + for( MessageData messageData : cycle.getMessageDataList()) { + + logger.debug("\nUpdate "+j); + Pair diffTime = messageData.getHandlePerformanceMessages().getTimeDifferences(); + messageData.setRawDataToSourceDiffTime(diffTime.second()); + messageData.setSourceToUpdateDiffTime(diffTime.first()); + createAndUpdateArray.getRawDataToSource()[index] = (double) diffTime.second(); + createAndUpdateArray.getSourceToUpdate()[index] = (double)diffTime.first(); + + if( j == 0 ) { + createArray.getRawDataToSource()[index1] = (double) diffTime.second(); + createArray.getSourceToUpdate()[index1] = (double)diffTime.first(); + index1++; + } + else { + + updateArray.getRawDataToSource()[index2] = (double) diffTime.second(); + updateArray.getSourceToUpdate()[index2] = (double) diffTime.first(); + index2++; + } + + messageData.setLastOffsetForRawData(messageData.getHandlePerformanceMessages().getLastOffsetForRawData()); + messageData.setLastOffsetForSource(messageData.getHandlePerformanceMessages().getLastOffsetForSource()); + messageData.setLastOffsetForUpdate(messageData.getHandlePerformanceMessages().getLastOffsetForUpdate()); + j++; + index++; + } + i++; + } + } + + @Override + protected ServiceStatus execute() throws Exception { + + if( durationInMin > 0 && num_of_cycles > 0 ) { + + logger.debug("Error: Both DURATION and NUM_OF_CYCLE have value"); + return ServiceStatus.FAILURE; + } + + ExecutorService executor = Executors.newFixedThreadPool(5); + if( num_of_cycles > 0 ) { + + for( int i = 0; i < num_of_cycles; i++ ) { + + logger.debug("===>CYCLE " + i); + SingleCycle singlePeriod = runSingleCycle(i); + cyclesList.add(singlePeriod); + Runnable worker = new MessageConsumerThread(singlePeriod); + executor.execute(worker); + } + } + else { + + long startTime = System.currentTimeMillis(); + long endTime = startTime + durationInMin * 60000; + num_of_cycles = 0; + while ( System.currentTimeMillis() < endTime) { + + logger.debug("===>CYCLE " + num_of_cycles); + SingleCycle singlePeriod = runSingleCycle(num_of_cycles); + cyclesList.add(singlePeriod); + Runnable worker = new MessageConsumerThread(singlePeriod); + executor.execute(worker); + num_of_cycles++; + } + } + + executor.shutdown(); + while (!executor.isTerminated()) { + } + logger.debug("Finished all threads"); + + logger.debug("END execute"); + return ServiceStatus.SUCCESS; + + } + + @Override + public String getOutput() { + + for(SingleCycle period : cyclesList ) { + + for( MessageData messageData : period.getMessageDataList()) { + + String msg = messageData.toString(); + if(msg == null) + return null; + output.append(messageData.toString()); + } + } + + Arrays.sort(createAndUpdateArray.getRawDataToSource()); + Arrays.sort(createAndUpdateArray.getSourceToUpdate()); + + output.append(endl); + output.append("The average between <"+sourceName+"-raw-data> and <"+sourceName+"> is "+utils.mean(createAndUpdateArray.getRawDataToSource()) ).append(endl); + output.append("The average between <"+sourceName+"> and is "+utils.mean(createAndUpdateArray.getSourceToUpdate())).append(endl); + output.append("The median between <"+sourceName+"-raw-data> and <"+sourceName+"> is "+utils.median(createAndUpdateArray.getRawDataToSource())).append(endl); + output.append("The median between <"+sourceName+"> and is "+utils.median(createAndUpdateArray.getSourceToUpdate())).append(endl); + output.append("The standard deviation between <"+sourceName+"-raw-data> and <"+sourceName+"> is "+utils.standardDeviation(createAndUpdateArray.getRawDataToSource())).append(endl); + output.append("The standard deviation between <"+sourceName+"> and is "+utils.standardDeviation(createAndUpdateArray.getSourceToUpdate())).append(endl); + + output.append("Export to CSV ").append(endl); + output.append("NUM_OF_INTERCAES").append(seperator).append(numOfInteraces).append(endl); + output.append("NUM_OF_UPDATES").append(seperator).append(numOfUpdates).append(endl); + output.append("CREATE").append(endl); + output.append(createCsvFileDataInRows(createArray,sourceName)).append(endl); + output.append("UPDATE").append(endl); + output.append(createCsvFileDataInRows(updateArray,sourceName)).append(endl).append(endl); + + return output.toString(); + } + + private SingleCycle runSingleCycle(int numOfCycle) throws IOException, RestClientException, InterruptedException { + + String externalSystemID = utils.randomExternalSystemID(); + SingleCycle singleCycle = new SingleCycle(); + double lat = 4.3; + double longX = 6.4; + + for(int j = 0 ; j < num_of_updates_per_cycle; j++) { + logger.debug("UPDATE " + j); + + Date startTime = new Date(System.currentTimeMillis()); + String latStr = Double.toString(lat); + String longXStr = Double.toString(longX); + + HandlePerformanceMessages handlePerformanceMessages = new HandlePerformanceMessages(kafkaAddress,schemaRegustryUrl, + sourceName,externalSystemID,latStr,longXStr); + + handlePerformanceMessages.handleMessage(); + MessageData messageData = new MessageData(startTime,externalSystemID,latStr, longXStr, sourceName,handlePerformanceMessages); + messageData.setNumOfCycle(numOfCycle); + messageData.setNumOfUpdate(j); + singleCycle.addMessageData(messageData); + lat++; + longX++; + Thread.sleep(interval); + } + + return singleCycle; + } + + @Override + public void printOutputToFile(String fileLocation) { + + List header = new ArrayList(); + header.add("NUM_OF_INTERCAES :"+numOfInteraces); + header.add("NUM_OF_UPDATES :"+numOfUpdates); + + if( durationInMin > 0 ) { + header.add("DURATION(MIN) :"+durationInMin); + } + else { + header.add("NUM_OF_CYCLES :"+num_of_cycles); + } + header.add("INTERVAL :"+interval); + + Object[] columnsName = {"CREATE - DiffTime between <"+sourceName+"-raw-data> and <"+sourceName+">", + "CREATE - DiffTime between <"+sourceName+"> and ", + "UPDATE - DiffTime between between <"+sourceName+"-raw-data> and <"+sourceName+">", + "UPDATE - DiffTime between <"+sourceName+"> and "}; + + List data = getDataInColumns(); + + CsvFileWriter csvFileWriter = new CsvFileWriter(fileLocation); + csvFileWriter.writeCsvFile(header, columnsName, data); + } + + private List getDataInColumns() { + + int[] maxArray = new int[]{createArray.getRawDataToSource().length,createArray.getSourceToUpdate().length, + updateArray.getRawDataToSource().length,updateArray.getSourceToUpdate().length}; + + Arrays.sort(maxArray); + int max = maxArray[maxArray.length - 1]; + + List data = new ArrayList<>(); + + for( int i = 0; i < max; i++) { + + TopicTimeData create = new TopicTimeData(emptyString,emptyString); + TopicTimeData update = new TopicTimeData(emptyString,emptyString); + + if( i < createArray.getRawDataToSource().length ) { + create.setRawDataToSource(convertToString(createArray.getRawDataToSource()[i])); + } + + if( i < createArray.getSourceToUpdate().length ) { + create.setSourceToUpdate(convertToString(createArray.getSourceToUpdate()[i])); + } + + if( i < updateArray.getRawDataToSource().length ) { + update.setRawDataToSource(convertToString(updateArray.getRawDataToSource()[i])); + } + + if( i < updateArray.getSourceToUpdate().length ) { + update.setSourceToUpdate(convertToString(updateArray.getSourceToUpdate()[i])); + } + + CsvRecordForCreate record = new CsvRecordForCreate(create,update); + data.add(record); + } + + return data; + } + + private String convertToString (double num) { + + Long d = new Double(num).longValue(); + return String.valueOf(d); + } + + private String createCsvFileDataInRows(TopicTimeData topicTimeData,String sourceName) { + + StringBuffer output = new StringBuffer(); + output.append("DiffTime between between <"+sourceName+"-raw-data> and <"+sourceName+">").append(getLine(topicTimeData.getRawDataToSource())).append("\n"); + output.append("DiffTime between <"+sourceName+"> and ").append(getLine(topicTimeData.getSourceToUpdate())).append("\n"); + + return output.toString(); + } + + private String getLine(double[] array) { + + StringBuffer output = new StringBuffer(); + for(double d : array ) + { + output.append(seperator).append(d); + } + + return output.toString(); + + } +} diff --git a/src/main/java/org/engine/process/performance/multi/EngingPerformanceMultiPeriods.java b/src/main/java/org/engine/process/performance/multi/EngingPerformanceMultiPeriods.java deleted file mode 100644 index 6a6ab4b..0000000 --- a/src/main/java/org/engine/process/performance/multi/EngingPerformanceMultiPeriods.java +++ /dev/null @@ -1,189 +0,0 @@ -package org.engine.process.performance.multi; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.List; - -import org.engine.process.performance.ServiceStatus; -import org.engine.process.performance.utils.InnerService; -import org.engine.process.performance.utils.StdStats; - -import akka.japi.Pair; - -public class EngingPerformanceMultiPeriods extends InnerService { - - private List cyclesList; - - private StringBuffer output = new StringBuffer(); - private String kafkaAddress; - private String schemaRegustryUrl; - private String sourceName; - private int num_of_cycles; - private int num_of_updates; - private String endl = "\n"; - private double[] rowDataToSourceDiffTimeArray; - private double[] sourceToUpdateDiffTimeArray; - private double[] totalDiffTimeArray; - - private double[] rowDataToSourceDiffTimeUpdateArray; - private double[] sourceToUpdateDiffTimeUpdateArray; - private double[] totalDiffTimeUpdateArray; - private double[] rowDataToSourceDiffTimeCreateArray; - private double[] sourceToUpdateDiffTimeCreateArray; - private double[] totalDiffTimeCreateArray; - - public EngingPerformanceMultiPeriods(String kafkaAddress, - String schemaRegistryUrl, String schemaRegistryIdentity,String sourceName) { - - this.kafkaAddress = kafkaAddress; - this.sourceName = sourceName; - this.schemaRegustryUrl = schemaRegistryUrl; - System.out.println("NUM_OF_CYCLES::::::::" + System.getenv("NUM_OF_CYCLES")); - System.out.println("NUM_OF_UPDATES::::::::" + System.getenv("NUM_OF_UPDATES")); - - num_of_cycles = Integer.parseInt(System.getenv("NUM_OF_CYCLES")); - num_of_updates = Integer.parseInt(System.getenv("NUM_OF_UPDATES")); - cyclesList = new ArrayList<>(); - rowDataToSourceDiffTimeArray = new double[num_of_cycles*num_of_updates]; - sourceToUpdateDiffTimeArray= new double[num_of_cycles*num_of_updates]; - totalDiffTimeArray= new double[num_of_cycles*num_of_updates]; - - rowDataToSourceDiffTimeCreateArray = new double[num_of_cycles]; - sourceToUpdateDiffTimeCreateArray= new double[num_of_cycles]; - totalDiffTimeCreateArray= new double[num_of_cycles]; - - rowDataToSourceDiffTimeUpdateArray = new double[num_of_cycles*(num_of_updates-1)]; - sourceToUpdateDiffTimeUpdateArray = new double[num_of_cycles*(num_of_updates-1)]; - totalDiffTimeUpdateArray = new double[num_of_cycles*(num_of_updates-1)]; - } - - @Override - protected void preExecute() throws Exception { - - } - - @Override - protected void postExecute() throws Exception { - - System.out.println("===postExecute"); - int i = 0; - int index = 0; - int index1 = 0; - int index2 = 0; - for(SingleCycle period : cyclesList ) { - System.out.println("Cycle "+i); - int j = 0; - for( MessageData messageData : period.getMessageDataList()) { - - System.out.println("\nUpdate "+j); - Pair diffTime = messageData.getHandlePerformanceMessages().getTimeDifferences(); - messageData.setRowDataToSourceDiffTime(diffTime.second()); - messageData.setSourceToUpdateDiffTime(diffTime.first()); - rowDataToSourceDiffTimeArray[index] = (double) diffTime.second(); - sourceToUpdateDiffTimeArray[index] = (double)diffTime.first(); - totalDiffTimeArray[index] = (double) diffTime.first()+diffTime.second(); - - if( j == 0 ) { - rowDataToSourceDiffTimeCreateArray[index1] = (double) diffTime.second(); - sourceToUpdateDiffTimeCreateArray[index1] = (double)diffTime.first(); - totalDiffTimeCreateArray[index1] = (double) diffTime.first()+diffTime.second(); - index1++; - } - else { - rowDataToSourceDiffTimeUpdateArray[index2] = (double) diffTime.second(); - sourceToUpdateDiffTimeUpdateArray[index2] = (double)diffTime.first(); - totalDiffTimeUpdateArray[index2] = (double) diffTime.first()+diffTime.second(); - index2++; - } - - messageData.setLastOffsetForRawData(messageData.getHandlePerformanceMessages().getLastOffsetForRawData()); - messageData.setLastOffsetForSource(messageData.getHandlePerformanceMessages().getLastOffsetForSource()); - messageData.setLastOffsetForUpdate(messageData.getHandlePerformanceMessages().getLastOffsetForUpdate()); - j++; - index++; - } - i++; - } - } - - @Override - protected ServiceStatus execute() throws Exception { - - for( int i = 0; i < num_of_cycles; i++ ) { - - System.out.println("===>CYCLE " + i); - - String externalSystemID = utils.randomExternalSystemID(); - SingleCycle singlePeriod = new SingleCycle(); - double lat = 4.3; - double longX = 6.4; - - for(int j = 0 ; j < num_of_updates; j++) { - System.out.println("UPDATE " + j); - - Date startTime = new Date(System.currentTimeMillis()); - String latStr = Double.toString(lat); - String longXStr = Double.toString(longX); - - HandlePerformanceMessages handlePerformanceMessages = new HandlePerformanceMessages(kafkaAddress,schemaRegustryUrl, - sourceName,externalSystemID,latStr,longXStr); - - handlePerformanceMessages.handleMessage(); - MessageData messageData = new MessageData(startTime,externalSystemID,latStr, longXStr, sourceName,handlePerformanceMessages); - messageData.setNumOfCycle(i); - messageData.setNumOfUpdate(j); - singlePeriod.addMessageData(messageData); - lat++; - longX++; - Thread.sleep(5000); - - } - - cyclesList.add(singlePeriod); - } - - System.out.println("END execute " + cyclesList ); - return ServiceStatus.SUCCESS; - - } - - @Override - public String getOutput() { - - for(SingleCycle period : cyclesList ) { - - for( MessageData messageData : period.getMessageDataList()) { - - output.append(messageData.toString()); - } - } - - String csvData = utils.createCsvFile(rowDataToSourceDiffTimeArray,sourceToUpdateDiffTimeArray,totalDiffTimeArray,sourceName); - - Arrays.sort(rowDataToSourceDiffTimeArray); - Arrays.sort(sourceToUpdateDiffTimeArray); - Arrays.sort(totalDiffTimeArray); - - - output.append(endl); - output.append("The average between <"+sourceName+"-row-data> and <"+sourceName+"> is "+utils.mean(rowDataToSourceDiffTimeArray) ).append(endl); - output.append("The average between <"+sourceName+"> and is "+utils.mean(sourceToUpdateDiffTimeArray)).append(endl); - output.append("The average of total is "+utils.mean(totalDiffTimeArray)).append(endl); - output.append("The median between <"+sourceName+"-row-data> and <"+sourceName+"> is "+utils.median(rowDataToSourceDiffTimeArray)).append(endl); - output.append("The median between <"+sourceName+"> and is "+utils.median(sourceToUpdateDiffTimeArray)).append(endl); - output.append("The median of total is "+utils.median(totalDiffTimeArray)).append(endl); - output.append("The standard deviation between <"+sourceName+"-row-data> and <"+sourceName+"> is "+utils.standardDeviation(rowDataToSourceDiffTimeArray)).append(endl); - output.append("The standard deviation between <"+sourceName+"> and is "+utils.standardDeviation(sourceToUpdateDiffTimeArray)).append(endl); - output.append("The standard deviation of total is "+utils.standardDeviation(totalDiffTimeArray)).append(endl); - - output.append("Export to CSV ").append(endl); - output.append("CREATE").append(endl); - output.append(utils.createCsvFile(rowDataToSourceDiffTimeCreateArray,sourceToUpdateDiffTimeCreateArray,totalDiffTimeCreateArray,sourceName)).append(endl); - output.append("UPDATE").append(endl); - output.append(utils.createCsvFile(rowDataToSourceDiffTimeUpdateArray,sourceToUpdateDiffTimeUpdateArray,totalDiffTimeUpdateArray,sourceName)).append(endl); - - return output.toString(); - } -} diff --git a/src/main/java/org/engine/process/performance/multi/HandlePerformanceMessages.java b/src/main/java/org/engine/process/performance/multi/HandlePerformanceMessages.java index 288e13c..c58f2c5 100644 --- a/src/main/java/org/engine/process/performance/multi/HandlePerformanceMessages.java +++ b/src/main/java/org/engine/process/performance/multi/HandlePerformanceMessages.java @@ -6,8 +6,6 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; - - import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -24,6 +22,8 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.log4j.Logger; +import org.engine.process.performance.Main; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.type.TypeReference; @@ -37,7 +37,7 @@ * Jul 2017 * * The class will check the performance of the engine by create messages to topics - * sourceName-row-data + * sourceName-raw-data * update * get the message timestamp and compare * @@ -56,7 +56,10 @@ public class HandlePerformanceMessages { private List> updateRecordsList = new ArrayList<>(); private long lastOffsetForRawData; private long lastOffsetForUpdate; - private long lastOffsetForSource; + private long lastOffsetForSource; + private Pair timeDifferences; + private Logger logger = Main.logger; + public long getLastOffsetForRawData() { return lastOffsetForRawData; } @@ -246,7 +249,7 @@ else if( param.topic().equals(sourceName)) { } } - public Pair getTimeDifferences() throws Exception { + public void callConsumer() throws Exception { consumer.seek(partitionRawData, lastOffsetForRawData); callConsumersWithKafkaConsuemr(consumer); @@ -258,15 +261,21 @@ public Pair getTimeDifferences() throws Exception { callConsumersWithKafkaConsuemr(consumer3); Pair update = updateRecordsList.stream().collect(Collectors.toList()).get(0); - System.out.println("====Consumer from topic update: "+update.toString()); + logger.debug("====Consumer from topic update: "+update.toString()); Pair source = sourceRecordsList.stream().collect(Collectors.toList()).get(0); - System.out.println("====Consumer from topic source: "+source.toString()); + logger.debug("====Consumer from topic source: "+source.toString()); Pair rowData = rawDataRecordsList.stream().collect(Collectors.toList()).get(0); - System.out.println("====Consumer from topic "+sourceName+"-row-data: "+rowData.toString()); - - return new Pair(update.second() - source.second(), source.second() - rowData.second()); + logger.debug("====Consumer from topic "+sourceName+"-raw-data: "+rowData.toString()); + + timeDifferences = new Pair(update.second() - source.second(), source.second() - rowData.second()); } + + public Pair getTimeDifferences() throws Exception { + + return timeDifferences; + } + private Map jsonToMap(String json) throws JsonParseException, JsonMappingException, IOException { ObjectMapper mapper = new ObjectMapper(); diff --git a/src/main/java/org/engine/process/performance/multi/MessageConsumerThread.java b/src/main/java/org/engine/process/performance/multi/MessageConsumerThread.java new file mode 100644 index 0000000..7576a59 --- /dev/null +++ b/src/main/java/org/engine/process/performance/multi/MessageConsumerThread.java @@ -0,0 +1,31 @@ +package org.engine.process.performance.multi; + +import org.apache.log4j.Logger; +import org.engine.process.performance.Main; + +class MessageConsumerThread implements Runnable { + + private SingleCycle singleCycle; + private Logger logger = Main.logger; + + public MessageConsumerThread(SingleCycle singleCycle) { + + this.singleCycle = singleCycle; + } + + @Override + public void run() { + + for( MessageData messageData : singleCycle.getMessageDataList()) { + + logger.debug("in Thread num_of_cycle "+messageData.getNumOfCycle()+" num_of_update "+messageData.getNumOfUpdate()); + + try { + messageData.getHandlePerformanceMessages().callConsumer(); + } catch (Exception e) { + + e.printStackTrace(); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/org/engine/process/performance/multi/MessageData.java b/src/main/java/org/engine/process/performance/multi/MessageData.java index 9c61759..fd4a1b5 100644 --- a/src/main/java/org/engine/process/performance/multi/MessageData.java +++ b/src/main/java/org/engine/process/performance/multi/MessageData.java @@ -2,13 +2,16 @@ import java.util.Date; +import org.apache.log4j.Logger; +import org.engine.process.performance.Main; + public class MessageData { private final Date startTime; private final String externalSystemID; private final String lat; private final String longX; - private long rowDataToSourceDiffTime; + private long rawDataToSourceDiffTime; private long sourceToUpdateDiffTime; private String endl = "\n"; private String sourceName; @@ -18,6 +21,7 @@ public class MessageData { private long lastOffsetForSource; private int numOfCycle; private int numOfUpdate; + private Logger logger = Main.logger; public MessageData(Date startTime,String externalSystemID, String lat, String longX,String sourceName, HandlePerformanceMessages handlePerformanceMessages) { @@ -29,12 +33,12 @@ public MessageData(Date startTime,String externalSystemID, String lat, String lo this.handlePerformanceMessages = handlePerformanceMessages; } - public long getRowDataToSourceDiffTime() { - return rowDataToSourceDiffTime; + public long getRawDataToSourceDiffTime() { + return rawDataToSourceDiffTime; } - public void setRowDataToSourceDiffTime(long rowDataToSourceDiffTime) { - this.rowDataToSourceDiffTime = rowDataToSourceDiffTime; + public void setRawDataToSourceDiffTime(long rawDataToSourceDiffTime) { + this.rawDataToSourceDiffTime = rawDataToSourceDiffTime; } public long getSourceToUpdateDiffTime() { @@ -56,11 +60,17 @@ public String toString() { stringBuffer.append("externalSystemID: "+externalSystemID).append(endl); stringBuffer.append("lat: "+lat).append(endl); stringBuffer.append("longX: "+longX).append(endl); - if( rowDataToSourceDiffTime > 0 ) { + if(rawDataToSourceDiffTime < 0 || sourceToUpdateDiffTime < 0 ) { + + logger.error("Error - The clock is not syncronized between hosts rawDataToSourceDiffTime "+rawDataToSourceDiffTime+" sourceToUpdateDiffTime "+sourceToUpdateDiffTime); + return null; + + } + if( rawDataToSourceDiffTime >= 0 && sourceToUpdateDiffTime >= 0) { stringBuffer.append("lastOffsetForRawData: "+lastOffsetForRawData).append(endl); stringBuffer.append("lastOffsetForSource: "+lastOffsetForSource).append(endl); stringBuffer.append("lastOffsetForUpdate: "+lastOffsetForUpdate).append(endl); - stringBuffer.append("The action between topics <"+sourceName+"-row-data> and <"+sourceName+"> took "+ rowDataToSourceDiffTime +" millisec").append(endl); + stringBuffer.append("The action between topics <"+sourceName+"-raw-data> and <"+sourceName+"> took "+ rawDataToSourceDiffTime +" millisec").append(endl); stringBuffer.append("The action between topics <"+sourceName+"> and took "+ sourceToUpdateDiffTime +" millisec").append(endl).append(endl); } return stringBuffer.toString(); diff --git a/src/main/java/org/engine/process/performance/multi/SingleCycle.java b/src/main/java/org/engine/process/performance/multi/SingleCycle.java index 7bf5207..6d1d451 100644 --- a/src/main/java/org/engine/process/performance/multi/SingleCycle.java +++ b/src/main/java/org/engine/process/performance/multi/SingleCycle.java @@ -1,24 +1,20 @@ package org.engine.process.performance.multi; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.List; public class SingleCycle { - - private final Date startTime; + private List messageDataList; public SingleCycle() { - - startTime = new Date(System.currentTimeMillis()); - messageDataList = new ArrayList<>(); + messageDataList = Collections.synchronizedList(new ArrayList()); } public void addMessageData(MessageData messageData) { - - messageDataList.add(messageData); - + messageDataList.add(messageData); } public List getMessageDataList() { diff --git a/src/main/java/org/engine/process/performance/multi/SinglePeriod.java b/src/main/java/org/engine/process/performance/multi/SinglePeriod.java deleted file mode 100644 index dc33741..0000000 --- a/src/main/java/org/engine/process/performance/multi/SinglePeriod.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.engine.process.performance.multi; - -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - -import org.engine.process.performance.ServiceStatus; -import org.engine.process.performance.utils.InnerService; - -public class SinglePeriod { - - private final Date startTime; - private List messageDataList; - - public SinglePeriod() { - - startTime = new Date(System.currentTimeMillis()); - messageDataList = new ArrayList<>(); - } - - public void addMessageData(MessageData messageData) { - - messageDataList.add(messageData); - - } - - public List getMessageDataList() { - return messageDataList; - } - - public void setMessageDataList(List messageDataList) { - this.messageDataList = messageDataList; - } - -} diff --git a/src/main/java/org/engine/process/performance/utils/InnerService.java b/src/main/java/org/engine/process/performance/utils/InnerService.java index 5dc5734..3635887 100644 --- a/src/main/java/org/engine/process/performance/utils/InnerService.java +++ b/src/main/java/org/engine/process/performance/utils/InnerService.java @@ -1,6 +1,9 @@ package org.engine.process.performance.utils; +import org.apache.log4j.Logger; +import org.engine.process.performance.Main; import org.engine.process.performance.ServiceStatus; + public abstract class InnerService { @@ -8,6 +11,8 @@ public abstract class InnerService { abstract protected void postExecute() throws Exception; abstract protected ServiceStatus execute() throws Exception; abstract public String getOutput(); + abstract public void printOutputToFile(String fileLocation); + protected Logger logger = Main.logger; protected Utils utils = new Utils(); @@ -21,10 +26,9 @@ public ServiceStatus run() { return ServiceStatus.SUCCESS; } catch(Exception e) { - System.out.println(e.getStackTrace()); - System.out.println(e.getMessage()); + logger.error(e.getStackTrace()); + logger.error(e.getMessage()); return ServiceStatus.FAILURE; } - } - + } } diff --git a/src/main/java/org/engine/process/performance/utils/StdStats.java b/src/main/java/org/engine/process/performance/utils/StdStats.java deleted file mode 100644 index faa64cf..0000000 --- a/src/main/java/org/engine/process/performance/utils/StdStats.java +++ /dev/null @@ -1,471 +0,0 @@ -package org.engine.process.performance.utils; - - -/****************************************************************************** - * Compilation: javac StdStats.java - * Execution: java StdStats < input.txt - * Dependencies: StdOut.java - * - * Library of statistical functions. - * - * The test client reads an array of real numbers from standard - * input, and computes the minimum, mean, maximum, and - * standard deviation. - * - * The functions all throw a java.lang.IllegalArgumentException - * if the array passed in as an argument is null. - * - * The floating-point functions all return NaN if any input is NaN. - * - * Unlike Math.min() and Math.max(), the min() and max() functions - * do not differentiate between -0.0 and 0.0. - * - * % more tiny.txt - * 5 - * 3.0 1.0 2.0 5.0 4.0 - * - * % java StdStats < tiny.txt - * min 1.000 - * mean 3.000 - * max 5.000 - * std dev 1.581 - * - * Should these funtions use varargs instead of array arguments? - * - ******************************************************************************/ - -/** - * The {@code StdStats} class provides static methods for computing - * statistics such as min, max, mean, sample standard deviation, and - * sample variance. - *

- * For additional documentation, see - * Section 2.2 of - * Computer Science: An Interdisciplinary Approach - * by Robert Sedgewick and Kevin Wayne. - * - * @author Robert Sedgewick - * @author Kevin Wayne - */ -public final class StdStats { - - private StdStats() { } - - /** - * Returns the maximum value in the specified array. - * - * @param a the array - * @return the maximum value in the array {@code a[]}; - * {@code Double.NEGATIVE_INFINITY} if no such value - */ - public static double max(double[] a) { - validateNotNull(a); - - double max = Double.NEGATIVE_INFINITY; - for (int i = 0; i < a.length; i++) { - if (Double.isNaN(a[i])) return Double.NaN; - if (a[i] > max) max = a[i]; - } - return max; - } - - /** - * Returns the maximum value in the specified subarray. - * - * @param a the array - * @param lo the left endpoint of the subarray (inclusive) - * @param hi the right endpoint of the subarray (exclusive) - * @return the maximum value in the subarray {@code a[lo..hi)}; - * {@code Double.NEGATIVE_INFINITY} if no such value - * @throws IllegalArgumentException if {@code a} is {@code null} - * @throws IllegalArgumentException unless {@code (0 <= lo) && (lo < hi) && (hi <= a.length)} - */ - public static double max(double[] a, int lo, int hi) { - validateNotNull(a); - validateSubarrayIndices(lo, hi, a.length); - - double max = Double.NEGATIVE_INFINITY; - for (int i = lo; i < hi; i++) { - if (Double.isNaN(a[i])) return Double.NaN; - if (a[i] > max) max = a[i]; - } - return max; - } - - /** - * Returns the maximum value in the specified array. - * - * @param a the array - * @return the maximum value in the array {@code a[]}; - * {@code Integer.MIN_VALUE} if no such value - */ - public static int max(int[] a) { - validateNotNull(a); - - int max = Integer.MIN_VALUE; - for (int i = 0; i < a.length; i++) { - if (a[i] > max) max = a[i]; - } - return max; - } - - /** - * Returns the minimum value in the specified array. - * - * @param a the array - * @return the minimum value in the array {@code a[]}; - * {@code Double.POSITIVE_INFINITY} if no such value - */ - public static double min(double[] a) { - validateNotNull(a); - - double min = Double.POSITIVE_INFINITY; - for (int i = 0; i < a.length; i++) { - if (Double.isNaN(a[i])) return Double.NaN; - if (a[i] < min) min = a[i]; - } - return min; - } - - /** - * Returns the minimum value in the specified subarray. - * - * @param a the array - * @param lo the left endpoint of the subarray (inclusive) - * @param hi the right endpoint of the subarray (exclusive) - * @return the maximum value in the subarray {@code a[lo..hi)}; - * {@code Double.POSITIVE_INFINITY} if no such value - * @throws IllegalArgumentException if {@code a} is {@code null} - * @throws IllegalArgumentException unless {@code (0 <= lo) && (lo < hi) && (hi <= a.length)} - */ - public static double min(double[] a, int lo, int hi) { - validateNotNull(a); - validateSubarrayIndices(lo, hi, a.length); - - double min = Double.POSITIVE_INFINITY; - for (int i = lo; i < hi; i++) { - if (Double.isNaN(a[i])) return Double.NaN; - if (a[i] < min) min = a[i]; - } - return min; - } - - /** - * Returns the minimum value in the specified array. - * - * @param a the array - * @return the minimum value in the array {@code a[]}; - * {@code Integer.MAX_VALUE} if no such value - */ - public static int min(int[] a) { - validateNotNull(a); - - int min = Integer.MAX_VALUE; - for (int i = 0; i < a.length; i++) { - if (a[i] < min) min = a[i]; - } - return min; - } - - /** - * Returns the average value in the specified array. - * - * @param a the array - * @return the average value in the array {@code a[]}; - * {@code Double.NaN} if no such value - */ - public static double mean(double[] a) { - validateNotNull(a); - - if (a.length == 0) return Double.NaN; - double sum = sum(a); - return sum / a.length; - } - - /** - * Returns the average value in the specified subarray. - * - * @param a the array - * @param lo the left endpoint of the subarray (inclusive) - * @param hi the right endpoint of the subarray (exclusive) - * @return the average value in the subarray {@code a[lo..hi)}; - * {@code Double.NaN} if no such value - * @throws IllegalArgumentException if {@code a} is {@code null} - * @throws IllegalArgumentException unless {@code (0 <= lo) && (lo < hi) && (hi <= a.length)} - */ - public static double mean(double[] a, int lo, int hi) { - validateNotNull(a); - validateSubarrayIndices(lo, hi, a.length); - - int length = hi - lo; - if (length == 0) return Double.NaN; - - double sum = sum(a, lo, hi); - return sum / length; - } - - /** - * Returns the average value in the specified array. - * - * @param a the array - * @return the average value in the array {@code a[]}; - * {@code Double.NaN} if no such value - */ - public static double mean(int[] a) { - validateNotNull(a); - - if (a.length == 0) return Double.NaN; - int sum = sum(a); - return 1.0 * sum / a.length; - } - - /** - * Returns the sample variance in the specified array. - * - * @param a the array - * @return the sample variance in the array {@code a[]}; - * {@code Double.NaN} if no such value - */ - public static double var(double[] a) { - validateNotNull(a); - - if (a.length == 0) return Double.NaN; - double avg = mean(a); - double sum = 0.0; - for (int i = 0; i < a.length; i++) { - sum += (a[i] - avg) * (a[i] - avg); - } - return sum / (a.length - 1); - } - - /** - * Returns the sample variance in the specified subarray. - * - * @param a the array - * @param lo the left endpoint of the subarray (inclusive) - * @param hi the right endpoint of the subarray (exclusive) - * @return the sample variance in the subarray {@code a[lo..hi)}; - * {@code Double.NaN} if no such value - * @throws IllegalArgumentException if {@code a} is {@code null} - * @throws IllegalArgumentException unless {@code (0 <= lo) && (lo < hi) && (hi <= a.length)} - */ - public static double var(double[] a, int lo, int hi) { - validateNotNull(a); - validateSubarrayIndices(lo, hi, a.length); - - int length = hi - lo; - if (length == 0) return Double.NaN; - - double avg = mean(a, lo, hi); - double sum = 0.0; - for (int i = lo; i < hi; i++) { - sum += (a[i] - avg) * (a[i] - avg); - } - return sum / (length - 1); - } - - /** - * Returns the sample variance in the specified array. - * - * @param a the array - * @return the sample variance in the array {@code a[]}; - * {@code Double.NaN} if no such value - */ - public static double var(int[] a) { - validateNotNull(a); - if (a.length == 0) return Double.NaN; - double avg = mean(a); - double sum = 0.0; - for (int i = 0; i < a.length; i++) { - sum += (a[i] - avg) * (a[i] - avg); - } - return sum / (a.length - 1); - } - - /** - * Returns the population variance in the specified array. - * - * @param a the array - * @return the population variance in the array {@code a[]}; - * {@code Double.NaN} if no such value - */ - public static double varp(double[] a) { - validateNotNull(a); - if (a.length == 0) return Double.NaN; - double avg = mean(a); - double sum = 0.0; - for (int i = 0; i < a.length; i++) { - sum += (a[i] - avg) * (a[i] - avg); - } - return sum / a.length; - } - - /** - * Returns the population variance in the specified subarray. - * - * @param a the array - * @param lo the left endpoint of the subarray (inclusive) - * @param hi the right endpoint of the subarray (exclusive) - * @return the population variance in the subarray {@code a[lo..hi)}; - * {@code Double.NaN} if no such value - * @throws IllegalArgumentException if {@code a} is {@code null} - * @throws IllegalArgumentException unless {@code (0 <= lo) && (lo < hi) && (hi <= a.length)} - */ - public static double varp(double[] a, int lo, int hi) { - validateNotNull(a); - validateSubarrayIndices(lo, hi, a.length); - - int length = hi - lo; - if (length == 0) return Double.NaN; - - double avg = mean(a, lo, hi); - double sum = 0.0; - for (int i = lo; i < hi; i++) { - sum += (a[i] - avg) * (a[i] - avg); - } - return sum / length; - } - - /** - * Returns the sample standard deviation in the specified array. - * - * @param a the array - * @return the sample standard deviation in the array {@code a[]}; - * {@code Double.NaN} if no such value - */ - public static double stddev(double[] a) { - validateNotNull(a); - return Math.sqrt(var(a)); - } - - /** - * Returns the sample standard deviation in the specified array. - * - * @param a the array - * @return the sample standard deviation in the array {@code a[]}; - * {@code Double.NaN} if no such value - */ - public static double stddev(int[] a) { - validateNotNull(a); - return Math.sqrt(var(a)); - } - - /** - * Returns the sample standard deviation in the specified subarray. - * - * @param a the array - * @param lo the left endpoint of the subarray (inclusive) - * @param hi the right endpoint of the subarray (exclusive) - * @return the sample standard deviation in the subarray {@code a[lo..hi)}; - * {@code Double.NaN} if no such value - * @throws IllegalArgumentException if {@code a} is {@code null} - * @throws IllegalArgumentException unless {@code (0 <= lo) && (lo < hi) && (hi <= a.length)} - */ - public static double stddev(double[] a, int lo, int hi) { - validateNotNull(a); - validateSubarrayIndices(lo, hi, a.length); - - return Math.sqrt(var(a, lo, hi)); - } - - - /** - * Returns the population standard deviation in the specified array. - * - * @param a the array - * @return the population standard deviation in the array; - * {@code Double.NaN} if no such value - */ - public static double stddevp(double[] a) { - validateNotNull(a); - return Math.sqrt(varp(a)); - } - - /** - * Returns the population standard deviation in the specified subarray. - * - * @param a the array - * @param lo the left endpoint of the subarray (inclusive) - * @param hi the right endpoint of the subarray (exclusive) - * @return the population standard deviation in the subarray {@code a[lo..hi)}; - * {@code Double.NaN} if no such value - * @throws IllegalArgumentException if {@code a} is {@code null} - * @throws IllegalArgumentException unless {@code (0 <= lo) && (lo < hi) && (hi <= a.length)} - */ - public static double stddevp(double[] a, int lo, int hi) { - validateNotNull(a); - validateSubarrayIndices(lo, hi, a.length); - - return Math.sqrt(varp(a, lo, hi)); - } - - /** - * Returns the sum of all values in the specified array. - * - * @param a the array - * @return the sum of all values in the array {@code a[]}; - * {@code 0.0} if no such value - */ - private static double sum(double[] a) { - validateNotNull(a); - double sum = 0.0; - for (int i = 0; i < a.length; i++) { - sum += a[i]; - } - return sum; - } - - /** - * Returns the sum of all values in the specified subarray. - * - * @param a the array - * @param lo the left endpoint of the subarray (inclusive) - * @param hi the right endpoint of the subarray (exclusive) - * @return the sum of all values in the subarray {@code a[lo..hi)}; - * {@code 0.0} if no such value - * @throws IllegalArgumentException if {@code a} is {@code null} - * @throws IllegalArgumentException unless {@code (0 <= lo) && (lo < hi) && (hi <= a.length)} - */ - private static double sum(double[] a, int lo, int hi) { - validateNotNull(a); - validateSubarrayIndices(lo, hi, a.length); - - double sum = 0.0; - for (int i = lo; i < hi; i++) { - sum += a[i]; - } - return sum; - } - - /** - * Returns the sum of all values in the specified array. - * - * @param a the array - * @return the sum of all values in the array {@code a[]}; - * {@code 0.0} if no such value - */ - private static int sum(int[] a) { - validateNotNull(a); - int sum = 0; - for (int i = 0; i < a.length; i++) { - sum += a[i]; - } - return sum; - } - - - // throw an IllegalArgumentException if x is null - // (x is either of type double[] or int[]) - private static void validateNotNull(Object x) { - if (x == null) - throw new IllegalArgumentException("argument is null"); - } - - // throw an exception unless 0 <= lo <= hi <= length - private static void validateSubarrayIndices(int lo, int hi, int length) { - if (lo < 0 || hi > length || lo > hi) - throw new IllegalArgumentException("subarray indices out of bounds: [" + lo + ", " + hi + ")"); - } - - -} \ No newline at end of file diff --git a/src/main/java/org/engine/process/performance/utils/Utils.java b/src/main/java/org/engine/process/performance/utils/Utils.java index 03ad2fe..c023f80 100644 --- a/src/main/java/org/engine/process/performance/utils/Utils.java +++ b/src/main/java/org/engine/process/performance/utils/Utils.java @@ -1,20 +1,11 @@ package org.engine.process.performance.utils; - -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.List; + import java.util.Random; - -import org.engine.process.performance.multi.SingleCycle; - + public class Utils { private static Random random; - private String seperator = ","; - private String endl = "\n"; + private String seperator = ","; static { @@ -54,16 +45,14 @@ public double standardDeviation(double[] array) { for( double i : array) { finalsum = (sum += i); } - average = finalsum/(array.length); - //System.out.println("Average: "+ average); + average = finalsum/(array.length); double sumX=0; double finalsumX=0; double[] x1_average = new double[array.length+1]; for (int i = 0; i and <"+sourceName+">").append(getLine(rowDataToSourceDiffTimeArray)).append("\n"); - output.append("DiffTime between <"+sourceName+"> and ").append(getLine(sourceToUpdateDiffTimeArray)).append("\n"); - output.append("Total DiffTime").append(getLine(totalDiffTimeArray)).append("\n"); - - //printToFile(output.toString(),""); - - return output.toString(); - - - } - - private String getLine(double[] array) { - - StringBuffer output = new StringBuffer(); - for(double d : array ) - { - output.append(seperator).append(d); - } - - return output.toString(); - - } - - public void printToFile(String output, String fileLocation) { - - String dateTime = new SimpleDateFormat("yyyyMMdd_HHmm").format(new Date()); - try( FileWriter fw = new FileWriter(fileLocation+"/enginePeformanceResult_"+dateTime+".log")) - { - fw.write(output+"\n"); - } catch (IOException e) { - - e.printStackTrace(); - } - } - + } }