NonNullableStructVector.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.arrow.vector.complex;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.util.ByteFunctionHelpers;
import org.apache.arrow.memory.util.hash.ArrowBufHasher;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.DensityAwareVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueIterableVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.compare.VectorVisitor;
import org.apache.arrow.vector.complex.impl.SingleStructReaderImpl;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.holders.ComplexHolder;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.util.CallBack;
import org.apache.arrow.vector.util.JsonStringHashMap;
import org.apache.arrow.vector.util.TransferPair;

/**
 * A struct vector that has no null values (and no validity buffer). Child Vectors are handled in
 * {@link AbstractStructVector}.
 */
public class NonNullableStructVector extends AbstractStructVector
    implements ValueIterableVector<Map<String, ?>> {

  /**
   * Construct a new empty instance which replaces an existing field with the new one in case of
   * name conflict.
   */
  public static NonNullableStructVector empty(String name, BufferAllocator allocator) {
    FieldType fieldType = new FieldType(false, ArrowType.Struct.INSTANCE, null, null);
    return new NonNullableStructVector(
        name, allocator, fieldType, null, ConflictPolicy.CONFLICT_REPLACE, false);
  }

  /** Construct a new empty instance which preserve fields with identical names. */
  public static NonNullableStructVector emptyWithDuplicates(
      String name, BufferAllocator allocator) {
    FieldType fieldType = new FieldType(false, ArrowType.Struct.INSTANCE, null, null);
    return new NonNullableStructVector(
        name, allocator, fieldType, null, ConflictPolicy.CONFLICT_APPEND, true);
  }

  private final SingleStructReaderImpl reader = new SingleStructReaderImpl(this);
  protected Field field;
  public int valueCount;

  /**
   * Constructs a new instance.
   *
   * @param name The name of the instance.
   * @param allocator The allocator to use to allocating/reallocating buffers.
   * @param fieldType The type of this list.
   */
  public NonNullableStructVector(
      String name, BufferAllocator allocator, FieldType fieldType, CallBack callBack) {
    this(new Field(name, fieldType, null), allocator, callBack);
  }

  /**
   * Constructs a new instance.
   *
   * @param field The field materialized by this vector.
   * @param allocator The allocator to use to allocating/reallocating buffers.
   * @param callBack A schema change callback.
   */
  public NonNullableStructVector(Field field, BufferAllocator allocator, CallBack callBack) {
    this(field, allocator, callBack, null, true);
  }

  /**
   * Constructs a new instance.
   *
   * @param name The name of the instance.
   * @param allocator The allocator to use to allocating/reallocating buffers.
   * @param fieldType The type of this list.
   * @param callBack A schema change callback.
   * @param conflictPolicy How to handle duplicate field names in the struct.
   */
  public NonNullableStructVector(
      String name,
      BufferAllocator allocator,
      FieldType fieldType,
      CallBack callBack,
      ConflictPolicy conflictPolicy,
      boolean allowConflictPolicyChanges) {
    this(
        new Field(name, fieldType, null),
        allocator,
        callBack,
        conflictPolicy,
        allowConflictPolicyChanges);
  }

  /**
   * Constructs a new instance.
   *
   * @param field The field materialized by this vector.
   * @param allocator The allocator to use to allocating/reallocating buffers.
   * @param callBack A schema change callback.
   * @param conflictPolicy How to handle duplicate field names in the struct.
   */
  public NonNullableStructVector(
      Field field,
      BufferAllocator allocator,
      CallBack callBack,
      ConflictPolicy conflictPolicy,
      boolean allowConflictPolicyChanges) {
    super(field.getName(), allocator, callBack, conflictPolicy, allowConflictPolicyChanges);
    this.field = field;
    this.valueCount = 0;
  }

  @Override
  public FieldReader getReader() {
    return reader;
  }

  private transient StructTransferPair ephPair;

  /**
   * Copies the element at fromIndex in the provided vector to thisIndex. Reallocates buffers if
   * thisIndex is larger then current capacity.
   */
  @Override
  public void copyFrom(int fromIndex, int thisIndex, ValueVector from) {
    Preconditions.checkArgument(this.getMinorType() == from.getMinorType());
    if (ephPair == null || ephPair.from != from) {
      ephPair = (StructTransferPair) from.makeTransferPair(this);
    }
    ephPair.copyValueSafe(fromIndex, thisIndex);
  }

  @Override
  public void copyFromSafe(int fromIndex, int thisIndex, ValueVector from) {
    copyFrom(fromIndex, thisIndex, from);
  }

  @Override
  protected boolean supportsDirectRead() {
    return true;
  }

  public Iterator<String> fieldNameIterator() {
    return getChildFieldNames().iterator();
  }

  @Override
  public void setInitialCapacity(int numRecords) {
    for (final ValueVector v : this) {
      v.setInitialCapacity(numRecords);
    }
  }

  @Override
  public void setInitialCapacity(int valueCount, double density) {
    for (final ValueVector vector : this) {
      if (vector instanceof DensityAwareVector) {
        ((DensityAwareVector) vector).setInitialCapacity(valueCount, density);
      } else {
        vector.setInitialCapacity(valueCount);
      }
    }
  }

  @Override
  public int getBufferSize() {
    if (valueCount == 0 || size() == 0) {
      return 0;
    }
    long buffer = 0;
    for (final ValueVector v : this) {
      buffer += v.getBufferSize();
    }

    return (int) buffer;
  }

  @Override
  public int getBufferSizeFor(final int valueCount) {
    if (valueCount == 0) {
      return 0;
    }

    long bufferSize = 0;
    for (final ValueVector v : this) {
      bufferSize += v.getBufferSizeFor(valueCount);
    }

    return (int) bufferSize;
  }

  @Override
  public ArrowBuf getValidityBuffer() {
    throw new UnsupportedOperationException();
  }

  @Override
  public ArrowBuf getDataBuffer() {
    throw new UnsupportedOperationException();
  }

  @Override
  public ArrowBuf getOffsetBuffer() {
    throw new UnsupportedOperationException();
  }

  @Override
  public TransferPair getTransferPair(BufferAllocator allocator) {
    return getTransferPair(name, allocator, null);
  }

  @Override
  public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
    return new StructTransferPair(
        this,
        new NonNullableStructVector(
            name,
            allocator,
            field.getFieldType(),
            callBack,
            getConflictPolicy(),
            allowConflictPolicyChanges),
        false);
  }

  @Override
  public TransferPair makeTransferPair(ValueVector to) {
    return new StructTransferPair(this, (NonNullableStructVector) to);
  }

  @Override
  public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
    return new StructTransferPair(
        this,
        new NonNullableStructVector(
            ref,
            allocator,
            field.getFieldType(),
            callBack,
            getConflictPolicy(),
            allowConflictPolicyChanges),
        false);
  }

  @Override
  public TransferPair getTransferPair(Field field, BufferAllocator allocator) {
    return new StructTransferPair(
        this,
        new NonNullableStructVector(
            field, allocator, callBack, getConflictPolicy(), allowConflictPolicyChanges),
        false);
  }

  @Override
  public TransferPair getTransferPair(Field field, BufferAllocator allocator, CallBack callBack) {
    return new StructTransferPair(
        this,
        new NonNullableStructVector(
            field, allocator, callBack, getConflictPolicy(), allowConflictPolicyChanges),
        false);
  }

  /** {@link TransferPair} for this this class. */
  protected static class StructTransferPair implements TransferPair {
    private final TransferPair[] pairs;
    private final NonNullableStructVector from;
    private final NonNullableStructVector to;

    public StructTransferPair(NonNullableStructVector from, NonNullableStructVector to) {
      this(from, to, true);
    }

    protected StructTransferPair(
        NonNullableStructVector from, NonNullableStructVector to, boolean allocate) {
      this.from = from;
      this.to = to;
      this.pairs = new TransferPair[from.size()];
      this.to.ephPair = null;

      int i = 0;
      FieldVector vector;
      for (String child : from.getChildFieldNames()) {
        int preSize = to.size();
        vector = from.getChild(child);
        if (vector == null) {
          continue;
        }
        // DRILL-1872: we add the child fields for the vector, looking up the field by name. For a
        // map vector,
        // the child fields may be nested fields of the top level child. For example if the
        // structure
        // of a child field is oa.oab.oabc then we add oa, then add oab to oa then oabc to oab.
        // But the children member of a Materialized field is a HashSet. If the fields are added in
        // the
        // children HashSet, and the hashCode of the Materialized field includes the hash code of
        // the
        // children, the hashCode value of oa changes *after* the field has been added to the
        // HashSet.
        // (This is similar to what happens in ScanBatch where the children cannot be added till
        // they are
        // read). To take care of this, we ensure that the hashCode of the MaterializedField does
        // not
        // include the hashCode of the children but is based only on MaterializedField$key.
        final FieldVector newVector =
            to.addOrGet(child, vector.getField().getFieldType(), vector.getClass());
        if (allocate && to.size() != preSize) {
          newVector.allocateNew();
        }
        pairs[i++] = vector.makeTransferPair(newVector);
      }
    }

    @Override
    public void transfer() {
      for (final TransferPair p : pairs) {
        p.transfer();
      }
      to.valueCount = from.valueCount;
      from.clear();
    }

    @Override
    public ValueVector getTo() {
      return to;
    }

    @Override
    public void copyValueSafe(int from, int to) {
      for (TransferPair p : pairs) {
        p.copyValueSafe(from, to);
      }
    }

    @Override
    public void splitAndTransfer(int startIndex, int length) {
      for (TransferPair p : pairs) {
        p.splitAndTransfer(startIndex, length);
      }
      to.setValueCount(length);
    }
  }

  @Override
  public int getValueCapacity() {
    if (size() == 0) {
      return 0;
    }

    return getChildren().stream().mapToInt(child -> child.getValueCapacity()).min().getAsInt();
  }

  @Override
  public Map<String, ?> getObject(int index) {
    Map<String, Object> vv = new JsonStringHashMap<>();
    for (String child : getChildFieldNames()) {
      ValueVector v = getChild(child);
      if (v != null && index < v.getValueCount()) {
        Object value = v.getObject(index);
        if (value != null) {
          vv.put(child, value);
        }
      }
    }
    return vv;
  }

  @Override
  public int hashCode(int index) {
    return hashCode(index, null);
  }

  @Override
  public int hashCode(int index, ArrowBufHasher hasher) {
    int hash = 0;
    for (FieldVector v : getChildren()) {
      if (index < v.getValueCount()) {
        hash = ByteFunctionHelpers.combineHash(hash, v.hashCode(index, hasher));
      }
    }
    return hash;
  }

  @Override
  public <OUT, IN> OUT accept(VectorVisitor<OUT, IN> visitor, IN value) {
    return visitor.visit(this, value);
  }

  @Override
  public boolean isNull(int index) {
    return false;
  }

  @Override
  public int getNullCount() {
    return 0;
  }

  public void get(int index, ComplexHolder holder) {
    reader.setPosition(index);
    holder.reader = reader;
  }

  @Override
  public int getValueCount() {
    return valueCount;
  }

  public ValueVector getVectorById(int id) {
    return getChildByOrdinal(id);
  }

  /** Gets a child vector by ordinal position and casts to the specified class. */
  public <V extends ValueVector> V getVectorById(int id, Class<V> clazz) {
    ValueVector untyped = getVectorById(id);
    if (clazz.isInstance(untyped)) {
      return clazz.cast(untyped);
    }
    throw new ClassCastException(
        "Id "
            + id
            + " had the wrong type. Expected "
            + clazz.getCanonicalName()
            + " but was "
            + untyped.getClass().getCanonicalName());
  }

  @Override
  public void setValueCount(int valueCount) {
    for (final ValueVector v : getChildren()) {
      v.setValueCount(valueCount);
    }
    NonNullableStructVector.this.valueCount = valueCount;
  }

  @Override
  public void clear() {
    for (final ValueVector v : getChildren()) {
      v.clear();
    }
    valueCount = 0;
  }

  @Override
  public void reset() {
    for (final ValueVector v : getChildren()) {
      v.reset();
    }
    valueCount = 0;
  }

  @Override
  public Field getField() {
    List<Field> children = new ArrayList<>();
    for (ValueVector child : getChildren()) {
      children.add(child.getField());
    }
    if (children.isEmpty() || field.getChildren().equals(children)) {
      return field;
    }
    field = new Field(field.getName(), field.getFieldType(), children);
    return field;
  }

  @Override
  public MinorType getMinorType() {
    return MinorType.STRUCT;
  }

  @Override
  public void close() {
    final Collection<FieldVector> vectors = getChildren();
    for (final FieldVector v : vectors) {
      v.close();
    }
    vectors.clear();

    valueCount = 0;

    super.close();
  }

  /** Initializes the struct's members from the given Fields. */
  public void initializeChildrenFromFields(List<Field> children) {
    for (Field field : children) {
      FieldVector vector = (FieldVector) this.add(field.getName(), field.getFieldType());
      vector.initializeChildrenFromFields(field.getChildren());
    }
  }

  public List<FieldVector> getChildrenFromFields() {
    return getChildren();
  }
}