VectorSchemaRootTransformer.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.arrow.driver.jdbc.utils;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.BaseFixedWidthVector;
import org.apache.arrow.vector.BaseVariableWidthVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;

/** Converts Arrow's {@link VectorSchemaRoot} format to one JDBC would expect. */
@FunctionalInterface
public interface VectorSchemaRootTransformer {
  VectorSchemaRoot transform(VectorSchemaRoot originalRoot, VectorSchemaRoot transformedRoot)
      throws Exception;

  /** Transformer's helper class; builds a new {@link VectorSchemaRoot}. */
  class Builder {

    private final Schema schema;
    private final BufferAllocator bufferAllocator;
    private final List<Field> newFields = new ArrayList<>();
    private final Collection<Task> tasks = new ArrayList<>();

    /**
     * Constructor for the VectorSchemaRootTransformer's Builder.
     *
     * @param schema The Arrow schema.
     * @param bufferAllocator The BufferAllocator to use for allocating memory.
     */
    public Builder(final Schema schema, final BufferAllocator bufferAllocator) {
      this.schema = schema;
      this.bufferAllocator =
          bufferAllocator.newChildAllocator(
              "VectorSchemaRootTransformer", 0, bufferAllocator.getLimit());
    }

    /**
     * Add task to transform a vector to a new vector renaming it. This also adds
     * transformedVectorName to the transformed {@link VectorSchemaRoot} schema.
     *
     * @param originalVectorName Name of the original vector to be transformed.
     * @param transformedVectorName Name of the vector that is the result of the transformation.
     * @return a VectorSchemaRoot instance with a task to rename a field vector.
     */
    public Builder renameFieldVector(
        final String originalVectorName, final String transformedVectorName) {
      tasks.add(
          (originalRoot, transformedRoot) -> {
            final FieldVector originalVector = originalRoot.getVector(originalVectorName);
            final FieldVector transformedVector = transformedRoot.getVector(transformedVectorName);

            final ArrowType originalType = originalVector.getField().getType();
            final ArrowType transformedType = transformedVector.getField().getType();
            if (!originalType.equals(transformedType)) {
              throw new IllegalArgumentException(
                  String.format(
                      "Cannot transfer vector with field type %s to %s",
                      originalType, transformedType));
            }

            if (originalVector instanceof BaseVariableWidthVector) {
              ((BaseVariableWidthVector) originalVector)
                  .transferTo(((BaseVariableWidthVector) transformedVector));
            } else if (originalVector instanceof BaseFixedWidthVector) {
              ((BaseFixedWidthVector) originalVector)
                  .transferTo(((BaseFixedWidthVector) transformedVector));
            } else {
              throw new IllegalStateException(
                  String.format("Cannot transfer vector of type %s", originalVector.getClass()));
            }
          });

      final Field originalField = schema.findField(originalVectorName);
      newFields.add(
          new Field(
              transformedVectorName,
              new FieldType(
                  originalField.isNullable(),
                  originalField.getType(),
                  originalField.getDictionary(),
                  originalField.getMetadata()),
              originalField.getChildren()));

      return this;
    }

    /**
     * Adds an empty field to the transformed {@link VectorSchemaRoot} schema.
     *
     * @param fieldName Name of the field to be added.
     * @param fieldType Type of the field to be added.
     * @return a VectorSchemaRoot instance with the current tasks.
     */
    public Builder addEmptyField(final String fieldName, final Types.MinorType fieldType) {
      newFields.add(Field.nullable(fieldName, fieldType.getType()));

      return this;
    }

    /**
     * Adds an empty field to the transformed {@link VectorSchemaRoot} schema.
     *
     * @param fieldName Name of the field to be added.
     * @param fieldType Type of the field to be added.
     * @return a VectorSchemaRoot instance with the current tasks.
     */
    public Builder addEmptyField(final String fieldName, final ArrowType fieldType) {
      newFields.add(Field.nullable(fieldName, fieldType));

      return this;
    }

    /**
     * Build the {@link VectorSchemaRoot} with applied transformation tasks.
     *
     * @return The built {@link VectorSchemaRoot}.
     */
    public VectorSchemaRootTransformer build() {
      return (originalRoot, transformedRoot) -> {
        if (transformedRoot == null) {
          transformedRoot = VectorSchemaRoot.create(new Schema(newFields), bufferAllocator);
        }

        for (final Task task : tasks) {
          task.run(originalRoot, transformedRoot);
        }

        transformedRoot.setRowCount(originalRoot.getRowCount());

        originalRoot.clear();
        return transformedRoot;
      };
    }

    /**
     * Functional interface used to a task to transform a VectorSchemaRoot into a new
     * VectorSchemaRoot.
     */
    @FunctionalInterface
    interface Task {
      void run(VectorSchemaRoot originalRoot, VectorSchemaRoot transformedRoot);
    }
  }
}