ArrowBuf.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.memory;
import static org.apache.arrow.memory.util.LargeMemoryUtil.checkedCastToInt;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.ReadOnlyBufferException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.arrow.memory.BaseAllocator.Verbosity;
import org.apache.arrow.memory.util.CommonUtil;
import org.apache.arrow.memory.util.HistoricalLog;
import org.apache.arrow.memory.util.MemoryUtil;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.util.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;
/**
* ArrowBuf serves as a facade over underlying memory by providing several access APIs to read/write
* data into a chunk of direct memory. All the accounting, ownership and reference management is
* done by {@link ReferenceManager} and ArrowBuf can work with a custom user provided implementation
* of ReferenceManager
*
* <p>Two important instance variables of an ArrowBuf: (1) address - starting virtual address in the
* underlying memory chunk that this ArrowBuf has access to (2) length - length (in bytes) in the
* underlying memory chunk that this ArrowBuf has access to
*
* <p>The management (allocation, deallocation, reference counting etc) for the memory chunk is not
* done by ArrowBuf. Default implementation of ReferenceManager, allocation is in {@link
* BaseAllocator}, {@link BufferLedger} and {@link AllocationManager}
*/
public final class ArrowBuf implements AutoCloseable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ArrowBuf.class);
private static final int SHORT_SIZE = Short.BYTES;
private static final int INT_SIZE = Integer.BYTES;
private static final int FLOAT_SIZE = Float.BYTES;
private static final int DOUBLE_SIZE = Double.BYTES;
private static final int LONG_SIZE = Long.BYTES;
private static final AtomicLong idGenerator = new AtomicLong(0);
private static final int LOG_BYTES_PER_ROW = 10;
private final long id = idGenerator.incrementAndGet();
private final ReferenceManager referenceManager;
private final @Nullable BufferManager bufferManager;
private final long addr;
private long readerIndex;
private long writerIndex;
private final @Nullable HistoricalLog historicalLog =
BaseAllocator.DEBUG
? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH, "ArrowBuf[%d]", id)
: null;
private volatile long capacity;
/**
* Constructs a new ArrowBuf.
*
* @param referenceManager The memory manager to track memory usage and reference count of this
* buffer
* @param capacity The capacity in bytes of this buffer
*/
public ArrowBuf(
final ReferenceManager referenceManager,
final @Nullable BufferManager bufferManager,
final long capacity,
final long memoryAddress) {
this.referenceManager = referenceManager;
this.bufferManager = bufferManager;
this.addr = memoryAddress;
this.capacity = capacity;
this.readerIndex = 0;
this.writerIndex = 0;
if (historicalLog != null) {
historicalLog.recordEvent("create()");
}
}
public int refCnt() {
return referenceManager.getRefCount();
}
/**
* Allows a function to determine whether not reading a particular string of bytes is valid.
*
* <p>Will throw an exception if the memory is not readable for some reason. Only doesn't
* something in the case that AssertionUtil.BOUNDS_CHECKING_ENABLED is true.
*
* @param start The starting position of the bytes to be read.
* @param end The exclusive endpoint of the bytes to be read.
*/
public void checkBytes(long start, long end) {
if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
checkIndexD(start, end - start);
}
}
/** For get/set operations, reference count should be >= 1. */
private void ensureAccessible() {
if (this.refCnt() == 0) {
throw new IllegalStateException("Ref count should be >= 1 for accessing the ArrowBuf");
}
}
/**
* Get reference manager for this ArrowBuf.
*
* @return user provided implementation of {@link ReferenceManager}
*/
public ReferenceManager getReferenceManager() {
return referenceManager;
}
public long capacity() {
return capacity;
}
/**
* Adjusts the capacity of this buffer. Size increases are NOT supported.
*
* @param newCapacity Must be in in the range [0, length).
*/
public synchronized ArrowBuf capacity(long newCapacity) {
if (newCapacity == capacity) {
return this;
}
Preconditions.checkArgument(newCapacity >= 0);
if (newCapacity < capacity) {
capacity = newCapacity;
return this;
}
throw new UnsupportedOperationException(
"Buffers don't support resizing that increases the size.");
}
/** Returns the byte order of elements in this buffer. */
public ByteOrder order() {
return ByteOrder.nativeOrder();
}
/** Returns the number of bytes still available to read in this buffer. */
public long readableBytes() {
Preconditions.checkState(
writerIndex >= readerIndex, "Writer index cannot be less than reader index");
return writerIndex - readerIndex;
}
/**
* Returns the number of bytes still available to write into this buffer before capacity is
* reached.
*/
public long writableBytes() {
return capacity() - writerIndex;
}
/** Returns a slice of only the readable bytes in the buffer. */
public ArrowBuf slice() {
return slice(readerIndex, readableBytes());
}
/** Returns a slice (view) starting at <code>index</code> with the given <code>length</code>. */
public ArrowBuf slice(long index, long length) {
Preconditions.checkPositionIndex(index, this.capacity);
Preconditions.checkPositionIndex(index + length, this.capacity);
/*
* Re the behavior of reference counting, see http://netty.io/wiki/reference-counted-objects
* .html#wiki-h3-5, which
* explains that derived buffers share their reference count with their parent
*/
final ArrowBuf newBuf = referenceManager.deriveBuffer(this, index, length);
newBuf.writerIndex(length);
return newBuf;
}
/** Make a nio byte buffer from this arrowbuf. */
public ByteBuffer nioBuffer() {
return nioBuffer(readerIndex, checkedCastToInt(readableBytes()));
}
/** Make a nio byte buffer from this ArrowBuf. */
public ByteBuffer nioBuffer(long index, int length) {
chk(index, length);
return getDirectBuffer(index, length);
}
private ByteBuffer getDirectBuffer(long index, int length) {
long address = addr(index);
return MemoryUtil.directBuffer(address, length);
}
public long memoryAddress() {
return this.addr;
}
@Override
public String toString() {
return String.format("ArrowBuf[%d], address:%d, capacity:%d", id, memoryAddress(), capacity);
}
@Override
public int hashCode() {
return System.identityHashCode(this);
}
@Override
public boolean equals(@Nullable Object obj) {
// identity equals only.
return this == obj;
}
/*
* IMPORTANT NOTE
* The data getters and setters work with a caller provided
* index. This index is 0 based and since ArrowBuf has access
* to a portion of underlying chunk of memory starting at
* some address, we convert the given relative index into
* absolute index as memory address + index.
*
* Example:
*
* Let's say we have an underlying chunk of memory of length 64 bytes
* Now let's say we have an ArrowBuf that has access to the chunk
* from offset 4 for length of 16 bytes.
*
* If the starting virtual address of chunk is MAR, then memory
* address of this ArrowBuf is MAR + offset -- this is what is stored
* in variable addr. See the BufferLedger and AllocationManager code
* for the implementation of ReferenceManager that manages a
* chunk of memory and creates ArrowBuf with access to a range of
* bytes within the chunk (or the entire chunk)
*
* So now to get/set data, we will do => addr + index
* This logic is put in method addr(index) and is frequently
* used in get/set data methods to compute the absolute
* byte address for get/set operation in the underlying chunk
*
* @param index the index at which we the user wants to read/write
* @return the absolute address within the memory
*/
private long addr(long index) {
return addr + index;
}
/*-------------------------------------------------*
| Following are a set of fast path data set and |
| get APIs to write/read data from ArrowBuf |
| at a given index (0 based relative to this |
| ArrowBuf and not relative to the underlying |
| memory chunk). |
| |
*-------------------------------------------------*/
/**
* Helper function to do bounds checking at a particular index for particular length of data.
*
* @param index index (0 based relative to this ArrowBuf)
* @param length provided length of data for get/set
*/
private void chk(long index, long length) {
if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
checkIndexD(index, length);
}
}
private void checkIndexD(long index, long fieldLength) {
// check reference count
ensureAccessible();
// check bounds
Preconditions.checkArgument(fieldLength >= 0, "expecting non-negative data length");
if (index < 0 || index > capacity() - fieldLength) {
if (historicalLog != null) {
historicalLog.logHistory(logger);
}
throw new IndexOutOfBoundsException(
String.format(
"index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity()));
}
}
/**
* Get long value stored at a particular index in the underlying memory chunk this ArrowBuf has
* access to.
*
* @param index index (0 based relative to this ArrowBuf) where the value will be read from
* @return 8 byte long value
*/
public long getLong(long index) {
chk(index, LONG_SIZE);
return MemoryUtil.getLong(addr(index));
}
/**
* Set long value at a particular index in the underlying memory chunk this ArrowBuf has access
* to.
*
* @param index index (0 based relative to this ArrowBuf) where the value will be written
* @param value value to write
*/
public void setLong(long index, long value) {
chk(index, LONG_SIZE);
MemoryUtil.putLong(addr(index), value);
}
/**
* Get float value stored at a particular index in the underlying memory chunk this ArrowBuf has
* access to.
*
* @param index index (0 based relative to this ArrowBuf) where the value will be read from
* @return 4 byte float value
*/
public float getFloat(long index) {
return Float.intBitsToFloat(getInt(index));
}
/**
* Set float value at a particular index in the underlying memory chunk this ArrowBuf has access
* to.
*
* @param index index (0 based relative to this ArrowBuf) where the value will be written
* @param value value to write
*/
public void setFloat(long index, float value) {
chk(index, FLOAT_SIZE);
MemoryUtil.putInt(addr(index), Float.floatToRawIntBits(value));
}
/**
* Get double value stored at a particular index in the underlying memory chunk this ArrowBuf has
* access to.
*
* @param index index (0 based relative to this ArrowBuf) where the value will be read from
* @return 8 byte double value
*/
public double getDouble(long index) {
return Double.longBitsToDouble(getLong(index));
}
/**
* Set double value at a particular index in the underlying memory chunk this ArrowBuf has access
* to.
*
* @param index index (0 based relative to this ArrowBuf) where the value will be written
* @param value value to write
*/
public void setDouble(long index, double value) {
chk(index, DOUBLE_SIZE);
MemoryUtil.putLong(addr(index), Double.doubleToRawLongBits(value));
}
/**
* Get char value stored at a particular index in the underlying memory chunk this ArrowBuf has
* access to.
*
* @param index index (0 based relative to this ArrowBuf) where the value will be read from
* @return 2 byte char value
*/
public char getChar(long index) {
return (char) getShort(index);
}
/**
* Set char value at a particular index in the underlying memory chunk this ArrowBuf has access
* to.
*
* @param index index (0 based relative to this ArrowBuf) where the value will be written
* @param value value to write
*/
public void setChar(long index, int value) {
chk(index, SHORT_SIZE);
MemoryUtil.putShort(addr(index), (short) value);
}
/**
* Get int value stored at a particular index in the underlying memory chunk this ArrowBuf has
* access to.
*
* @param index index (0 based relative to this ArrowBuf) where the value will be read from
* @return 4 byte int value
*/
public int getInt(long index) {
chk(index, INT_SIZE);
return MemoryUtil.getInt(addr(index));
}
/**
* Set int value at a particular index in the underlying memory chunk this ArrowBuf has access to.
*
* @param index index (0 based relative to this ArrowBuf) where the value will be written
* @param value value to write
*/
public void setInt(long index, int value) {
chk(index, INT_SIZE);
MemoryUtil.putInt(addr(index), value);
}
/**
* Get short value stored at a particular index in the underlying memory chunk this ArrowBuf has
* access to.
*
* @param index index (0 based relative to this ArrowBuf) where the value will be read from
* @return 2 byte short value
*/
public short getShort(long index) {
chk(index, SHORT_SIZE);
return MemoryUtil.getShort(addr(index));
}
/**
* Set short value at a particular index in the underlying memory chunk this ArrowBuf has access
* to.
*
* @param index index (0 based relative to this ArrowBuf) where the value will be written
* @param value value to write
*/
public void setShort(long index, int value) {
setShort(index, (short) value);
}
/**
* Set short value at a particular index in the underlying memory chunk this ArrowBuf has access
* to.
*
* @param index index (0 based relative to this ArrowBuf) where the value will be written
* @param value value to write
*/
public void setShort(long index, short value) {
chk(index, SHORT_SIZE);
MemoryUtil.putShort(addr(index), value);
}
/**
* Set byte value at a particular index in the underlying memory chunk this ArrowBuf has access
* to.
*
* @param index index (0 based relative to this ArrowBuf) where the value will be written
* @param value value to write
*/
public void setByte(long index, int value) {
chk(index, 1);
MemoryUtil.putByte(addr(index), (byte) value);
}
/**
* Set byte value at a particular index in the underlying memory chunk this ArrowBuf has access
* to.
*
* @param index index (0 based relative to this ArrowBuf) where the value will be written
* @param value value to write
*/
public void setByte(long index, byte value) {
chk(index, 1);
MemoryUtil.putByte(addr(index), value);
}
/**
* Get byte value stored at a particular index in the underlying memory chunk this ArrowBuf has
* access to.
*
* @param index index (0 based relative to this ArrowBuf) where the value will be read from
* @return byte value
*/
public byte getByte(long index) {
chk(index, 1);
return MemoryUtil.getByte(addr(index));
}
/*--------------------------------------------------*
| Following are another set of data set APIs |
| that directly work with writerIndex |
| |
*--------------------------------------------------*/
/**
* Helper function to do bound checking w.r.t writerIndex by checking if we can set "length" bytes
* of data at the writerIndex in this ArrowBuf.
*
* @param length provided length of data for set
*/
private void ensureWritable(final int length) {
if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
Preconditions.checkArgument(length >= 0, "expecting non-negative length");
// check reference count
this.ensureAccessible();
// check bounds
if (length > writableBytes()) {
throw new IndexOutOfBoundsException(
String.format(
"writerIndex(%d) + length(%d) exceeds capacity(%d)",
writerIndex, length, capacity()));
}
}
}
/**
* Helper function to do bound checking w.r.t readerIndex by checking if we can read "length"
* bytes of data at the readerIndex in this ArrowBuf.
*
* @param length provided length of data for get
*/
private void ensureReadable(final int length) {
if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
Preconditions.checkArgument(length >= 0, "expecting non-negative length");
// check reference count
this.ensureAccessible();
// check bounds
if (length > readableBytes()) {
throw new IndexOutOfBoundsException(
String.format(
"readerIndex(%d) + length(%d) exceeds writerIndex(%d)",
readerIndex, length, writerIndex));
}
}
}
/**
* Read the byte at readerIndex.
*
* @return byte value
*/
public byte readByte() {
ensureReadable(1);
final byte b = getByte(readerIndex);
++readerIndex;
return b;
}
/**
* Read dst.length bytes at readerIndex into dst byte array
*
* @param dst byte array where the data will be written
*/
public void readBytes(byte[] dst) {
Preconditions.checkArgument(dst != null, "expecting valid dst bytearray");
ensureReadable(dst.length);
getBytes(readerIndex, dst, 0, checkedCastToInt(dst.length));
}
/**
* Set the provided byte value at the writerIndex.
*
* @param value value to set
*/
public void writeByte(byte value) {
ensureWritable(1);
MemoryUtil.putByte(addr(writerIndex), value);
++writerIndex;
}
/**
* Set the lower order byte for the provided value at the writerIndex.
*
* @param value value to be set
*/
public void writeByte(int value) {
ensureWritable(1);
MemoryUtil.putByte(addr(writerIndex), (byte) value);
++writerIndex;
}
/**
* Write the bytes from given byte array into this ArrowBuf starting at writerIndex.
*
* @param src src byte array
*/
public void writeBytes(byte[] src) {
Preconditions.checkArgument(src != null, "expecting valid src array");
writeBytes(src, 0, src.length);
}
/**
* Write the bytes from given byte array starting at srcIndex into this ArrowBuf starting at
* writerIndex.
*
* @param src src byte array
* @param srcIndex index in the byte array where the copy will being from
* @param length length of data to copy
*/
public void writeBytes(byte[] src, int srcIndex, int length) {
ensureWritable(length);
setBytes(writerIndex, src, srcIndex, length);
writerIndex += length;
}
/**
* Set the provided int value as short at the writerIndex.
*
* @param value value to set
*/
public void writeShort(int value) {
ensureWritable(SHORT_SIZE);
MemoryUtil.putShort(addr(writerIndex), (short) value);
writerIndex += SHORT_SIZE;
}
/**
* Set the provided int value at the writerIndex.
*
* @param value value to set
*/
public void writeInt(int value) {
ensureWritable(INT_SIZE);
MemoryUtil.putInt(addr(writerIndex), value);
writerIndex += INT_SIZE;
}
/**
* Set the provided long value at the writerIndex.
*
* @param value value to set
*/
public void writeLong(long value) {
ensureWritable(LONG_SIZE);
MemoryUtil.putLong(addr(writerIndex), value);
writerIndex += LONG_SIZE;
}
/**
* Set the provided float value at the writerIndex.
*
* @param value value to set
*/
public void writeFloat(float value) {
ensureWritable(FLOAT_SIZE);
MemoryUtil.putInt(addr(writerIndex), Float.floatToRawIntBits(value));
writerIndex += FLOAT_SIZE;
}
/**
* Set the provided double value at the writerIndex.
*
* @param value value to set
*/
public void writeDouble(double value) {
ensureWritable(DOUBLE_SIZE);
MemoryUtil.putLong(addr(writerIndex), Double.doubleToRawLongBits(value));
writerIndex += DOUBLE_SIZE;
}
/*--------------------------------------------------*
| Following are another set of data set/get APIs |
| that read and write stream of bytes from/to byte |
| arrays, ByteBuffer, ArrowBuf etc |
| |
*--------------------------------------------------*/
/**
* Determine if the requested {@code index} and {@code length} will fit within {@code capacity}.
*
* @param index The starting index.
* @param length The length which will be utilized (starting from {@code index}).
* @param capacity The capacity that {@code index + length} is allowed to be within.
* @return {@code true} if the requested {@code index} and {@code length} will fit within {@code
* capacity}. {@code false} if this would result in an index out of bounds exception.
*/
private static boolean isOutOfBounds(long index, long length, long capacity) {
return (index | length | (index + length) | (capacity - (index + length))) < 0;
}
private void checkIndex(long index, long fieldLength) {
if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
// check reference count
this.ensureAccessible();
// check bounds
if (isOutOfBounds(index, fieldLength, this.capacity())) {
throw new IndexOutOfBoundsException(
String.format(
"index: %d, length: %d (expected: range(0, %d))",
index, fieldLength, this.capacity()));
}
}
}
/**
* Copy data from this ArrowBuf at a given index in into destination byte array.
*
* @param index starting index (0 based relative to the portion of memory) this ArrowBuf has
* access to
* @param dst byte array to copy the data into
*/
public void getBytes(long index, byte[] dst) {
getBytes(index, dst, 0, dst.length);
}
/**
* Copy data from this ArrowBuf at a given index into destination byte array.
*
* @param index index (0 based relative to the portion of memory this ArrowBuf has access to)
* @param dst byte array to copy the data into
* @param dstIndex starting index in dst byte array to copy into
* @param length length of data to copy from this ArrowBuf
*/
public void getBytes(long index, byte[] dst, int dstIndex, int length) {
// bound check for this ArrowBuf where the data will be copied from
checkIndex(index, length);
// null check
Preconditions.checkArgument(dst != null, "expecting a valid dst byte array");
// bound check for dst byte array where the data will be copied to
if (isOutOfBounds(dstIndex, length, dst.length)) {
// not enough space to copy "length" bytes into dst array from dstIndex onwards
throw new IndexOutOfBoundsException(
"Not enough space to copy data into destination" + dstIndex);
}
if (length != 0) {
// copy "length" bytes from this ArrowBuf starting at addr(index) address
// into dst byte array at dstIndex onwards
MemoryUtil.copyFromMemory(addr(index), dst, dstIndex, length);
}
}
/**
* Copy data from a given byte array into this ArrowBuf starting at a given index.
*
* @param index starting index (0 based relative to the portion of memory) this ArrowBuf has
* access to
* @param src byte array to copy the data from
*/
public void setBytes(long index, byte[] src) {
setBytes(index, src, 0, src.length);
}
/**
* Copy data from a given byte array starting at the given source index into this ArrowBuf at a
* given index.
*
* @param index index (0 based relative to the portion of memory this ArrowBuf has access to)
* @param src src byte array to copy the data from
* @param srcIndex index in the byte array where the copy will start from
* @param length length of data to copy from byte array
*/
public void setBytes(long index, byte[] src, int srcIndex, long length) {
// bound check for this ArrowBuf where the data will be copied into
checkIndex(index, length);
// null check
Preconditions.checkArgument(src != null, "expecting a valid src byte array");
// bound check for src byte array where the data will be copied from
if (isOutOfBounds(srcIndex, length, src.length)) {
// not enough space to copy "length" bytes into dst array from dstIndex onwards
throw new IndexOutOfBoundsException(
"Not enough space to copy data from byte array" + srcIndex);
}
if (length > 0) {
// copy "length" bytes from src byte array at the starting index (srcIndex)
// into this ArrowBuf starting at address "addr(index)"
MemoryUtil.copyToMemory(src, srcIndex, addr(index), length);
}
}
/**
* Copy data from this ArrowBuf at a given index into the destination ByteBuffer.
*
* @param index index (0 based relative to the portion of memory this ArrowBuf has access to)
* @param dst dst ByteBuffer where the data will be copied into
*/
public void getBytes(long index, ByteBuffer dst) {
// bound check for this ArrowBuf where the data will be copied from
checkIndex(index, dst.remaining());
// dst.remaining() bytes of data will be copied into dst ByteBuffer
if (dst.remaining() != 0) {
// address in this ArrowBuf where the copy will begin from
final long srcAddress = addr(index);
if (dst.isDirect()) {
if (dst.isReadOnly()) {
throw new ReadOnlyBufferException();
}
// copy dst.remaining() bytes of data from this ArrowBuf starting
// at address srcAddress into the dst ByteBuffer starting at
// address dstAddress
final long dstAddress = MemoryUtil.getByteBufferAddress(dst) + dst.position();
MemoryUtil.copyMemory(srcAddress, dstAddress, dst.remaining());
// after copy, bump the next write position for the dst ByteBuffer
dst.position(dst.position() + dst.remaining());
} else if (dst.hasArray()) {
// copy dst.remaining() bytes of data from this ArrowBuf starting
// at address srcAddress into the dst ByteBuffer starting at
// index dstIndex
final int dstIndex = dst.arrayOffset() + dst.position();
MemoryUtil.copyFromMemory(srcAddress, dst.array(), dstIndex, dst.remaining());
// after copy, bump the next write position for the dst ByteBuffer
dst.position(dst.position() + dst.remaining());
} else {
throw new UnsupportedOperationException(
"Copy from this ArrowBuf to ByteBuffer is not supported");
}
}
}
/**
* Copy data into this ArrowBuf at a given index onwards from a source ByteBuffer.
*
* @param index index index (0 based relative to the portion of memory this ArrowBuf has access
* to)
* @param src src ByteBuffer where the data will be copied from
*/
public void setBytes(long index, ByteBuffer src) {
// bound check for this ArrowBuf where the data will be copied into
checkIndex(index, src.remaining());
// length of data to copy
int length = src.remaining();
// address in this ArrowBuf where the data will be copied to
long dstAddress = addr(index);
if (length != 0) {
if (src.isDirect()) {
// copy src.remaining() bytes of data from src ByteBuffer starting at
// address srcAddress into this ArrowBuf starting at address dstAddress
final long srcAddress = MemoryUtil.getByteBufferAddress(src) + src.position();
MemoryUtil.copyMemory(srcAddress, dstAddress, length);
// after copy, bump the next read position for the src ByteBuffer
src.position(src.position() + length);
} else if (src.hasArray()) {
// copy src.remaining() bytes of data from src ByteBuffer starting at
// index srcIndex into this ArrowBuf starting at address dstAddress
final int srcIndex = src.arrayOffset() + src.position();
MemoryUtil.copyToMemory(src.array(), srcIndex, dstAddress, length);
// after copy, bump the next read position for the src ByteBuffer
src.position(src.position() + length);
} else {
final ByteOrder originalByteOrder = src.order();
src.order(order());
try {
// copy word at a time
while (length - 128 >= LONG_SIZE) {
for (int x = 0; x < 16; x++) {
MemoryUtil.putLong(dstAddress, src.getLong());
length -= LONG_SIZE;
dstAddress += LONG_SIZE;
}
}
while (length >= LONG_SIZE) {
MemoryUtil.putLong(dstAddress, src.getLong());
length -= LONG_SIZE;
dstAddress += LONG_SIZE;
}
// copy last byte
while (length > 0) {
MemoryUtil.putByte(dstAddress, src.get());
--length;
++dstAddress;
}
} finally {
src.order(originalByteOrder);
}
}
}
}
/**
* Copy data into this ArrowBuf at a given index onwards from a source ByteBuffer starting at a
* given srcIndex for a certain length.
*
* @param index index (0 based relative to the portion of memory this ArrowBuf has access to)
* @param src src ByteBuffer where the data will be copied from
* @param srcIndex starting index in the src ByteBuffer where the data copy will start from
* @param length length of data to copy from src ByteBuffer
*/
public void setBytes(long index, ByteBuffer src, int srcIndex, int length) {
// bound check for this ArrowBuf where the data will be copied into
checkIndex(index, length);
if (src.isDirect()) {
// copy length bytes of data from src ByteBuffer starting at address
// srcAddress into this ArrowBuf at address dstAddress
final long srcAddress = MemoryUtil.getByteBufferAddress(src) + srcIndex;
final long dstAddress = addr(index);
MemoryUtil.copyMemory(srcAddress, dstAddress, length);
} else {
if (srcIndex == 0 && src.capacity() == length) {
// copy the entire ByteBuffer from start to end of length
setBytes(index, src);
} else {
ByteBuffer newBuf = src.duplicate();
newBuf.position(srcIndex);
newBuf.limit(srcIndex + length);
setBytes(index, newBuf);
}
}
}
/**
* Copy a given length of data from this ArrowBuf starting at a given index into a dst ArrowBuf at
* dstIndex.
*
* @param index index (0 based relative to the portion of memory this ArrowBuf has access to)
* @param dst dst ArrowBuf where the data will be copied into
* @param dstIndex index (0 based relative to the portion of memory dst ArrowBuf has access to)
* @param length length of data to copy
*/
public void getBytes(long index, ArrowBuf dst, long dstIndex, int length) {
// bound check for this ArrowBuf where the data will be copied from
checkIndex(index, length);
// bound check for this ArrowBuf where the data will be copied into
Preconditions.checkArgument(dst != null, "expecting a valid ArrowBuf");
// bound check for dst ArrowBuf
if (isOutOfBounds(dstIndex, length, dst.capacity())) {
throw new IndexOutOfBoundsException(
String.format(
"index: %d, length: %d (expected: range(0, %d))", dstIndex, length, dst.capacity()));
}
if (length != 0) {
// copy length bytes of data from this ArrowBuf starting at
// address srcAddress into dst ArrowBuf starting at address
// dstAddress
final long srcAddress = addr(index);
final long dstAddress = dst.memoryAddress() + (long) dstIndex;
MemoryUtil.copyMemory(srcAddress, dstAddress, length);
}
}
/**
* Copy data from src ArrowBuf starting at index srcIndex into this ArrowBuf at given index.
*
* @param index index index (0 based relative to the portion of memory this ArrowBuf has access
* to)
* @param src src ArrowBuf where the data will be copied from
* @param srcIndex starting index in the src ArrowBuf where the copy will begin from
* @param length length of data to copy from src ArrowBuf
*/
public void setBytes(long index, ArrowBuf src, long srcIndex, long length) {
// bound check for this ArrowBuf where the data will be copied into
checkIndex(index, length);
// null check
Preconditions.checkArgument(src != null, "expecting a valid ArrowBuf");
// bound check for src ArrowBuf
if (isOutOfBounds(srcIndex, length, src.capacity())) {
throw new IndexOutOfBoundsException(
String.format(
"index: %d, length: %d (expected: range(0, %d))", srcIndex, length, src.capacity()));
}
if (length != 0) {
// copy length bytes of data from src ArrowBuf starting at
// address srcAddress into this ArrowBuf starting at address
// dstAddress
final long srcAddress = src.memoryAddress() + srcIndex;
final long dstAddress = addr(index);
MemoryUtil.copyMemory(srcAddress, dstAddress, length);
}
}
/**
* Copy readableBytes() number of bytes from src ArrowBuf starting from its readerIndex into this
* ArrowBuf starting at the given index.
*
* @param index index index (0 based relative to the portion of memory this ArrowBuf has access
* to)
* @param src src ArrowBuf where the data will be copied from
*/
public void setBytes(long index, ArrowBuf src) {
// null check
Preconditions.checkArgument(src != null, "expecting valid ArrowBuf");
final long length = src.readableBytes();
// bound check for this ArrowBuf where the data will be copied into
checkIndex(index, length);
final long srcAddress = src.memoryAddress() + src.readerIndex;
final long dstAddress = addr(index);
MemoryUtil.copyMemory(srcAddress, dstAddress, length);
src.readerIndex(src.readerIndex + length);
}
/**
* Copy a certain length of bytes from given InputStream into this ArrowBuf at the provided index.
*
* @param index index index (0 based relative to the portion of memory this ArrowBuf has access
* to)
* @param in src stream to copy from
* @param length length of data to copy
* @return number of bytes copied from stream into ArrowBuf
* @throws IOException on failing to read from stream
*/
public int setBytes(long index, InputStream in, int length) throws IOException {
Preconditions.checkArgument(in != null, "expecting valid input stream");
checkIndex(index, length);
int readBytes = 0;
if (length > 0) {
byte[] tmp = new byte[length];
// read the data from input stream into tmp byte array
readBytes = in.read(tmp);
if (readBytes > 0) {
// copy readBytes length of data from the tmp byte array starting
// at srcIndex 0 into this ArrowBuf starting at address addr(index)
MemoryUtil.copyToMemory(tmp, 0, addr(index), readBytes);
}
}
return readBytes;
}
/**
* Copy a certain length of bytes from this ArrowBuf at a given index into the given OutputStream.
*
* @param index index index (0 based relative to the portion of memory this ArrowBuf has access
* to)
* @param out dst stream to copy data into
* @param length length of data to copy
* @throws IOException on failing to write to stream
*/
public void getBytes(long index, OutputStream out, int length) throws IOException {
Preconditions.checkArgument(out != null, "expecting valid output stream");
checkIndex(index, length);
if (length > 0) {
// copy length bytes of data from this ArrowBuf starting at
// address addr(index) into the tmp byte array starting at index 0
byte[] tmp = new byte[length];
MemoryUtil.copyFromMemory(addr(index), tmp, 0, length);
// write the copied data to output stream
out.write(tmp);
}
}
@Override
public void close() {
referenceManager.release();
}
/**
* Returns the possible memory consumed by this ArrowBuf in the worse case scenario. (not shared,
* connected to larger underlying buffer of allocated memory)
*
* @return Size in bytes.
*/
public long getPossibleMemoryConsumed() {
return referenceManager.getSize();
}
/**
* Return that is Accounted for by this buffer (and its potentially shared siblings within the
* context of the associated allocator).
*
* @return Size in bytes.
*/
public long getActualMemoryConsumed() {
return referenceManager.getAccountedSize();
}
/**
* Return the buffer's byte contents in the form of a hex dump.
*
* @param start the starting byte index
* @param length how many bytes to log
* @return A hex dump in a String.
*/
public String toHexString(final long start, final int length) {
final long roundedStart = (start / LOG_BYTES_PER_ROW) * LOG_BYTES_PER_ROW;
final StringBuilder sb = new StringBuilder("buffer byte dump\n");
long index = roundedStart;
for (long nLogged = 0; nLogged < length; nLogged += LOG_BYTES_PER_ROW) {
sb.append(String.format(" [%05d-%05d]", index, index + LOG_BYTES_PER_ROW - 1));
for (int i = 0; i < LOG_BYTES_PER_ROW; ++i) {
try {
final byte b = getByte(index++);
sb.append(String.format(" 0x%02x", b));
} catch (IndexOutOfBoundsException ioob) {
sb.append(" <ioob>");
}
}
sb.append('\n');
}
return sb.toString();
}
/**
* Get the integer id assigned to this ArrowBuf for debugging purposes.
*
* @return integer id
*/
public long getId() {
return id;
}
/**
* Print information of this buffer into <code>sb</code> at the given indentation and verbosity
* level.
*
* <p>It will include history if BaseAllocator.DEBUG is true and the
* verbosity.includeHistoricalLog are true.
*/
@VisibleForTesting
public void print(StringBuilder sb, int indent, Verbosity verbosity) {
CommonUtil.indent(sb, indent).append(toString());
if (historicalLog != null && verbosity.includeHistoricalLog) {
sb.append("\n");
historicalLog.buildHistory(sb, indent + 1, verbosity.includeStackTraces);
}
}
/**
* Print detailed information of this buffer into <code>sb</code>.
*
* <p>Most information will only be present if BaseAllocator.DEBUG is true.
*/
public void print(StringBuilder sb, int indent) {
print(sb, indent, Verbosity.LOG_WITH_STACKTRACE);
}
/**
* Get the index at which the next byte will be read from.
*
* @return reader index
*/
public long readerIndex() {
return readerIndex;
}
/**
* Get the index at which next byte will be written to.
*
* @return writer index
*/
public long writerIndex() {
return writerIndex;
}
/**
* Set the reader index for this ArrowBuf.
*
* @param readerIndex new reader index
* @return this ArrowBuf
*/
public ArrowBuf readerIndex(long readerIndex) {
this.readerIndex = readerIndex;
return this;
}
/**
* Set the writer index for this ArrowBuf.
*
* @param writerIndex new writer index
* @return this ArrowBuf
*/
public ArrowBuf writerIndex(long writerIndex) {
this.writerIndex = writerIndex;
return this;
}
/**
* Zero-out the bytes in this ArrowBuf starting at the given index for the given length.
*
* @param index index index (0 based relative to the portion of memory this ArrowBuf has access
* to)
* @param length length of bytes to zero-out
* @return this ArrowBuf
*/
public ArrowBuf setZero(long index, long length) {
if (length != 0) {
this.checkIndex(index, length);
MemoryUtil.setMemory(this.addr + index, length, (byte) 0);
}
return this;
}
/**
* Sets all bits to one in the specified range.
*
* @param index index index (0 based relative to the portion of memory this ArrowBuf has access
* to)
* @param length length of bytes to set.
* @return this ArrowBuf
* @deprecated use {@link ArrowBuf#setOne(long, long)} instead.
*/
@Deprecated
public ArrowBuf setOne(int index, int length) {
if (length != 0) {
this.checkIndex(index, length);
MemoryUtil.setMemory(this.addr + index, length, (byte) 0xff);
}
return this;
}
/**
* Sets all bits to one in the specified range.
*
* @param index index index (0 based relative to the portion of memory this ArrowBuf has access
* to)
* @param length length of bytes to set.
* @return this ArrowBuf
*/
public ArrowBuf setOne(long index, long length) {
if (length != 0) {
this.checkIndex(index, length);
MemoryUtil.setMemory(this.addr + index, length, (byte) 0xff);
}
return this;
}
/**
* Returns <code>this</code> if size is less than {@link #capacity()}, otherwise delegates to
* {@link BufferManager#replace(ArrowBuf, long)} to get a new buffer.
*/
public ArrowBuf reallocIfNeeded(final long size) {
Preconditions.checkArgument(size >= 0, "reallocation size must be non-negative");
if (this.capacity() >= size) {
return this;
}
if (bufferManager != null) {
return bufferManager.replace(this, size);
} else {
throw new UnsupportedOperationException(
"Realloc is only available in the context of operator's UDFs");
}
}
public ArrowBuf clear() {
this.readerIndex = this.writerIndex = 0;
return this;
}
}