Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for ProtobufStructObjectInspector to work correctly with Hive #400

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ public void initialize(Configuration job, Properties tbl) throws SerDeException
.asSubclass(Message.class);
protobufConverter = ProtobufConverter.newInstance(protobufClass);

Message.Builder builder = Protobufs.getMessageBuilder(protobufClass);

Descriptor descriptor = Protobufs.getMessageDescriptor(protobufClass);
objectInspector = new ProtobufStructObjectInspector(descriptor);
objectInspector = new ProtobufStructObjectInspector(descriptor, builder);
} catch (Exception e) {
throw new SerDeException(e);
}
Expand All @@ -54,7 +56,7 @@ public void initialize(Configuration job, Properties tbl) throws SerDeException
@Override
public Object deserialize(Writable blob) throws SerDeException {
BytesWritable bytes = (BytesWritable) blob;
return protobufConverter.fromBytes(bytes.getBytes(), 0, bytes.getLength());
return protobufConverter.fromBytes(bytes.getBytes(), 0, bytes.getLength()).toBuilder();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,167 +20,201 @@

public final class ProtobufStructObjectInspector extends SettableStructObjectInspector {

public static class ProtobufStructField implements StructField {
public static class ProtobufStructField implements StructField {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why you completely changed the formatting? I'm guessing it was your IDE?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It was. Fixed


private ObjectInspector oi = null;
private String comment = null;
private FieldDescriptor fieldDescriptor = null;
private Descriptor descriptor = null;
private Message.Builder builder = null;

@SuppressWarnings("unchecked")
public ProtobufStructField(FieldDescriptor fieldDescriptor) {
assert (fieldDescriptor != null);
this.fieldDescriptor = fieldDescriptor;
this.descriptor = null;
oi = this.createOIForField();
}

@SuppressWarnings("unchecked")
public ProtobufStructField(FieldDescriptor fd, Message.Builder builder) {
this.descriptor = fd.getMessageType();
this.fieldDescriptor = fd;
this.builder = builder;
oi = this.createOIForField();
}

@Override
public String getFieldName() {
return fieldDescriptor.getName();
}

@Override
public ObjectInspector getFieldObjectInspector() {
return oi;
}

@Override
public String getFieldComment() {
return comment;
}

public FieldDescriptor getFieldDescriptor() {
return fieldDescriptor;
}

private ObjectInspector oi = null;
private String comment = null;
private FieldDescriptor fieldDescriptor;
private PrimitiveCategory getPrimitiveCategory(JavaType fieldType) {
switch (fieldType) {
case INT:
return PrimitiveCategory.INT;
case LONG:
return PrimitiveCategory.LONG;
case FLOAT:
return PrimitiveCategory.FLOAT;
case DOUBLE:
return PrimitiveCategory.DOUBLE;
case BOOLEAN:
return PrimitiveCategory.BOOLEAN;
case STRING:
return PrimitiveCategory.STRING;
case BYTE_STRING:
return PrimitiveCategory.BINARY;
case ENUM:
return PrimitiveCategory.STRING;
default:
return null;
}
}

@SuppressWarnings("unchecked")
public ProtobufStructField(FieldDescriptor fieldDescriptor) {
this.fieldDescriptor = fieldDescriptor;
oi = this.createOIForField();
private ObjectInspector createOIForField() {
ObjectInspector elementOI = null;
if (descriptor != null) {
// this field is a Message
elementOI = new ProtobufStructObjectInspector(descriptor, builder);
} else {
JavaType fieldType = fieldDescriptor.getJavaType();
PrimitiveCategory category = getPrimitiveCategory(fieldType);
if (category != null) {
elementOI = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(category);
}
}
if (fieldDescriptor.isRepeated()) {
return ObjectInspectorFactory.getStandardListObjectInspector(elementOI);
} else {
return elementOI;
}
}
}

@Override
public String getFieldName() {
return fieldDescriptor.getName();
private Descriptor descriptor;
private Message.Builder builder;
private List<StructField> structFields = Lists.newArrayList();

ProtobufStructObjectInspector(Descriptor descriptor, Message.Builder builder) {
this.descriptor = descriptor;
this.builder = builder;
for (FieldDescriptor fd : descriptor.getFields()) {
if (fd.getType() != Type.MESSAGE) {
structFields.add(new ProtobufStructField(fd));
} else {
structFields.add(new ProtobufStructField(fd, builder.newBuilderForField(fd)));
}
}
}

@Override
public ObjectInspector getFieldObjectInspector() {
return oi;
public Category getCategory() {
return Category.STRUCT;
}

@Override
public String getFieldComment() {
return comment;
public String getTypeName() {
StringBuilder sb = new StringBuilder("struct<");
boolean first = true;
for (StructField structField : getAllStructFieldRefs()) {
if (first) {
first = false;
} else {
sb.append(",");
}
sb.append(structField.getFieldName()).append(":")
.append(structField.getFieldObjectInspector().getTypeName());
}
sb.append(">");
return sb.toString();
}

public FieldDescriptor getFieldDescriptor() {
return fieldDescriptor;
@Override
public Object create() {
return builder;
}

private PrimitiveCategory getPrimitiveCategory(JavaType fieldType) {
switch (fieldType) {
case INT:
return PrimitiveCategory.INT;
case LONG:
return PrimitiveCategory.LONG;
case FLOAT:
return PrimitiveCategory.FLOAT;
case DOUBLE:
return PrimitiveCategory.DOUBLE;
case BOOLEAN:
return PrimitiveCategory.BOOLEAN;
case STRING:
return PrimitiveCategory.STRING;
case BYTE_STRING:
return PrimitiveCategory.BINARY;
case ENUM:
return PrimitiveCategory.STRING;
default:
return null;
}
@Override
public Object setStructFieldData(Object data, StructField field, Object fieldValue) {
Message.Builder builder = (Message.Builder) data;
ProtobufStructField psf = (ProtobufStructField) field;
FieldDescriptor fieldDescriptor = psf.getFieldDescriptor();
if (fieldDescriptor.getType() != Type.MESSAGE) {
builder.setField(descriptor.findFieldByName(field.getFieldName()), fieldValue);
} else {
Message.Builder subFieldBuilder = (Message.Builder)fieldValue;
builder.setField(descriptor.findFieldByName(field.getFieldName()), subFieldBuilder.build());
}
return builder;

}

private ObjectInspector createOIForField() {
JavaType fieldType = fieldDescriptor.getJavaType();
PrimitiveCategory category = getPrimitiveCategory(fieldType);
ObjectInspector elementOI = null;
if (category != null) {
elementOI = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(category);
} else {
switch (fieldType) {
case MESSAGE:
elementOI = new ProtobufStructObjectInspector(fieldDescriptor.getMessageType());
break;
default:
throw new RuntimeException("JavaType " + fieldType
+ " from protobuf is not supported.");
}
}
if (fieldDescriptor.isRepeated()) {
return ObjectInspectorFactory.getStandardListObjectInspector(elementOI);
} else {
return elementOI;
}
@Override
public List<? extends StructField> getAllStructFieldRefs() {
return structFields;
}
}

private Descriptor descriptor;
private List<StructField> structFields = Lists.newArrayList();
@Override
public Object getStructFieldData(Object data, StructField structField) {
if (data == null) {
return null;
}
Message.Builder m = ((Message.Builder) data);
ProtobufStructField psf = (ProtobufStructField) structField;
FieldDescriptor fieldDescriptor = psf.getFieldDescriptor();

ProtobufStructObjectInspector(Descriptor descriptor) {
this.descriptor = descriptor;
for (FieldDescriptor fd : descriptor.getFields()) {
structFields.add(new ProtobufStructField(fd));
}
}

@Override
public Category getCategory() {
return Category.STRUCT;
}

@Override
public String getTypeName() {
StringBuilder sb = new StringBuilder("struct<");
boolean first = true;
for (StructField structField : getAllStructFieldRefs()) {
if (first) {
first = false;
} else {
sb.append(",");
}
sb.append(structField.getFieldName()).append(":")
.append(structField.getFieldObjectInspector().getTypeName());
}
sb.append(">");
return sb.toString();
}

@Override
public Object create() {
return descriptor.toProto().toBuilder().build();
}

@Override
public Object setStructFieldData(Object data, StructField field, Object fieldValue) {
return ((Message) data)
.toBuilder()
.setField(descriptor.findFieldByName(field.getFieldName()), fieldValue)
.build();
}

@Override
public List<? extends StructField> getAllStructFieldRefs() {
return structFields;
}

@Override
public Object getStructFieldData(Object data, StructField structField) {
if (data == null) {
return null;
}
Message m = (Message) data;
ProtobufStructField psf = (ProtobufStructField) structField;
FieldDescriptor fieldDescriptor = psf.getFieldDescriptor();
Object result = m.getField(fieldDescriptor);
if (fieldDescriptor.getType() == Type.ENUM) {
return ((EnumValueDescriptor)result).getName();
}
if (fieldDescriptor.getType() == Type.BYTES && (result instanceof ByteString)) {
return ((ByteString)result).toByteArray();
Object result = m.getField(fieldDescriptor);
if (fieldDescriptor.getType() == Type.ENUM) {
return ((EnumValueDescriptor) result).getName();
}
if (fieldDescriptor.getType() == Type.BYTES && (result instanceof ByteString)) {
return ((ByteString) result).toByteArray();
}

if (fieldDescriptor.getType() == Type.MESSAGE && !fieldDescriptor.isRepeated()) {
result = ((Message) result).toBuilder();
}

return result;
}
return result;
}

@Override
public StructField getStructFieldRef(String fieldName) {
return new ProtobufStructField(descriptor.findFieldByName(fieldName));
}

@Override
public List<Object> getStructFieldsDataAsList(Object data) {
if (data == null) {
return null;

@Override
public StructField getStructFieldRef(String fieldName) {
FieldDescriptor fd = descriptor.findFieldByName(fieldName);
ProtobufStructField result = null;
if (fd.getType() != Type.MESSAGE) {
result = new ProtobufStructField(fd);
} else {
result = new ProtobufStructField(fd, builder.newBuilderForField(fd));
}
return result;
}
List<Object> result = Lists.newArrayList();
Message m = (Message) data;
for (FieldDescriptor fd : descriptor.getFields()) {
result.add(m.getField(fd));

@Override
public List<Object> getStructFieldsDataAsList(Object data) {
if (data == null) {
return null;
}
List<Object> result = Lists.newArrayList();
Message.Builder m = (Message.Builder) data;
for (FieldDescriptor fd : descriptor.getFields()) {
result.add(m.getField(fd));
}
return result;
}
return result;
}
}
Loading