JdbcToArrowUtils.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.arrow.adapter.jdbc;

import static org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE;
import static org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE;
import static org.apache.arrow.vector.types.Types.MinorType;

import java.io.IOException;
import java.math.RoundingMode;
import java.sql.Date;
import java.sql.ParameterMetaData;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.TimeZone;
import java.util.function.Function;
import org.apache.arrow.adapter.jdbc.consumer.ArrayConsumer;
import org.apache.arrow.adapter.jdbc.consumer.BigIntConsumer;
import org.apache.arrow.adapter.jdbc.consumer.BinaryConsumer;
import org.apache.arrow.adapter.jdbc.consumer.BitConsumer;
import org.apache.arrow.adapter.jdbc.consumer.CompositeJdbcConsumer;
import org.apache.arrow.adapter.jdbc.consumer.DateConsumer;
import org.apache.arrow.adapter.jdbc.consumer.Decimal256Consumer;
import org.apache.arrow.adapter.jdbc.consumer.DecimalConsumer;
import org.apache.arrow.adapter.jdbc.consumer.DoubleConsumer;
import org.apache.arrow.adapter.jdbc.consumer.FloatConsumer;
import org.apache.arrow.adapter.jdbc.consumer.IntConsumer;
import org.apache.arrow.adapter.jdbc.consumer.JdbcConsumer;
import org.apache.arrow.adapter.jdbc.consumer.MapConsumer;
import org.apache.arrow.adapter.jdbc.consumer.NullConsumer;
import org.apache.arrow.adapter.jdbc.consumer.SmallIntConsumer;
import org.apache.arrow.adapter.jdbc.consumer.TimeConsumer;
import org.apache.arrow.adapter.jdbc.consumer.TimestampConsumer;
import org.apache.arrow.adapter.jdbc.consumer.TimestampTZConsumer;
import org.apache.arrow.adapter.jdbc.consumer.TinyIntConsumer;
import org.apache.arrow.adapter.jdbc.consumer.VarCharConsumer;
import org.apache.arrow.adapter.jdbc.consumer.exceptions.JdbcConsumerException;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.Decimal256Vector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.NullVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeMilliVector;
import org.apache.arrow.vector.TimeStampMilliTZVector;
import org.apache.arrow.vector.TimeStampMilliVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.extension.OpaqueType;
import org.apache.arrow.vector.types.DateUnit;
import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.ValueVectorUtility;

/**
 * Class that does most of the work to convert JDBC ResultSet data into Arrow columnar format Vector
 * objects.
 *
 * @since 0.10.0
 */
public class JdbcToArrowUtils {

  private static final int JDBC_ARRAY_VALUE_COLUMN = 2;

  /** Returns the instance of a {java.util.Calendar} with the UTC time zone and root locale. */
  public static Calendar getUtcCalendar() {
    return Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.ROOT);
  }

  /**
   * Create Arrow {@link Schema} object for the given JDBC {@link ResultSetMetaData}.
   *
   * @param rsmd The ResultSetMetaData containing the results, to read the JDBC metadata from.
   * @param calendar The calendar to use the time zone field of, to construct Timestamp fields from.
   * @return {@link Schema}
   * @throws SQLException on error
   */
  public static Schema jdbcToArrowSchema(ResultSetMetaData rsmd, Calendar calendar)
      throws SQLException {
    Preconditions.checkNotNull(calendar, "Calendar object can't be null");

    return jdbcToArrowSchema(rsmd, new JdbcToArrowConfig(new RootAllocator(0), calendar));
  }

  /**
   * Create Arrow {@link Schema} object for the given JDBC {@link ResultSetMetaData}.
   *
   * @param parameterMetaData The ResultSetMetaData containing the results, to read the JDBC
   *     metadata from.
   * @param calendar The calendar to use the time zone field of, to construct Timestamp fields from.
   * @return {@link Schema}
   * @throws SQLException on error
   */
  public static Schema jdbcToArrowSchema(
      final ParameterMetaData parameterMetaData, final Calendar calendar) throws SQLException {
    Preconditions.checkNotNull(calendar, "Calendar object can't be null");
    Preconditions.checkNotNull(parameterMetaData);
    final List<Field> parameterFields = new ArrayList<>(parameterMetaData.getParameterCount());
    for (int parameterCounter = 1;
        parameterCounter <= parameterMetaData.getParameterCount();
        parameterCounter++) {
      final int jdbcDataType = parameterMetaData.getParameterType(parameterCounter);
      final int jdbcIsNullable = parameterMetaData.isNullable(parameterCounter);
      final boolean arrowIsNullable = jdbcIsNullable != ParameterMetaData.parameterNoNulls;
      final int precision = parameterMetaData.getPrecision(parameterCounter);
      final int scale = parameterMetaData.getScale(parameterCounter);
      final ArrowType arrowType =
          getArrowTypeFromJdbcType(new JdbcFieldInfo(jdbcDataType, precision, scale), calendar);
      final FieldType fieldType = new FieldType(arrowIsNullable, arrowType, /*dictionary=*/ null);
      parameterFields.add(new Field(null, fieldType, null));
    }

    return new Schema(parameterFields);
  }

  /**
   * Converts the provided JDBC type to its respective {@link ArrowType} counterpart.
   *
   * @param fieldInfo the {@link JdbcFieldInfo} with information about the original JDBC type.
   * @param calendar the {@link Calendar} to use for datetime data types.
   * @return a new {@link ArrowType}.
   */
  public static ArrowType getArrowTypeFromJdbcType(
      final JdbcFieldInfo fieldInfo, final Calendar calendar) {
    switch (fieldInfo.getJdbcType()) {
      case Types.BOOLEAN:
      case Types.BIT:
        return new ArrowType.Bool();
      case Types.TINYINT:
        return new ArrowType.Int(8, true);
      case Types.SMALLINT:
        return new ArrowType.Int(16, true);
      case Types.INTEGER:
        return new ArrowType.Int(32, true);
      case Types.BIGINT:
        return new ArrowType.Int(64, true);
      case Types.NUMERIC:
      case Types.DECIMAL:
        int precision = fieldInfo.getPrecision();
        int scale = fieldInfo.getScale();
        if (precision > 38) {
          return new ArrowType.Decimal(precision, scale, 256);
        } else {
          return new ArrowType.Decimal(precision, scale, 128);
        }
      case Types.REAL:
      case Types.FLOAT:
        return new ArrowType.FloatingPoint(SINGLE);
      case Types.DOUBLE:
        return new ArrowType.FloatingPoint(DOUBLE);
      case Types.CHAR:
      case Types.NCHAR:
      case Types.VARCHAR:
      case Types.NVARCHAR:
      case Types.LONGVARCHAR:
      case Types.LONGNVARCHAR:
      case Types.CLOB:
        return new ArrowType.Utf8();
      case Types.DATE:
        return new ArrowType.Date(DateUnit.DAY);
      case Types.TIME:
        return new ArrowType.Time(TimeUnit.MILLISECOND, 32);
      case Types.TIMESTAMP:
        final String timezone;
        if (calendar != null) {
          timezone = calendar.getTimeZone().getID();
        } else {
          timezone = null;
        }
        return new ArrowType.Timestamp(TimeUnit.MILLISECOND, timezone);
      case Types.BINARY:
      case Types.VARBINARY:
      case Types.LONGVARBINARY:
      case Types.BLOB:
        return new ArrowType.Binary();
      case Types.ARRAY:
        return new ArrowType.List();
      case Types.NULL:
        return new ArrowType.Null();
      case Types.STRUCT:
        return new ArrowType.Struct();
      default:
        throw new UnsupportedOperationException("Unmapped JDBC type: " + fieldInfo.getJdbcType());
    }
  }

  /**
   * Wrap a JDBC to Arrow type converter such that {@link UnsupportedOperationException} becomes
   * {@link OpaqueType}.
   *
   * @param typeConverter The type converter to wrap.
   * @param vendorName The database name to report as the Opaque type's vendor name.
   */
  public static Function<JdbcFieldInfo, ArrowType> reportUnsupportedTypesAsOpaque(
      Function<JdbcFieldInfo, ArrowType> typeConverter, String vendorName) {
    return (final JdbcFieldInfo fieldInfo) -> {
      try {
        return typeConverter.apply(fieldInfo);
      } catch (UnsupportedOperationException e) {
        return new OpaqueType(MinorType.NULL.getType(), fieldInfo.getTypeName(), vendorName);
      }
    };
  }

  /**
   * Create Arrow {@link Schema} object for the given JDBC {@link java.sql.ResultSetMetaData}.
   *
   * <p>If {@link JdbcToArrowConfig#shouldIncludeMetadata()} returns <code>true</code>, the
   * following fields will be added to the {@link FieldType#getMetadata()}:
   *
   * <ul>
   *   <li>{@link Constants#SQL_CATALOG_NAME_KEY} representing {@link
   *       ResultSetMetaData#getCatalogName(int)}
   *   <li>{@link Constants#SQL_TABLE_NAME_KEY} representing {@link
   *       ResultSetMetaData#getTableName(int)}
   *   <li>{@link Constants#SQL_COLUMN_NAME_KEY} representing {@link
   *       ResultSetMetaData#getColumnLabel(int)}
   *   <li>{@link Constants#SQL_TYPE_KEY} representing {@link
   *       ResultSetMetaData#getColumnTypeName(int)}
   * </ul>
   *
   * <p>If any columns are of type {@link java.sql.Types#ARRAY}, the configuration object will be
   * used to look up the array sub-type field. The {@link
   * JdbcToArrowConfig#getArraySubTypeByColumnIndex(int)} method will be checked first, followed by
   * the {@link JdbcToArrowConfig#getArraySubTypeByColumnName(String)} method.
   *
   * @param rsmd The ResultSetMetaData containing the results, to read the JDBC metadata from.
   * @param config The configuration to use when constructing the schema.
   * @return {@link Schema}
   * @throws SQLException on error
   * @throws IllegalArgumentException if <code>rsmd</code> contains an {@link java.sql.Types#ARRAY}
   *     but the <code>config</code> does not have a sub-type definition for it.
   */
  public static Schema jdbcToArrowSchema(ResultSetMetaData rsmd, JdbcToArrowConfig config)
      throws SQLException {
    Preconditions.checkNotNull(rsmd, "JDBC ResultSetMetaData object can't be null");
    Preconditions.checkNotNull(config, "The configuration object must not be null");

    List<Field> fields = new ArrayList<>();
    int columnCount = rsmd.getColumnCount();
    for (int i = 1; i <= columnCount; i++) {
      final String columnName = rsmd.getColumnLabel(i);

      final Map<String, String> columnMetadata =
          config.getColumnMetadataByColumnIndex() != null
              ? config.getColumnMetadataByColumnIndex().get(i)
              : null;
      final Map<String, String> metadata;
      if (config.shouldIncludeMetadata()) {
        metadata = new HashMap<>();
        metadata.put(Constants.SQL_CATALOG_NAME_KEY, rsmd.getCatalogName(i));
        metadata.put(Constants.SQL_SCHEMA_NAME_KEY, rsmd.getSchemaName(i));
        metadata.put(Constants.SQL_TABLE_NAME_KEY, rsmd.getTableName(i));
        metadata.put(Constants.SQL_COLUMN_NAME_KEY, columnName);
        metadata.put(Constants.SQL_TYPE_KEY, rsmd.getColumnTypeName(i));
        if (columnMetadata != null && !columnMetadata.isEmpty()) {
          metadata.putAll(columnMetadata);
        }
      } else {
        if (columnMetadata != null && !columnMetadata.isEmpty()) {
          metadata = columnMetadata;
        } else {
          metadata = null;
        }
      }

      final JdbcFieldInfo columnFieldInfo = getJdbcFieldInfoForColumn(rsmd, i, config);
      final ArrowType arrowType = config.getJdbcToArrowTypeConverter().apply(columnFieldInfo);
      if (arrowType != null) {
        final FieldType fieldType =
            new FieldType(
                isColumnNullable(rsmd, i, columnFieldInfo),
                arrowType, /* dictionary encoding */
                null,
                metadata);

        List<Field> children = null;
        if (arrowType.getTypeID() == ArrowType.List.TYPE_TYPE) {
          final JdbcFieldInfo arrayFieldInfo = getJdbcFieldInfoForArraySubType(rsmd, i, config);
          if (arrayFieldInfo == null) {
            throw new IllegalArgumentException(
                "Configuration does not provide a mapping for array column " + i);
          }
          children = new ArrayList<Field>();
          final ArrowType childType = config.getJdbcToArrowTypeConverter().apply(arrayFieldInfo);
          children.add(new Field("child", FieldType.nullable(childType), null));
        } else if (arrowType.getTypeID() == ArrowType.ArrowTypeID.Map) {
          FieldType mapType = new FieldType(false, ArrowType.Struct.INSTANCE, null, null);
          FieldType keyType = new FieldType(false, new ArrowType.Utf8(), null, null);
          FieldType valueType = new FieldType(false, new ArrowType.Utf8(), null, null);
          children = new ArrayList<>();
          children.add(
              new Field(
                  "child",
                  mapType,
                  Arrays.asList(
                      new Field(MapVector.KEY_NAME, keyType, null),
                      new Field(MapVector.VALUE_NAME, valueType, null))));
        }

        fields.add(new Field(columnName, fieldType, children));
      }
    }
    return new Schema(fields, config.getSchemaMetadata());
  }

  static JdbcFieldInfo getJdbcFieldInfoForColumn(
      ResultSetMetaData rsmd, int arrayColumn, JdbcToArrowConfig config) throws SQLException {
    Preconditions.checkNotNull(rsmd, "ResultSet MetaData object cannot be null");
    Preconditions.checkNotNull(config, "Configuration must not be null");
    Preconditions.checkArgument(
        arrayColumn > 0, "ResultSetMetaData columns start with 1; column cannot be less than 1");
    Preconditions.checkArgument(
        arrayColumn <= rsmd.getColumnCount(),
        "Column number cannot be more than the number of columns");

    JdbcFieldInfo fieldInfo = config.getExplicitTypeByColumnIndex(arrayColumn);
    if (fieldInfo == null) {
      fieldInfo = config.getExplicitTypeByColumnName(rsmd.getColumnLabel(arrayColumn));
    }
    if (fieldInfo != null) {
      return fieldInfo;
    }
    return new JdbcFieldInfo(rsmd, arrayColumn);
  }

  /* Uses the configuration to determine what the array sub-type JdbcFieldInfo is.
   * If no sub-type can be found, returns null.
   */
  private static JdbcFieldInfo getJdbcFieldInfoForArraySubType(
      ResultSetMetaData rsmd, int arrayColumn, JdbcToArrowConfig config) throws SQLException {

    Preconditions.checkNotNull(rsmd, "ResultSet MetaData object cannot be null");
    Preconditions.checkNotNull(config, "Configuration must not be null");
    Preconditions.checkArgument(
        arrayColumn > 0, "ResultSetMetaData columns start with 1; column cannot be less than 1");
    Preconditions.checkArgument(
        arrayColumn <= rsmd.getColumnCount(),
        "Column number cannot be more than the number of columns");

    JdbcFieldInfo fieldInfo = config.getArraySubTypeByColumnIndex(arrayColumn);
    if (fieldInfo == null) {
      fieldInfo = config.getArraySubTypeByColumnName(rsmd.getColumnLabel(arrayColumn));
    }
    return fieldInfo;
  }

  /**
   * Iterate the given JDBC {@link ResultSet} object to fetch the data and transpose it to populate
   * the given Arrow Vector objects.
   *
   * @param rs ResultSet to use to fetch the data from underlying database
   * @param root Arrow {@link VectorSchemaRoot} object to populate
   * @param calendar The calendar to use when reading {@link Date}, {@link Time}, or {@link
   *     Timestamp} data types from the {@link ResultSet}, or <code>null</code> if not converting.
   * @throws SQLException on error
   */
  public static void jdbcToArrowVectors(ResultSet rs, VectorSchemaRoot root, Calendar calendar)
      throws SQLException, IOException {

    Preconditions.checkNotNull(calendar, "Calendar object can't be null");

    jdbcToArrowVectors(rs, root, new JdbcToArrowConfig(new RootAllocator(0), calendar));
  }

  static boolean isColumnNullable(
      ResultSetMetaData resultSetMetadata, int index, JdbcFieldInfo info) throws SQLException {
    int nullableValue;
    if (info != null && info.isNullable() != ResultSetMetaData.columnNullableUnknown) {
      nullableValue = info.isNullable();
    } else {
      nullableValue = resultSetMetadata.isNullable(index);
    }
    return nullableValue == ResultSetMetaData.columnNullable
        || nullableValue == ResultSetMetaData.columnNullableUnknown;
  }

  /**
   * Iterate the given JDBC {@link ResultSet} object to fetch the data and transpose it to populate
   * the given Arrow Vector objects.
   *
   * @param rs ResultSet to use to fetch the data from underlying database
   * @param root Arrow {@link VectorSchemaRoot} object to populate
   * @param config The configuration to use when reading the data.
   * @throws SQLException on error
   * @throws JdbcConsumerException on error from VectorConsumer
   */
  public static void jdbcToArrowVectors(
      ResultSet rs, VectorSchemaRoot root, JdbcToArrowConfig config)
      throws SQLException, IOException {

    ResultSetMetaData rsmd = rs.getMetaData();
    int columnCount = rsmd.getColumnCount();

    JdbcConsumer[] consumers = new JdbcConsumer[columnCount];
    for (int i = 1; i <= columnCount; i++) {
      FieldVector vector = root.getVector(rsmd.getColumnLabel(i));
      final JdbcFieldInfo columnFieldInfo = getJdbcFieldInfoForColumn(rsmd, i, config);
      consumers[i - 1] =
          getConsumer(
              vector.getField().getType(),
              i,
              isColumnNullable(rsmd, i, columnFieldInfo),
              vector,
              config);
    }

    CompositeJdbcConsumer compositeConsumer = null;
    // Only clean resources when occurs error,
    // vectors within consumers are useful and users are responsible for its close.
    try {
      compositeConsumer = new CompositeJdbcConsumer(consumers);
      int readRowCount = 0;
      if (config.getTargetBatchSize() == JdbcToArrowConfig.NO_LIMIT_BATCH_SIZE) {
        while (rs.next()) {
          ValueVectorUtility.ensureCapacity(root, readRowCount + 1);
          compositeConsumer.consume(rs);
          readRowCount++;
        }
      } else {
        while (readRowCount < config.getTargetBatchSize() && rs.next()) {
          compositeConsumer.consume(rs);
          readRowCount++;
        }
      }

      root.setRowCount(readRowCount);
    } catch (Exception e) {
      // error occurs and clean up resources.
      if (compositeConsumer != null) {
        compositeConsumer.close();
      }
      throw e;
    }
  }

  /**
   * Default function used for JdbcConsumerFactory. This function gets a JdbcConsumer for the given
   * column based on the Arrow type and provided vector.
   *
   * @param arrowType Arrow type for the column.
   * @param columnIndex Column index to fetch from the ResultSet
   * @param nullable Whether the value is nullable or not
   * @param vector Vector to store the consumed value
   * @param config Associated JdbcToArrowConfig, used mainly for the Calendar.
   * @return {@link JdbcConsumer}
   */
  public static JdbcConsumer getConsumer(
      ArrowType arrowType,
      int columnIndex,
      boolean nullable,
      FieldVector vector,
      JdbcToArrowConfig config) {
    final Calendar calendar = config.getCalendar();

    switch (arrowType.getTypeID()) {
      case Bool:
        return BitConsumer.createConsumer((BitVector) vector, columnIndex, nullable);
      case Int:
        switch (((ArrowType.Int) arrowType).getBitWidth()) {
          case 8:
            return TinyIntConsumer.createConsumer((TinyIntVector) vector, columnIndex, nullable);
          case 16:
            return SmallIntConsumer.createConsumer((SmallIntVector) vector, columnIndex, nullable);
          case 32:
            return IntConsumer.createConsumer((IntVector) vector, columnIndex, nullable);
          case 64:
            return BigIntConsumer.createConsumer((BigIntVector) vector, columnIndex, nullable);
          default:
            return null;
        }
      case Decimal:
        final RoundingMode bigDecimalRoundingMode = config.getBigDecimalRoundingMode();
        if (((ArrowType.Decimal) arrowType).getBitWidth() == 256) {
          return Decimal256Consumer.createConsumer(
              (Decimal256Vector) vector, columnIndex, nullable, bigDecimalRoundingMode);
        } else {
          return DecimalConsumer.createConsumer(
              (DecimalVector) vector, columnIndex, nullable, bigDecimalRoundingMode);
        }
      case FloatingPoint:
        switch (((ArrowType.FloatingPoint) arrowType).getPrecision()) {
          case SINGLE:
            return FloatConsumer.createConsumer((Float4Vector) vector, columnIndex, nullable);
          case DOUBLE:
            return DoubleConsumer.createConsumer((Float8Vector) vector, columnIndex, nullable);
          default:
            return null;
        }
      case Utf8:
      case LargeUtf8:
        return VarCharConsumer.createConsumer((VarCharVector) vector, columnIndex, nullable);
      case Binary:
      case LargeBinary:
        return BinaryConsumer.createConsumer((VarBinaryVector) vector, columnIndex, nullable);
      case Date:
        return DateConsumer.createConsumer((DateDayVector) vector, columnIndex, nullable, calendar);
      case Time:
        return TimeConsumer.createConsumer(
            (TimeMilliVector) vector, columnIndex, nullable, calendar);
      case Timestamp:
        if (config.getCalendar() == null) {
          return TimestampConsumer.createConsumer(
              (TimeStampMilliVector) vector, columnIndex, nullable);
        } else {
          return TimestampTZConsumer.createConsumer(
              (TimeStampMilliTZVector) vector, columnIndex, nullable, calendar);
        }
      case List:
        FieldVector childVector = ((ListVector) vector).getDataVector();
        JdbcConsumer delegate =
            getConsumer(
                childVector.getField().getType(),
                JDBC_ARRAY_VALUE_COLUMN,
                childVector.getField().isNullable(),
                childVector,
                config);
        return ArrayConsumer.createConsumer((ListVector) vector, delegate, columnIndex, nullable);
      case Map:
        return MapConsumer.createConsumer((MapVector) vector, columnIndex, nullable);
      case Null:
        return new NullConsumer((NullVector) vector);
      default:
        // no-op, shouldn't get here
        throw new UnsupportedOperationException("No consumer for Arrow type: " + arrowType);
    }
  }
}