AvroToArrowVectorIterator.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.adapter.avro;
import java.io.EOFException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.arrow.adapter.avro.consumers.CompositeAvroConsumer;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.util.ValueVectorUtility;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
/** VectorSchemaRoot iterator for partially converting avro data. */
public class AvroToArrowVectorIterator implements Iterator<VectorSchemaRoot>, AutoCloseable {
public static final int NO_LIMIT_BATCH_SIZE = -1;
public static final int DEFAULT_BATCH_SIZE = 1024;
private final Decoder decoder;
private final Schema schema;
private final AvroToArrowConfig config;
private CompositeAvroConsumer compositeConsumer;
private org.apache.arrow.vector.types.pojo.Schema rootSchema;
private VectorSchemaRoot nextBatch;
private final int targetBatchSize;
/** Construct an instance. */
private AvroToArrowVectorIterator(Decoder decoder, Schema schema, AvroToArrowConfig config) {
this.decoder = decoder;
this.schema = schema;
this.config = config;
this.targetBatchSize = config.getTargetBatchSize();
}
/** Create a ArrowVectorIterator to partially convert data. */
public static AvroToArrowVectorIterator create(
Decoder decoder, Schema schema, AvroToArrowConfig config) {
AvroToArrowVectorIterator iterator = new AvroToArrowVectorIterator(decoder, schema, config);
try {
iterator.initialize();
return iterator;
} catch (Exception e) {
iterator.close();
throw new RuntimeException("Error occurs while creating iterator.", e);
}
}
private void initialize() {
// create consumers
compositeConsumer = AvroToArrowUtils.createCompositeConsumer(schema, config);
List<FieldVector> vectors = new ArrayList<>();
compositeConsumer.getConsumers().forEach(c -> vectors.add(c.getVector()));
List<Field> fields = vectors.stream().map(t -> t.getField()).collect(Collectors.toList());
VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 0);
rootSchema = root.getSchema();
load(root);
}
private void consumeData(VectorSchemaRoot root) {
int readRowCount = 0;
try {
while ((targetBatchSize == NO_LIMIT_BATCH_SIZE || readRowCount < targetBatchSize)) {
compositeConsumer.consume(decoder);
readRowCount++;
}
if (targetBatchSize == NO_LIMIT_BATCH_SIZE) {
while (true) {
ValueVectorUtility.ensureCapacity(root, readRowCount + 1);
compositeConsumer.consume(decoder);
readRowCount++;
}
} else {
while (readRowCount < targetBatchSize) {
compositeConsumer.consume(decoder);
readRowCount++;
}
}
root.setRowCount(readRowCount);
} catch (EOFException eof) {
// reach the end of encoder stream.
root.setRowCount(readRowCount);
} catch (Exception e) {
compositeConsumer.close();
throw new RuntimeException("Error occurs while consuming data.", e);
}
}
// Loads the next schema root or null if no more rows are available.
private void load(VectorSchemaRoot root) {
final int targetBatchSize = config.getTargetBatchSize();
if (targetBatchSize != NO_LIMIT_BATCH_SIZE) {
ValueVectorUtility.preAllocate(root, targetBatchSize);
}
long validConsumerCount =
compositeConsumer.getConsumers().stream().filter(c -> !c.skippable()).count();
Preconditions.checkArgument(
root.getFieldVectors().size() == validConsumerCount,
"Schema root vectors size not equals to consumers size.");
compositeConsumer.resetConsumerVectors(root);
// consume data
consumeData(root);
if (root.getRowCount() == 0) {
root.close();
nextBatch = null;
} else {
nextBatch = root;
}
}
@Override
public boolean hasNext() {
return nextBatch != null;
}
/** Gets the next vector. The user is responsible for freeing its resources. */
@Override
public VectorSchemaRoot next() {
Preconditions.checkArgument(hasNext());
VectorSchemaRoot returned = nextBatch;
try {
load(VectorSchemaRoot.create(rootSchema, config.getAllocator()));
} catch (Exception e) {
returned.close();
throw new RuntimeException("Error occurs while getting next schema root.", e);
}
return returned;
}
/** Clean up resources. */
@Override
public void close() {
if (nextBatch != null) {
nextBatch.close();
}
compositeConsumer.close();
}
}