JsonFileWriter.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.vector.ipc;
import static org.apache.arrow.vector.BufferLayout.BufferType.*;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter.NopIndenter;
import com.fasterxml.jackson.databind.MappingJsonFactory;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.BaseLargeVariableWidthVector;
import org.apache.arrow.vector.BaseVariableWidthVector;
import org.apache.arrow.vector.BaseVariableWidthViewVector;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.BufferLayout.BufferType;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.DateMilliVector;
import org.apache.arrow.vector.Decimal256Vector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.DurationVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.IntervalDayVector;
import org.apache.arrow.vector.IntervalMonthDayNanoVector;
import org.apache.arrow.vector.IntervalYearVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeMilliVector;
import org.apache.arrow.vector.TimeNanoVector;
import org.apache.arrow.vector.TimeSecVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.TimeStampMilliTZVector;
import org.apache.arrow.vector.TimeStampMilliVector;
import org.apache.arrow.vector.TimeStampNanoTZVector;
import org.apache.arrow.vector.TimeStampNanoVector;
import org.apache.arrow.vector.TimeStampSecTZVector;
import org.apache.arrow.vector.TimeStampSecVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.TypeLayout;
import org.apache.arrow.vector.UInt1Vector;
import org.apache.arrow.vector.UInt2Vector;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.UInt8Vector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.BaseRepeatedValueViewVector;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.DecimalUtility;
import org.apache.arrow.vector.util.DictionaryUtility;
import org.apache.commons.codec.binary.Hex;
/**
* A writer that converts binary Vectors into an <em>internal, unstable</em> JSON format suitable
* for integration testing.
*
* <p>This writer does NOT implement a JSON dataset format like JSONL.
*/
public class JsonFileWriter implements AutoCloseable {
/** Configuration POJO for writing JSON files. */
public static final class JSONWriteConfig {
private final boolean pretty;
private JSONWriteConfig(boolean pretty) {
this.pretty = pretty;
}
private JSONWriteConfig() {
this.pretty = false;
}
public JSONWriteConfig pretty(boolean pretty) {
return new JSONWriteConfig(pretty);
}
}
public static JSONWriteConfig config() {
return new JSONWriteConfig();
}
private final JsonGenerator generator;
private Schema schema;
/** Constructs a new writer that will output to <code>outputFile</code>. */
public JsonFileWriter(File outputFile) throws IOException {
this(outputFile, config());
}
/** Constructs a new writer that will output to <code>outputFile</code> with the given options. */
public JsonFileWriter(File outputFile, JSONWriteConfig config) throws IOException {
MappingJsonFactory jsonFactory = new MappingJsonFactory();
this.generator = jsonFactory.createGenerator(outputFile, JsonEncoding.UTF8);
if (config.pretty) {
DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter();
prettyPrinter.indentArraysWith(NopIndenter.instance);
this.generator.setPrettyPrinter(prettyPrinter);
}
// Allow writing of floating point NaN values not as strings
this.generator.configure(JsonGenerator.Feature.QUOTE_NON_NUMERIC_NUMBERS, false);
}
/** Writes out the "header" of the file including the schema and any dictionaries required. */
public void start(Schema schema, DictionaryProvider provider) throws IOException {
List<Field> fields = new ArrayList<>(schema.getFields().size());
Set<Long> dictionaryIdsUsed = new HashSet<>();
this.schema = schema; // Store original Schema to ensure batches written match
// Convert fields with dictionaries to have dictionary type
for (Field field : schema.getFields()) {
fields.add(DictionaryUtility.toMessageFormat(field, provider, dictionaryIdsUsed));
}
Schema updatedSchema = new Schema(fields, schema.getCustomMetadata());
generator.writeStartObject();
generator.writeObjectField("schema", updatedSchema);
// Write all dictionaries that were used
if (!dictionaryIdsUsed.isEmpty()) {
writeDictionaryBatches(generator, dictionaryIdsUsed, provider);
}
// Start writing of record batches
generator.writeArrayFieldStart("batches");
}
private void writeDictionaryBatches(
JsonGenerator generator, Set<Long> dictionaryIdsUsed, DictionaryProvider provider)
throws IOException {
generator.writeArrayFieldStart("dictionaries");
for (Long id : dictionaryIdsUsed) {
generator.writeStartObject();
generator.writeObjectField("id", id);
generator.writeFieldName("data");
Dictionary dictionary = provider.lookup(id);
FieldVector vector = dictionary.getVector();
List<Field> fields = Collections.singletonList(vector.getField());
List<FieldVector> vectors = Collections.singletonList(vector);
VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, vector.getValueCount());
writeBatch(root);
generator.writeEndObject();
}
generator.writeEndArray();
}
/** Writes the record batch to the JSON file. */
public void write(VectorSchemaRoot recordBatch) throws IOException {
if (!recordBatch.getSchema().equals(schema)) {
throw new IllegalArgumentException("record batches must have the same schema: " + schema);
}
writeBatch(recordBatch);
}
private void writeBatch(VectorSchemaRoot recordBatch) throws IOException {
generator.writeStartObject();
{
generator.writeObjectField("count", recordBatch.getRowCount());
generator.writeArrayFieldStart("columns");
for (Field field : recordBatch.getSchema().getFields()) {
FieldVector vector = recordBatch.getVector(field);
writeFromVectorIntoJson(field, vector);
}
generator.writeEndArray();
}
generator.writeEndObject();
}
private void writeFromVectorIntoJson(Field field, FieldVector vector) throws IOException {
TypeLayout typeLayout = TypeLayout.getTypeLayout(field.getType());
List<BufferType> vectorTypes = typeLayout.getBufferTypes();
List<ArrowBuf> vectorBuffers = vector.getFieldBuffers();
if (typeLayout.isFixedBufferCount()) {
if (vectorTypes.size() != vectorBuffers.size()) {
throw new IllegalArgumentException(
"vector types and inner vector buffers are not the same size: "
+ vectorTypes.size()
+ " != "
+ vectorBuffers.size());
}
} else {
vectorTypes.add(VARIADIC_DATA_BUFFERS);
}
generator.writeStartObject();
{
generator.writeObjectField("name", field.getName());
int valueCount = vector.getValueCount();
generator.writeObjectField("count", valueCount);
for (int v = 0; v < vectorTypes.size(); v++) {
BufferType bufferType = vectorTypes.get(v);
ArrowBuf vectorBuffer = vectorBuffers.get(v);
// Note that in JSON format we cannot have VARIADIC_DATA_BUFFERS repeated,
// thus the values are only written to a single entity.
generator.writeArrayFieldStart(bufferType.getName());
final int bufferValueCount =
(bufferType.equals(OFFSET)
&& vector.getMinorType() != MinorType.DENSEUNION
&& vector.getMinorType() != MinorType.LISTVIEW)
? valueCount + 1
: valueCount;
for (int i = 0; i < bufferValueCount; i++) {
if (bufferType.equals(DATA)
&& (vector.getMinorType() == MinorType.VARCHAR
|| vector.getMinorType() == MinorType.VARBINARY)) {
writeValueToGenerator(bufferType, vectorBuffer, vectorBuffers.get(v - 1), vector, i);
} else if (bufferType.equals(VIEWS)
&& (vector.getMinorType() == MinorType.VIEWVARCHAR
|| vector.getMinorType() == MinorType.VIEWVARBINARY)) {
// writing views
ArrowBuf viewBuffer = vectorBuffers.get(1);
List<ArrowBuf> dataBuffers = vectorBuffers.subList(v + 1, vectorBuffers.size());
writeValueToViewGenerator(bufferType, viewBuffer, dataBuffers, vector, i);
} else if (bufferType.equals(VARIADIC_DATA_BUFFERS)
&& (vector.getMinorType() == MinorType.VIEWVARCHAR
|| vector.getMinorType() == MinorType.VIEWVARBINARY)) {
ArrowBuf viewBuffer = vectorBuffers.get(1); // check if this is v-1
List<ArrowBuf> dataBuffers = vectorBuffers.subList(v, vectorBuffers.size());
if (!dataBuffers.isEmpty()) {
writeValueToDataBufferGenerator(bufferType, viewBuffer, dataBuffers, vector);
// The variadic buffers are written at once and doesn't require iterating for
// each index.
// So, break the loop.
break;
}
} else if (bufferType.equals(OFFSET)
&& vector.getValueCount() == 0
&& (vector.getMinorType() == MinorType.LIST
|| vector.getMinorType() == MinorType.LISTVIEW
|| vector.getMinorType() == MinorType.MAP
|| vector.getMinorType() == MinorType.VARBINARY
|| vector.getMinorType() == MinorType.VARCHAR)) {
// Empty vectors may not have allocated an offsets buffer
try (ArrowBuf vectorBufferTmp = vector.getAllocator().buffer(4)) {
vectorBufferTmp.setInt(0, 0);
writeValueToGenerator(bufferType, vectorBufferTmp, null, vector, i);
}
} else if (bufferType.equals(OFFSET)
&& vector.getValueCount() == 0
&& (vector.getMinorType() == MinorType.LARGELIST
|| vector.getMinorType() == MinorType.LARGEVARBINARY
|| vector.getMinorType() == MinorType.LARGEVARCHAR)) {
// Empty vectors may not have allocated an offsets buffer
try (ArrowBuf vectorBufferTmp = vector.getAllocator().buffer(8)) {
vectorBufferTmp.setLong(0, 0);
writeValueToGenerator(bufferType, vectorBufferTmp, null, vector, i);
}
} else {
writeValueToGenerator(bufferType, vectorBuffer, null, vector, i);
}
}
generator.writeEndArray();
}
List<Field> fields = field.getChildren();
List<FieldVector> children = vector.getChildrenFromFields();
if (fields.size() != children.size()) {
throw new IllegalArgumentException(
"fields and children are not the same size: "
+ fields.size()
+ " != "
+ children.size());
}
if (fields.size() > 0) {
generator.writeArrayFieldStart("children");
for (int i = 0; i < fields.size(); i++) {
Field childField = fields.get(i);
FieldVector childVector = children.get(i);
writeFromVectorIntoJson(childField, childVector);
}
generator.writeEndArray();
}
}
generator.writeEndObject();
}
/**
* Get data of a view by index.
*
* @param viewBuffer view buffer
* @param dataBuffers data buffers
* @param index index of the view
* @return byte array of the view
*/
private byte[] getView(final ArrowBuf viewBuffer, final List<ArrowBuf> dataBuffers, int index) {
final int dataLength =
viewBuffer.getInt((long) index * BaseVariableWidthViewVector.ELEMENT_SIZE);
byte[] result = new byte[dataLength];
final int inlineSize = BaseVariableWidthViewVector.INLINE_SIZE;
final int elementSize = BaseVariableWidthViewVector.ELEMENT_SIZE;
final int lengthWidth = BaseVariableWidthViewVector.LENGTH_WIDTH;
final int prefixWidth = BaseVariableWidthViewVector.PREFIX_WIDTH;
final int bufIndexWidth = BaseVariableWidthViewVector.BUF_INDEX_WIDTH;
if (dataLength > inlineSize) {
// data is in the data buffer
// get buffer index
final int bufferIndex =
viewBuffer.getInt(((long) index * elementSize) + lengthWidth + prefixWidth);
// get data offset
final int dataOffset =
viewBuffer.getInt(
((long) index * elementSize) + lengthWidth + prefixWidth + bufIndexWidth);
dataBuffers.get(bufferIndex).getBytes(dataOffset, result, 0, dataLength);
} else {
// data is in the view buffer
viewBuffer.getBytes((long) index * elementSize + lengthWidth, result, 0, dataLength);
}
return result;
}
private void writeValueToViewGenerator(
BufferType bufferType,
ArrowBuf viewBuffer,
List<ArrowBuf> dataBuffers,
FieldVector vector,
final int index)
throws IOException {
Preconditions.checkNotNull(viewBuffer);
byte[] b = getView(viewBuffer, dataBuffers, index);
final int elementSize = BaseVariableWidthViewVector.ELEMENT_SIZE;
final int lengthWidth = BaseVariableWidthViewVector.LENGTH_WIDTH;
final int prefixWidth = BaseVariableWidthViewVector.PREFIX_WIDTH;
final int bufIndexWidth = BaseVariableWidthViewVector.BUF_INDEX_WIDTH;
final int length = viewBuffer.getInt((long) index * elementSize);
generator.writeStartObject();
generator.writeFieldName("SIZE");
generator.writeObject(length);
if (length > 12) {
byte[] prefix = Arrays.copyOfRange(b, 0, prefixWidth);
final int bufferIndex =
viewBuffer.getInt(((long) index * elementSize) + lengthWidth + prefixWidth);
// get data offset
final int dataOffset =
viewBuffer.getInt(
((long) index * elementSize) + lengthWidth + prefixWidth + bufIndexWidth);
generator.writeFieldName("PREFIX_HEX");
generator.writeString(Hex.encodeHexString(prefix));
generator.writeFieldName("BUFFER_INDEX");
generator.writeObject(bufferIndex);
generator.writeFieldName("OFFSET");
generator.writeObject(dataOffset);
} else {
generator.writeFieldName("INLINED");
if (vector.getMinorType() == MinorType.VIEWVARCHAR) {
generator.writeString(new String(b, "UTF-8"));
} else {
generator.writeString(Hex.encodeHexString(b));
}
}
generator.writeEndObject();
}
private void writeValueToDataBufferGenerator(
BufferType bufferType, ArrowBuf viewBuffer, List<ArrowBuf> dataBuffers, FieldVector vector)
throws IOException {
if (bufferType.equals(VARIADIC_DATA_BUFFERS)) {
Preconditions.checkNotNull(viewBuffer);
Preconditions.checkArgument(!dataBuffers.isEmpty());
for (int i = 0; i < dataBuffers.size(); i++) {
ArrowBuf dataBuf = dataBuffers.get(i);
byte[] result = new byte[(int) dataBuf.writerIndex()];
dataBuf.getBytes(0, result);
if (result != null) {
generator.writeString(Hex.encodeHexString(result));
}
}
}
}
private void writeValueToGenerator(
BufferType bufferType,
ArrowBuf buffer,
ArrowBuf offsetBuffer,
FieldVector vector,
final int index)
throws IOException {
if (bufferType.equals(TYPE)) {
generator.writeNumber(buffer.getByte(index * TinyIntVector.TYPE_WIDTH));
} else if (bufferType.equals(OFFSET)) {
switch (vector.getMinorType()) {
case VARCHAR:
case VARBINARY:
case LIST:
case MAP:
generator.writeNumber(buffer.getInt((long) index * BaseVariableWidthVector.OFFSET_WIDTH));
break;
case LISTVIEW:
generator.writeNumber(
buffer.getInt((long) index * BaseRepeatedValueViewVector.OFFSET_WIDTH));
break;
case LARGELIST:
case LARGEVARBINARY:
case LARGEVARCHAR:
generator.writeNumber(
buffer.getLong((long) index * BaseLargeVariableWidthVector.OFFSET_WIDTH));
break;
default:
throw new IllegalArgumentException("Type has no offset buffer: " + vector.getField());
}
} else if (bufferType.equals(VALIDITY)) {
generator.writeNumber(vector.isNull(index) ? 0 : 1);
} else if (bufferType.equals(DATA)) {
switch (vector.getMinorType()) {
case TINYINT:
generator.writeNumber(TinyIntVector.get(buffer, index));
break;
case SMALLINT:
generator.writeNumber(SmallIntVector.get(buffer, index));
break;
case INT:
generator.writeNumber(IntVector.get(buffer, index));
break;
case BIGINT:
generator.writeString(String.valueOf(BigIntVector.get(buffer, index)));
break;
case UINT1:
generator.writeNumber(UInt1Vector.getNoOverflow(buffer, index));
break;
case UINT2:
generator.writeNumber(UInt2Vector.get(buffer, index));
break;
case UINT4:
generator.writeNumber(UInt4Vector.getNoOverflow(buffer, index));
break;
case UINT8:
generator.writeString(UInt8Vector.getNoOverflow(buffer, index).toString());
break;
case FLOAT4:
generator.writeNumber(Float4Vector.get(buffer, index));
break;
case FLOAT8:
generator.writeNumber(Float8Vector.get(buffer, index));
break;
case DATEDAY:
generator.writeNumber(DateDayVector.get(buffer, index));
break;
case DATEMILLI:
generator.writeNumber(DateMilliVector.get(buffer, index));
break;
case TIMESEC:
generator.writeNumber(TimeSecVector.get(buffer, index));
break;
case TIMEMILLI:
generator.writeNumber(TimeMilliVector.get(buffer, index));
break;
case TIMEMICRO:
generator.writeNumber(TimeMicroVector.get(buffer, index));
break;
case TIMENANO:
generator.writeNumber(TimeNanoVector.get(buffer, index));
break;
case TIMESTAMPSEC:
generator.writeNumber(TimeStampSecVector.get(buffer, index));
break;
case TIMESTAMPMILLI:
generator.writeNumber(TimeStampMilliVector.get(buffer, index));
break;
case TIMESTAMPMICRO:
generator.writeNumber(TimeStampMicroVector.get(buffer, index));
break;
case TIMESTAMPNANO:
generator.writeNumber(TimeStampNanoVector.get(buffer, index));
break;
case TIMESTAMPSECTZ:
generator.writeNumber(TimeStampSecTZVector.get(buffer, index));
break;
case TIMESTAMPMILLITZ:
generator.writeNumber(TimeStampMilliTZVector.get(buffer, index));
break;
case TIMESTAMPMICROTZ:
generator.writeNumber(TimeStampMicroTZVector.get(buffer, index));
break;
case TIMESTAMPNANOTZ:
generator.writeNumber(TimeStampNanoTZVector.get(buffer, index));
break;
case DURATION:
generator.writeNumber(DurationVector.get(buffer, index));
break;
case INTERVALYEAR:
generator.writeNumber(IntervalYearVector.getTotalMonths(buffer, index));
break;
case INTERVALDAY:
generator.writeStartObject();
generator.writeObjectField("days", IntervalDayVector.getDays(buffer, index));
generator.writeObjectField(
"milliseconds", IntervalDayVector.getMilliseconds(buffer, index));
generator.writeEndObject();
break;
case INTERVALMONTHDAYNANO:
generator.writeStartObject();
generator.writeObjectField("months", IntervalMonthDayNanoVector.getMonths(buffer, index));
generator.writeObjectField("days", IntervalMonthDayNanoVector.getDays(buffer, index));
generator.writeObjectField(
"nanoseconds", IntervalMonthDayNanoVector.getNanoseconds(buffer, index));
generator.writeEndObject();
break;
case BIT:
generator.writeNumber(BitVectorHelper.get(buffer, index));
break;
case VARBINARY:
{
Preconditions.checkNotNull(offsetBuffer);
String hexString =
Hex.encodeHexString(BaseVariableWidthVector.get(buffer, offsetBuffer, index));
generator.writeObject(hexString);
break;
}
case FIXEDSIZEBINARY:
int byteWidth = ((FixedSizeBinaryVector) vector).getByteWidth();
String fixedSizeHexString =
Hex.encodeHexString(FixedSizeBinaryVector.get(buffer, index, byteWidth));
generator.writeObject(fixedSizeHexString);
break;
case VARCHAR:
{
Preconditions.checkNotNull(offsetBuffer);
byte[] b = (BaseVariableWidthVector.get(buffer, offsetBuffer, index));
generator.writeString(new String(b, "UTF-8"));
break;
}
case DECIMAL:
{
int scale = ((DecimalVector) vector).getScale();
BigDecimal decimalValue =
DecimalUtility.getBigDecimalFromArrowBuf(
buffer, index, scale, DecimalVector.TYPE_WIDTH);
// We write the unscaled value, because the scale is stored in the type metadata.
generator.writeString(decimalValue.unscaledValue().toString());
break;
}
case DECIMAL256:
{
int scale = ((Decimal256Vector) vector).getScale();
BigDecimal decimalValue =
DecimalUtility.getBigDecimalFromArrowBuf(
buffer, index, scale, Decimal256Vector.TYPE_WIDTH);
// We write the unscaled value, because the scale is stored in the type metadata.
generator.writeString(decimalValue.unscaledValue().toString());
break;
}
default:
throw new UnsupportedOperationException("minor type: " + vector.getMinorType());
}
} else if (bufferType.equals(SIZE)) {
generator.writeNumber(buffer.getInt((long) index * BaseRepeatedValueViewVector.SIZE_WIDTH));
}
}
@Override
public void close() throws IOException {
generator.writeEndArray();
generator.writeEndObject();
generator.close();
}
}