JsonFileReader.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.arrow.vector.ipc;

import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
import static com.fasterxml.jackson.core.JsonToken.END_OBJECT;
import static com.fasterxml.jackson.core.JsonToken.START_ARRAY;
import static com.fasterxml.jackson.core.JsonToken.START_OBJECT;
import static org.apache.arrow.vector.BufferLayout.BufferType.DATA;
import static org.apache.arrow.vector.BufferLayout.BufferType.OFFSET;
import static org.apache.arrow.vector.BufferLayout.BufferType.SIZE;
import static org.apache.arrow.vector.BufferLayout.BufferType.TYPE;
import static org.apache.arrow.vector.BufferLayout.BufferType.VALIDITY;
import static org.apache.arrow.vector.BufferLayout.BufferType.VARIADIC_DATA_BUFFERS;
import static org.apache.arrow.vector.BufferLayout.BufferType.VIEWS;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonParser.Feature;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.MappingJsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.BaseVariableWidthViewVector;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.BufferLayout.BufferType;
import org.apache.arrow.vector.Decimal256Vector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.IntervalDayVector;
import org.apache.arrow.vector.IntervalMonthDayNanoVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.TypeLayout;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.ArrowType.LargeListView;
import org.apache.arrow.vector.types.pojo.ArrowType.ListView;
import org.apache.arrow.vector.types.pojo.ArrowType.Union;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.DecimalUtility;
import org.apache.arrow.vector.util.DictionaryUtility;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;

/**
 * A reader for JSON files that translates them into vectors. This reader is used for integration
 * tests.
 *
 * <p>This class uses a streaming parser API, method naming tends to reflect this implementation
 * detail.
 */
public class JsonFileReader implements AutoCloseable, DictionaryProvider {
  private final JsonParser parser;
  private final BufferAllocator allocator;
  private Schema schema;
  private Map<Long, Dictionary> dictionaries;
  private Boolean started = false;

  /**
   * Constructs a new instance.
   *
   * @param inputFile The file to read.
   * @param allocator The allocator to use for allocating buffers.
   */
  public JsonFileReader(File inputFile, BufferAllocator allocator)
      throws JsonParseException, IOException {
    super();
    this.allocator = allocator;
    MappingJsonFactory jsonFactory =
        new MappingJsonFactory(
            new ObjectMapper()
                // ignore case for enums
                .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS, true));
    this.parser = jsonFactory.createParser(inputFile);
    // Allow reading NaN for floating point values
    this.parser.configure(Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
  }

  @Override
  public Dictionary lookup(long id) {
    if (!started) {
      throw new IllegalStateException("Unable to lookup until after read() has started");
    }

    return dictionaries.get(id);
  }

  @Override
  public Set<Long> getDictionaryIds() {
    return dictionaries.keySet();
  }

  /** Reads the beginning (schema section) of the json file and returns it. */
  public Schema start() throws JsonParseException, IOException {
    readToken(START_OBJECT);
    {
      Schema originalSchema = readNextField("schema", Schema.class);
      List<Field> fields = new ArrayList<>();
      dictionaries = new HashMap<>();

      // Convert fields with dictionaries to have the index type
      for (Field field : originalSchema.getFields()) {
        fields.add(DictionaryUtility.toMemoryFormat(field, allocator, dictionaries));
      }
      this.schema = new Schema(fields, originalSchema.getCustomMetadata());

      if (!dictionaries.isEmpty()) {
        nextFieldIs("dictionaries");
        readDictionaryBatches();
      }

      nextFieldIs("batches");
      readToken(START_ARRAY);
      started = true;
      return this.schema;
    }
  }

  private void readDictionaryBatches() throws JsonParseException, IOException {
    readToken(START_ARRAY);
    JsonToken token = parser.nextToken();
    boolean haveDictionaryBatch = token == START_OBJECT;
    while (haveDictionaryBatch) {

      // Lookup what dictionary for the batch about to be read
      long id = readNextField("id", Long.class);
      Dictionary dict = dictionaries.get(id);
      if (dict == null) {
        throw new IllegalArgumentException(
            "Dictionary with id: " + id + " missing encoding from schema Field");
      }

      // Read the dictionary record batch
      nextFieldIs("data");
      FieldVector vector = dict.getVector();
      List<Field> fields = Collections.singletonList(vector.getField());
      List<FieldVector> vectors = Collections.singletonList(vector);
      VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, vector.getValueCount());
      read(root);

      readToken(END_OBJECT);
      token = parser.nextToken();
      haveDictionaryBatch = token == START_OBJECT;
    }

    if (token != END_ARRAY) {
      throw new IllegalArgumentException(
          "Invalid token: " + token + " expected end of array at " + parser.getTokenLocation());
    }
  }

  /** Reads the next record batch from the file into <code>root</code>. */
  public boolean read(VectorSchemaRoot root) throws IOException {
    JsonToken t = parser.nextToken();
    if (t == START_OBJECT) {
      {
        int count = readNextField("count", Integer.class);
        nextFieldIs("columns");
        readToken(START_ARRAY);
        {
          for (Field field : root.getSchema().getFields()) {
            FieldVector vector = root.getVector(field);
            readFromJsonIntoVector(field, vector);
          }
        }
        readToken(END_ARRAY);
        root.setRowCount(count);
      }
      readToken(END_OBJECT);
      return true;
    } else if (t == END_ARRAY) {
      root.setRowCount(0);
      return false;
    } else {
      throw new IllegalArgumentException("Invalid token: " + t);
    }
  }

  /** Returns the next record batch from the file. */
  public VectorSchemaRoot read() throws IOException {
    JsonToken t = parser.nextToken();
    if (t == START_OBJECT) {
      VectorSchemaRoot recordBatch = VectorSchemaRoot.create(schema, allocator);
      {
        int count = readNextField("count", Integer.class);
        recordBatch.setRowCount(count);
        nextFieldIs("columns");
        readToken(START_ARRAY);
        {
          for (Field field : schema.getFields()) {
            FieldVector vector = recordBatch.getVector(field);
            readFromJsonIntoVector(field, vector);
          }
        }
        readToken(END_ARRAY);
      }
      readToken(END_OBJECT);
      return recordBatch;
    } else if (t == END_ARRAY) {
      return null;
    } else {
      throw new IllegalArgumentException("Invalid token: " + t);
    }
  }

  /**
   * Skips a number of record batches in the file.
   *
   * @param numBatches the number of batches to skip
   * @return the actual number of skipped batches.
   */
  // This is currently called using JPype by the integration tests.
  public int skip(int numBatches) throws IOException {
    for (int i = 0; i < numBatches; ++i) {
      JsonToken t = parser.nextToken();
      if (t == START_OBJECT) {
        parser.skipChildren();
        assert parser.getCurrentToken() == END_OBJECT;
      } else if (t == END_ARRAY) {
        return i;
      } else {
        throw new IllegalArgumentException("Invalid token: " + t);
      }
    }
    return numBatches;
  }

  private abstract class BufferReader {
    protected abstract ArrowBuf read(BufferAllocator allocator, int count) throws IOException;

    ArrowBuf readBuffer(BufferAllocator allocator, int count) throws IOException {
      readToken(START_ARRAY);
      ArrowBuf buf = read(allocator, count);
      readToken(END_ARRAY);
      return buf;
    }
  }

  /**
   * Read all the variadic data buffers from the parser.
   *
   * @param allocator BufferAllocator
   * @param variadicBuffersCount Number of variadic buffers
   * @return List of ArrowBuf
   * @throws IOException throws IOException in a failure
   */
  List<ArrowBuf> readVariadicBuffers(BufferAllocator allocator, int variadicBuffersCount)
      throws IOException {
    readToken(START_ARRAY);
    ArrayList<ArrowBuf> dataBuffers = new ArrayList<>(variadicBuffersCount);
    for (int i = 0; i < variadicBuffersCount; i++) {
      parser.nextToken();
      final byte[] value;

      String variadicStr = parser.readValueAs(String.class);
      if (variadicStr == null) {
        value = new byte[0];
      } else {
        value = decodeHexSafe(variadicStr);
      }

      ArrowBuf buf = allocator.buffer(value.length);
      buf.writeBytes(value);
      dataBuffers.add(buf);
    }
    readToken(END_ARRAY);
    return dataBuffers;
  }

  private ArrowBuf readViewBuffers(
      BufferAllocator allocator, int count, List<Integer> variadicBufferIndices, MinorType type)
      throws IOException {
    readToken(START_ARRAY);
    ArrayList<byte[]> values = new ArrayList<>(count);
    long bufferSize = 0L;
    for (int i = 0; i < count; i++) {
      readToken(START_OBJECT);
      final int length = readNextField("SIZE", Integer.class);
      byte[] value;
      if (length > BaseVariableWidthViewVector.INLINE_SIZE) {
        // PREFIX_HEX
        final byte[] prefix = decodeHexSafe(readNextField("PREFIX_HEX", String.class));
        // BUFFER_INDEX
        final int bufferIndex = readNextField("BUFFER_INDEX", Integer.class);
        if (variadicBufferIndices.isEmpty()) {
          variadicBufferIndices.add(bufferIndex);
        } else {
          int lastBufferIndex = variadicBufferIndices.get(variadicBufferIndices.size() - 1);
          if (lastBufferIndex != bufferIndex) {
            variadicBufferIndices.add(bufferIndex);
          }
        }

        // OFFSET
        final int offset = readNextField("OFFSET", Integer.class);
        ByteBuffer buffer =
            ByteBuffer.allocate(BaseVariableWidthViewVector.ELEMENT_SIZE)
                .order(ByteOrder.LITTLE_ENDIAN); // Allocate a ByteBuffer of size 16 bytes
        buffer.putInt(length); // Write 'length' to bytes 0-3
        buffer.put(prefix); // Write 'prefix' to bytes 4-7
        buffer.putInt(bufferIndex); // Write 'bufferIndex' to bytes 8-11
        buffer.putInt(offset); // Write 'offset' to bytes 12-15
        value = buffer.array(); // Convert the ByteBuffer to a byte array
      } else {
        // in-line
        ByteBuffer buffer =
            ByteBuffer.allocate(BaseVariableWidthViewVector.ELEMENT_SIZE)
                .order(ByteOrder.LITTLE_ENDIAN); // Allocate a ByteBuffer of size 16 bytes
        buffer.putInt(length); // Write 'length' to bytes 0-3
        // INLINE
        if (type == MinorType.VIEWVARCHAR) {
          buffer.put(readNextField("INLINED", String.class).getBytes(StandardCharsets.UTF_8));
        } else {
          String inlined = readNextField("INLINED", String.class);
          if (inlined == null) {
            buffer.put(new byte[length]);
          } else {
            buffer.put(decodeHexSafe(inlined));
          }
        }
        value = buffer.array(); // Convert the ByteBuffer to a byte array
      }
      values.add(value);
      bufferSize += value.length;
      readToken(END_OBJECT);
    }

    ArrowBuf buf = allocator.buffer(bufferSize);

    for (byte[] value : values) {
      buf.writeBytes(value);
    }
    readToken(END_ARRAY);
    return buf;
  }

  private class BufferHelper {

    BufferReader BIT =
        new BufferReader() {
          @Override
          protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
            final int bufferSize = BitVectorHelper.getValidityBufferSize(count);
            ArrowBuf buf = allocator.buffer(bufferSize);

            // C++ integration test fails without this.
            buf.setZero(0, bufferSize);

            for (int i = 0; i < count; i++) {
              parser.nextToken();
              BitVectorHelper.setValidityBit(buf, i, parser.readValueAs(Boolean.class) ? 1 : 0);
            }

            buf.writerIndex(bufferSize);
            return buf;
          }
        };

    BufferReader DAY_MILLIS =
        new BufferReader() {
          @Override
          protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
            final long size = (long) count * IntervalDayVector.TYPE_WIDTH;
            ArrowBuf buf = allocator.buffer(size);

            for (int i = 0; i < count; i++) {
              readToken(START_OBJECT);
              buf.writeInt(readNextField("days", Integer.class));
              buf.writeInt(readNextField("milliseconds", Integer.class));
              readToken(END_OBJECT);
            }

            return buf;
          }
        };

    BufferReader MONTH_DAY_NANOS =
        new BufferReader() {
          @Override
          protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
            final long size = (long) count * IntervalMonthDayNanoVector.TYPE_WIDTH;
            ArrowBuf buf = allocator.buffer(size);

            for (int i = 0; i < count; i++) {
              readToken(START_OBJECT);
              buf.writeInt(readNextField("months", Integer.class));
              buf.writeInt(readNextField("days", Integer.class));
              buf.writeLong(readNextField("nanoseconds", Long.class));
              readToken(END_OBJECT);
            }

            return buf;
          }
        };

    BufferReader INT1 =
        new BufferReader() {
          @Override
          protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
            final long size = (long) count * TinyIntVector.TYPE_WIDTH;
            ArrowBuf buf = allocator.buffer(size);

            for (int i = 0; i < count; i++) {
              parser.nextToken();
              buf.writeByte(parser.getByteValue());
            }

            return buf;
          }
        };

    BufferReader INT2 =
        new BufferReader() {
          @Override
          protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
            final long size = (long) count * SmallIntVector.TYPE_WIDTH;
            ArrowBuf buf = allocator.buffer(size);

            for (int i = 0; i < count; i++) {
              parser.nextToken();
              buf.writeShort(parser.getShortValue());
            }

            return buf;
          }
        };

    BufferReader INT4 =
        new BufferReader() {
          @Override
          protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
            final long size = (long) count * IntVector.TYPE_WIDTH;
            ArrowBuf buf = allocator.buffer(size);

            for (int i = 0; i < count; i++) {
              parser.nextToken();
              buf.writeInt(parser.getIntValue());
            }

            return buf;
          }
        };

    BufferReader INT8 =
        new BufferReader() {
          @Override
          protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
            final long size = (long) count * BigIntVector.TYPE_WIDTH;
            ArrowBuf buf = allocator.buffer(size);

            for (int i = 0; i < count; i++) {
              parser.nextToken();
              String value = parser.getValueAsString();
              buf.writeLong(Long.valueOf(value));
            }

            return buf;
          }
        };

    BufferReader UINT1 =
        new BufferReader() {
          @Override
          protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
            final long size = (long) count * TinyIntVector.TYPE_WIDTH;
            ArrowBuf buf = allocator.buffer(size);

            for (int i = 0; i < count; i++) {
              parser.nextToken();
              buf.writeByte(parser.getShortValue() & 0xFF);
            }

            return buf;
          }
        };

    BufferReader UINT2 =
        new BufferReader() {
          @Override
          protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
            final long size = (long) count * SmallIntVector.TYPE_WIDTH;
            ArrowBuf buf = allocator.buffer(size);

            for (int i = 0; i < count; i++) {
              parser.nextToken();
              buf.writeShort(parser.getIntValue() & 0xFFFF);
            }

            return buf;
          }
        };

    BufferReader UINT4 =
        new BufferReader() {
          @Override
          protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
            final long size = (long) count * IntVector.TYPE_WIDTH;
            ArrowBuf buf = allocator.buffer(size);

            for (int i = 0; i < count; i++) {
              parser.nextToken();
              buf.writeInt((int) parser.getLongValue());
            }

            return buf;
          }
        };

    BufferReader UINT8 =
        new BufferReader() {
          @Override
          protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
            final long size = (long) count * BigIntVector.TYPE_WIDTH;
            ArrowBuf buf = allocator.buffer(size);

            for (int i = 0; i < count; i++) {
              parser.nextToken();
              BigInteger value = new BigInteger(parser.getValueAsString());
              buf.writeLong(value.longValue());
            }

            return buf;
          }
        };

    BufferReader FLOAT4 =
        new BufferReader() {
          @Override
          protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
            final long size = (long) count * Float4Vector.TYPE_WIDTH;
            ArrowBuf buf = allocator.buffer(size);

            for (int i = 0; i < count; i++) {
              parser.nextToken();
              buf.writeFloat(parser.getFloatValue());
            }

            return buf;
          }
        };

    BufferReader FLOAT8 =
        new BufferReader() {
          @Override
          protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
            final long size = (long) count * Float8Vector.TYPE_WIDTH;
            ArrowBuf buf = allocator.buffer(size);

            for (int i = 0; i < count; i++) {
              parser.nextToken();
              buf.writeDouble(parser.getDoubleValue());
            }

            return buf;
          }
        };

    BufferReader DECIMAL =
        new BufferReader() {
          @Override
          protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
            final long size = (long) count * DecimalVector.TYPE_WIDTH;
            ArrowBuf buf = allocator.buffer(size);

            for (int i = 0; i < count; i++) {
              parser.nextToken();
              BigDecimal decimalValue = new BigDecimal(parser.readValueAs(String.class));
              DecimalUtility.writeBigDecimalToArrowBuf(
                  decimalValue, buf, i, DecimalVector.TYPE_WIDTH);
            }

            buf.writerIndex(size);
            return buf;
          }
        };

    BufferReader DECIMAL256 =
        new BufferReader() {
          @Override
          protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
            final long size = (long) count * Decimal256Vector.TYPE_WIDTH;
            ArrowBuf buf = allocator.buffer(size);

            for (int i = 0; i < count; i++) {
              parser.nextToken();
              BigDecimal decimalValue = new BigDecimal(parser.readValueAs(String.class));
              DecimalUtility.writeBigDecimalToArrowBuf(
                  decimalValue, buf, i, Decimal256Vector.TYPE_WIDTH);
            }

            buf.writerIndex(size);
            return buf;
          }
        };

    ArrowBuf readBinaryValues(BufferAllocator allocator, int count) throws IOException {
      ArrayList<byte[]> values = new ArrayList<>(count);
      long bufferSize = 0L;
      for (int i = 0; i < count; i++) {
        parser.nextToken();
        final byte[] value = decodeHexSafe(parser.readValueAs(String.class));
        values.add(value);
        bufferSize += value.length;
      }

      ArrowBuf buf = allocator.buffer(bufferSize);

      for (byte[] value : values) {
        buf.writeBytes(value);
      }

      return buf;
    }

    ArrowBuf readStringValues(BufferAllocator allocator, int count) throws IOException {
      ArrayList<byte[]> values = new ArrayList<>(count);
      long bufferSize = 0L;
      for (int i = 0; i < count; i++) {
        parser.nextToken();
        final byte[] value = parser.getValueAsString().getBytes(StandardCharsets.UTF_8);
        values.add(value);
        bufferSize += value.length;
      }

      ArrowBuf buf = allocator.buffer(bufferSize);

      for (byte[] value : values) {
        buf.writeBytes(value);
      }

      return buf;
    }

    BufferReader FIXEDSIZEBINARY =
        new BufferReader() {
          @Override
          protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
            return readBinaryValues(allocator, count);
          }
        };

    BufferReader VARCHAR =
        new BufferReader() {
          @Override
          protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
            return readStringValues(allocator, count);
          }
        };

    BufferReader LARGEVARCHAR =
        new BufferReader() {
          @Override
          protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
            return readStringValues(allocator, count);
          }
        };

    BufferReader VARBINARY =
        new BufferReader() {
          @Override
          protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
            return readBinaryValues(allocator, count);
          }
        };

    BufferReader LARGEVARBINARY =
        new BufferReader() {
          @Override
          protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException {
            return readBinaryValues(allocator, count);
          }
        };
  }

  private List<ArrowBuf> readIntoBuffer(
      BufferAllocator allocator,
      BufferType bufferType,
      MinorType type,
      int count,
      List<Integer> variadicBufferIndices)
      throws IOException {
    ArrowBuf buf;

    BufferHelper helper = new BufferHelper();
    BufferReader reader;

    if (bufferType.equals(VALIDITY)) {
      reader = helper.BIT;
    } else if (bufferType.equals(OFFSET) || bufferType.equals(SIZE)) {
      if (type == MinorType.LARGELIST
          || type == MinorType.LARGEVARCHAR
          || type == MinorType.LARGEVARBINARY
          || type == MinorType.LARGELISTVIEW) {
        reader = helper.INT8;
      } else {
        reader = helper.INT4;
      }
    } else if (bufferType.equals(TYPE)) {
      reader = helper.INT1;
    } else if (bufferType.equals(DATA)) {
      switch (type) {
        case BIT:
          reader = helper.BIT;
          break;
        case TINYINT:
          reader = helper.INT1;
          break;
        case SMALLINT:
          reader = helper.INT2;
          break;
        case INT:
          reader = helper.INT4;
          break;
        case BIGINT:
          reader = helper.INT8;
          break;
        case UINT1:
          reader = helper.UINT1;
          break;
        case UINT2:
          reader = helper.UINT2;
          break;
        case UINT4:
          reader = helper.UINT4;
          break;
        case UINT8:
          reader = helper.UINT8;
          break;
        case FLOAT4:
          reader = helper.FLOAT4;
          break;
        case FLOAT8:
          reader = helper.FLOAT8;
          break;
        case DECIMAL:
          reader = helper.DECIMAL;
          break;
        case DECIMAL256:
          reader = helper.DECIMAL256;
          break;
        case FIXEDSIZEBINARY:
          reader = helper.FIXEDSIZEBINARY;
          break;
        case VARCHAR:
          reader = helper.VARCHAR;
          break;
        case LARGEVARCHAR:
          reader = helper.LARGEVARCHAR;
          break;
        case VARBINARY:
          reader = helper.VARBINARY;
          break;
        case LARGEVARBINARY:
          reader = helper.LARGEVARBINARY;
          break;
        case DATEDAY:
          reader = helper.INT4;
          break;
        case DATEMILLI:
          reader = helper.INT8;
          break;
        case TIMESEC:
        case TIMEMILLI:
          reader = helper.INT4;
          break;
        case TIMEMICRO:
        case TIMENANO:
          reader = helper.INT8;
          break;
        case TIMESTAMPNANO:
        case TIMESTAMPMICRO:
        case TIMESTAMPMILLI:
        case TIMESTAMPSEC:
        case TIMESTAMPNANOTZ:
        case TIMESTAMPMICROTZ:
        case TIMESTAMPMILLITZ:
        case TIMESTAMPSECTZ:
          reader = helper.INT8;
          break;
        case INTERVALYEAR:
          reader = helper.INT4;
          break;
        case INTERVALDAY:
          reader = helper.DAY_MILLIS;
          break;
        case INTERVALMONTHDAYNANO:
          reader = helper.MONTH_DAY_NANOS;
          break;
        case DURATION:
          reader = helper.INT8;
          break;
        default:
          throw new UnsupportedOperationException("Cannot read array of type " + type);
      }
    } else if (bufferType.equals(VIEWS)) {
      return Collections.singletonList(
          readViewBuffers(allocator, count, variadicBufferIndices, type));
    } else if (bufferType.equals(VARIADIC_DATA_BUFFERS)) {
      return readVariadicBuffers(allocator, variadicBufferIndices.size());
    } else {
      throw new InvalidArrowFileException("Unrecognized buffer type " + bufferType);
    }

    buf = reader.readBuffer(allocator, count);
    Preconditions.checkNotNull(buf);
    return Collections.singletonList(buf);
  }

  private void readFromJsonIntoVector(Field field, FieldVector vector) throws IOException {
    ArrowType type = field.getType();
    TypeLayout typeLayout = TypeLayout.getTypeLayout(type);
    List<BufferType> vectorTypes = typeLayout.getBufferTypes();
    List<ArrowBuf> vectorBuffers = new ArrayList<>(vectorTypes.size());
    List<Integer> variadicBufferIndices = new ArrayList<>();

    if (!typeLayout.isFixedBufferCount()) {
      vectorTypes.add(VARIADIC_DATA_BUFFERS);
    }
    /*
     * The order of inner buffers is:
     * Fixed width vector:
     *    -- validity buffer
     *    -- data buffer
     * Variable width vector:
     *    -- validity buffer
     *    -- offset buffer
     *    -- data buffer
     *
     * This is similar to what getFieldInnerVectors() used to give but now that we don't have
     * inner vectors anymore, we will work directly at the buffer level -- populate buffers
     * locally as we read from Json parser and do loadFieldBuffers on the vector followed by
     * releasing the local buffers.
     */
    readToken(START_OBJECT);
    {
      // If currently reading dictionaries, field name is not important so don't check
      String name = readNextField("name", String.class);
      if (started && !Objects.equals(field.getName(), name)) {
        throw new IllegalArgumentException(
            "Expected field " + field.getName() + " but got " + name);
      }

      /* Initialize the vector with required capacity but don't allocateNew since we would
       * be doing loadFieldBuffers.
       */
      int valueCount = readNextField("count", Integer.class);

      vector.setInitialCapacity(valueCount);

      for (int v = 0; v < vectorTypes.size(); v++) {
        BufferType bufferType = vectorTypes.get(v);
        nextFieldIs(bufferType.getName());
        int innerBufferValueCount = valueCount;
        if (bufferType.equals(OFFSET)
            && !(type instanceof Union)
            && !(type instanceof ListView)
            && !(type instanceof LargeListView)) {
          /* offset buffer has 1 additional value capacity except for dense unions and ListView */
          innerBufferValueCount = valueCount + 1;
        }

        vectorBuffers.addAll(
            readIntoBuffer(
                allocator,
                bufferType,
                vector.getMinorType(),
                innerBufferValueCount,
                variadicBufferIndices));
      }

      int nullCount = 0;
      if (type instanceof ArrowType.Null) {
        nullCount = valueCount;
      } else if (!(type instanceof Union)) {
        nullCount = BitVectorHelper.getNullCount(vectorBuffers.get(0), valueCount);
      }
      final ArrowFieldNode fieldNode = new ArrowFieldNode(valueCount, nullCount);
      vector.loadFieldBuffers(fieldNode, vectorBuffers);

      /* read child vectors (if any) */
      List<Field> fields = field.getChildren();
      if (!fields.isEmpty()) {
        List<FieldVector> vectorChildren = vector.getChildrenFromFields();
        if (fields.size() != vectorChildren.size()) {
          throw new IllegalArgumentException(
              "fields and children are not the same size: "
                  + fields.size()
                  + " != "
                  + vectorChildren.size());
        }
        nextFieldIs("children");
        readToken(START_ARRAY);
        for (int i = 0; i < fields.size(); i++) {
          Field childField = fields.get(i);
          FieldVector childVector = vectorChildren.get(i);
          readFromJsonIntoVector(childField, childVector);
        }
        readToken(END_ARRAY);
      }
    }

    readToken(END_OBJECT);

    for (ArrowBuf buffer : vectorBuffers) {
      buffer.getReferenceManager().release();
    }
  }

  private byte[] decodeHexSafe(String hexString) throws IOException {
    try {
      return Hex.decodeHex(hexString.toCharArray());
    } catch (DecoderException e) {
      throw new IOException("Unable to decode hex string: " + hexString, e);
    }
  }

  @Override
  public void close() throws IOException {
    parser.close();
    if (dictionaries != null) {
      for (Dictionary dictionary : dictionaries.values()) {
        dictionary.getVector().close();
      }
    }
  }

  private <T> T readNextField(String expectedFieldName, Class<T> c)
      throws IOException, JsonParseException {
    nextFieldIs(expectedFieldName);
    parser.nextToken();
    return parser.readValueAs(c);
  }

  private void nextFieldIs(String expectedFieldName) throws IOException, JsonParseException {
    String name = parser.nextFieldName();
    if (name == null || !name.equals(expectedFieldName)) {
      throw new IllegalStateException("Expected " + expectedFieldName + " but got " + name);
    }
  }

  private void readToken(JsonToken expected) throws JsonParseException, IOException {
    JsonToken t = parser.nextToken();
    if (t != expected) {
      throw new IllegalStateException("Expected " + expected + " but got " + t);
    }
  }
}