Apache Pig UDF: Part 3 – Store Functions

Last updated on Jun 21,2023 4.7K Views
Awanish is a Sr. Research Analyst at Edureka. He has rich expertise... Awanish is a Sr. Research Analyst at Edureka. He has rich expertise in Big Data technologies like Hadoop, Spark, Storm, Kafka, Flink. Awanish also...

Apache Pig UDF: Part 3 – Store Functions

edureka.co

StoreFunc abstract class has the main methods for storing data and for most use cases it should suffice to extend it. There is an optional interface which can be implemented to achieve extended functionality:

StoreMetadata

This interface has methods to interact with metadata systems to store schema and statistics. This interface is optional and should be implemented only if metadata needs to stored.

The methods which need to be overridden in StoreFunc are explained below:

Default Implementations in StoreFunc:

Gain hands-on experience in building and managing data storage, processing, and analytics solutions with the Azure Data Engineer Certification Course.

Example Implementation:

The storer implementation in the example, is a storer for text data with line delimiter as ‘
‘ and ‘ ‘ as default field delimiter (which can be overridden by passing a different field delimiter in the constructor) – this is similar to current PigStorage storer in Pig. The implementation uses an existing Hadoop supported OutputFormat – TextOutputFormat as the underlying OutputFormat.

public class SimpleTextStorer extends StoreFunc {
protected RecordWriter writer = null;
private byte fieldDel = '	';
private static final int BUFFER_SIZE = 1024;
private static final String UTF8 = "UTF-8";
public PigStorage() {
}
public PigStorage(String delimiter) {
this();
if (delimiter.length() == 1) {
this.fieldDel = (byte)delimiter.charAt(0);
} else if (delimiter.length() > 1delimiter.charAt(0) == '') {
switch (delimiter.charAt(1)) {
case 't':
this.fieldDel = (byte)'	';
break;
case 'x':
fieldDel =
Integer.valueOf(delimiter.substring(2), 16).byteValue();
break;
case 'u':
this.fieldDel =
Integer.valueOf(delimiter.substring(2)).byteValue();
break;
default:
throw new RuntimeException("Unknown delimiter " + delimiter);
}
} else {
throw new RuntimeException("PigStorage delimeter must be a single character");
}
}
ByteArrayOutputStream mOut = new ByteArrayOutputStream(BUFFER_SIZE);
@Override
public void putNext(Tuple f) throws IOException {
int sz = f.size();
for (int i = 0; i < sz; i++) {
Object field;
try {
field = f.get(i);
} catch (ExecException ee) {
throw ee;
}
putField(field);
if (i != sz - 1) {
mOut.write(fieldDel);
}
}
Text text = new Text(mOut.toByteArray());
try {
writer.write(null, text);
mOut.reset();
} catch (InterruptedException e) {
throw new IOException(e);
}
}
@SuppressWarnings("unchecked")
private void putField(Object field) throws IOException {
//string constants for each delimiter
String tupleBeginDelim = "(";
String tupleEndDelim = ")";
String bagBeginDelim = "{";
String bagEndDelim = "}";
String mapBeginDelim = "[";
String mapEndDelim = "]";
String fieldDelim = ",";
String mapKeyValueDelim = "#";
switch (DataType.findType(field)) {
case DataType.NULL:
break; // just leave it empty
case DataType.BOOLEAN:
mOut.write(((Boolean)field).toString().getBytes());
break;
case DataType.INTEGER:
mOut.write(((Integer)field).toString().getBytes());
break;
case DataType.LONG:
mOut.write(((Long)field).toString().getBytes());
break;
case DataType.FLOAT:
mOut.write(((Float)field).toString().getBytes());
break;
case DataType.DOUBLE:
mOut.write(((Double)field).toString().getBytes());
break;
case DataType.BYTEARRAY: {
byte[] b = ((DataByteArray)field).get();
mOut.write(b, 0, b.length);
break;
}
case DataType.CHARARRAY:
// oddly enough, writeBytes writes a string
mOut.write(((String)field).getBytes(UTF8));
break;
case DataType.MAP:
boolean mapHasNext = false;
Map<String, Object> m = (Map<String, Object>)field;
mOut.write(mapBeginDelim.getBytes(UTF8));
for(Map.Entry<String, Object> e: m.entrySet()) {
if(mapHasNext) {
mOut.write(fieldDelim.getBytes(UTF8));
} else {
mapHasNext = true;
}
putField(e.getKey());
mOut.write(mapKeyValueDelim.getBytes(UTF8));
putField(e.getValue());
}
mOut.write(mapEndDelim.getBytes(UTF8));
break;
case DataType.TUPLE:
boolean tupleHasNext = false;
Tuple t = (Tuple)field;
mOut.write(tupleBeginDelim.getBytes(UTF8));
for(int i = 0; i < t.size(); ++i) {
if(tupleHasNext) {
mOut.write(fieldDelim.getBytes(UTF8));
} else {
tupleHasNext = true;
}
try {
putField(t.get(i));
} catch (ExecException ee) {
throw ee;
}
}
mOut.write(tupleEndDelim.getBytes(UTF8));
break;
case DataType.BAG:
boolean bagHasNext = false;
mOut.write(bagBeginDelim.getBytes(UTF8));
Iterator<Tuple> tupleIter = ((DataBag)field).iterator();
while(tupleIter.hasNext()) {
if(bagHasNext) {
mOut.write(fieldDelim.getBytes(UTF8));
} else {
bagHasNext = true;
}
putField((Object)tupleIter.next());
}
mOut.write(bagEndDelim.getBytes(UTF8));
break;
default: {
int errCode = 2108;
String msg = "Could not determine data type of field: " + field;
throw new ExecException(msg, errCode, PigException.BUG);
}
}
}
@Override
public OutputFormat getOutputFormat() {
return new TextOutputFormat<WritableComparable, Text>();
}
@Override
public void prepareToWrite(RecordWriter writer) {
this.writer = writer;
}
@Override
public void setStoreLocation(String location, Job job) throws IOException {
job.getConfiguration().set("mapred.textoutputformat.separator", "");
FileOutputFormat.setOutputPath(job, new Path(location));
if (location.endsWith(".bz2")) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job,  BZip2Codec.class);
}  else if (location.endsWith(".gz")) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
}
}
}

Got a question for us? Please mention them in the comments section and we will get back to you.

Related Posts:

Apache Pig UDF: Part 2
Apache Pig UDF: Part 1
Big Data and Hadoop Training

BROWSE COURSES