diff --git a/blocking-bootstrap-producers/blocking-bootstrap-mysql-producer/src/main/java/com/flipkart/aesop/bootstrap/mysql/mapper/impl/DefaultBinLogEventMapper.java b/blocking-bootstrap-producers/blocking-bootstrap-mysql-producer/src/main/java/com/flipkart/aesop/bootstrap/mysql/mapper/impl/DefaultBinLogEventMapper.java index b797441..2635620 100644 --- a/blocking-bootstrap-producers/blocking-bootstrap-mysql-producer/src/main/java/com/flipkart/aesop/bootstrap/mysql/mapper/impl/DefaultBinLogEventMapper.java +++ b/blocking-bootstrap-producers/blocking-bootstrap-mysql-producer/src/main/java/com/flipkart/aesop/bootstrap/mysql/mapper/impl/DefaultBinLogEventMapper.java @@ -86,7 +86,7 @@ public int compare(String o1, String o2) } return (T)new SourceEvent(keyValuePairs, getPkListFromSchema(schema), schema.getName(), schema.getNamespace(), - eventType); + eventType, null); } catch (Exception e) { diff --git a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/AbstractEvent.java b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/AbstractEvent.java index d18913b..18be55b 100644 --- a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/AbstractEvent.java +++ b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/AbstractEvent.java @@ -39,22 +39,27 @@ public abstract class AbstractEvent implements Event /** Event Type. */ protected final DbusOpcode eventType; + /** OldRowMap : Storing changes. + * This field holds field Change values (old values) */ + protected final Map rowChangeMap; + /** * Constructs the basic event using mandatory fields. - * @param fieldsMap + * @param fieldMap * @param primaryKeysSet * @param entityName * @param namespaceName * @param eventType + * @param rowChangeMap */ - public AbstractEvent(Map fieldsMap, Set primaryKeysSet, String entityName, - String namespaceName, DbusOpcode eventType) - { - this.fieldMap = fieldsMap; + public AbstractEvent(Map fieldMap, Set primaryKeysSet, String entityName, + String namespaceName, DbusOpcode eventType, Map rowChangeMap) { + this.fieldMap = fieldMap; this.primaryKeysSet = primaryKeysSet; this.entityName = entityName; this.namespaceName = namespaceName; this.eventType = eventType; + this.rowChangeMap = rowChangeMap; } public Map getFieldMapPair() @@ -102,10 +107,14 @@ public List getPrimaryKeyValues() return primaryKeyValues; } + public Map getRowChangeMap() { + return rowChangeMap; + } + @Override public String toString() { return "AbstractEvent [fieldsMap=" + fieldMap + ", primaryKeysSet=" + primaryKeysSet + ", entityName=" - + entityName + ", namespaceName=" + namespaceName + ", eventType=" + eventType + "]"; + + entityName + ", namespaceName=" + namespaceName + ", eventType=" + eventType + ", rowChangeMap=" + rowChangeMap + "]"; } } diff --git a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/AbstractEventFactory.java b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/AbstractEventFactory.java index 7ea9ecc..e64f0fa 100644 --- a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/AbstractEventFactory.java +++ b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/AbstractEventFactory.java @@ -10,29 +10,25 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * + * *******************************************************************************/ package com.flipkart.aesop.event; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; +import com.flipkart.aesop.utils.AvroSchemaHelper; +import com.flipkart.aesop.utils.AvroToMysqlConverter; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.generic.GenericRecord; - -import com.flipkart.aesop.utils.AvroToMysqlMapper; -import com.flipkart.aesop.utils.MysqlDataTypes; import com.linkedin.databus.client.pub.DbusEventDecoder; -import com.linkedin.databus.core.DbusConstants; import com.linkedin.databus.core.DbusEvent; import com.linkedin.databus.core.DbusOpcode; import com.linkedin.databus2.core.DatabusException; import com.linkedin.databus2.schemas.VersionedSchema; -import com.linkedin.databus2.schemas.utils.SchemaHelper; /** * Abstract Event Factory to be extended by the various types of Event Factory Classes. @@ -41,50 +37,36 @@ */ public abstract class AbstractEventFactory implements EventFactory { - public static String PRIMARY_KEY_FIELD_NAME = "pk"; - public static String META_FIELD_TYPE_NAME = "dbFieldType"; - - /** - * Generates primary key set using the schema. - * @param schema - * @return Primary key set - * @throws DatabusException - */ - private Set getPrimaryKeysSetFromSchema(Schema schema) throws DatabusException - { - Set primaryKeySet = new HashSet(); - String primaryKeyFieldName = SchemaHelper.getMetaField(schema, PRIMARY_KEY_FIELD_NAME); - if (primaryKeyFieldName == null) - { - throw new DatabusException("No primary key specified in the schema"); - } - for (String primaryKey : primaryKeyFieldName.split(DbusConstants.COMPOUND_KEY_SEPARATOR)) - { - primaryKeySet.add(primaryKey.trim()); - } - assert (primaryKeySet.size() >= 1); - return primaryKeySet; - } - public AbstractEvent createEvent(DbusEvent dbusEvent, DbusEventDecoder eventDecoder) throws DatabusException { GenericRecord genericRecord = eventDecoder.getGenericRecord(dbusEvent, null); VersionedSchema writerSchema = eventDecoder.getPayloadSchema(dbusEvent); Schema schema = writerSchema.getSchema(); DbusOpcode eventType = dbusEvent.getOpcode(); - Set primaryKeysSet = getPrimaryKeysSetFromSchema(schema); + Set primaryKeysSet = AvroSchemaHelper.getPrimaryKeysSetFromSchema(schema); String namespaceName = schema.getNamespace(); String entityName = schema.getName(); Map fieldMap = new HashMap(); + Map fieldToMysqlDataType = AvroSchemaHelper.fieldToDataTypeMap(schema); + String rowChangeField = AvroSchemaHelper.getRowChangeField(schema); + Map rowChangeMap = null; + for (Field field : schema.getFields()) { - String mysqlType = SchemaHelper.getMetaField(field, META_FIELD_TYPE_NAME); - fieldMap.put( - field.name(), - AvroToMysqlMapper.avroToMysqlType(genericRecord.get(field.name()), - MysqlDataTypes.valueOf(mysqlType.toUpperCase()))); + Object recordValue = genericRecord.get(field.name()); + if (field.name().equals(rowChangeField)) + { + rowChangeMap = AvroToMysqlConverter.getMysqlTypedObjectForMap((Map) recordValue, + fieldToMysqlDataType); + } + else + { + fieldMap.put(field.name(), + AvroToMysqlConverter.getMysqlTypedObject(fieldToMysqlDataType.get(field.name()), recordValue)); + } } - AbstractEvent event = createEventInstance(fieldMap, primaryKeysSet, entityName, namespaceName, eventType); + AbstractEvent event = createEventInstance(fieldMap, primaryKeysSet, entityName, namespaceName, eventType, + rowChangeMap); return event; } @@ -98,23 +80,25 @@ public AbstractEvent createEvent(DbusEvent dbusEvent, DbusEventDecoder eventDeco * @return Actual Event instance. */ protected abstract AbstractEvent createEventInstance(Map fieldsMap, Set primaryKeysSet, - String entityName, String namespaceName, DbusOpcode eventType); + String entityName, String namespaceName, DbusOpcode eventType, Map rowChangeMap); - public AbstractEvent createEvent(Schema schema, Map keyValuePairs, DbusOpcode eventType) - throws DatabusException + public AbstractEvent createEvent(Schema schema, Map keyValuePairs, DbusOpcode eventType, Map rowChangeMap) + throws DatabusException { String entityName = schema.getName(); String namespaceName = schema.getNamespace(); - Set primaryKeysSet = getPrimaryKeysSetFromSchema(schema); + Set primaryKeysSet = AvroSchemaHelper.getPrimaryKeysSetFromSchema(schema); - AbstractEvent event = createEventInstance(keyValuePairs, primaryKeysSet, entityName, namespaceName, eventType); + AbstractEvent event = + createEventInstance(keyValuePairs, primaryKeysSet, entityName, namespaceName, eventType, rowChangeMap); return event; } public AbstractEvent createEvent(Map fieldsMap, Set primaryFieldsSet, String entityName, - String namespaceName, DbusOpcode eventType) + String namespaceName, DbusOpcode eventType, Map rowChangeMap) { - AbstractEvent event = createEventInstance(fieldsMap, primaryFieldsSet, entityName, namespaceName, eventType); + AbstractEvent event = + createEventInstance(fieldsMap, primaryFieldsSet, entityName, namespaceName, eventType, rowChangeMap); return event; } diff --git a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/Event.java b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/Event.java index acea68b..2f32fbb 100644 --- a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/Event.java +++ b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/Event.java @@ -76,4 +76,10 @@ public interface Event * @return {@link List} of Primary Keys */ public List getPrimaryKeyValues(); + + /** + * Gets the change Map which store old values of record. + * @return rowChangeMap + */ + public Map getRowChangeMap(); } diff --git a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/EventFactory.java b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/EventFactory.java index 88e3979..f49baa2 100644 --- a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/EventFactory.java +++ b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/EventFactory.java @@ -51,7 +51,8 @@ public interface EventFactory * @return {@link AbstractEvent} * @throws DatabusException */ - public AbstractEvent createEvent(Schema schema, Map fieldMap, DbusOpcode eventType) + public AbstractEvent createEvent(Schema schema, Map fieldMap, DbusOpcode eventType, + Map rowChangeMap) throws DatabusException; /** @@ -61,8 +62,9 @@ public AbstractEvent createEvent(Schema schema, Map fieldMap, Db * @param entityName * @param namespaceName * @param eventType + * @param rowChangeMap * @return {@link AbstractEvent} */ public AbstractEvent createEvent(Map fieldsMap, Set primaryFieldsSet, String entityName, - String namespaceName, DbusOpcode eventType); + String namespaceName, DbusOpcode eventType, Map rowChangeMap); } diff --git a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/implementation/DestinationEvent.java b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/implementation/DestinationEvent.java index e772793..bd26ae9 100644 --- a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/implementation/DestinationEvent.java +++ b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/implementation/DestinationEvent.java @@ -36,10 +36,11 @@ public class DestinationEvent extends AbstractEvent * @param entityName * @param namespaceName * @param eventType + * @param rowChangeMap */ public DestinationEvent(Map fieldsMap, Set primaryKeysSet, String entityName, - String namespaceName, DbusOpcode eventType) + String namespaceName, DbusOpcode eventType, Map rowChangeMap) { - super(fieldsMap, primaryKeysSet, entityName, namespaceName, eventType); + super(fieldsMap, primaryKeysSet, entityName, namespaceName, eventType, rowChangeMap); } } diff --git a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/implementation/DestinationEventFactory.java b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/implementation/DestinationEventFactory.java index 9606fb3..eab654b 100644 --- a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/implementation/DestinationEventFactory.java +++ b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/implementation/DestinationEventFactory.java @@ -29,8 +29,8 @@ public class DestinationEventFactory extends AbstractEventFactory fieldsMap, Set primaryKeysSet, - String entityName, String namespaceName, DbusOpcode eventType) + String entityName, String namespaceName, DbusOpcode eventType, Map rowChangeMap) { - return new DestinationEvent(fieldsMap, primaryKeysSet, entityName, namespaceName, eventType); + return new DestinationEvent(fieldsMap, primaryKeysSet, entityName, namespaceName, eventType, rowChangeMap); } } diff --git a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/implementation/SourceEvent.java b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/implementation/SourceEvent.java index 71578a9..039876a 100644 --- a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/implementation/SourceEvent.java +++ b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/implementation/SourceEvent.java @@ -36,10 +36,11 @@ public class SourceEvent extends AbstractEvent * @param entityName * @param namespaceName * @param eventType + * @param rowChangeMap */ public SourceEvent(Map fieldsMap, Set primaryKeysSet, String entityName, - String namespaceName, DbusOpcode eventType) + String namespaceName, DbusOpcode eventType, Map rowChangeMap) { - super(fieldsMap, primaryKeysSet, entityName, namespaceName, eventType); + super(fieldsMap, primaryKeysSet, entityName, namespaceName, eventType, rowChangeMap); } } diff --git a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/implementation/SourceEventFactory.java b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/implementation/SourceEventFactory.java index 813b63c..4fb2cd1 100644 --- a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/implementation/SourceEventFactory.java +++ b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/event/implementation/SourceEventFactory.java @@ -18,6 +18,7 @@ import java.util.Map; import java.util.Set; +import com.flipkart.aesop.event.AbstractEvent; import com.flipkart.aesop.event.AbstractEventFactory; import com.linkedin.databus.core.DbusOpcode; @@ -29,8 +30,8 @@ public class SourceEventFactory extends AbstractEventFactory { @Override protected SourceEvent createEventInstance(Map fieldsMap, Set primaryKeysSet, - String entityName, String namespaceName, DbusOpcode eventType) + String entityName, String namespaceName, DbusOpcode eventType, Map rowChangeMap) { - return new SourceEvent(fieldsMap, primaryKeysSet, entityName, namespaceName, eventType); + return new SourceEvent(fieldsMap, primaryKeysSet, entityName, namespaceName, eventType, rowChangeMap); } } diff --git a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/mapper/implementation/MapperType.java b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/mapper/implementation/MapperType.java index ab3b506..ed5687e 100644 --- a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/mapper/implementation/MapperType.java +++ b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/mapper/implementation/MapperType.java @@ -70,7 +70,8 @@ public List map(Config config, String configRoot, AbstractEvent s AbstractEvent destinationEvent = destinationEventFactory.createEvent(sourceEvent.getFieldMapPair(), sourceEvent.getPrimaryKeySet(), - sourceEvent.getEntityName(), sourceEvent.getNamespaceName(), sourceEvent.getEventType()); + sourceEvent.getEntityName(), sourceEvent.getNamespaceName(), sourceEvent.getEventType(), + sourceEvent.getRowChangeMap()); return Arrays.asList(destinationEvent); } @@ -106,7 +107,7 @@ public List map(Config config, String configRoot, AbstractEvent s AbstractEvent destinationEvent = destinationEventFactory.createEvent(sourceEvent.getFieldMapPair(), sourceEvent.getPrimaryKeySet(), - sourceEvent.getEntityName(), sourceEvent.getNamespaceName(), sourceEvent.getEventType()); + sourceEvent.getEntityName(), sourceEvent.getNamespaceName(), sourceEvent.getEventType(), sourceEvent.getRowChangeMap()); return Arrays.asList(destinationEvent); } @@ -170,7 +171,8 @@ public List map(Config config, String configRoot, AbstractEvent s AbstractEvent destinationEvent = destinationEventFactory.createEvent(destinationEventColumnMap, primaryKeySet, - destinationEntity, destinationNamespace, sourceEvent.getEventType()); + destinationEntity, destinationNamespace, sourceEvent.getEventType(), + sourceEvent.getRowChangeMap()); destinationEventList.add(destinationEvent); } diff --git a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/utils/AvroSchemaHelper.java b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/utils/AvroSchemaHelper.java new file mode 100644 index 0000000..0f3b0e5 --- /dev/null +++ b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/utils/AvroSchemaHelper.java @@ -0,0 +1,83 @@ +/******************************************************************************* + * + * Copyright 2012-2015, the original author or authors. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obta a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + *******************************************************************************/ + +package com.flipkart.aesop.utils; + +import com.linkedin.databus.core.DbusConstants; +import com.linkedin.databus2.core.DatabusException; +import com.linkedin.databus2.schemas.utils.SchemaHelper; +import org.apache.avro.Schema; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public abstract class AvroSchemaHelper +{ + private static final String META_ROW_CHANGE_FIELD = "rowChangeField"; + private static final String META_FIELD_TYPE_NAME = "dbFieldType"; + private static final String PRIMARY_KEY_FIELD_NAME = "pk"; + + /** + * Returns the fieldname marked as row change field from schema meta + * @param schema + * @return rowChangeFieldName + */ + public static String getRowChangeField(Schema schema) + { + return SchemaHelper.getMetaField(schema, META_ROW_CHANGE_FIELD); + } + + /** + * Generates a hash storing fieldName to intended MysqlDataType from schema + * @param schema + * @return Map containg FieldName to Mysql Type mapping + */ + public static Map fieldToDataTypeMap(Schema schema) + { + Map map = new HashMap (); + for (Schema.Field field : schema.getFields()) + { + String mysqlType = SchemaHelper.getMetaField(field, META_FIELD_TYPE_NAME); + map.put(field.name(), mysqlType); + } + return map; + } + + /** + * Generates primary key set using the schema. + * @param schema + * @return Primary key set + * @throws DatabusException + */ + public static Set getPrimaryKeysSetFromSchema(Schema schema) throws DatabusException + { + String primaryKeyFieldName = SchemaHelper.getMetaField(schema, PRIMARY_KEY_FIELD_NAME); + if (primaryKeyFieldName == null) + { + throw new DatabusException("No primary key specified in the schema"); + } + + Set primaryKeySet = new HashSet(); + for (String primaryKey : primaryKeyFieldName.split(DbusConstants.COMPOUND_KEY_SEPARATOR)) + { + primaryKeySet.add(primaryKey.trim()); + } + assert (primaryKeySet.size() >= 1); + return primaryKeySet; + } +} + diff --git a/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/utils/AvroToMysqlConverter.java b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/utils/AvroToMysqlConverter.java new file mode 100644 index 0000000..486f366 --- /dev/null +++ b/consumers/client-event-consumer/src/main/java/com/flipkart/aesop/utils/AvroToMysqlConverter.java @@ -0,0 +1,56 @@ +/******************************************************************************* + * + * Copyright 2012-2015, the original author or authors. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obta a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + *******************************************************************************/ + +package com.flipkart.aesop.utils; + +import java.util.HashMap; +import java.util.Map; + +public abstract class AvroToMysqlConverter +{ + /** + * This returns mysql object from avro object and intended mysql datatype + * @param mysqlType + * @param fieldValue + * @return MysqlTypedObject using AvroToMysqlMapper + */ + public static Object getMysqlTypedObject(String mysqlType, Object fieldValue) + { + return AvroToMysqlMapper.avroToMysqlType(fieldValue, MysqlDataTypes.valueOf(mysqlType.toUpperCase())); + } + + /** + * This specifically handles rowChangeField which comes in form HashMap and converting each key/value to Mysql type + * @param fieldMap + * @param fieldToMysqlDataType + * @return MysqlTypedObject using AvroToMysqlMapper + */ + public static Map getMysqlTypedObjectForMap(Map fieldMap, + Map fieldToMysqlDataType) + { + Map mysqlTypedObject = null; + if (fieldMap != null) + { + mysqlTypedObject = new HashMap(fieldMap.size()); + for (Object key : fieldMap.keySet()) + { + String fieldName = key.toString(); + String sqlType = fieldToMysqlDataType.get(fieldName); + mysqlTypedObject.put(fieldName, AvroToMysqlConverter.getMysqlTypedObject(sqlType, fieldMap.get(key))); + } + } + return mysqlTypedObject; + } +} diff --git a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/MysqlEventProducer.java b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/MysqlEventProducer.java index 931d5a4..2f71bba 100644 --- a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/MysqlEventProducer.java +++ b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/MysqlEventProducer.java @@ -75,7 +75,12 @@ public class MysqlEventProducer extends AbstractEventPr protected SchemaChangeEventProcessor schemaChangeEventProcessor; /** The SCN generator implementation, initialized to the default simple implementation*/ protected SCNGenerator scnGenerator = new NaiveSCNGenerator(); + /** Stores the flag value for propogating old/changed row values. Defaulting it to false */ + private boolean oldValueRequired; + public void setOldValueRequired(boolean oldValueRequired) { + this.oldValueRequired = oldValueRequired; + } /** * Interface method implementation. Checks for mandatory dependencies and creates the Open Replicator * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() @@ -181,7 +186,7 @@ public MysqlAvroEventManager buildEventManagers(LogicalSourceStaticConfig sou PhysicalSourceStaticConfig pConfig) throws DatabusException, EventCreationException, UnsupportedKeyException, InvalidConfigException { - MysqlAvroEventManager manager = new MysqlAvroEventManager(sourceConfig.getId(), (short) pConfig.getId()); + MysqlAvroEventManager manager = new MysqlAvroEventManager(sourceConfig.getId(), (short) pConfig.getId(), this.oldValueRequired); return manager; } diff --git a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/avro/MysqlAvroEventManager.java b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/avro/MysqlAvroEventManager.java index 7c65a38..052ce46 100644 --- a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/avro/MysqlAvroEventManager.java +++ b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/avro/MysqlAvroEventManager.java @@ -14,11 +14,12 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; +import com.flipkart.aesop.runtime.producer.avro.exception.InvalidAvroSchemaException; +import com.flipkart.aesop.runtime.producer.avro.utils.AvroSchemaHelper; +import com.google.code.or.common.glossary.Pair; +import com.google.common.base.Objects; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; @@ -53,7 +54,7 @@ /** * MysqlAvroEventManager deals with avro events and provides avro specific functionalities * such as framing an avro record and appending avro events to event buffer. - * + * *
  * 
    *
  • Provides means to serialize avro events
  • @@ -75,11 +76,15 @@ public class MysqlAvroEventManager /** physical source id */ protected final int pSourceId; + /** flag for getting old values */ + private final boolean oldValueRequired; + /** constructor for this class */ - public MysqlAvroEventManager(int lSourceId, int pSourceId) throws DatabusException + public MysqlAvroEventManager(int lSourceId, int pSourceId, boolean oldValueRequired) throws DatabusException { this.lSourceId = lSourceId; this.pSourceId = pSourceId; + this.oldValueRequired = oldValueRequired; } /** @@ -120,38 +125,58 @@ public int createAndAppendEvent(DbChangeEntry changeEntry, DbusEventBufferAppend * Sample : * [header=BinlogEventV4HeaderImpl[timestamp=1394108600000,eventType=25,serverId=1,eventLength=85 * ,nextPosition=1501,flags=0,timestampOfReceipt=1394108600580] - * @param rowList contains list of all mutated rows + * @param pairs contains list of all mutated rows * @param dbusOpCode code indicating type of operation such as insertion, update or delete - * @param binLogEventMapper mapper corresponding to the source to which event belongs to + * @param binLogEventMappers mapper corresponding to the source to which event belongs to * @param schema schema corresponding to the source to which event belongs to * @param scn system change number * @return List list of change records */ - public List frameAvroRecord(final BinlogEventV4Header eventHeader, final List rowList, + public List frameAvroRecord(final BinlogEventV4Header eventHeader, final List> pairs, final DbusOpcode dbusOpCode, Map> binLogEventMappers, final Schema schema, final long scn) { List entryList = new ArrayList(); - LOGGER.debug("Received frame avro record request for " + eventHeader); + LOGGER.debug("Received frame avro record request for {}", eventHeader); try { final long timestampInNanos = eventHeader.getTimestamp() * 1000000L; final boolean isReplicated = false; - for (Row row : rowList) + for (Pair pair : pairs) { - List columns = row.getColumns(); + Row oldRow = (Row) pair.getBefore(); + Row newRow = (Row) pair.getAfter(); // getting the appropriate bin log mapper for the logicalSource BinLogEventMapper binLogEventMapper = - binLogEventMappers.get(lSourceId) == null ? new DefaultBinLogEventMapper(new ORToAvroMapper()) - : binLogEventMappers.get(lSourceId); + binLogEventMappers.get(lSourceId) == null ? new DefaultBinLogEventMapper(new ORToAvroMapper()) + : binLogEventMappers.get(lSourceId); + GenericRecord newRecord = binLogEventMapper.mapBinLogEvent(eventHeader, newRow, dbusOpCode, schema); + + if (this.oldValueRequired) + { + String rowChangeFieldName = AvroSchemaHelper.getRowChangeField(schema); + if (rowChangeFieldName == null) + { + LOGGER.error("Schema Configuration Mismatch: oldValueRequired flag is set but no field in schema found"); + throw new InvalidAvroSchemaException("oldValueRequired flag is set but no field in schema found"); + } - GenericRecord genericRecord = binLogEventMapper.mapBinLogEvent(eventHeader, row, dbusOpCode, schema); - List keyPairList = generateKeyPair(columns, schema); + Map changedOldValues = null; + if(oldRow != null) + { + GenericRecord oldRecord = binLogEventMapper.mapBinLogEvent(eventHeader, oldRow, dbusOpCode, schema); + changedOldValues = MysqlAvroEventManager.calculateChange(oldRecord, newRecord); + } + newRecord.put(rowChangeFieldName, changedOldValues); + } + + List keyPairList = generateKeyPair(newRow.getColumns(), schema); + LOGGER.debug("Record value in the event: {}", newRecord); DbChangeEntry dbChangeEntry = - new DbChangeEntry(scn, timestampInNanos, genericRecord, dbusOpCode, isReplicated, schema, - keyPairList); + new DbChangeEntry(scn, timestampInNanos, newRecord, dbusOpCode, isReplicated, schema, + keyPairList); entryList.add(dbChangeEntry); - LOGGER.debug("Successfully Processed the Row " + dbChangeEntry); + LOGGER.debug("Successfully Processed the Row {}", dbChangeEntry); } } catch (NoSuchSchemaException ne) @@ -167,6 +192,31 @@ public List frameAvroRecord(final BinlogEventV4Header eventHeader return entryList; } + /** + * This method scans new and old records and calculates changes. + * + * @param oldRecord is a generic record representing old-record before change + * @param newRecord is a generic record representing new-record after change + * @return HashMap this contains changes in form of + */ + private static Map calculateChange(GenericRecord oldRecord, GenericRecord newRecord) + { + Map changedOldValue = new HashMap(); + for (Schema.Field field : newRecord.getSchema().getFields()) + { + String fieldName = field.name(); + Object oldValue = oldRecord.get(fieldName); + Object newValue = newRecord.get(fieldName); + + if (!Objects.equal(oldValue, newValue)) + { + changedOldValue.put(fieldName, oldValue); + } + } + assert (changedOldValue.isEmpty() == false) : "Old and New Record values are same or equals not working as expected"; + return changedOldValue; + } + /** * Serializes avro record into byte array * @param record generic record diff --git a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/avro/exception/InvalidAvroSchemaException.java b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/avro/exception/InvalidAvroSchemaException.java new file mode 100644 index 0000000..2f98356 --- /dev/null +++ b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/avro/exception/InvalidAvroSchemaException.java @@ -0,0 +1,21 @@ +package com.flipkart.aesop.runtime.producer.avro.exception; + +/** + * Created by akshit.agarwal on 19/04/16. + */ + +public class InvalidAvroSchemaException extends RuntimeException +{ + public InvalidAvroSchemaException() { + } + + public InvalidAvroSchemaException(String message, Throwable cause) { + super(message, cause); + } + public InvalidAvroSchemaException(String message) { + super(message); + } + public InvalidAvroSchemaException(Throwable cause) { + super(cause); + } +} diff --git a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/avro/utils/AvroSchemaHelper.java b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/avro/utils/AvroSchemaHelper.java new file mode 100644 index 0000000..ba993f9 --- /dev/null +++ b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/avro/utils/AvroSchemaHelper.java @@ -0,0 +1,35 @@ +/* + * Copyright 2012-2015, the original author or authors. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.flipkart.aesop.runtime.producer.avro.utils; + +import com.linkedin.databus2.schemas.utils.SchemaHelper; +import org.apache.avro.Schema; +import java.util.List; + +/** + * Created by akshit.agarwal on 14/03/16. + */ +public abstract class AvroSchemaHelper +{ + private static final String ROW_CHANGE_META_FIELD = "rowChangeField"; + + /** + * @param schema + * @return Fieldname representing row change + */ + public static String getRowChangeField(Schema schema) + { + return SchemaHelper.getMetaField(schema, ROW_CHANGE_META_FIELD); + } +} diff --git a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/DeleteEventProcessor.java b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/DeleteEventProcessor.java index 12d8682..7b0d986 100644 --- a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/DeleteEventProcessor.java +++ b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/DeleteEventProcessor.java @@ -12,6 +12,8 @@ */ package com.flipkart.aesop.runtime.producer.eventprocessor.impl; +import com.google.code.or.common.glossary.Pair; +import com.google.code.or.common.glossary.Row; import org.trpr.platform.core.impl.logging.LogFactory; import org.trpr.platform.core.spi.logging.Logger; @@ -21,6 +23,9 @@ import com.google.code.or.binlog.impl.event.DeleteRowsEvent; import com.linkedin.databus.core.DbusOpcode; +import java.util.ArrayList; +import java.util.List; + /** * The DeleteEventProcessor processes DeleteRowsEvent from source. This event gets called when ever few * row/(s) are deleted at the source. @@ -41,14 +46,24 @@ public void process(BinlogEventV4 event, OpenReplicationListener listener) throw { if (!listener.getMysqlTransactionManager().isBeginTxnSeen()) { - LOGGER.warn("Skipping event (" + event + ") as this is before the start of first transaction"); + LOGGER.warn("Skipping event ({}) as this is before the start of first transaction", event); return; } - LOGGER.debug("Delete Event Received : " + event); + LOGGER.debug("Delete Event Received : {}", event); DeleteRowsEvent deleteRowsEvent = (DeleteRowsEvent) event; + + List rowList = deleteRowsEvent.getRows(); + List> listOfPairs = new ArrayList>(rowList.size()); + + for (Row row : rowList) + { + /* null is added in before to maintain consistency between with update and further in code we dont need to + *differentiate update and delete */ + listOfPairs.add(new Pair(null, row)); + } + listener.getMysqlTransactionManager().performChanges(deleteRowsEvent.getTableId(), deleteRowsEvent.getHeader(), - deleteRowsEvent.getRows(), DbusOpcode.DELETE); - LOGGER.debug("Delete Successful for " + event.getHeader().getEventLength() + " . Data deleted : " - + deleteRowsEvent.getRows()); + listOfPairs, DbusOpcode.DELETE); + LOGGER.debug("Delete Successful for {} . Data deleted : {}", event.getHeader().getEventLength(), rowList); } } diff --git a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/DeleteEventV2Processor.java b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/DeleteEventV2Processor.java index 5d034cb..01de46b 100644 --- a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/DeleteEventV2Processor.java +++ b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/DeleteEventV2Processor.java @@ -1,5 +1,7 @@ package com.flipkart.aesop.runtime.producer.eventprocessor.impl; +import com.google.code.or.common.glossary.Pair; +import com.google.code.or.common.glossary.Row; import org.trpr.platform.core.impl.logging.LogFactory; import org.trpr.platform.core.spi.logging.Logger; @@ -9,6 +11,9 @@ import com.google.code.or.binlog.impl.event.DeleteRowsEventV2; import com.linkedin.databus.core.DbusOpcode; +import java.util.ArrayList; +import java.util.List; + /** * The DeleteEventV2Processor processes DeleteRowsEventV2 from source. This event gets called when ever few * row/(s) are deleted at the source. @@ -28,14 +33,23 @@ public void process(BinlogEventV4 event, OpenReplicationListener listener) throw { if (!listener.getMysqlTransactionManager().isBeginTxnSeen()) { - LOGGER.warn("Skipping event (" + event + ") as this is before the start of first transaction"); + LOGGER.warn("Skipping event ({}) as this is before the start of first transaction", event); return; } - LOGGER.debug("Delete Event Received : " + event); + LOGGER.debug("Delete Event Received : {}", event); DeleteRowsEventV2 deleteRowsEvent = (DeleteRowsEventV2) event; + List rowList = deleteRowsEvent.getRows(); + List> listOfPairs = new ArrayList>(rowList.size()); + + for (Row row : rowList) + { + /* null is added in before to maintain consistency between with update and further in code we dont need to + *differentiate update and delete */ + listOfPairs.add(new Pair(null, row)); + } + listener.getMysqlTransactionManager().performChanges(deleteRowsEvent.getTableId(), deleteRowsEvent.getHeader(), - deleteRowsEvent.getRows(), DbusOpcode.DELETE); - LOGGER.debug("Delete Successful for " + event.getHeader().getEventLength() + " . Data deleted : " - + deleteRowsEvent.getRows()); + listOfPairs, DbusOpcode.DELETE); + LOGGER.debug("Delete Successful for {} . Data deleted : {}", event.getHeader().getEventLength(), rowList); } } diff --git a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/InsertEventProcessor.java b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/InsertEventProcessor.java index 0218567..1b49a10 100644 --- a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/InsertEventProcessor.java +++ b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/InsertEventProcessor.java @@ -12,6 +12,8 @@ */ package com.flipkart.aesop.runtime.producer.eventprocessor.impl; +import com.google.code.or.common.glossary.Pair; +import com.google.code.or.common.glossary.Row; import org.trpr.platform.core.impl.logging.LogFactory; import org.trpr.platform.core.spi.logging.Logger; @@ -21,6 +23,9 @@ import com.google.code.or.binlog.impl.event.WriteRowsEvent; import com.linkedin.databus.core.DbusOpcode; +import java.util.ArrayList; +import java.util.List; + /** * The InsertEventProcessor processes WriteRowsEvent from source. This event is received whenever insertion * operation happens at the source. @@ -41,15 +46,23 @@ public void process(BinlogEventV4 event, OpenReplicationListener listener) throw { if (!listener.getMysqlTransactionManager().isBeginTxnSeen()) { - LOGGER.warn("Skipping event (" + event + ") as this is before the start of first transaction"); + LOGGER.warn("Skipping event ({}) as this is before the start of first transaction", event); return; } - LOGGER.debug("Insert Event Received : " + event); + LOGGER.debug("Insert Event Received : {}", event); WriteRowsEvent wre = (WriteRowsEvent) event; - listener.getMysqlTransactionManager().performChanges(wre.getTableId(), wre.getHeader(), wre.getRows(), - DbusOpcode.UPSERT); - LOGGER.debug("Insertion Successful for " + event.getHeader().getEventLength() + " . Data inserted : " - + wre.getRows()); + List rowList = wre.getRows(); + List> listOfPairs = new ArrayList>(rowList.size()); + + for (Row row : rowList) + { + //Inserting Old Row as null + listOfPairs.add(new Pair(null, row)); + } + + listener.getMysqlTransactionManager().performChanges(wre.getTableId(), wre.getHeader(), listOfPairs, + DbusOpcode.UPSERT); + LOGGER.debug("Insertion Successful for {} . Data inserted : {}", event.getHeader().getEventLength(), rowList); } } diff --git a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/InsertEventV2Processor.java b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/InsertEventV2Processor.java index 5293a72..1c5e311 100644 --- a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/InsertEventV2Processor.java +++ b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/InsertEventV2Processor.java @@ -1,5 +1,7 @@ package com.flipkart.aesop.runtime.producer.eventprocessor.impl; +import com.google.code.or.common.glossary.Pair; +import com.google.code.or.common.glossary.Row; import org.trpr.platform.core.impl.logging.LogFactory; import org.trpr.platform.core.spi.logging.Logger; @@ -9,6 +11,9 @@ import com.google.code.or.binlog.impl.event.WriteRowsEventV2; import com.linkedin.databus.core.DbusOpcode; +import java.util.ArrayList; +import java.util.List; + /** * The InsertEvent2Processor processes WriteRowsEventV2 from source. This event is received if there is any * insert operation on the source. @@ -28,15 +33,22 @@ public void process(BinlogEventV4 event, OpenReplicationListener listener) throw { if (!listener.getMysqlTransactionManager().isBeginTxnSeen()) { - LOGGER.warn("Skipping event (" + event + ") as this is before the start of first transaction"); + LOGGER.warn("Skipping event ({}) as this is before the start of first transaction", event); return; } - LOGGER.debug("Insert Event Received : " + event); + LOGGER.debug("Insert Event Received : {}", event); WriteRowsEventV2 wre = (WriteRowsEventV2) event; - listener.getMysqlTransactionManager().performChanges(wre.getTableId(), wre.getHeader(), wre.getRows(), - DbusOpcode.UPSERT); - LOGGER.debug("Insertion Successful for " + event.getHeader().getEventLength() + " . Data inserted : " - + wre.getRows()); + List rowList = wre.getRows(); + List> listOfPairs = new ArrayList>(rowList.size()); + + for (Row row : rowList) + { + listOfPairs.add(new Pair(null, row)); + } + + listener.getMysqlTransactionManager().performChanges(wre.getTableId(), wre.getHeader(), listOfPairs, + DbusOpcode.UPSERT); + LOGGER.debug("Insertion Successful for {} . Data inserted : {}", event.getHeader().getEventLength(), rowList); } } diff --git a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/UpdateEventProcessor.java b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/UpdateEventProcessor.java index eca86f3..7e388fb 100644 --- a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/UpdateEventProcessor.java +++ b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/UpdateEventProcessor.java @@ -12,7 +12,6 @@ */ package com.flipkart.aesop.runtime.producer.eventprocessor.impl; -import java.util.ArrayList; import java.util.List; import org.trpr.platform.core.impl.logging.LogFactory; @@ -54,13 +53,8 @@ public void process(BinlogEventV4 event, OpenReplicationListener listener) throw LOGGER.debug("Update Event Received : " + event); UpdateRowsEvent updateRowsEvent = (UpdateRowsEvent) event; List> listOfPairs = updateRowsEvent.getRows(); - List rowList = new ArrayList(listOfPairs.size()); - for (Pair pair : listOfPairs) - { - Row row = pair.getAfter(); - rowList.add(row); - } - manager.performChanges(updateRowsEvent.getTableId(), updateRowsEvent.getHeader(), rowList, DbusOpcode.UPSERT); - LOGGER.debug("Update Successful for " + event.getHeader().getEventLength() + " . Data updated : " + rowList); + + manager.performChanges(updateRowsEvent.getTableId(), updateRowsEvent.getHeader(), listOfPairs, DbusOpcode.UPSERT); + LOGGER.debug("Update Successful for {} . Data updated : {}", event.getHeader().getEventLength(), listOfPairs); } } diff --git a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/UpdateEventV2Processor.java b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/UpdateEventV2Processor.java index 8775181..9d28d55 100644 --- a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/UpdateEventV2Processor.java +++ b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/eventprocessor/impl/UpdateEventV2Processor.java @@ -1,6 +1,5 @@ package com.flipkart.aesop.runtime.producer.eventprocessor.impl; -import java.util.ArrayList; import java.util.List; import org.trpr.platform.core.impl.logging.LogFactory; @@ -42,13 +41,8 @@ public void process(BinlogEventV4 event, OpenReplicationListener listener) throw LOGGER.debug("Update Event Received : " + event); UpdateRowsEventV2 updateRowsEvent = (UpdateRowsEventV2) event; List> listOfPairs = updateRowsEvent.getRows(); - List rowList = new ArrayList(listOfPairs.size()); - for (Pair pair : listOfPairs) - { - Row row = pair.getAfter(); - rowList.add(row); - } - manager.performChanges(updateRowsEvent.getTableId(), updateRowsEvent.getHeader(), rowList, DbusOpcode.UPSERT); - LOGGER.debug("Update Successful for " + event.getHeader().getEventLength() + " . Data updated : " + rowList); + + manager.performChanges(updateRowsEvent.getTableId(), updateRowsEvent.getHeader(), listOfPairs, DbusOpcode.UPSERT); + LOGGER.debug("Update Successful for {} . Data updated : {}", event.getHeader().getEventLength(), listOfPairs); } } diff --git a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/mapper/impl/DefaultBinLogEventMapper.java b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/mapper/impl/DefaultBinLogEventMapper.java index 52b05da..be1f777 100644 --- a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/mapper/impl/DefaultBinLogEventMapper.java +++ b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/mapper/impl/DefaultBinLogEventMapper.java @@ -19,6 +19,7 @@ import java.util.Comparator; import java.util.List; +import com.flipkart.aesop.runtime.producer.avro.utils.AvroSchemaHelper; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -78,6 +79,7 @@ public T mapBinLogEvent(BinlogEventV4Header header, Row row, DbusOpcode databusC GenericRecord record = new GenericData.Record(schema); List columns = row.getColumns(); List orderedFields; + String rowChangeField = AvroSchemaHelper.getRowChangeField(schema); try { @@ -97,7 +99,7 @@ public int compare(String o1, String o2) int cnt = 0; for (Schema.Field field : orderedFields) { - Column column = columns.get(cnt); + Column column = field.name().equals(rowChangeField) ? null : columns.get(cnt); record.put(field.name(), column == null ? null : orToAvroMapper.orToAvroType(column)); cnt++; } diff --git a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/schema/eventprocessor/impl/UpdateSchemaChangeEventProcessor.java b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/schema/eventprocessor/impl/UpdateSchemaChangeEventProcessor.java index a1d844c..0799bb7 100644 --- a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/schema/eventprocessor/impl/UpdateSchemaChangeEventProcessor.java +++ b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/schema/eventprocessor/impl/UpdateSchemaChangeEventProcessor.java @@ -7,6 +7,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.flipkart.aesop.runtime.producer.avro.utils.AvroSchemaHelper; import org.apache.avro.Schema; import org.trpr.platform.core.impl.logging.LogFactory; import org.trpr.platform.core.spi.logging.Logger; @@ -80,12 +81,20 @@ public void process(QueryEvent queryEvent) throws Exception @Override public void process(String databaseName, String tableName) throws Exception { - String newSchemaJson = schemaGenerator.generateSchema(databaseName, tableName); String tableUri = databaseName.toLowerCase() + "." + tableName.toLowerCase(); - VersionedSchema olderSchema = + VersionedSchema olderVersionedSchema = schemaRegistryService.fetchLatestVersionedSchemaBySourceName(tableUriToSrcNameMap.get(tableUri)); - short olderVersion = (olderSchema != null) ? (short) olderSchema.getVersion() : 0; + short olderVersion = 0; + if (olderVersionedSchema != null) + { + Schema oldSchema = olderVersionedSchema.getSchema(); + schemaGenerator.setRowChangeFieldName(AvroSchemaHelper.getRowChangeField(oldSchema)); + olderVersion = (short) olderVersionedSchema.getVersion(); + } + + String newSchemaJson = schemaGenerator.generateSchema(databaseName, tableName); + /** if the olderVersion is at its Max value then overwrite it */ Short newVersion = olderVersion == Short.MAX_VALUE ? olderVersion : (short) (olderVersion + 1); VersionedSchema newSchema = diff --git a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/txnprocessor/MysqlTransactionManager.java b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/txnprocessor/MysqlTransactionManager.java index 86b2b1d..ac120a9 100644 --- a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/txnprocessor/MysqlTransactionManager.java +++ b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/txnprocessor/MysqlTransactionManager.java @@ -20,6 +20,7 @@ import com.google.code.or.binlog.BinlogEventV4Header; import com.google.code.or.common.glossary.Row; +import com.google.code.or.common.glossary.Pair; import com.linkedin.databus.core.DbusOpcode; /** * MysqlTransactionManager>/code> defines contracts specific for Mysql transactions. Inherits contracts from {@link TransactionProcessor} and {@link SourceProcessor} @@ -28,7 +29,7 @@ */ public interface MysqlTransactionManager extends TransactionProcessor,SourceProcessor{ /** Persists change events in event buffer */ - void performChanges(long tableId,BinlogEventV4Header eventHeader, List rowList, final DbusOpcode doc); + void performChanges(long tableId,BinlogEventV4Header eventHeader, List> rowPairs, final DbusOpcode doc); /** Set the current bin log file number*/ void setCurrFileNum(int currFileNum) ; /** Get the map of mysqlTableId to tableName mapping */ diff --git a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/txnprocessor/impl/MysqlTransactionManagerImpl.java b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/txnprocessor/impl/MysqlTransactionManagerImpl.java index c942c3e..a6a9ca1 100644 --- a/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/txnprocessor/impl/MysqlTransactionManagerImpl.java +++ b/producers/mysql-producer/src/main/java/com/flipkart/aesop/runtime/producer/txnprocessor/impl/MysqlTransactionManagerImpl.java @@ -18,6 +18,7 @@ import com.flipkart.aesop.runtime.producer.spi.SCNGenerator; import com.flipkart.aesop.runtime.producer.txnprocessor.MysqlTransactionManager; import com.google.code.or.binlog.BinlogEventV4Header; +import com.google.code.or.common.glossary.Pair; import com.google.code.or.common.glossary.Row; import com.linkedin.databus.core.DatabusRuntimeException; import com.linkedin.databus.core.DbusEventBufferAppendable; @@ -271,11 +272,11 @@ public Map getMysqlTableIdToTableNameMap() /** * Persists event related data in transaction object * @param eventHeader Binary log event header - * @param rowList list of mutated rows + * @param listOfPairs list of mutated rows * @param databusOpcode operation code indicating nature of change such as insertion,deletion or updation. */ @Override - public void performChanges(long tableId, BinlogEventV4Header eventHeader, List rowList, + public void performChanges(long tableId, BinlogEventV4Header eventHeader, List> listOfPairs, final DbusOpcode databusOpcode) { try @@ -289,7 +290,7 @@ public void performChanges(long tableId, BinlogEventV4Header eventHeader, List entries = eventManagerMap.get(Integer.valueOf(tableUriToSrcIdMap.get(currTableName))).frameAvroRecord( - eventHeader, rowList, databusOpcode, binLogEventMappers, schema.getSchema(), + eventHeader, listOfPairs, databusOpcode, binLogEventMappers, schema.getSchema(), this.scnGenerator.getSCN(frameSCN(currFileNum, (int) eventHeader.getPosition()), this.mySqlEventProducer.getBinLogHost())); for (DbChangeEntry entry : entries) diff --git a/samples/sample-client-common/src/main/java/com/flipkart/aesop/sample/client/common/events/MysqlBinLogEvent.java b/samples/sample-client-common/src/main/java/com/flipkart/aesop/sample/client/common/events/MysqlBinLogEvent.java index 0cbb642..ec9dd99 100644 --- a/samples/sample-client-common/src/main/java/com/flipkart/aesop/sample/client/common/events/MysqlBinLogEvent.java +++ b/samples/sample-client-common/src/main/java/com/flipkart/aesop/sample/client/common/events/MysqlBinLogEvent.java @@ -25,4 +25,6 @@ public interface MysqlBinLogEvent public List getPrimaryKeyValues(); + public Map getRowChangeMap(); + } diff --git a/samples/sample-client-common/src/main/java/com/flipkart/aesop/sample/client/common/events/MysqlBinLogEventImpl.java b/samples/sample-client-common/src/main/java/com/flipkart/aesop/sample/client/common/events/MysqlBinLogEventImpl.java index 10188da..8495044 100644 --- a/samples/sample-client-common/src/main/java/com/flipkart/aesop/sample/client/common/events/MysqlBinLogEventImpl.java +++ b/samples/sample-client-common/src/main/java/com/flipkart/aesop/sample/client/common/events/MysqlBinLogEventImpl.java @@ -1,23 +1,18 @@ package com.flipkart.aesop.sample.client.common.events; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; +import com.flipkart.aesop.utils.AvroToMysqlConverter; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.generic.GenericRecord; -import com.flipkart.aesop.sample.client.common.utils.AvroToMysqlMapper; -import com.flipkart.aesop.sample.client.common.utils.MysqlDataTypes; import com.linkedin.databus.client.pub.DbusEventDecoder; -import com.linkedin.databus.core.DbusConstants; import com.linkedin.databus.core.DbusEvent; import com.linkedin.databus.core.DbusOpcode; import com.linkedin.databus2.core.DatabusException; import com.linkedin.databus2.schemas.VersionedSchema; -import com.linkedin.databus2.schemas.utils.SchemaHelper; +import com.flipkart.aesop.utils.AvroSchemaHelper; /** * @author yogesh.dahiya @@ -25,11 +20,10 @@ public class MysqlBinLogEventImpl implements MysqlBinLogEvent { - private static String PK_FIELD_NAME = "pk"; - private static String META_FIELD_TYPE_NAME = "dbFieldType"; private Schema schema; private List pKeyList = new ArrayList(3); private Map keyValuePairs = new HashMap(); + private Map rowChangeMap = new HashMap(); private DbusOpcode eventType; public MysqlBinLogEventImpl(DbusEvent event, DbusEventDecoder eventDecoder) throws DatabusException @@ -39,41 +33,47 @@ public MysqlBinLogEventImpl(DbusEvent event, DbusEventDecoder eventDecoder) thro this.schema = writerSchema.getSchema(); this.eventType = event.getOpcode(); this.pKeyList = getPkListFromSchema(schema); + this.rowChangeMap = null; + Map fieldToMysqlDataType = AvroSchemaHelper.fieldToDataTypeMap(schema); + String rowChangeField = AvroSchemaHelper.getRowChangeField(schema); + for (Field field : schema.getFields()) { - String mysqlType = SchemaHelper.getMetaField(field, META_FIELD_TYPE_NAME); - this.keyValuePairs.put( - field.name(), - AvroToMysqlMapper.avroToMysqlType(genericRecord.get(field.name()), - MysqlDataTypes.valueOf(mysqlType.toUpperCase()))); + Object recordValue = genericRecord.get(field.name()); + if (field.name().equals(rowChangeField)) + { + this.rowChangeMap = AvroToMysqlConverter.getMysqlTypedObjectForMap((Map) recordValue, + fieldToMysqlDataType); + } + else + { + this.keyValuePairs.put(field.name(), + AvroToMysqlConverter.getMysqlTypedObject(fieldToMysqlDataType.get(field.name()), recordValue)); + } } } - public MysqlBinLogEventImpl(Schema schema, Map keyValuePairs, DbusOpcode eventType) + public MysqlBinLogEventImpl(Schema schema, Map keyValuePairs, DbusOpcode eventType, Map rowChangeMap) throws DatabusException { this.eventType = eventType; this.keyValuePairs = keyValuePairs; this.pKeyList = getPkListFromSchema(schema); this.schema = schema; + this.rowChangeMap = rowChangeMap; } private List getPkListFromSchema(Schema schema) throws DatabusException { - List pKeyList = new ArrayList(3); - String pkFieldName = SchemaHelper.getMetaField(schema, PK_FIELD_NAME); - if (pkFieldName == null) - { - throw new DatabusException("No primary key specified in the schema"); - } - for (String s : pkFieldName.split(DbusConstants.COMPOUND_KEY_SEPARATOR)) + Set primaryKeySet = AvroSchemaHelper.getPrimaryKeysSetFromSchema(schema); + List primaryKeyList = new ArrayList(3); + for (String s : primaryKeySet) { - pKeyList.add(s.trim()); + primaryKeyList.add(s); } - assert (pKeyList.size() >= 1); - return pKeyList; + return primaryKeyList; } public Map getKeyValuePair() @@ -116,6 +116,8 @@ public DbusOpcode getEventType() return this.eventType; } + public Map getRowChangeMap() { return rowChangeMap; } + public String toString() { StringBuilder builder = new StringBuilder(); @@ -123,8 +125,8 @@ public String toString() builder.append("entityName : " + getEntityName() + ", "); builder.append("eventType : " + getEventType().toString() + ", "); builder.append("pKeyList : " + getPrimaryKeyList().toString() + ", "); - builder.append("keyValuePairs : " + getKeyValuePair().toString()); + builder.append("keyValuePairs : " + getKeyValuePair().toString() + ", "); + builder.append("rowChangeMap : " + getRowChangeMap()); return builder.toString(); } - } diff --git a/samples/sample-mysql-relay/src/main/resources/external/spring-relay-config.xml b/samples/sample-mysql-relay/src/main/resources/external/spring-relay-config.xml index f21fefc..a6329c4 100644 --- a/samples/sample-mysql-relay/src/main/resources/external/spring-relay-config.xml +++ b/samples/sample-mysql-relay/src/main/resources/external/spring-relay-config.xml @@ -35,6 +35,7 @@ + diff --git a/samples/sample-mysql-relay/src/main/resources/schemas_registry/com.flipkart.aesop.events.ortest.Person.0.avsc b/samples/sample-mysql-relay/src/main/resources/schemas_registry/com.flipkart.aesop.events.ortest.Person.0.avsc index 9a42610..e259073 100644 --- a/samples/sample-mysql-relay/src/main/resources/schemas_registry/com.flipkart.aesop.events.ortest.Person.0.avsc +++ b/samples/sample-mysql-relay/src/main/resources/schemas_registry/com.flipkart.aesop.events.ortest.Person.0.avsc @@ -2,27 +2,35 @@ "name" : "Person", "doc" : "Auto-generated Avro schema for sy$person. Generated at Dec 04, 2012 05:07:05 PM PST", "type" : "record", - "meta" : "dbFieldName=person;pk=id;", + "meta" : "dbFieldName=person;pk=id;rowChangeField=_oldValue", "namespace" : "or_test", "fields" : [ { "name" : "id", "type" : [ "long", "null" ], - "meta" : "dbFieldName=ID;dbFieldPosition=0;dbFieldType=bigint" + "meta" : "dbFieldName=ID;dbFieldPosition=1;dbFieldType=bigint" }, { "name" : "firstName", "type" : [ "string", "null" ], - "meta" : "dbFieldName=FIRST_NAME;dbFieldPosition=1;dbFieldType=varchar" + "meta" : "dbFieldName=FIRST_NAME;dbFieldPosition=2;dbFieldType=varchar" }, { "name" : "lastName", "type" : [ "string", "null" ], - "meta" : "dbFieldName=LAST_NAME;dbFieldPosition=2;dbFieldType=varchar" + "meta" : "dbFieldName=LAST_NAME;dbFieldPosition=3;dbFieldType=varchar" }, { "name" : "birthDate", "type" : [ "long", "null" ], - "meta" : "dbFieldName=BIRTH_DATE;dbFieldPosition=3;dbFieldType=timestamp" + "meta" : "dbFieldName=BIRTH_DATE;dbFieldPosition=4;dbFieldType=timestamp" }, { "name" : "deleted", "type" : [ "string", "null" ], - "meta" : "dbFieldName=DELETED;dbFieldPosition=4;dbFieldType=varchar" - } ] + "meta" : "dbFieldName=DELETED;dbFieldPosition=5;dbFieldType=varchar" + }, { + "name" : "_oldValue", + "type" : [ { + "values" : [ "int", "long", "float", "double", "bytes", "string", "null" ], + "type" : "map" + }, "null" ], + "meta" : "dbFieldName=_oldValue;dbFieldPosition=6;dbFieldType=MAP" + } + ] } diff --git a/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/data/MysqlToAvroMapper.java b/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/data/MysqlToAvroMapper.java index 8c6dd40..71efc3b 100644 --- a/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/data/MysqlToAvroMapper.java +++ b/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/data/MysqlToAvroMapper.java @@ -1,5 +1,9 @@ package com.flipkart.aesop.avro.schemagenerator.data; +import java.lang.Object; +import java.util.Arrays; +import java.util.HashMap; + /** * MysqlToAvroMapper maps Mysql data type to Avro data types. * @author chandan.bansal @@ -94,16 +98,29 @@ public enum MysqlToAvroMapper TIME("long"), /** The year. */ - YEAR("long"); + YEAR("long"), + + /** For HashMap. + * Though this Mysql Datatype does not exist, this required for + * passing record changes in form of Avro Map datatype. Avro MAP datatype + */ + MAP(new HashMap(){{ + put("type", "map"); + put("values", Arrays.asList("int", "long", "float", "double", "bytes", "string", "null")); + }}); - /** The avro type. */ - private final String avroType; + + /** The avro type. + * The reason its returning Object type because along with String datatypes required for defining "long","int" etc + * We need to support Map (avro map datatype) which of the type HashMap + */ + private final Object avroType; /** * enum constructor. * @param avroType the avro data type */ - private MysqlToAvroMapper(String avroType) + private MysqlToAvroMapper(Object avroType) { this.avroType = avroType; } @@ -112,7 +129,7 @@ private MysqlToAvroMapper(String avroType) * Gets the avro type. * @return the avro type */ - public String getAvroType() + public Object getAvroType() { return avroType; } diff --git a/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/data/TableRecord.java b/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/data/TableRecord.java index a0aa724..5c4c92f 100644 --- a/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/data/TableRecord.java +++ b/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/data/TableRecord.java @@ -32,10 +32,22 @@ public TableRecord(String name, String type, String doc, String namespace, List< this.type = type; this.doc = doc; this.namespace = namespace; - this.meta = "pk=" + StringUtils.join(primaryKeys, ","); + this.meta = generateMeta(primaryKeys); this.fields = fields; } + public TableRecord(String name, String type, String doc, String namespace, List primaryKeys, + List fields, String rowChangeFieldName) throws IllegalArgumentException + { + this(name, type, doc, namespace, primaryKeys, fields); + if(rowChangeFieldName == null) { throw new IllegalArgumentException("rowChangeFieldName can't be NULL"); } + this.meta = generateMeta(primaryKeys) + "; rowChangeField=" + rowChangeFieldName; + } + + private String generateMeta(List primaryKeys) { + return "pk=" + StringUtils.join(primaryKeys, ","); + } + /** * Field encapsulate the field schema for the table * @author yogesh.dahiya @@ -45,7 +57,7 @@ public static class Field /** schema field name */ private String name; /** array of field types */ - private String[] type; + private Object[] type; /** meta for the field */ private String meta; @@ -53,7 +65,7 @@ public Field(String dbFieldName, String dbFieldType, int dbFieldPosition) { /** this can be different from dbFieldName */ this.name = dbFieldName; - this.type = new String[]{MysqlToAvroMapper.valueOf(dbFieldType.toUpperCase()).getAvroType(), "null"}; + this.type = new Object[]{MysqlToAvroMapper.valueOf(dbFieldType.toUpperCase()).getAvroType(), "null"}; this.meta = "dbFieldName=" + dbFieldName + ";dbFieldPosition=" + dbFieldPosition + ";dbFieldType=" + dbFieldType; @@ -69,7 +81,7 @@ public void setName(String name) this.name = name; } - public String[] getType() + public Object[] getType() { return type; } diff --git a/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/main/InteractiveSchemaGenerator.java b/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/main/InteractiveSchemaGenerator.java index 1e31f9f..8c1dcfc 100644 --- a/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/main/InteractiveSchemaGenerator.java +++ b/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/main/InteractiveSchemaGenerator.java @@ -189,8 +189,7 @@ private void processInput() throws Exception * Run schema gen tool. * @return true, if successful */ - public boolean runSchemaGenTool() - { + public boolean runSchemaGenTool() throws IllegalArgumentException { for (String table : _tableNames) { diff --git a/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/main/SchemaGenerator.java b/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/main/SchemaGenerator.java index 0e55990..4839c0b 100644 --- a/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/main/SchemaGenerator.java +++ b/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/main/SchemaGenerator.java @@ -41,11 +41,13 @@ public class SchemaGenerator implements InitializingBean /** date formatter. */ private SimpleDateFormat df = new SimpleDateFormat("MMM dd, yyyy hh:mm:ss a zzz"); + private String rowChangeFieldName = null; + /** * Instantiates a new schema generator. - * @param dataSourceConfig - configuration to initialize datasource - * @param inclusionList - * @param exclusionList + * @param dataSourceConfigs - configuration to initialize datasource + * @param tablesInclusionListMap + * @param tablesExclusionListMap * @throws PropertyVetoException the property veto exception */ public SchemaGenerator(List dataSourceConfigs, Map> tablesInclusionListMap, @@ -60,6 +62,26 @@ public SchemaGenerator(List dataSourceConfigs, Map dataSourceConfigs, Map> tablesInclusionListMap, + Map> tablesExclusionListMap, String rowChangeFieldName) throws PropertyVetoException + { + for (DataSourceConfig dataSourceConfig : dataSourceConfigs) + { + MysqlConnectionProvider.getInstance().addDataSource(dataSourceConfig); + } + this.tablesInclusionListMap = tablesInclusionListMap; + this.tablesExclusionListMap = tablesExclusionListMap; + this.rowChangeFieldName = rowChangeFieldName; + } + /** * Constructor * @param dataSourceConfigs @@ -84,7 +106,7 @@ public SchemaGenerator() * @return table to schema mapping * @throws IOException Signals that an I/O exception has occurred. */ - public Map generateSchemaForAllTables(String dbName) throws IOException + public Map generateSchemaForAllTables(String dbName) throws IOException, IllegalArgumentException { Map tableNameToSchemaMap = new HashMap(); List tableNameList = MysqlUtils.getTablesInDB(dbName); @@ -125,7 +147,18 @@ public String generateSchema(String dbName, String tableName) throws IOException String doc = "Auto-generated Avro schema for " + tableName + ". Generated at " + df.format(new Date(System.currentTimeMillis())); - TableRecord tableRecord = new TableRecord(tableName, "record", doc, namespace, primaryKeys, fields); + + TableRecord tableRecord; + if (rowChangeFieldName != null) + { + SchemaGenerator.validateRowChangeField(tableName, namespace, fields, rowChangeFieldName); + fields.add(new TableRecord.Field(rowChangeFieldName, "MAP", fields.size() + 1)); + tableRecord = new TableRecord(tableName, "record", doc, namespace, primaryKeys, fields, rowChangeFieldName); + } + else + { + tableRecord = new TableRecord(tableName, "record", doc, namespace, primaryKeys, fields); + } /* mapping tableRecord to json */ StringWriter writer = new StringWriter(); @@ -179,4 +212,32 @@ public void setTablesExclusionListMap(Map> tablesExclusionL this.tablesExclusionListMap = tablesExclusionListMap; } + + public String getRowChangeFieldName() { + return rowChangeFieldName; + } + + public void setRowChangeFieldName(String rowChangeFieldName) { + this.rowChangeFieldName = rowChangeFieldName; + } + + /** + * @param fields + * @param rowChangeFieldName + */ + private static void validateRowChangeField(String tableName, String namespace, List fields, + String rowChangeFieldName) throws IllegalArgumentException + { + for (TableRecord.Field field : fields) + { + if (field.getName().equals(rowChangeFieldName)) + { + throw new IllegalArgumentException("FAILED: rowChangeFieldName: " + rowChangeFieldName + " clashes with orignal field: " + + field.getName() + " (TableName:" + tableName + ",Namespace:" + namespace + ")"); + } + + } + + } + } diff --git a/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/main/SchemaGeneratorCli.java b/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/main/SchemaGeneratorCli.java index e234005..fc93de8 100644 --- a/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/main/SchemaGeneratorCli.java +++ b/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/main/SchemaGeneratorCli.java @@ -85,16 +85,16 @@ public static void main(String[] commandLineArguments) String outputFolder=commandLine.getOptionValue("f"); + String oldValueFieldName = commandLine.hasOption("q") ? commandLine.getOptionValue("q") : null; + //check if outputFolder exists, if not create one. File f = new File(outputFolder); if (!f.exists() || !f.isDirectory()) { f.mkdir(); } - - - + SchemaGenerator schemaGenerator = - new SchemaGenerator(dataSourceConfigs, tablesInclusionListMap, tablesExclusionListMap); + new SchemaGenerator(dataSourceConfigs, tablesInclusionListMap, tablesExclusionListMap, oldValueFieldName); System.out.println("Generating Schema ...\n"); if (commandLine.hasOption("t")) { @@ -150,16 +150,17 @@ public static Options constructOptions() .addOption("f", "output-folder", true, "path to the folder for storing output") .addOption("v", "version", true, "version number of schema") .addOption("h", "host", true, "host name for connection ; default localhost") - .addOption("o", "port", true, "port for connection ; default 3306") + .addOption("o", "port", true, "port for connection ; default 3306") .addOption("u", "user", true, "user name for connection ; default root") .addOption("p", "password", true, "password for the connection ; default empty string") .addOption("t", "table", true, "table name for schema generation ; default all ") .addOption("e", "exclusion-list", true, "exclusion list ; default none") .addOption("i", "inclusion-list", true, "inclusion list ; default all") - .addOption("?", "help", false, "help") - .addOption( - OptionBuilder.withArgName("dbName").withLongOpt("db").withDescription("db name for connection") - .hasArg().create('d')) + .addOption("?", "help", false, "help") + .addOption("q", "row-change-field", true, "fieldname to represents old-row-values") + .addOption( + OptionBuilder.withArgName("dbName").withLongOpt("db").withDescription("db name for connection") + .hasArg().create('d')) .addOption( OptionBuilder.withArgName("args").withLongOpt("exclusion-list") diff --git a/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/mysql/MysqlUtils.java b/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/mysql/MysqlUtils.java index 16ab9da..9d82904 100644 --- a/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/mysql/MysqlUtils.java +++ b/utilities/avro-schema-generator/src/main/java/com/flipkart/aesop/avro/schemagenerator/mysql/MysqlUtils.java @@ -105,7 +105,7 @@ public static List getTablesInDB(String dbName) /** * Gets the primary keys. - * @param dataSourceId the dataSourceId + * @param dbName the dataSourceId * @param tableName the table name * @return the primary keys */ @@ -140,7 +140,6 @@ public static List getPrimaryKeys(String dbName, String tableName) /** * Gets the fields in table. - * @param dataSourceId the dataSourceId * @param db the database * @param table the table * @return the fields in table @@ -174,7 +173,7 @@ public static List getFieldsInTable(String db, String table) /** * checks if the current table is a valid table in the given schema. - * @param dataSourceId the dataSourceId + * @param dataBase the dataSourceId * @param table : table name * @return true if valid table, false otherwise */ @@ -210,7 +209,6 @@ public static boolean isValidTable(String dataBase, String table) /** * Checks if the field is present in the table. - * @param dataSourceId the dataSourceId * @param database the database * @param field The field to check if it's valid * @param table the table @@ -247,7 +245,6 @@ public static boolean isValidField(String database, String field, String table) /** * Gets the field details. - * @param dataSourceId the dataSourceId * @param db the db name * @param table the table name * @return the field details @@ -270,7 +267,6 @@ public static List getFieldDetails(String db, String table) fieldInfoList.add(new TableRecord.Field(resultSet.getString("COLUMN_NAME"), resultSet .getString("DATA_TYPE"), resultSet.getInt("ORDINAL_POSITION"))); } - } catch (SQLException e) {