ITTestIPCWithLargeArrowBuffers.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.vector.ipc;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Integration test for reading/writing {@link org.apache.arrow.vector.VectorSchemaRoot} with large
* (more than 2GB) buffers by {@link ArrowReader} and {@link ArrowWriter}.. To run this test, please
* make sure there is at least 8GB free memory, and 8GB free.disk space in the system.
*/
public class ITTestIPCWithLargeArrowBuffers {
private static final Logger logger =
LoggerFactory.getLogger(ITTestIPCWithLargeArrowBuffers.class);
// 4GB buffer size
static final long BUFFER_SIZE = 4 * 1024 * 1024 * 1024L;
static final int DICTIONARY_VECTOR_SIZE = (int) (BUFFER_SIZE / BigIntVector.TYPE_WIDTH);
static final int ENCODED_VECTOR_SIZE = (int) (BUFFER_SIZE / IntVector.TYPE_WIDTH);
static final String FILE_NAME = "largeArrowData.data";
static final long DICTIONARY_ID = 123L;
static final ArrowType.Int ENCODED_VECTOR_TYPE = new ArrowType.Int(32, true);
static final DictionaryEncoding DICTIONARY_ENCODING =
new DictionaryEncoding(DICTIONARY_ID, false, ENCODED_VECTOR_TYPE);
static final FieldType ENCODED_FIELD_TYPE =
new FieldType(true, ENCODED_VECTOR_TYPE, DICTIONARY_ENCODING, null);
static final Field ENCODED_VECTOR_FIELD = new Field("encoded vector", ENCODED_FIELD_TYPE, null);
private void testWriteLargeArrowData(boolean streamMode) throws IOException {
// simulate encoding big int as int
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
BigIntVector dictVector = new BigIntVector("dic vector", allocator);
FileOutputStream out = new FileOutputStream(FILE_NAME);
IntVector encodedVector = (IntVector) ENCODED_VECTOR_FIELD.createVector(allocator)) {
// prepare dictionary provider.
DictionaryProvider.MapDictionaryProvider provider =
new DictionaryProvider.MapDictionaryProvider();
Dictionary dictionary = new Dictionary(dictVector, DICTIONARY_ENCODING);
provider.put(dictionary);
// populate the dictionary vector
dictVector.allocateNew(DICTIONARY_VECTOR_SIZE);
for (int i = 0; i < DICTIONARY_VECTOR_SIZE; i++) {
dictVector.set(i, i);
}
dictVector.setValueCount(DICTIONARY_VECTOR_SIZE);
assertTrue(dictVector.getDataBuffer().capacity() > Integer.MAX_VALUE);
logger.trace("Populating dictionary vector finished");
// populate the encoded vector
encodedVector.allocateNew(ENCODED_VECTOR_SIZE);
for (int i = 0; i < ENCODED_VECTOR_SIZE; i++) {
encodedVector.set(i, i % DICTIONARY_VECTOR_SIZE);
}
encodedVector.setValueCount(ENCODED_VECTOR_SIZE);
assertTrue(encodedVector.getDataBuffer().capacity() > Integer.MAX_VALUE);
logger.trace("Populating encoded vector finished");
// build vector schema root and write data.
try (VectorSchemaRoot root =
new VectorSchemaRoot(
Arrays.asList(ENCODED_VECTOR_FIELD),
Arrays.asList(encodedVector),
ENCODED_VECTOR_SIZE);
ArrowWriter writer =
streamMode
? new ArrowStreamWriter(root, provider, out)
: new ArrowFileWriter(root, provider, out.getChannel())) {
writer.start();
writer.writeBatch();
writer.end();
logger.trace("Writing data finished");
}
}
assertTrue(new File(FILE_NAME).exists());
}
private void testReadLargeArrowData(boolean streamMode) throws IOException {
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
FileInputStream in = new FileInputStream(FILE_NAME);
ArrowReader reader =
streamMode
? new ArrowStreamReader(in, allocator)
: new ArrowFileReader(in.getChannel(), allocator)) {
// verify schema
Schema readSchema = reader.getVectorSchemaRoot().getSchema();
assertEquals(1, readSchema.getFields().size());
assertEquals(ENCODED_VECTOR_FIELD, readSchema.getFields().get(0));
logger.trace("Verifying schema finished");
// verify vector schema root
assertTrue(reader.loadNextBatch());
VectorSchemaRoot root = reader.getVectorSchemaRoot();
assertEquals(ENCODED_VECTOR_SIZE, root.getRowCount());
assertEquals(1, root.getFieldVectors().size());
assertTrue(root.getFieldVectors().get(0) instanceof IntVector);
IntVector encodedVector = (IntVector) root.getVector(0);
for (int i = 0; i < ENCODED_VECTOR_SIZE; i++) {
assertEquals(i % DICTIONARY_VECTOR_SIZE, encodedVector.get(i));
}
logger.trace("Verifying encoded vector finished");
// verify dictionary
Map<Long, Dictionary> dictVectors = reader.getDictionaryVectors();
assertEquals(1, dictVectors.size());
Dictionary dictionary = dictVectors.get(DICTIONARY_ID);
assertNotNull(dictionary);
assertTrue(dictionary.getVector() instanceof BigIntVector);
BigIntVector dictVector = (BigIntVector) dictionary.getVector();
assertEquals(DICTIONARY_VECTOR_SIZE, dictVector.getValueCount());
for (int i = 0; i < DICTIONARY_VECTOR_SIZE; i++) {
assertEquals(i, dictVector.get(i));
}
logger.trace("Verifying dictionary vector finished");
// ensure no more data available
assertFalse(reader.loadNextBatch());
} finally {
File dataFile = new File(FILE_NAME);
dataFile.delete();
assertFalse(dataFile.exists());
}
}
@Test
public void testIPC() throws IOException {
logger.trace("Start testing reading/writing large arrow stream data");
testWriteLargeArrowData(true);
testReadLargeArrowData(true);
logger.trace("Finish testing reading/writing large arrow stream data");
logger.trace("Start testing reading/writing large arrow file data");
testWriteLargeArrowData(false);
testReadLargeArrowData(false);
logger.trace("Finish testing reading/writing large arrow file data");
}
}