Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aesop Patch to send old values from mysql producer #65

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public int compare(String o1, String o2)
}

return (T)new SourceEvent(keyValuePairs, getPkListFromSchema(schema), schema.getName(), schema.getNamespace(),
eventType);
eventType, null);
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,27 @@ public abstract class AbstractEvent implements Event
/** Event Type. */
protected final DbusOpcode eventType;

/** OldRowMap : Storing changes.
* This field holds field Change values (old values) */
protected final Map<String, Object> rowChangeMap;

/**
* Constructs the basic event using mandatory fields.
* @param fieldsMap
* @param fieldMap
* @param primaryKeysSet
* @param entityName
* @param namespaceName
* @param eventType
* @param rowChangeMap
*/
public AbstractEvent(Map<String, Object> fieldsMap, Set<String> primaryKeysSet, String entityName,
String namespaceName, DbusOpcode eventType)
{
this.fieldMap = fieldsMap;
public AbstractEvent(Map<String, Object> fieldMap, Set<String> primaryKeysSet, String entityName,
String namespaceName, DbusOpcode eventType, Map<String, Object> rowChangeMap) {
this.fieldMap = fieldMap;
this.primaryKeysSet = primaryKeysSet;
this.entityName = entityName;
this.namespaceName = namespaceName;
this.eventType = eventType;
this.rowChangeMap = rowChangeMap;
}

public Map<String, Object> getFieldMapPair()
Expand Down Expand Up @@ -102,10 +107,14 @@ public List<Object> getPrimaryKeyValues()
return primaryKeyValues;
}

public Map<String, Object> getRowChangeMap() {
return rowChangeMap;
}

@Override
public String toString()
{
return "AbstractEvent [fieldsMap=" + fieldMap + ", primaryKeysSet=" + primaryKeysSet + ", entityName="
+ entityName + ", namespaceName=" + namespaceName + ", eventType=" + eventType + "]";
+ entityName + ", namespaceName=" + namespaceName + ", eventType=" + eventType + ", rowChangeMap=" + rowChangeMap + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,25 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*******************************************************************************/

package com.flipkart.aesop.event;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import com.flipkart.aesop.utils.AvroSchemaHelper;
import com.flipkart.aesop.utils.AvroToMysqlConverter;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericRecord;

import com.flipkart.aesop.utils.AvroToMysqlMapper;
import com.flipkart.aesop.utils.MysqlDataTypes;
import com.linkedin.databus.client.pub.DbusEventDecoder;
import com.linkedin.databus.core.DbusConstants;
import com.linkedin.databus.core.DbusEvent;
import com.linkedin.databus.core.DbusOpcode;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.schemas.VersionedSchema;
import com.linkedin.databus2.schemas.utils.SchemaHelper;

/**
* Abstract Event Factory to be extended by the various types of Event Factory Classes.
Expand All @@ -41,50 +37,36 @@
*/
public abstract class AbstractEventFactory<T extends AbstractEvent> implements EventFactory
{
public static String PRIMARY_KEY_FIELD_NAME = "pk";
public static String META_FIELD_TYPE_NAME = "dbFieldType";

/**
* Generates primary key set using the schema.
* @param schema
* @return Primary key set
* @throws DatabusException
*/
private Set<String> getPrimaryKeysSetFromSchema(Schema schema) throws DatabusException
{
Set<String> primaryKeySet = new HashSet<String>();
String primaryKeyFieldName = SchemaHelper.getMetaField(schema, PRIMARY_KEY_FIELD_NAME);
if (primaryKeyFieldName == null)
{
throw new DatabusException("No primary key specified in the schema");
}
for (String primaryKey : primaryKeyFieldName.split(DbusConstants.COMPOUND_KEY_SEPARATOR))
{
primaryKeySet.add(primaryKey.trim());
}
assert (primaryKeySet.size() >= 1);
return primaryKeySet;
}

public AbstractEvent createEvent(DbusEvent dbusEvent, DbusEventDecoder eventDecoder) throws DatabusException
{
GenericRecord genericRecord = eventDecoder.getGenericRecord(dbusEvent, null);
VersionedSchema writerSchema = eventDecoder.getPayloadSchema(dbusEvent);
Schema schema = writerSchema.getSchema();
DbusOpcode eventType = dbusEvent.getOpcode();
Set<String> primaryKeysSet = getPrimaryKeysSetFromSchema(schema);
Set<String> primaryKeysSet = AvroSchemaHelper.getPrimaryKeysSetFromSchema(schema);
String namespaceName = schema.getNamespace();
String entityName = schema.getName();
Map<String, Object> fieldMap = new HashMap<String, Object>();
Map <String, String> fieldToMysqlDataType = AvroSchemaHelper.fieldToDataTypeMap(schema);
String rowChangeField = AvroSchemaHelper.getRowChangeField(schema);
Map<String, Object> rowChangeMap = null;

for (Field field : schema.getFields())
{
String mysqlType = SchemaHelper.getMetaField(field, META_FIELD_TYPE_NAME);
fieldMap.put(
field.name(),
AvroToMysqlMapper.avroToMysqlType(genericRecord.get(field.name()),
MysqlDataTypes.valueOf(mysqlType.toUpperCase())));
Object recordValue = genericRecord.get(field.name());
if (field.name().equals(rowChangeField))
{
rowChangeMap = AvroToMysqlConverter.getMysqlTypedObjectForMap((Map<Object, Object>) recordValue,
fieldToMysqlDataType);
}
else
{
fieldMap.put(field.name(),
AvroToMysqlConverter.getMysqlTypedObject(fieldToMysqlDataType.get(field.name()), recordValue));
}
}
AbstractEvent event = createEventInstance(fieldMap, primaryKeysSet, entityName, namespaceName, eventType);
AbstractEvent event = createEventInstance(fieldMap, primaryKeysSet, entityName, namespaceName, eventType,
rowChangeMap);
return event;
}

Expand All @@ -98,23 +80,25 @@ public AbstractEvent createEvent(DbusEvent dbusEvent, DbusEventDecoder eventDeco
* @return Actual Event instance.
*/
protected abstract AbstractEvent createEventInstance(Map<String, Object> fieldsMap, Set<String> primaryKeysSet,
String entityName, String namespaceName, DbusOpcode eventType);
String entityName, String namespaceName, DbusOpcode eventType, Map<String, Object> rowChangeMap);

public AbstractEvent createEvent(Schema schema, Map<String, Object> keyValuePairs, DbusOpcode eventType)
throws DatabusException
public AbstractEvent createEvent(Schema schema, Map<String, Object> keyValuePairs, DbusOpcode eventType, Map<String, Object> rowChangeMap)
throws DatabusException
{
String entityName = schema.getName();
String namespaceName = schema.getNamespace();
Set<String> primaryKeysSet = getPrimaryKeysSetFromSchema(schema);
Set<String> primaryKeysSet = AvroSchemaHelper.getPrimaryKeysSetFromSchema(schema);

AbstractEvent event = createEventInstance(keyValuePairs, primaryKeysSet, entityName, namespaceName, eventType);
AbstractEvent event =
createEventInstance(keyValuePairs, primaryKeysSet, entityName, namespaceName, eventType, rowChangeMap);
return event;
}

public AbstractEvent createEvent(Map<String, Object> fieldsMap, Set<String> primaryFieldsSet, String entityName,
String namespaceName, DbusOpcode eventType)
String namespaceName, DbusOpcode eventType, Map<String, Object> rowChangeMap)
{
AbstractEvent event = createEventInstance(fieldsMap, primaryFieldsSet, entityName, namespaceName, eventType);
AbstractEvent event =
createEventInstance(fieldsMap, primaryFieldsSet, entityName, namespaceName, eventType, rowChangeMap);

return event;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,10 @@ public interface Event
* @return {@link List} of Primary Keys
*/
public List<Object> getPrimaryKeyValues();

/**
* Gets the change Map which store old values of record.
* @return rowChangeMap
*/
public Map<String, Object> getRowChangeMap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public interface EventFactory
* @return {@link AbstractEvent}
* @throws DatabusException
*/
public AbstractEvent createEvent(Schema schema, Map<String, Object> fieldMap, DbusOpcode eventType)
public AbstractEvent createEvent(Schema schema, Map<String, Object> fieldMap, DbusOpcode eventType,
Map<String, Object> rowChangeMap)
throws DatabusException;

/**
Expand All @@ -61,8 +62,9 @@ public AbstractEvent createEvent(Schema schema, Map<String, Object> fieldMap, Db
* @param entityName
* @param namespaceName
* @param eventType
* @param rowChangeMap
* @return {@link AbstractEvent}
*/
public AbstractEvent createEvent(Map<String, Object> fieldsMap, Set<String> primaryFieldsSet, String entityName,
String namespaceName, DbusOpcode eventType);
String namespaceName, DbusOpcode eventType, Map<String, Object> rowChangeMap);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ public class DestinationEvent extends AbstractEvent
* @param entityName
* @param namespaceName
* @param eventType
* @param rowChangeMap
*/
public DestinationEvent(Map<String, Object> fieldsMap, Set<String> primaryKeysSet, String entityName,
String namespaceName, DbusOpcode eventType)
String namespaceName, DbusOpcode eventType, Map<String, Object> rowChangeMap)
{
super(fieldsMap, primaryKeysSet, entityName, namespaceName, eventType);
super(fieldsMap, primaryKeysSet, entityName, namespaceName, eventType, rowChangeMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public class DestinationEventFactory extends AbstractEventFactory<DestinationEve
{
@Override
protected DestinationEvent createEventInstance(Map<String, Object> fieldsMap, Set<String> primaryKeysSet,
String entityName, String namespaceName, DbusOpcode eventType)
String entityName, String namespaceName, DbusOpcode eventType, Map<String, Object> rowChangeMap)
{
return new DestinationEvent(fieldsMap, primaryKeysSet, entityName, namespaceName, eventType);
return new DestinationEvent(fieldsMap, primaryKeysSet, entityName, namespaceName, eventType, rowChangeMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ public class SourceEvent extends AbstractEvent
* @param entityName
* @param namespaceName
* @param eventType
* @param rowChangeMap
*/
public SourceEvent(Map<String, Object> fieldsMap, Set<String> primaryKeysSet, String entityName,
String namespaceName, DbusOpcode eventType)
String namespaceName, DbusOpcode eventType, Map<String, Object> rowChangeMap)
{
super(fieldsMap, primaryKeysSet, entityName, namespaceName, eventType);
super(fieldsMap, primaryKeysSet, entityName, namespaceName, eventType, rowChangeMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Map;
import java.util.Set;

import com.flipkart.aesop.event.AbstractEvent;
import com.flipkart.aesop.event.AbstractEventFactory;
import com.linkedin.databus.core.DbusOpcode;

Expand All @@ -29,8 +30,8 @@ public class SourceEventFactory extends AbstractEventFactory<SourceEvent>
{
@Override
protected SourceEvent createEventInstance(Map<String, Object> fieldsMap, Set<String> primaryKeysSet,
String entityName, String namespaceName, DbusOpcode eventType)
String entityName, String namespaceName, DbusOpcode eventType, Map<String, Object> rowChangeMap)
{
return new SourceEvent(fieldsMap, primaryKeysSet, entityName, namespaceName, eventType);
return new SourceEvent(fieldsMap, primaryKeysSet, entityName, namespaceName, eventType, rowChangeMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public List<AbstractEvent> map(Config config, String configRoot, AbstractEvent s

AbstractEvent destinationEvent =
destinationEventFactory.createEvent(sourceEvent.getFieldMapPair(), sourceEvent.getPrimaryKeySet(),
sourceEvent.getEntityName(), sourceEvent.getNamespaceName(), sourceEvent.getEventType());
sourceEvent.getEntityName(), sourceEvent.getNamespaceName(), sourceEvent.getEventType(),
sourceEvent.getRowChangeMap());

return Arrays.asList(destinationEvent);
}
Expand Down Expand Up @@ -106,7 +107,7 @@ public List<AbstractEvent> map(Config config, String configRoot, AbstractEvent s

AbstractEvent destinationEvent =
destinationEventFactory.createEvent(sourceEvent.getFieldMapPair(), sourceEvent.getPrimaryKeySet(),
sourceEvent.getEntityName(), sourceEvent.getNamespaceName(), sourceEvent.getEventType());
sourceEvent.getEntityName(), sourceEvent.getNamespaceName(), sourceEvent.getEventType(), sourceEvent.getRowChangeMap());

return Arrays.asList(destinationEvent);
}
Expand Down Expand Up @@ -170,7 +171,8 @@ public List<AbstractEvent> map(Config config, String configRoot, AbstractEvent s

AbstractEvent destinationEvent =
destinationEventFactory.createEvent(destinationEventColumnMap, primaryKeySet,
destinationEntity, destinationNamespace, sourceEvent.getEventType());
destinationEntity, destinationNamespace, sourceEvent.getEventType(),
sourceEvent.getRowChangeMap());

destinationEventList.add(destinationEvent);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*******************************************************************************
*
* Copyright 2012-2015, the original author or authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obta a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*******************************************************************************/

package com.flipkart.aesop.utils;

import com.linkedin.databus.core.DbusConstants;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.schemas.utils.SchemaHelper;
import org.apache.avro.Schema;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public abstract class AvroSchemaHelper
{
private static final String META_ROW_CHANGE_FIELD = "rowChangeField";
private static final String META_FIELD_TYPE_NAME = "dbFieldType";
private static final String PRIMARY_KEY_FIELD_NAME = "pk";

/**
* Returns the fieldname marked as row change field from schema meta
* @param schema
* @return rowChangeFieldName
*/
public static String getRowChangeField(Schema schema)
{
return SchemaHelper.getMetaField(schema, META_ROW_CHANGE_FIELD);
}

/**
* Generates a hash storing fieldName to intended MysqlDataType from schema
* @param schema
* @return Map containg FieldName to Mysql Type mapping
*/
public static Map<String, String> fieldToDataTypeMap(Schema schema)
{
Map<String, String> map = new HashMap <String, String>();
for (Schema.Field field : schema.getFields())
{
String mysqlType = SchemaHelper.getMetaField(field, META_FIELD_TYPE_NAME);
map.put(field.name(), mysqlType);
}
return map;
}

/**
* Generates primary key set using the schema.
* @param schema
* @return Primary key set
* @throws DatabusException
*/
public static Set<String> getPrimaryKeysSetFromSchema(Schema schema) throws DatabusException
{
String primaryKeyFieldName = SchemaHelper.getMetaField(schema, PRIMARY_KEY_FIELD_NAME);
if (primaryKeyFieldName == null)
{
throw new DatabusException("No primary key specified in the schema");
}

Set<String> primaryKeySet = new HashSet<String>();
for (String primaryKey : primaryKeyFieldName.split(DbusConstants.COMPOUND_KEY_SEPARATOR))
{
primaryKeySet.add(primaryKey.trim());
}
assert (primaryKeySet.size() >= 1);
return primaryKeySet;
}
}

Loading