LargeListVector.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.vector.complex;
import static java.util.Collections.singletonList;
import static org.apache.arrow.memory.util.LargeMemoryUtil.capAtMaxInt;
import static org.apache.arrow.memory.util.LargeMemoryUtil.checkedCastToInt;
import static org.apache.arrow.util.Preconditions.checkArgument;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.OutOfMemoryException;
import org.apache.arrow.memory.util.ArrowBufPointer;
import org.apache.arrow.memory.util.ByteFunctionHelpers;
import org.apache.arrow.memory.util.CommonUtil;
import org.apache.arrow.memory.util.hash.ArrowBufHasher;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.AddOrGetResult;
import org.apache.arrow.vector.BaseFixedWidthVector;
import org.apache.arrow.vector.BaseValueVector;
import org.apache.arrow.vector.BaseVariableWidthVector;
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.BufferBacked;
import org.apache.arrow.vector.DensityAwareVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.NullVector;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.ValueIterableVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.ZeroVector;
import org.apache.arrow.vector.compare.VectorVisitor;
import org.apache.arrow.vector.complex.impl.ComplexCopier;
import org.apache.arrow.vector.complex.impl.UnionLargeListReader;
import org.apache.arrow.vector.complex.impl.UnionLargeListWriter;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.util.CallBack;
import org.apache.arrow.vector.util.JsonStringArrayList;
import org.apache.arrow.vector.util.OversizedAllocationException;
import org.apache.arrow.vector.util.SchemaChangeRuntimeException;
import org.apache.arrow.vector.util.TransferPair;
/**
* A list vector contains lists of a specific type of elements. Its structure contains 3 elements.
*
* <ol>
* <li>A validity buffer.
* <li>An offset buffer, that denotes lists boundaries.
* <li>A child data vector that contains the elements of lists.
* </ol>
*
* This is the LargeList variant of list, it has a 64-bit wide offset
*
* <p>WARNING: Currently Arrow in Java doesn't support 64-bit vectors. This class follows the
* expected behaviour of a LargeList but doesn't actually support allocating a 64-bit vector. It has
* little use until 64-bit vectors are supported and should be used with caution. todo review
* checkedCastToInt usage in this class. Once int64 indexed vectors are supported these checks
* aren't needed.
*/
public class LargeListVector extends BaseValueVector
implements RepeatedValueVector, FieldVector, PromotableVector, ValueIterableVector<List<?>> {
public static LargeListVector empty(String name, BufferAllocator allocator) {
return new LargeListVector(
name, allocator, FieldType.nullable(ArrowType.LargeList.INSTANCE), null);
}
public static final FieldVector DEFAULT_DATA_VECTOR = ZeroVector.INSTANCE;
public static final String DATA_VECTOR_NAME = "$data$";
public static final byte OFFSET_WIDTH = 8;
protected ArrowBuf offsetBuffer;
protected FieldVector vector;
protected final CallBack callBack;
protected int valueCount;
protected long offsetAllocationSizeInBytes = INITIAL_VALUE_ALLOCATION * OFFSET_WIDTH;
protected String defaultDataVectorName = DATA_VECTOR_NAME;
protected ArrowBuf validityBuffer;
protected UnionLargeListReader reader;
private Field field;
private int validityAllocationSizeInBytes;
/** The maximum index that is actually set. */
private int lastSet;
/**
* Constructs a new instance.
*
* @param name The name of the instance.
* @param allocator The allocator to use for allocating/reallocating buffers.
* @param fieldType The type of this list.
* @param callBack A schema change callback.
*/
public LargeListVector(
String name, BufferAllocator allocator, FieldType fieldType, CallBack callBack) {
this(new Field(name, fieldType, null), allocator, callBack);
}
/**
* Creates a new instance.
*
* @param field The field materialized by this vector.
* @param allocator The allocator to use for creating/reallocating buffers for the vector.
* @param callBack A schema change callback.
*/
public LargeListVector(Field field, BufferAllocator allocator, CallBack callBack) {
super(allocator);
this.field = field;
this.validityBuffer = allocator.getEmpty();
this.callBack = callBack;
this.validityAllocationSizeInBytes = getValidityBufferSizeFromCount(INITIAL_VALUE_ALLOCATION);
this.lastSet = -1;
this.offsetBuffer = allocator.getEmpty();
this.vector = vector == null ? DEFAULT_DATA_VECTOR : vector;
this.valueCount = 0;
}
@Override
public void initializeChildrenFromFields(List<Field> children) {
checkArgument(
children.size() == 1,
"Lists have one child Field. Found: %s",
children.isEmpty() ? "none" : children);
Field field = children.get(0);
AddOrGetResult<FieldVector> addOrGetVector = addOrGetVector(field.getFieldType());
checkArgument(
addOrGetVector.isCreated(), "Child vector already existed: %s", addOrGetVector.getVector());
addOrGetVector.getVector().initializeChildrenFromFields(field.getChildren());
this.field = new Field(this.field.getName(), this.field.getFieldType(), children);
}
@Override
public void setInitialCapacity(int numRecords) {
validityAllocationSizeInBytes = getValidityBufferSizeFromCount(numRecords);
offsetAllocationSizeInBytes = (long) (numRecords + 1) * OFFSET_WIDTH;
if (vector instanceof BaseFixedWidthVector || vector instanceof BaseVariableWidthVector) {
vector.setInitialCapacity(numRecords * RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD);
} else {
vector.setInitialCapacity(numRecords);
}
}
/**
* Specialized version of setInitialCapacity() for ListVector. This is used by some callers when
* they want to explicitly control and be conservative about memory allocated for inner data
* vector. This is very useful when we are working with memory constraints for a query and have a
* fixed amount of memory reserved for the record batch. In such cases, we are likely to face OOM
* or related problems when we reserve memory for a record batch with value count x and do
* setInitialCapacity(x) such that each vector allocates only what is necessary and not the
* default amount but the multiplier forces the memory requirement to go beyond what was needed.
*
* @param numRecords value count
* @param density density of ListVector. Density is the average size of list per position in the
* List vector. For example, a density value of 10 implies each position in the list vector
* has a list of 10 values. A density value of 0.1 implies out of 10 positions in the list
* vector, 1 position has a list of size 1 and remaining positions are null (no lists) or
* empty lists. This helps in tightly controlling the memory we provision for inner data
* vector.
*/
@Override
public void setInitialCapacity(int numRecords, double density) {
validityAllocationSizeInBytes = getValidityBufferSizeFromCount(numRecords);
if ((numRecords * density) >= Integer.MAX_VALUE) {
throw new OversizedAllocationException("Requested amount of memory is more than max allowed");
}
offsetAllocationSizeInBytes = (numRecords + 1L) * OFFSET_WIDTH;
int innerValueCapacity = Math.max((int) (numRecords * density), 1);
if (vector instanceof DensityAwareVector) {
((DensityAwareVector) vector).setInitialCapacity(innerValueCapacity, density);
} else {
vector.setInitialCapacity(innerValueCapacity);
}
}
/**
* Specialized version of setInitialTotalCapacity() for ListVector. This is used by some callers
* when they want to explicitly control and be conservative about memory allocated for inner data
* vector. This is very useful when we are working with memory constraints for a query and have a
* fixed amount of memory reserved for the record batch. In such cases, we are likely to face OOM
* or related problems when we reserve memory for a record batch with value count x and do
* setInitialCapacity(x) such that each vector allocates only what is necessary and not the
* default amount but the multiplier forces the memory requirement to go beyond what was needed.
*
* @param numRecords value count
* @param totalNumberOfElements the total number of elements to to allow for in this vector across
* all records.
*/
public void setInitialTotalCapacity(int numRecords, int totalNumberOfElements) {
offsetAllocationSizeInBytes = (numRecords + 1L) * OFFSET_WIDTH;
vector.setInitialCapacity(totalNumberOfElements);
}
/**
* Get the density of this ListVector.
*
* @return density
*/
public double getDensity() {
if (valueCount == 0) {
return 0.0D;
}
final long startOffset = offsetBuffer.getLong(0L);
final long endOffset = offsetBuffer.getLong((long) valueCount * OFFSET_WIDTH);
final double totalListSize = endOffset - startOffset;
return totalListSize / valueCount;
}
@Override
public List<FieldVector> getChildrenFromFields() {
return singletonList(getDataVector());
}
/**
* Load the buffers of this vector with provided source buffers. The caller manages the source
* buffers and populates them before invoking this method.
*
* @param fieldNode the fieldNode indicating the value count
* @param ownBuffers the buffers for this Field (own buffers only, children not included)
*/
@Override
public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
if (ownBuffers.size() != 2) {
throw new IllegalArgumentException(
"Illegal buffer count, expected " + 2 + ", got: " + ownBuffers.size());
}
ArrowBuf bitBuffer = ownBuffers.get(0);
ArrowBuf offBuffer = ownBuffers.get(1);
validityBuffer.getReferenceManager().release();
validityBuffer = BitVectorHelper.loadValidityBuffer(fieldNode, bitBuffer, allocator);
offsetBuffer.getReferenceManager().release();
offsetBuffer = offBuffer.getReferenceManager().retain(offBuffer, allocator);
validityAllocationSizeInBytes = checkedCastToInt(validityBuffer.capacity());
offsetAllocationSizeInBytes = offsetBuffer.capacity();
lastSet = fieldNode.getLength() - 1;
valueCount = fieldNode.getLength();
}
/**
* Get the buffers belonging to this vector.
*
* @return the inner buffers.
*/
@Override
public List<ArrowBuf> getFieldBuffers() {
List<ArrowBuf> result = new ArrayList<>(2);
setReaderAndWriterIndex();
result.add(validityBuffer);
result.add(offsetBuffer);
return result;
}
/**
* Export the buffers of the fields for C Data Interface. This method traverse the buffers and
* export buffer and buffer's memory address into a list of buffers and a pointer to the list of
* buffers.
*/
@Override
public void exportCDataBuffers(List<ArrowBuf> buffers, ArrowBuf buffersPtr, long nullValue) {
exportBuffer(validityBuffer, buffers, buffersPtr, nullValue, true);
if (offsetBuffer.capacity() == 0) {
// Empty offset buffer is allowed for historical reason.
// To export it through C Data interface, we need to allocate a buffer with one offset.
// We set `retain = false` to explicitly not increase the ref count for the exported buffer.
// The ref count of the newly created buffer (i.e., 1) already represents the usage
// at imported side.
exportBuffer(allocateOffsetBuffer(OFFSET_WIDTH), buffers, buffersPtr, nullValue, false);
} else {
exportBuffer(offsetBuffer, buffers, buffersPtr, nullValue, true);
}
}
/** Set the reader and writer indexes for the inner buffers. */
private void setReaderAndWriterIndex() {
validityBuffer.readerIndex(0);
offsetBuffer.readerIndex(0);
if (valueCount == 0) {
validityBuffer.writerIndex(0);
offsetBuffer.writerIndex(0);
} else {
validityBuffer.writerIndex(getValidityBufferSizeFromCount(valueCount));
offsetBuffer.writerIndex((valueCount + 1) * OFFSET_WIDTH);
}
}
/**
* Get the inner vectors.
*
* @return the inner vectors for this field as defined by the TypeLayout
* @deprecated This API will be removed as the current implementations no longer support inner
* vectors.
*/
@Deprecated
@Override
public List<BufferBacked> getFieldInnerVectors() {
throw new UnsupportedOperationException("There are no inner vectors. Use getFieldBuffers");
}
/** Same as {@link #allocateNewSafe()}. */
@Override
public void allocateNew() throws OutOfMemoryException {
if (!allocateNewSafe()) {
throw new OutOfMemoryException("Failure while allocating memory");
}
}
/**
* Allocate memory for the vector. We internally use a default value count of 4096 to allocate
* memory for at least these many elements in the vector.
*
* @return false if memory allocation fails, true otherwise.
*/
@Override
public boolean allocateNewSafe() {
boolean success = false;
try {
/* we are doing a new allocation -- release the current buffers */
clear();
/* allocate validity buffer */
allocateValidityBuffer(validityAllocationSizeInBytes);
/* allocate offset and data buffer */
boolean dataAlloc = false;
try {
offsetBuffer = allocateOffsetBuffer(offsetAllocationSizeInBytes);
dataAlloc = vector.allocateNewSafe();
} catch (Exception e) {
e.printStackTrace();
clear();
success = false;
} finally {
if (!dataAlloc) {
clear();
}
}
success = dataAlloc;
} finally {
if (!success) {
clear();
}
}
return success;
}
private void allocateValidityBuffer(final long size) {
final int curSize = (int) size;
validityBuffer = allocator.buffer(curSize);
validityBuffer.readerIndex(0);
validityAllocationSizeInBytes = curSize;
validityBuffer.setZero(0, validityBuffer.capacity());
}
protected ArrowBuf allocateOffsetBuffer(final long size) {
ArrowBuf offsetBuffer = allocator.buffer(size);
offsetBuffer.readerIndex(0);
offsetAllocationSizeInBytes = size;
offsetBuffer.setZero(0, offsetBuffer.capacity());
return offsetBuffer;
}
/**
* Resize the vector to increase the capacity. The internal behavior is to double the current
* value capacity.
*/
@Override
public void reAlloc() {
/* reallocate the validity buffer */
reallocValidityBuffer();
/* reallocate the offset and data */
reallocOffsetBuffer();
vector.reAlloc();
}
private void reallocValidityAndOffsetBuffers() {
reallocOffsetBuffer();
reallocValidityBuffer();
}
protected void reallocOffsetBuffer() {
final long currentBufferCapacity = offsetBuffer.capacity();
long newAllocationSize = currentBufferCapacity * 2;
if (newAllocationSize == 0) {
if (offsetAllocationSizeInBytes > 0) {
newAllocationSize = offsetAllocationSizeInBytes;
} else {
newAllocationSize = INITIAL_VALUE_ALLOCATION * OFFSET_WIDTH * 2;
}
}
newAllocationSize = CommonUtil.nextPowerOfTwo(newAllocationSize);
newAllocationSize = Math.min(newAllocationSize, (long) OFFSET_WIDTH * Integer.MAX_VALUE);
assert newAllocationSize >= 1;
if (newAllocationSize > MAX_ALLOCATION_SIZE || newAllocationSize <= offsetBuffer.capacity()) {
throw new OversizedAllocationException("Unable to expand the buffer");
}
final ArrowBuf newBuf = allocator.buffer(newAllocationSize);
newBuf.setBytes(0, offsetBuffer, 0, currentBufferCapacity);
newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity);
offsetBuffer.getReferenceManager().release(1);
offsetBuffer = newBuf;
offsetAllocationSizeInBytes = newAllocationSize;
}
private void reallocValidityBuffer() {
final int currentBufferCapacity = checkedCastToInt(validityBuffer.capacity());
long newAllocationSize = currentBufferCapacity * 2L;
if (newAllocationSize == 0) {
if (validityAllocationSizeInBytes > 0) {
newAllocationSize = validityAllocationSizeInBytes;
} else {
newAllocationSize = getValidityBufferSizeFromCount(INITIAL_VALUE_ALLOCATION) * 2L;
}
}
newAllocationSize = CommonUtil.nextPowerOfTwo(newAllocationSize);
assert newAllocationSize >= 1;
if (newAllocationSize > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Unable to expand the buffer");
}
final ArrowBuf newBuf = allocator.buffer(newAllocationSize);
newBuf.setBytes(0, validityBuffer, 0, currentBufferCapacity);
newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity);
validityBuffer.getReferenceManager().release(1);
validityBuffer = newBuf;
validityAllocationSizeInBytes = (int) newAllocationSize;
}
/**
* Same as {@link #copyFrom(int, int, ValueVector)} except that it handles the case when the
* capacity of the vector needs to be expanded before copy.
*
* @param inIndex position to copy from in source vector
* @param outIndex position to copy to in this vector
* @param from source vector
*/
@Override
public void copyFromSafe(int inIndex, int outIndex, ValueVector from) {
copyFrom(inIndex, outIndex, from);
}
/**
* Copy a cell value from a particular index in source vector to a particular position in this
* vector.
*
* @param inIndex position to copy from in source vector
* @param outIndex position to copy to in this vector
* @param from source vector
*/
@Override
public void copyFrom(int inIndex, int outIndex, ValueVector from) {
Preconditions.checkArgument(this.getMinorType() == from.getMinorType());
FieldReader in = from.getReader();
in.setPosition(inIndex);
UnionLargeListWriter out = getWriter();
out.setPosition(outIndex);
ComplexCopier.copy(in, out);
}
/**
* Get the offset vector.
*
* @return the underlying offset vector or null if none exists.
* @deprecated This API will be removed, as the current implementations no longer hold inner
* offset vectors.
*/
@Override
@Deprecated
public UInt4Vector getOffsetVector() {
throw new UnsupportedOperationException("There is no inner offset vector");
}
/**
* Get the inner data vector for this list vector.
*
* @return data vector
*/
@Override
public FieldVector getDataVector() {
return vector;
}
@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
return getTransferPair(ref, allocator, null);
}
@Override
public TransferPair getTransferPair(Field field, BufferAllocator allocator) {
return getTransferPair(field, allocator, null);
}
@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
return new TransferImpl(ref, allocator, callBack);
}
@Override
public TransferPair getTransferPair(Field field, BufferAllocator allocator, CallBack callBack) {
return new TransferImpl(field, allocator, callBack);
}
@Override
public TransferPair makeTransferPair(ValueVector target) {
return new TransferImpl((LargeListVector) target);
}
@Override
public long getValidityBufferAddress() {
return validityBuffer.memoryAddress();
}
@Override
public long getDataBufferAddress() {
throw new UnsupportedOperationException();
}
@Override
public long getOffsetBufferAddress() {
return offsetBuffer.memoryAddress();
}
@Override
public ArrowBuf getValidityBuffer() {
return validityBuffer;
}
@Override
public ArrowBuf getDataBuffer() {
throw new UnsupportedOperationException();
}
@Override
public ArrowBuf getOffsetBuffer() {
return offsetBuffer;
}
@Override
public int getValueCount() {
return valueCount;
}
@Override
public int hashCode(int index) {
return hashCode(index, null);
}
@Override
public int hashCode(int index, ArrowBufHasher hasher) {
if (isSet(index) == 0) {
return ArrowBufPointer.NULL_HASH_CODE;
}
int hash = 0;
final long start = offsetBuffer.getLong((long) index * OFFSET_WIDTH);
final long end = offsetBuffer.getLong(((long) index + 1L) * OFFSET_WIDTH);
for (long i = start; i < end; i++) {
hash = ByteFunctionHelpers.combineHash(hash, vector.hashCode(checkedCastToInt(i), hasher));
}
return hash;
}
@Override
public <OUT, IN> OUT accept(VectorVisitor<OUT, IN> visitor, IN value) {
return visitor.visit(this, value);
}
public UnionLargeListWriter getWriter() {
return new UnionLargeListWriter(this);
}
protected void replaceDataVector(FieldVector v) {
vector.clear();
vector = v;
}
@Override
public UnionVector promoteToUnion() {
UnionVector vector = new UnionVector("$data$", allocator, /* field type */ null, callBack);
replaceDataVector(vector);
invalidateReader();
if (callBack != null) {
callBack.doWork();
}
return vector;
}
private class TransferImpl implements TransferPair {
LargeListVector to;
TransferPair dataTransferPair;
public TransferImpl(String name, BufferAllocator allocator, CallBack callBack) {
this(new LargeListVector(name, allocator, field.getFieldType(), callBack));
}
public TransferImpl(Field field, BufferAllocator allocator, CallBack callBack) {
this(new LargeListVector(field, allocator, callBack));
}
public TransferImpl(LargeListVector to) {
this.to = to;
to.addOrGetVector(vector.getField().getFieldType());
if (to.getDataVector() instanceof ZeroVector) {
to.addOrGetVector(vector.getField().getFieldType());
}
dataTransferPair = getDataVector().makeTransferPair(to.getDataVector());
}
/**
* Transfer this vector'data to another vector. The memory associated with this vector is
* transferred to the allocator of target vector for accounting and management purposes.
*/
@Override
public void transfer() {
to.clear();
dataTransferPair.transfer();
to.validityBuffer = transferBuffer(validityBuffer, to.allocator);
to.offsetBuffer = transferBuffer(offsetBuffer, to.allocator);
to.lastSet = lastSet;
if (valueCount > 0) {
to.setValueCount(valueCount);
}
clear();
}
/**
* Slice this vector at desired index and length and transfer the corresponding data to the
* target vector.
*
* @param startIndex start position of the split in source vector.
* @param length length of the split.
*/
@Override
public void splitAndTransfer(int startIndex, int length) {
Preconditions.checkArgument(
startIndex >= 0 && length >= 0 && startIndex + length <= valueCount,
"Invalid parameters startIndex: %s, length: %s for valueCount: %s",
startIndex,
length,
valueCount);
final long startPoint = offsetBuffer.getLong((long) startIndex * OFFSET_WIDTH);
final long sliceLength =
offsetBuffer.getLong((long) (startIndex + length) * OFFSET_WIDTH) - startPoint;
to.clear();
to.offsetBuffer = to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
/* splitAndTransfer offset buffer */
for (int i = 0; i < length + 1; i++) {
final long relativeOffset =
offsetBuffer.getLong((long) (startIndex + i) * OFFSET_WIDTH) - startPoint;
to.offsetBuffer.setLong((long) i * OFFSET_WIDTH, relativeOffset);
}
/* splitAndTransfer validity buffer */
splitAndTransferValidityBuffer(startIndex, length, to);
/* splitAndTransfer data buffer */
dataTransferPair.splitAndTransfer(
checkedCastToInt(startPoint), checkedCastToInt(sliceLength));
to.lastSet = length - 1;
to.setValueCount(length);
}
/*
* transfer the validity.
*/
private void splitAndTransferValidityBuffer(
int startIndex, int length, LargeListVector target) {
int firstByteSource = BitVectorHelper.byteIndex(startIndex);
int lastByteSource = BitVectorHelper.byteIndex(valueCount - 1);
int byteSizeTarget = getValidityBufferSizeFromCount(length);
int offset = startIndex % 8;
if (length > 0) {
if (offset == 0) {
// slice
if (target.validityBuffer != null) {
target.validityBuffer.getReferenceManager().release();
}
target.validityBuffer = validityBuffer.slice(firstByteSource, byteSizeTarget);
target.validityBuffer.getReferenceManager().retain(1);
} else {
/* Copy data
* When the first bit starts from the middle of a byte (offset != 0),
* copy data from src BitVector.
* Each byte in the target is composed by a part in i-th byte,
* another part in (i+1)-th byte.
*/
target.allocateValidityBuffer(byteSizeTarget);
for (int i = 0; i < byteSizeTarget - 1; i++) {
byte b1 =
BitVectorHelper.getBitsFromCurrentByte(validityBuffer, firstByteSource + i, offset);
byte b2 =
BitVectorHelper.getBitsFromNextByte(
validityBuffer, firstByteSource + i + 1, offset);
target.validityBuffer.setByte(i, (b1 + b2));
}
/* Copying the last piece is done in the following manner:
* if the source vector has 1 or more bytes remaining, we copy
* the last piece as a byte formed by shifting data
* from the current byte and the next byte.
*
* if the source vector has no more bytes remaining
* (we are at the last byte), we copy the last piece as a byte
* by shifting data from the current byte.
*/
if ((firstByteSource + byteSizeTarget - 1) < lastByteSource) {
byte b1 =
BitVectorHelper.getBitsFromCurrentByte(
validityBuffer, firstByteSource + byteSizeTarget - 1, offset);
byte b2 =
BitVectorHelper.getBitsFromNextByte(
validityBuffer, firstByteSource + byteSizeTarget, offset);
target.validityBuffer.setByte(byteSizeTarget - 1, b1 + b2);
} else {
byte b1 =
BitVectorHelper.getBitsFromCurrentByte(
validityBuffer, firstByteSource + byteSizeTarget - 1, offset);
target.validityBuffer.setByte(byteSizeTarget - 1, b1);
}
}
}
}
@Override
public ValueVector getTo() {
return to;
}
@Override
public void copyValueSafe(int from, int to) {
this.to.copyFrom(from, to, LargeListVector.this);
}
}
@Override
protected FieldReader getReaderImpl() {
return new UnionLargeListReader(this);
}
@Override
public UnionLargeListReader getReader() {
reader = (UnionLargeListReader) super.getReader();
return reader;
}
/**
* Initialize the data vector (and execute callback) if it hasn't already been done, returns the
* data vector.
*/
@Override
public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(FieldType fieldType) {
boolean created = false;
if (vector instanceof NullVector) {
vector = fieldType.createNewSingleVector(defaultDataVectorName, allocator, callBack);
// returned vector must have the same field
created = true;
if (callBack != null
&&
// not a schema change if changing from ZeroVector to ZeroVector
(fieldType.getType().getTypeID() != ArrowType.ArrowTypeID.Null)) {
callBack.doWork();
}
}
if (vector.getField().getType().getTypeID() != fieldType.getType().getTypeID()) {
final String msg =
String.format(
"Inner vector type mismatch. Requested type: [%s], actual type: [%s]",
fieldType.getType().getTypeID(), vector.getField().getType().getTypeID());
throw new SchemaChangeRuntimeException(msg);
}
invalidateReader();
return new AddOrGetResult<>((T) vector, created);
}
/**
* Get the size (number of bytes) of underlying buffers used by this vector.
*
* @return size of underlying buffers.
*/
@Override
public int getBufferSize() {
if (valueCount == 0) {
return 0;
}
final int offsetBufferSize = (valueCount + 1) * OFFSET_WIDTH;
final int validityBufferSize = getValidityBufferSizeFromCount(valueCount);
return offsetBufferSize + validityBufferSize + vector.getBufferSize();
}
@Override
public int getBufferSizeFor(int valueCount) {
if (valueCount == 0) {
return 0;
}
final int validityBufferSize = getValidityBufferSizeFromCount(valueCount);
long innerVectorValueCount = offsetBuffer.getLong((long) valueCount * OFFSET_WIDTH);
return ((valueCount + 1) * OFFSET_WIDTH)
+ vector.getBufferSizeFor(checkedCastToInt(innerVectorValueCount))
+ validityBufferSize;
}
@Override
public Field getField() {
if (field.getChildren().contains(getDataVector().getField())) {
return field;
}
field =
new Field(
field.getName(),
field.getFieldType(),
Collections.singletonList(getDataVector().getField()));
return field;
}
@Override
public MinorType getMinorType() {
return MinorType.LARGELIST;
}
@Override
public String getName() {
return field.getName();
}
@Override
public void clear() {
offsetBuffer = releaseBuffer(offsetBuffer);
vector.clear();
valueCount = 0;
super.clear();
validityBuffer = releaseBuffer(validityBuffer);
lastSet = -1;
}
@Override
public void reset() {
offsetBuffer.setZero(0, offsetBuffer.capacity());
vector.reset();
valueCount = 0;
validityBuffer.setZero(0, validityBuffer.capacity());
lastSet = -1;
}
/**
* Return the underlying buffers associated with this vector. Note that this doesn't impact the
* reference counts for this buffer so it only should be used for in-context access. Also note
* that this buffer changes regularly thus external classes shouldn't hold a reference to it
* (unless they change it).
*
* @param clear Whether to clear vector before returning; the buffers will still be refcounted but
* the returned array will be the only reference to them
* @return The underlying {@link ArrowBuf buffers} that is used by this vector instance.
*/
@Override
public ArrowBuf[] getBuffers(boolean clear) {
setReaderAndWriterIndex();
final ArrowBuf[] buffers;
if (getBufferSize() == 0) {
buffers = new ArrowBuf[0];
} else {
List<ArrowBuf> list = new ArrayList<>();
list.add(offsetBuffer);
list.add(validityBuffer);
list.addAll(Arrays.asList(vector.getBuffers(false)));
buffers = list.toArray(new ArrowBuf[list.size()]);
}
if (clear) {
for (ArrowBuf buffer : buffers) {
buffer.getReferenceManager().retain();
}
clear();
}
return buffers;
}
protected void invalidateReader() {
reader = null;
}
/**
* Get the element in the list vector at a particular index.
*
* @param index position of the element
* @return Object at given position
*/
@Override
public List<?> getObject(int index) {
if (isSet(index) == 0) {
return null;
}
final List<Object> vals = new JsonStringArrayList<>();
final long start = offsetBuffer.getLong((long) index * OFFSET_WIDTH);
final long end = offsetBuffer.getLong(((long) index + 1L) * OFFSET_WIDTH);
final ValueVector vv = getDataVector();
for (long i = start; i < end; i++) {
vals.add(vv.getObject(checkedCastToInt(i)));
}
return vals;
}
/**
* Check if element at given index is null.
*
* @param index position of element
* @return true if element at given index is null, false otherwise
*/
@Override
public boolean isNull(int index) {
return (isSet(index) == 0);
}
/**
* Check if element at given index is empty list.
*
* @param index position of element
* @return true if element at given index is empty list or NULL, false otherwise
*/
public boolean isEmpty(int index) {
if (isNull(index)) {
return true;
} else {
final long start = offsetBuffer.getLong((long) index * OFFSET_WIDTH);
final long end = offsetBuffer.getLong(((long) index + 1L) * OFFSET_WIDTH);
return start == end;
}
}
/**
* Same as {@link #isNull(int)}.
*
* @param index position of element
* @return 1 if element at given index is not null, 0 otherwise
*/
public int isSet(int index) {
final int byteIndex = index >> 3;
final byte b = validityBuffer.getByte(byteIndex);
final int bitIndex = index & 7;
return (b >> bitIndex) & 0x01;
}
/**
* Get the number of elements that are null in the vector.
*
* @return the number of null elements.
*/
@Override
public int getNullCount() {
return BitVectorHelper.getNullCount(validityBuffer, valueCount);
}
/**
* Get the current value capacity for the vector.
*
* @return number of elements that vector can hold.
*/
@Override
public int getValueCapacity() {
return getValidityAndOffsetValueCapacity();
}
protected int getOffsetBufferValueCapacity() {
return checkedCastToInt(offsetBuffer.capacity() / OFFSET_WIDTH);
}
private int getValidityAndOffsetValueCapacity() {
final int offsetValueCapacity = Math.max(getOffsetBufferValueCapacity() - 1, 0);
return Math.min(offsetValueCapacity, getValidityBufferValueCapacity());
}
private int getValidityBufferValueCapacity() {
return capAtMaxInt(validityBuffer.capacity() * 8);
}
/**
* Sets the list at index to be not-null. Reallocates validity buffer if index is larger than
* current capacity.
*/
public void setNotNull(int index) {
while (index >= getValidityAndOffsetValueCapacity()) {
reallocValidityAndOffsetBuffers();
}
BitVectorHelper.setBit(validityBuffer, index);
lastSet = index;
}
/**
* Sets list at index to be null.
*
* @param index position in vector
*/
@Override
public void setNull(int index) {
while (index >= getValidityAndOffsetValueCapacity()) {
reallocValidityAndOffsetBuffers();
}
if (lastSet >= index) {
lastSet = index - 1;
}
for (int i = lastSet + 1; i <= index; i++) {
final int currentOffset = offsetBuffer.getInt(i * OFFSET_WIDTH);
offsetBuffer.setInt((i + 1) * OFFSET_WIDTH, currentOffset);
}
BitVectorHelper.unsetBit(validityBuffer, index);
}
/**
* Start a new value in the list vector.
*
* @param index index of the value to start
*/
public long startNewValue(long index) {
while (index >= getValidityAndOffsetValueCapacity()) {
reallocValidityAndOffsetBuffers();
}
for (int i = lastSet + 1; i <= index; i++) {
final long currentOffset = offsetBuffer.getLong((long) i * OFFSET_WIDTH);
offsetBuffer.setLong(((long) i + 1L) * OFFSET_WIDTH, currentOffset);
}
BitVectorHelper.setBit(validityBuffer, index);
lastSet = checkedCastToInt(index);
return offsetBuffer.getLong(((long) lastSet + 1L) * OFFSET_WIDTH);
}
/**
* End the current value.
*
* @param index index of the value to end
* @param size number of elements in the list that was written
*/
public void endValue(int index, long size) {
final long currentOffset = offsetBuffer.getLong(((long) index + 1L) * OFFSET_WIDTH);
offsetBuffer.setLong(((long) index + 1L) * OFFSET_WIDTH, currentOffset + size);
}
/**
* Sets the value count for the vector.
*
* <p>Important note: The underlying vector does not support 64-bit allocations yet. This may
* throw if attempting to hold larger than what a 32-bit vector can store.
*
* @param valueCount value count
*/
@Override
public void setValueCount(int valueCount) {
this.valueCount = valueCount;
if (valueCount > 0) {
while (valueCount > getValidityAndOffsetValueCapacity()) {
/* check if validity and offset buffers need to be re-allocated */
reallocValidityAndOffsetBuffers();
}
for (int i = lastSet + 1; i < valueCount; i++) {
/* fill the holes with offsets */
final long currentOffset = offsetBuffer.getLong((long) i * OFFSET_WIDTH);
offsetBuffer.setLong(((long) i + 1L) * OFFSET_WIDTH, currentOffset);
}
}
/* valueCount for the data vector is the current end offset */
final long childValueCount =
(valueCount == 0) ? 0 : offsetBuffer.getLong(((long) lastSet + 1L) * OFFSET_WIDTH);
/* set the value count of data vector and this will take care of
* checking whether data buffer needs to be reallocated.
* TODO: revisit when 64-bit vectors are supported
*/
Preconditions.checkArgument(
childValueCount <= Integer.MAX_VALUE || childValueCount >= Integer.MIN_VALUE,
"LargeListVector doesn't yet support 64-bit allocations: %s",
childValueCount);
vector.setValueCount((int) childValueCount);
}
public void setLastSet(int value) {
lastSet = value;
}
public int getLastSet() {
return lastSet;
}
public long getElementStartIndex(int index) {
return offsetBuffer.getLong((long) index * OFFSET_WIDTH);
}
public long getElementEndIndex(int index) {
return offsetBuffer.getLong(((long) index + 1L) * OFFSET_WIDTH);
}
}