VectorAppender.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.vector.util;
import static org.apache.arrow.memory.util.LargeMemoryUtil.checkedCastToInt;
import java.util.HashSet;
import org.apache.arrow.memory.util.MemoryUtil;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.BaseFixedWidthVector;
import org.apache.arrow.vector.BaseLargeVariableWidthVector;
import org.apache.arrow.vector.BaseVariableWidthVector;
import org.apache.arrow.vector.BaseVariableWidthViewVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.ExtensionTypeVector;
import org.apache.arrow.vector.NullVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.compare.TypeEqualsVisitor;
import org.apache.arrow.vector.compare.VectorVisitor;
import org.apache.arrow.vector.complex.DenseUnionVector;
import org.apache.arrow.vector.complex.FixedSizeListVector;
import org.apache.arrow.vector.complex.LargeListVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.NonNullableStructVector;
import org.apache.arrow.vector.complex.UnionVector;
/** Utility to append two vectors together. */
public class VectorAppender implements VectorVisitor<ValueVector, Void> {
/** The targetVector to be appended. */
private final ValueVector targetVector;
private final TypeEqualsVisitor typeVisitor;
/**
* Constructs a new targetVector appender, with the given targetVector.
*
* @param targetVector the targetVector to be appended.
*/
public VectorAppender(ValueVector targetVector) {
this.targetVector = targetVector;
typeVisitor = new TypeEqualsVisitor(targetVector, false, true);
}
@Override
public ValueVector visit(BaseFixedWidthVector deltaVector, Void value) {
Preconditions.checkArgument(
targetVector.getField().getType().equals(deltaVector.getField().getType()),
"The targetVector to append must have the same type as the targetVector being appended");
if (deltaVector.getValueCount() == 0) {
return targetVector; // optimization, nothing to append, return
}
int newValueCount = targetVector.getValueCount() + deltaVector.getValueCount();
// make sure there is enough capacity
while (targetVector.getValueCapacity() < newValueCount) {
targetVector.reAlloc();
}
// append validity buffer
BitVectorHelper.concatBits(
targetVector.getValidityBuffer(),
targetVector.getValueCount(),
deltaVector.getValidityBuffer(),
deltaVector.getValueCount(),
targetVector.getValidityBuffer());
// append data buffer
if (targetVector instanceof BitVector) {
// special processing for bit vector, as its type width is 0
BitVectorHelper.concatBits(
targetVector.getDataBuffer(),
targetVector.getValueCount(),
deltaVector.getDataBuffer(),
deltaVector.getValueCount(),
targetVector.getDataBuffer());
} else {
MemoryUtil.copyMemory(
deltaVector.getDataBuffer().memoryAddress(),
targetVector.getDataBuffer().memoryAddress()
+ deltaVector.getTypeWidth() * targetVector.getValueCount(),
deltaVector.getTypeWidth() * deltaVector.getValueCount());
}
targetVector.setValueCount(newValueCount);
return targetVector;
}
@Override
public ValueVector visit(BaseVariableWidthVector deltaVector, Void value) {
Preconditions.checkArgument(
targetVector.getField().getType().equals(deltaVector.getField().getType()),
"The targetVector to append must have the same type as the targetVector being appended");
if (deltaVector.getValueCount() == 0) {
return targetVector; // nothing to append, return
}
int newValueCount = targetVector.getValueCount() + deltaVector.getValueCount();
int targetDataSize =
targetVector
.getOffsetBuffer()
.getInt((long) targetVector.getValueCount() * BaseVariableWidthVector.OFFSET_WIDTH);
int deltaDataSize =
deltaVector
.getOffsetBuffer()
.getInt((long) deltaVector.getValueCount() * BaseVariableWidthVector.OFFSET_WIDTH);
int newValueCapacity = targetDataSize + deltaDataSize;
// make sure there is enough capacity
while (targetVector.getValueCapacity() < newValueCount) {
((BaseVariableWidthVector) targetVector).reallocValidityAndOffsetBuffers();
}
while (targetVector.getDataBuffer().capacity() < newValueCapacity) {
((BaseVariableWidthVector) targetVector).reallocDataBuffer();
}
// append validity buffer
BitVectorHelper.concatBits(
targetVector.getValidityBuffer(),
targetVector.getValueCount(),
deltaVector.getValidityBuffer(),
deltaVector.getValueCount(),
targetVector.getValidityBuffer());
// append data buffer
MemoryUtil.copyMemory(
deltaVector.getDataBuffer().memoryAddress(),
targetVector.getDataBuffer().memoryAddress() + targetDataSize,
deltaDataSize);
// copy offset buffer
MemoryUtil.copyMemory(
deltaVector.getOffsetBuffer().memoryAddress() + BaseVariableWidthVector.OFFSET_WIDTH,
targetVector.getOffsetBuffer().memoryAddress()
+ (targetVector.getValueCount() + 1) * BaseVariableWidthVector.OFFSET_WIDTH,
deltaVector.getValueCount() * BaseVariableWidthVector.OFFSET_WIDTH);
// increase each offset from the second buffer
for (int i = 0; i < deltaVector.getValueCount(); i++) {
int oldOffset =
targetVector
.getOffsetBuffer()
.getInt(
(long) (targetVector.getValueCount() + 1 + i)
* BaseVariableWidthVector.OFFSET_WIDTH);
targetVector
.getOffsetBuffer()
.setInt(
(long) (targetVector.getValueCount() + 1 + i) * BaseVariableWidthVector.OFFSET_WIDTH,
oldOffset + targetDataSize);
}
((BaseVariableWidthVector) targetVector).setLastSet(newValueCount - 1);
targetVector.setValueCount(newValueCount);
return targetVector;
}
@Override
public ValueVector visit(BaseLargeVariableWidthVector deltaVector, Void value) {
Preconditions.checkArgument(
targetVector.getField().getType().equals(deltaVector.getField().getType()),
"The targetVector to append must have the same type as the targetVector being appended");
if (deltaVector.getValueCount() == 0) {
return targetVector; // nothing to append, return
}
int newValueCount = targetVector.getValueCount() + deltaVector.getValueCount();
long targetDataSize =
targetVector
.getOffsetBuffer()
.getLong(
(long) targetVector.getValueCount() * BaseLargeVariableWidthVector.OFFSET_WIDTH);
long deltaDataSize =
deltaVector
.getOffsetBuffer()
.getLong(
(long) deltaVector.getValueCount() * BaseLargeVariableWidthVector.OFFSET_WIDTH);
long newValueCapacity = targetDataSize + deltaDataSize;
// make sure there is enough capacity
while (targetVector.getValueCapacity() < newValueCount) {
((BaseLargeVariableWidthVector) targetVector).reallocValidityAndOffsetBuffers();
}
while (targetVector.getDataBuffer().capacity() < newValueCapacity) {
((BaseLargeVariableWidthVector) targetVector).reallocDataBuffer();
}
// append validity buffer
BitVectorHelper.concatBits(
targetVector.getValidityBuffer(),
targetVector.getValueCount(),
deltaVector.getValidityBuffer(),
deltaVector.getValueCount(),
targetVector.getValidityBuffer());
// append data buffer
MemoryUtil.copyMemory(
deltaVector.getDataBuffer().memoryAddress(),
targetVector.getDataBuffer().memoryAddress() + targetDataSize,
deltaDataSize);
// copy offset buffer
MemoryUtil.copyMemory(
deltaVector.getOffsetBuffer().memoryAddress() + BaseLargeVariableWidthVector.OFFSET_WIDTH,
targetVector.getOffsetBuffer().memoryAddress()
+ (targetVector.getValueCount() + 1) * BaseLargeVariableWidthVector.OFFSET_WIDTH,
deltaVector.getValueCount() * BaseLargeVariableWidthVector.OFFSET_WIDTH);
// increase each offset from the second buffer
for (int i = 0; i < deltaVector.getValueCount(); i++) {
long oldOffset =
targetVector
.getOffsetBuffer()
.getLong(
(long) (targetVector.getValueCount() + 1 + i)
* BaseLargeVariableWidthVector.OFFSET_WIDTH);
targetVector
.getOffsetBuffer()
.setLong(
(long) (targetVector.getValueCount() + 1 + i)
* BaseLargeVariableWidthVector.OFFSET_WIDTH,
oldOffset + targetDataSize);
}
((BaseLargeVariableWidthVector) targetVector).setLastSet(newValueCount - 1);
targetVector.setValueCount(newValueCount);
return targetVector;
}
@Override
public ValueVector visit(BaseVariableWidthViewVector left, Void value) {
throw new UnsupportedOperationException("View vectors are not supported.");
}
@Override
public ValueVector visit(ListVector deltaVector, Void value) {
Preconditions.checkArgument(
typeVisitor.equals(deltaVector),
"The targetVector to append must have the same type as the targetVector being appended");
if (deltaVector.getValueCount() == 0) {
return targetVector; // nothing to append, return
}
int newValueCount = targetVector.getValueCount() + deltaVector.getValueCount();
int targetListSize =
targetVector
.getOffsetBuffer()
.getInt((long) targetVector.getValueCount() * ListVector.OFFSET_WIDTH);
int deltaListSize =
deltaVector
.getOffsetBuffer()
.getInt((long) deltaVector.getValueCount() * ListVector.OFFSET_WIDTH);
ListVector targetListVector = (ListVector) targetVector;
// make sure the underlying vector has value count set
targetListVector.getDataVector().setValueCount(targetListSize);
deltaVector.getDataVector().setValueCount(deltaListSize);
// make sure there is enough capacity
while (targetVector.getValueCapacity() < newValueCount) {
targetVector.reAlloc();
}
// append validity buffer
BitVectorHelper.concatBits(
targetVector.getValidityBuffer(),
targetVector.getValueCount(),
deltaVector.getValidityBuffer(),
deltaVector.getValueCount(),
targetVector.getValidityBuffer());
// append offset buffer
MemoryUtil.copyMemory(
deltaVector.getOffsetBuffer().memoryAddress() + ListVector.OFFSET_WIDTH,
targetVector.getOffsetBuffer().memoryAddress()
+ (targetVector.getValueCount() + 1) * ListVector.OFFSET_WIDTH,
(long) deltaVector.getValueCount() * ListVector.OFFSET_WIDTH);
// increase each offset from the second buffer
for (int i = 0; i < deltaVector.getValueCount(); i++) {
int oldOffset =
targetVector
.getOffsetBuffer()
.getInt((long) (targetVector.getValueCount() + 1 + i) * ListVector.OFFSET_WIDTH);
targetVector
.getOffsetBuffer()
.setInt(
(long) (targetVector.getValueCount() + 1 + i) * ListVector.OFFSET_WIDTH,
oldOffset + targetListSize);
}
targetListVector.setLastSet(newValueCount - 1);
// append underlying vectors
VectorAppender innerAppender = new VectorAppender(targetListVector.getDataVector());
deltaVector.getDataVector().accept(innerAppender, null);
targetVector.setValueCount(newValueCount);
return targetVector;
}
@Override
public ValueVector visit(LargeListVector deltaVector, Void value) {
Preconditions.checkArgument(
typeVisitor.equals(deltaVector),
"The targetVector to append must have the same type as the targetVector being appended");
if (deltaVector.getValueCount() == 0) {
return targetVector; // nothing to append, return
}
int newValueCount = targetVector.getValueCount() + deltaVector.getValueCount();
long targetListSize =
targetVector
.getOffsetBuffer()
.getLong((long) targetVector.getValueCount() * LargeListVector.OFFSET_WIDTH);
long deltaListSize =
deltaVector
.getOffsetBuffer()
.getLong((long) deltaVector.getValueCount() * LargeListVector.OFFSET_WIDTH);
ListVector targetListVector = (ListVector) targetVector;
// make sure the underlying vector has value count set
// todo recheck these casts when int64 vectors are supported
targetListVector.getDataVector().setValueCount(checkedCastToInt(targetListSize));
deltaVector.getDataVector().setValueCount(checkedCastToInt(deltaListSize));
// make sure there is enough capacity
while (targetVector.getValueCapacity() < newValueCount) {
targetVector.reAlloc();
}
// append validity buffer
BitVectorHelper.concatBits(
targetVector.getValidityBuffer(),
targetVector.getValueCount(),
deltaVector.getValidityBuffer(),
deltaVector.getValueCount(),
targetVector.getValidityBuffer());
// append offset buffer
MemoryUtil.copyMemory(
deltaVector.getOffsetBuffer().memoryAddress() + ListVector.OFFSET_WIDTH,
targetVector.getOffsetBuffer().memoryAddress()
+ (targetVector.getValueCount() + 1) * LargeListVector.OFFSET_WIDTH,
(long) deltaVector.getValueCount() * ListVector.OFFSET_WIDTH);
// increase each offset from the second buffer
for (int i = 0; i < deltaVector.getValueCount(); i++) {
long oldOffset =
targetVector
.getOffsetBuffer()
.getLong(
(long) (targetVector.getValueCount() + 1 + i) * LargeListVector.OFFSET_WIDTH);
targetVector
.getOffsetBuffer()
.setLong(
(long) (targetVector.getValueCount() + 1 + i) * LargeListVector.OFFSET_WIDTH,
oldOffset + targetListSize);
}
targetListVector.setLastSet(newValueCount - 1);
// append underlying vectors
VectorAppender innerAppender = new VectorAppender(targetListVector.getDataVector());
deltaVector.getDataVector().accept(innerAppender, null);
targetVector.setValueCount(newValueCount);
return targetVector;
}
@Override
public ValueVector visit(FixedSizeListVector deltaVector, Void value) {
Preconditions.checkArgument(
typeVisitor.equals(deltaVector),
"The vector to append must have the same type as the targetVector being appended");
if (deltaVector.getValueCount() == 0) {
return targetVector; // optimization, nothing to append, return
}
FixedSizeListVector targetListVector = (FixedSizeListVector) targetVector;
Preconditions.checkArgument(
targetListVector.getListSize() == deltaVector.getListSize(),
"FixedSizeListVector must have the same list size to append");
int newValueCount = targetVector.getValueCount() + deltaVector.getValueCount();
int targetListSize = targetListVector.getValueCount() * targetListVector.getListSize();
int deltaListSize = deltaVector.getValueCount() * deltaVector.getListSize();
// make sure the underlying vector has value count set
targetListVector.getDataVector().setValueCount(targetListSize);
deltaVector.getDataVector().setValueCount(deltaListSize);
// make sure there is enough capacity
while (targetVector.getValueCapacity() < newValueCount) {
targetVector.reAlloc();
}
// append validity buffer
BitVectorHelper.concatBits(
targetVector.getValidityBuffer(),
targetVector.getValueCount(),
deltaVector.getValidityBuffer(),
deltaVector.getValueCount(),
targetVector.getValidityBuffer());
// append underlying vectors
VectorAppender innerAppender = new VectorAppender(targetListVector.getDataVector());
deltaVector.getDataVector().accept(innerAppender, null);
targetVector.setValueCount(newValueCount);
return targetVector;
}
@Override
public ValueVector visit(NonNullableStructVector deltaVector, Void value) {
Preconditions.checkArgument(
typeVisitor.equals(deltaVector),
"The vector to append must have the same type as the targetVector being appended");
if (deltaVector.getValueCount() == 0) {
return targetVector; // optimization, nothing to append, return
}
NonNullableStructVector targetStructVector = (NonNullableStructVector) targetVector;
int newValueCount = targetVector.getValueCount() + deltaVector.getValueCount();
// make sure there is enough capacity
while (targetVector.getValueCapacity() < newValueCount) {
targetVector.reAlloc();
}
// append validity buffer
BitVectorHelper.concatBits(
targetVector.getValidityBuffer(),
targetVector.getValueCount(),
deltaVector.getValidityBuffer(),
deltaVector.getValueCount(),
targetVector.getValidityBuffer());
// append child vectors
for (int i = 0; i < targetStructVector.getChildrenFromFields().size(); i++) {
ValueVector targetChild = targetStructVector.getVectorById(i);
ValueVector deltaChild = deltaVector.getVectorById(i);
targetChild.setValueCount(targetStructVector.getValueCount());
deltaChild.setValueCount(deltaVector.getValueCount());
VectorAppender innerAppender = new VectorAppender(targetChild);
deltaChild.accept(innerAppender, null);
}
targetVector.setValueCount(newValueCount);
return targetVector;
}
@Override
public ValueVector visit(UnionVector deltaVector, Void value) {
// we only make sure that both vectors are union vectors.
Preconditions.checkArgument(
targetVector.getMinorType() == deltaVector.getMinorType(),
"The vector to append must have the same type as the targetVector being appended");
if (deltaVector.getValueCount() == 0) {
return targetVector; // optimization, nothing to append, return
}
UnionVector targetUnionVector = (UnionVector) targetVector;
int newValueCount = targetVector.getValueCount() + deltaVector.getValueCount();
// make sure there is enough capacity
while (targetUnionVector.getValueCapacity() < newValueCount) {
targetUnionVector.reAlloc();
}
// append type buffers
MemoryUtil.copyMemory(
deltaVector.getTypeBufferAddress(),
targetUnionVector.getTypeBufferAddress() + targetVector.getValueCount(),
deltaVector.getValueCount());
// build the hash set for all types
HashSet<Integer> targetTypes = new HashSet<>();
for (int i = 0; i < targetUnionVector.getValueCount(); i++) {
targetTypes.add(targetUnionVector.getTypeValue(i));
}
HashSet<Integer> deltaTypes = new HashSet<>();
for (int i = 0; i < deltaVector.getValueCount(); i++) {
deltaTypes.add(deltaVector.getTypeValue(i));
}
// append child vectors
for (int i = 0; i < Byte.MAX_VALUE; i++) {
if (targetTypes.contains(i) || deltaTypes.contains(i)) {
ValueVector targetChild = targetUnionVector.getVectorByType(i);
if (!targetTypes.contains(i)) {
// if the vector type does not exist in the target, it must be newly created
// and we must make sure it has enough capacity.
while (targetChild.getValueCapacity() < newValueCount) {
targetChild.reAlloc();
}
}
if (deltaTypes.contains(i)) {
// append child vectors
ValueVector deltaChild = deltaVector.getVectorByType(i);
targetChild.setValueCount(targetUnionVector.getValueCount());
deltaChild.setValueCount(deltaVector.getValueCount());
VectorAppender innerAppender = new VectorAppender(targetChild);
deltaChild.accept(innerAppender, null);
}
targetChild.setValueCount(newValueCount);
}
}
targetVector.setValueCount(newValueCount);
return targetVector;
}
@Override
public ValueVector visit(DenseUnionVector deltaVector, Void value) {
// we only make sure that both vectors are union vectors.
Preconditions.checkArgument(
targetVector.getMinorType() == deltaVector.getMinorType(),
"The vector to append must have the same type as the targetVector being appended");
if (deltaVector.getValueCount() == 0) {
return targetVector; // optimization, nothing to append, return
}
DenseUnionVector targetDenseUnionVector = (DenseUnionVector) targetVector;
int newValueCount = targetVector.getValueCount() + deltaVector.getValueCount();
// make sure there is enough capacity
while (targetDenseUnionVector.getValueCapacity() < newValueCount) {
targetDenseUnionVector.reAlloc();
}
// append type buffers
MemoryUtil.copyMemory(
deltaVector.getTypeBuffer().memoryAddress(),
targetDenseUnionVector.getTypeBuffer().memoryAddress() + targetVector.getValueCount(),
deltaVector.getValueCount());
// append offset buffers
for (int i = 0; i < deltaVector.getValueCount(); i++) {
byte typeId = deltaVector.getTypeId(i);
ValueVector targetChildVector = targetDenseUnionVector.getVectorByType(typeId);
int offsetBase = targetChildVector == null ? 0 : targetChildVector.getValueCount();
int deltaOffset = deltaVector.getOffset(i);
long index = (long) (targetVector.getValueCount() + i) * DenseUnionVector.OFFSET_WIDTH;
targetVector.getOffsetBuffer().setInt(index, offsetBase + deltaOffset);
}
// append child vectors
for (int i = 0; i <= Byte.MAX_VALUE; i++) {
ValueVector targetChildVector = targetDenseUnionVector.getVectorByType((byte) i);
ValueVector deltaChildVector = deltaVector.getVectorByType((byte) i);
if (targetChildVector == null && deltaChildVector == null) {
// the type id is not registered in either vector, we are done.
continue;
} else if (targetChildVector == null && deltaChildVector != null) {
// first register a new child in the target vector
targetDenseUnionVector.registerNewTypeId(deltaChildVector.getField());
targetChildVector =
targetDenseUnionVector.addVector(
(byte) i,
deltaChildVector.getField().createVector(targetDenseUnionVector.getAllocator()));
// now we have both child vectors not null, we can append them.
VectorAppender childAppender = new VectorAppender(targetChildVector);
deltaChildVector.accept(childAppender, null);
} else if (targetChildVector != null && deltaChildVector == null) {
// the value only exists in the target vector, so we are done
continue;
} else {
// both child vectors are non-null
// first check vector types
TypeEqualsVisitor childTypeVisitor =
new TypeEqualsVisitor(
targetChildVector, /* check name */ false, /* check meta data*/ false);
if (!childTypeVisitor.equals(deltaChildVector)) {
throw new IllegalArgumentException(
"dense union vectors have different child vector types with type id " + i);
}
// append child vectors
VectorAppender childAppender = new VectorAppender(targetChildVector);
deltaChildVector.accept(childAppender, null);
}
}
targetVector.setValueCount(newValueCount);
return targetVector;
}
@Override
public ValueVector visit(NullVector deltaVector, Void value) {
Preconditions.checkArgument(
targetVector.getField().getType().equals(deltaVector.getField().getType()),
"The targetVector to append must have the same type as the targetVector being appended");
return targetVector;
}
@Override
public ValueVector visit(ExtensionTypeVector<?> deltaVector, Void value) {
ValueVector targetUnderlying = ((ExtensionTypeVector<?>) targetVector).getUnderlyingVector();
VectorAppender underlyingAppender = new VectorAppender(targetUnderlying);
deltaVector.getUnderlyingVector().accept(underlyingAppender, null);
return targetVector;
}
}