BaseRepeatedValueViewVector.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.arrow.vector.complex;

import static org.apache.arrow.memory.util.LargeMemoryUtil.capAtMaxInt;

import java.util.Collections;
import java.util.Iterator;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.util.CommonUtil;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.AddOrGetResult;
import org.apache.arrow.vector.BaseFixedWidthVector;
import org.apache.arrow.vector.BaseValueVector;
import org.apache.arrow.vector.BaseVariableWidthVector;
import org.apache.arrow.vector.DensityAwareVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.NullVector;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.ZeroVector;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.util.CallBack;
import org.apache.arrow.vector.util.OversizedAllocationException;
import org.apache.arrow.vector.util.SchemaChangeRuntimeException;

public abstract class BaseRepeatedValueViewVector extends BaseValueVector
    implements RepeatedValueVector, BaseListVector {

  public static final FieldVector DEFAULT_DATA_VECTOR = ZeroVector.INSTANCE;
  public static final String DATA_VECTOR_NAME = "$data$";

  public static final byte OFFSET_WIDTH = 4;
  public static final byte SIZE_WIDTH = 4;
  protected ArrowBuf offsetBuffer;
  protected ArrowBuf sizeBuffer;
  protected FieldVector vector;
  protected final CallBack repeatedCallBack;
  protected int valueCount;
  protected long offsetAllocationSizeInBytes = INITIAL_VALUE_ALLOCATION * OFFSET_WIDTH;
  protected long sizeAllocationSizeInBytes = INITIAL_VALUE_ALLOCATION * SIZE_WIDTH;
  private final String name;

  protected String defaultDataVectorName = DATA_VECTOR_NAME;

  protected BaseRepeatedValueViewVector(String name, BufferAllocator allocator, CallBack callBack) {
    this(name, allocator, DEFAULT_DATA_VECTOR, callBack);
  }

  protected BaseRepeatedValueViewVector(
      String name, BufferAllocator allocator, FieldVector vector, CallBack callBack) {
    super(allocator);
    this.name = name;
    this.offsetBuffer = allocator.getEmpty();
    this.sizeBuffer = allocator.getEmpty();
    this.vector = Preconditions.checkNotNull(vector, "data vector cannot be null");
    this.repeatedCallBack = callBack;
    this.valueCount = 0;
  }

  @Override
  public String getName() {
    return name;
  }

  @Override
  public boolean allocateNewSafe() {
    boolean dataAlloc = false;
    try {
      allocateBuffers();
      dataAlloc = vector.allocateNewSafe();
    } catch (Exception e) {
      clear();
      return false;
    } finally {
      if (!dataAlloc) {
        clear();
      }
    }
    return dataAlloc;
  }

  private void allocateBuffers() {
    offsetBuffer = allocateBuffers(offsetAllocationSizeInBytes);
    sizeBuffer = allocateBuffers(sizeAllocationSizeInBytes);
  }

  protected ArrowBuf allocateBuffers(final long size) {
    final int curSize = (int) size;
    ArrowBuf buffer = allocator.buffer(curSize);
    buffer.readerIndex(0);
    buffer.setZero(0, buffer.capacity());
    return buffer;
  }

  @Override
  public void reAlloc() {
    reallocateBuffers();
    vector.reAlloc();
  }

  protected void reallocateBuffers() {
    reallocOffsetBuffer();
    reallocSizeBuffer();
  }

  private void reallocOffsetBuffer() {
    final long currentBufferCapacity = offsetBuffer.capacity();
    long newAllocationSize = currentBufferCapacity * 2;
    if (newAllocationSize == 0) {
      if (offsetAllocationSizeInBytes > 0) {
        newAllocationSize = offsetAllocationSizeInBytes;
      } else {
        newAllocationSize = INITIAL_VALUE_ALLOCATION * OFFSET_WIDTH * 2;
      }
    }

    newAllocationSize = CommonUtil.nextPowerOfTwo(newAllocationSize);
    newAllocationSize = Math.min(newAllocationSize, (long) OFFSET_WIDTH * Integer.MAX_VALUE);
    assert newAllocationSize >= 1;

    if (newAllocationSize > MAX_ALLOCATION_SIZE || newAllocationSize <= offsetBuffer.capacity()) {
      throw new OversizedAllocationException("Unable to expand the buffer");
    }

    final ArrowBuf newBuf = allocator.buffer(newAllocationSize);
    newBuf.setBytes(0, offsetBuffer, 0, currentBufferCapacity);
    newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity);
    offsetBuffer.getReferenceManager().release(1);
    offsetBuffer = newBuf;
    offsetAllocationSizeInBytes = newAllocationSize;
  }

  private void reallocSizeBuffer() {
    final long currentBufferCapacity = sizeBuffer.capacity();
    long newAllocationSize = currentBufferCapacity * 2;
    if (newAllocationSize == 0) {
      if (sizeAllocationSizeInBytes > 0) {
        newAllocationSize = sizeAllocationSizeInBytes;
      } else {
        newAllocationSize = INITIAL_VALUE_ALLOCATION * SIZE_WIDTH * 2;
      }
    }

    newAllocationSize = CommonUtil.nextPowerOfTwo(newAllocationSize);
    newAllocationSize = Math.min(newAllocationSize, (long) SIZE_WIDTH * Integer.MAX_VALUE);
    assert newAllocationSize >= 1;

    if (newAllocationSize > MAX_ALLOCATION_SIZE || newAllocationSize <= sizeBuffer.capacity()) {
      throw new OversizedAllocationException("Unable to expand the buffer");
    }

    final ArrowBuf newBuf = allocator.buffer(newAllocationSize);
    newBuf.setBytes(0, sizeBuffer, 0, currentBufferCapacity);
    newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity);
    sizeBuffer.getReferenceManager().release(1);
    sizeBuffer = newBuf;
    sizeAllocationSizeInBytes = newAllocationSize;
  }

  @Override
  public FieldVector getDataVector() {
    return vector;
  }

  @Override
  public void setInitialCapacity(int numRecords) {
    offsetAllocationSizeInBytes = (numRecords) * OFFSET_WIDTH;
    sizeAllocationSizeInBytes = (numRecords) * SIZE_WIDTH;
    if (vector instanceof BaseFixedWidthVector || vector instanceof BaseVariableWidthVector) {
      vector.setInitialCapacity(numRecords * RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD);
    } else {
      vector.setInitialCapacity(numRecords);
    }
  }

  @Override
  public void setInitialCapacity(int numRecords, double density) {
    if ((numRecords * density) >= Integer.MAX_VALUE) {
      throw new OversizedAllocationException("Requested amount of memory is more than max allowed");
    }

    offsetAllocationSizeInBytes = numRecords * OFFSET_WIDTH;
    sizeAllocationSizeInBytes = numRecords * SIZE_WIDTH;

    int innerValueCapacity = Math.max((int) (numRecords * density), 1);

    if (vector instanceof DensityAwareVector) {
      ((DensityAwareVector) vector).setInitialCapacity(innerValueCapacity, density);
    } else {
      vector.setInitialCapacity(innerValueCapacity);
    }
  }

  /**
   * Specialized version of setInitialTotalCapacity() for ListViewVector. This is used by some
   * callers when they want to explicitly control and be conservative about memory allocated for
   * inner data vector. This is very useful when we are working with memory constraints for a query
   * and have a fixed amount of memory reserved for the record batch. In such cases, we are likely
   * to face OOM or related problems when we reserve memory for a record batch with value count x
   * and do setInitialCapacity(x) such that each vector allocates only what is necessary and not the
   * default amount, but the multiplier forces the memory requirement to go beyond what was needed.
   *
   * @param numRecords value count
   * @param totalNumberOfElements the total number of elements to allow for in this vector across
   *     all records.
   */
  public void setInitialTotalCapacity(int numRecords, int totalNumberOfElements) {
    offsetAllocationSizeInBytes = numRecords * OFFSET_WIDTH;
    sizeAllocationSizeInBytes = numRecords * SIZE_WIDTH;
    vector.setInitialCapacity(totalNumberOfElements);
  }

  @Override
  public int getValueCapacity() {
    throw new UnsupportedOperationException(
        "Get value capacity is not supported in RepeatedValueVector");
  }

  protected int getOffsetBufferValueCapacity() {
    return capAtMaxInt(offsetBuffer.capacity() / OFFSET_WIDTH);
  }

  protected int getSizeBufferValueCapacity() {
    return capAtMaxInt(sizeBuffer.capacity() / SIZE_WIDTH);
  }

  @Override
  public int getBufferSize() {
    if (valueCount == 0) {
      return 0;
    }
    return (valueCount * OFFSET_WIDTH) + (valueCount * SIZE_WIDTH) + vector.getBufferSize();
  }

  @Override
  public int getBufferSizeFor(int valueCount) {
    if (valueCount == 0) {
      return 0;
    }

    int innerVectorValueCount = 0;

    for (int i = 0; i < valueCount; i++) {
      innerVectorValueCount += sizeBuffer.getInt(i * SIZE_WIDTH);
    }

    return (valueCount * OFFSET_WIDTH)
        + (valueCount * SIZE_WIDTH)
        + vector.getBufferSizeFor(innerVectorValueCount);
  }

  @Override
  public Iterator<ValueVector> iterator() {
    return Collections.<ValueVector>singleton(getDataVector()).iterator();
  }

  @Override
  public void clear() {
    offsetBuffer = releaseBuffer(offsetBuffer);
    sizeBuffer = releaseBuffer(sizeBuffer);
    vector.clear();
    valueCount = 0;
    super.clear();
  }

  @Override
  public void reset() {
    offsetBuffer.setZero(0, offsetBuffer.capacity());
    sizeBuffer.setZero(0, sizeBuffer.capacity());
    vector.reset();
    valueCount = 0;
  }

  @Override
  public ArrowBuf[] getBuffers(boolean clear) {
    return new ArrowBuf[0];
  }

  @Override
  public int getValueCount() {
    return valueCount;
  }

  @Override
  public void setValueCount(int valueCount) {
    this.valueCount = valueCount;
    while (valueCount > getOffsetBufferValueCapacity()) {
      reallocateBuffers();
    }
    final int childValueCount = valueCount == 0 ? 0 : getLengthOfChildVector();
    vector.setValueCount(childValueCount);
  }

  protected int getLengthOfChildVector() {
    int maxOffsetSizeSum = offsetBuffer.getInt(0) + sizeBuffer.getInt(0);
    int minOffset = offsetBuffer.getInt(0);
    for (int i = 0; i < valueCount; i++) {
      int currentOffset = offsetBuffer.getInt(i * OFFSET_WIDTH);
      int currentSize = sizeBuffer.getInt(i * SIZE_WIDTH);
      int currentSum = currentOffset + currentSize;

      maxOffsetSizeSum = Math.max(maxOffsetSizeSum, currentSum);
      minOffset = Math.min(minOffset, currentOffset);
    }

    return maxOffsetSizeSum - minOffset;
  }

  protected int getLengthOfChildVectorByIndex(int index) {
    int maxOffsetSizeSum = offsetBuffer.getInt(0) + sizeBuffer.getInt(0);
    int minOffset = offsetBuffer.getInt(0);
    for (int i = 0; i < index; i++) {
      int currentOffset = offsetBuffer.getInt(i * OFFSET_WIDTH);
      int currentSize = sizeBuffer.getInt(i * SIZE_WIDTH);
      int currentSum = currentOffset + currentSize;

      maxOffsetSizeSum = Math.max(maxOffsetSizeSum, currentSum);
      minOffset = Math.min(minOffset, currentOffset);
    }

    return maxOffsetSizeSum - minOffset;
  }

  /**
   * Initialize the data vector (and execute callback) if it hasn't already been done, returns the
   * data vector.
   */
  public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(FieldType fieldType) {
    boolean created = false;
    if (vector instanceof NullVector) {
      vector = fieldType.createNewSingleVector(defaultDataVectorName, allocator, repeatedCallBack);
      // returned vector must have the same field
      created = true;
      if (repeatedCallBack != null
          &&
          // not a schema change if changing from ZeroVector to ZeroVector
          (fieldType.getType().getTypeID() != ArrowType.ArrowTypeID.Null)) {
        repeatedCallBack.doWork();
      }
    }

    if (vector.getField().getType().getTypeID() != fieldType.getType().getTypeID()) {
      final String msg =
          String.format(
              "Inner vector type mismatch. Requested type: [%s], actual type: [%s]",
              fieldType.getType().getTypeID(), vector.getField().getType().getTypeID());
      throw new SchemaChangeRuntimeException(msg);
    }

    return new AddOrGetResult<>((T) vector, created);
  }

  protected void replaceDataVector(FieldVector v) {
    vector.clear();
    vector = v;
  }

  public abstract boolean isEmpty(int index);

  /**
   * Start a new value at the given index.
   *
   * @param index the index to start the new value at
   * @return the offset in the data vector where the new value starts
   */
  public int startNewValue(int index) {
    while (index >= getOffsetBufferValueCapacity()) {
      reallocOffsetBuffer();
    }
    while (index >= getSizeBufferValueCapacity()) {
      reallocSizeBuffer();
    }

    if (index > 0) {
      final int prevOffset = getLengthOfChildVectorByIndex(index);
      offsetBuffer.setInt(index * OFFSET_WIDTH, prevOffset);
    }

    setValueCount(index + 1);
    return offsetBuffer.getInt(index * OFFSET_WIDTH);
  }

  @Override
  @Deprecated
  public UInt4Vector getOffsetVector() {
    throw new UnsupportedOperationException("There is no inner offset vector");
  }
}