BaseAllocator.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.arrow.memory;

import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;
import org.apache.arrow.memory.rounding.DefaultRoundingPolicy;
import org.apache.arrow.memory.rounding.RoundingPolicy;
import org.apache.arrow.memory.util.AssertionUtil;
import org.apache.arrow.memory.util.CommonUtil;
import org.apache.arrow.memory.util.HistoricalLog;
import org.apache.arrow.util.Preconditions;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.KeyFor;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.immutables.value.Value;

/**
 * A base-class that implements all functionality of {@linkplain BufferAllocator}s.
 *
 * <p>The class is abstract to enforce usage of {@linkplain RootAllocator}/{@linkplain
 * ChildAllocator} facades.
 */
abstract class BaseAllocator extends Accountant implements BufferAllocator {

  public static final String DEBUG_ALLOCATOR = "arrow.memory.debug.allocator";
  public static final int DEBUG_LOG_LENGTH = 6;
  public static final boolean DEBUG;
  private static final org.slf4j.Logger logger =
      org.slf4j.LoggerFactory.getLogger(BaseAllocator.class);

  // Initialize this before DEFAULT_CONFIG as DEFAULT_CONFIG will eventually initialize the
  // allocation manager,
  // which in turn allocates an ArrowBuf, which requires DEBUG to have been properly initialized
  static {
    // the system property takes precedence.
    String propValue = System.getProperty(DEBUG_ALLOCATOR);
    if (propValue != null) {
      DEBUG = Boolean.parseBoolean(propValue);
    } else {
      DEBUG = false;
    }
    logger.info(
        "Debug mode "
            + (DEBUG
                ? "enabled."
                : "disabled. Enable with the VM option -Darrow.memory.debug.allocator=true."));
  }

  public static final Config DEFAULT_CONFIG = ImmutableConfig.builder().build();

  // Package exposed for sharing between AllocatorManger and BaseAllocator objects
  private final String name;
  private final RootAllocator root;
  private final Object DEBUG_LOCK = new Object();
  private final AllocationListener listener;
  private final @Nullable BaseAllocator parentAllocator;
  private final Map<BaseAllocator, Object> childAllocators;
  private final ArrowBuf empty;
  // members used purely for debugging
  private final @Nullable IdentityHashMap<BufferLedger, @Nullable Object> childLedgers;
  private final @Nullable IdentityHashMap<Reservation, Object> reservations;
  private final @Nullable HistoricalLog historicalLog;
  private final RoundingPolicy roundingPolicy;
  private final AllocationManager.@NonNull Factory allocationManagerFactory;

  private volatile boolean isClosed = false; // the allocator has been closed

  /**
   * Initialize an allocator.
   *
   * @param parentAllocator parent allocator. null if defining a root allocator
   * @param name name of this allocator
   * @param config configuration including other options of this allocator
   * @see Config
   */
  @SuppressWarnings({"nullness:method.invocation", "nullness:cast.unsafe"})
  // {"call to hist(,...) not allowed on the given receiver.", "cast cannot be statically verified"}
  protected BaseAllocator(
      final @Nullable BaseAllocator parentAllocator, final String name, final Config config)
      throws OutOfMemoryException {
    super(parentAllocator, name, config.getInitReservation(), config.getMaxAllocation());

    this.listener = config.getListener();
    this.allocationManagerFactory = config.getAllocationManagerFactory();

    if (parentAllocator != null) {
      this.root = parentAllocator.root;
      empty = parentAllocator.empty;
    } else if (this instanceof RootAllocator) {
      this.root = (@Initialized RootAllocator) this;
      empty = createEmpty();
    } else {
      throw new IllegalStateException(
          "An parent allocator must either carry a root or be the " + "root.");
    }

    this.parentAllocator = parentAllocator;
    this.name = name;

    this.childAllocators = Collections.synchronizedMap(new IdentityHashMap<>());

    if (DEBUG) {
      reservations = new IdentityHashMap<>();
      childLedgers = new IdentityHashMap<>();
      historicalLog = new HistoricalLog(DEBUG_LOG_LENGTH, "allocator[%s]", name);
      hist("created by \"%s\", owned = %d", name, this.getAllocatedMemory());
    } else {
      reservations = null;
      historicalLog = null;
      childLedgers = null;
    }
    this.roundingPolicy = config.getRoundingPolicy();
  }

  @Override
  public AllocationListener getListener() {
    return listener;
  }

  @Override
  public @Nullable BaseAllocator getParentAllocator() {
    return parentAllocator;
  }

  @Override
  public Collection<BufferAllocator> getChildAllocators() {
    synchronized (childAllocators) {
      return new HashSet<>(childAllocators.keySet());
    }
  }

  private static String createErrorMsg(
      final BufferAllocator allocator, final long rounded, final long requested) {
    if (rounded != requested) {
      return String.format(
          "Unable to allocate buffer of size %d (rounded from %d) due to memory limit. Current "
              + "allocation: %d",
          rounded, requested, allocator.getAllocatedMemory());
    } else {
      return String.format(
          "Unable to allocate buffer of size %d due to memory limit. Current " + "allocation: %d",
          rounded, allocator.getAllocatedMemory());
    }
  }

  public static boolean isDebug() {
    return DEBUG;
  }

  @Override
  public void assertOpen() {
    if (AssertionUtil.ASSERT_ENABLED) {
      if (isClosed) {
        throw new IllegalStateException(
            "Attempting operation on allocator when allocator is closed.\n" + toVerboseString());
      }
    }
  }

  @Override
  public String getName() {
    return name;
  }

  @Override
  public ArrowBuf getEmpty() {
    return empty;
  }

  /**
   * For debug/verification purposes only. Allows an AllocationManager to tell the allocator that we
   * have a new ledger associated with this allocator.
   */
  void associateLedger(BufferLedger ledger) {
    assertOpen();
    if (DEBUG) {
      synchronized (DEBUG_LOCK) {
        if (childLedgers != null) {
          childLedgers.put(ledger, null);
        }
      }
    }
  }

  /**
   * For debug/verification purposes only. Allows an AllocationManager to tell the allocator that we
   * are removing a ledger associated with this allocator
   */
  void dissociateLedger(BufferLedger ledger) {
    assertOpen();
    if (DEBUG) {
      synchronized (DEBUG_LOCK) {
        Preconditions.checkState(childLedgers != null, "childLedgers must not be null");
        if (!childLedgers.containsKey(ledger)) {
          throw new IllegalStateException("Trying to remove a child ledger that doesn't exist.");
        }
        childLedgers.remove(ledger);
      }
    }
  }

  /**
   * Track when a ChildAllocator of this BaseAllocator is closed. Used for debugging purposes.
   *
   * @param childAllocator The child allocator that has been closed.
   */
  private void childClosed(final BaseAllocator childAllocator) {
    assertOpen();

    if (DEBUG) {
      Preconditions.checkArgument(childAllocator != null, "child allocator can't be null");

      synchronized (DEBUG_LOCK) {
        final Object object = childAllocators.remove(childAllocator);
        if (object == null) {
          if (childAllocator.historicalLog != null) {
            childAllocator.historicalLog.logHistory(logger);
          }
          throw new IllegalStateException(
              "Child allocator["
                  + childAllocator.name
                  + "] not found in parent allocator["
                  + name
                  + "]'s childAllocators");
        }
      }
    } else {
      childAllocators.remove(childAllocator);
    }
    listener.onChildRemoved(this, childAllocator);
  }

  @Override
  public ArrowBuf wrapForeignAllocation(ForeignAllocation allocation) {
    assertOpen();
    final long size = allocation.getSize();
    listener.onPreAllocation(size);
    AllocationOutcome outcome = this.allocateBytes(size);
    if (!outcome.isOk()) {
      if (listener.onFailedAllocation(size, outcome)) {
        // Second try, in case the listener can do something about it
        outcome = this.allocateBytes(size);
      }
      if (!outcome.isOk()) {
        throw new OutOfMemoryException(createErrorMsg(this, size, size), outcome.getDetails());
      }
    }
    try {
      final AllocationManager manager = new ForeignAllocationManager(this, allocation);
      final BufferLedger ledger = manager.associate(this);
      final ArrowBuf buf =
          new ArrowBuf(ledger, /*bufferManager=*/ null, size, allocation.memoryAddress());
      buf.writerIndex(size);
      listener.onAllocation(size);
      return buf;
    } catch (Throwable t) {
      try {
        releaseBytes(size);
      } catch (Throwable e) {
        t.addSuppressed(e);
      }
      try {
        allocation.release0();
      } catch (Throwable e) {
        t.addSuppressed(e);
      }
      throw t;
    }
  }

  @Override
  public ArrowBuf buffer(final long initialRequestSize) {
    assertOpen();

    return buffer(initialRequestSize, null);
  }

  @SuppressWarnings("nullness:dereference.of.nullable") // dereference of possibly-null reference
  // allocationManagerFactory
  private ArrowBuf createEmpty() {
    return allocationManagerFactory.empty();
  }

  @Override
  public ArrowBuf buffer(final long initialRequestSize, @Nullable BufferManager manager) {
    assertOpen();

    Preconditions.checkArgument(initialRequestSize >= 0, "the requested size must be non-negative");

    if (initialRequestSize == 0) {
      return getEmpty();
    }

    // round the request size according to the rounding policy
    final long actualRequestSize = roundingPolicy.getRoundedSize(initialRequestSize);

    listener.onPreAllocation(actualRequestSize);

    AllocationOutcome outcome = this.allocateBytes(actualRequestSize);
    if (!outcome.isOk()) {
      if (listener.onFailedAllocation(actualRequestSize, outcome)) {
        // Second try, in case the listener can do something about it
        outcome = this.allocateBytes(actualRequestSize);
      }
      if (!outcome.isOk()) {
        throw new OutOfMemoryException(
            createErrorMsg(this, actualRequestSize, initialRequestSize), outcome.getDetails());
      }
    }

    boolean success = false;
    try {
      ArrowBuf buffer = bufferWithoutReservation(actualRequestSize, manager);
      success = true;
      listener.onAllocation(actualRequestSize);
      return buffer;
    } catch (OutOfMemoryError e) {
      throw e;
    } finally {
      if (!success) {
        releaseBytes(actualRequestSize);
      }
    }
  }

  /**
   * Used by usual allocation as well as for allocating a pre-reserved buffer. Skips the typical
   * accounting associated with creating a new buffer.
   */
  private ArrowBuf bufferWithoutReservation(final long size, @Nullable BufferManager bufferManager)
      throws OutOfMemoryException {
    assertOpen();

    final AllocationManager manager = newAllocationManager(size);
    final BufferLedger ledger = manager.associate(this); // +1 ref cnt (required)
    final ArrowBuf buffer = ledger.newArrowBuf(size, bufferManager);

    // make sure that our allocation is equal to what we expected.
    Preconditions.checkArgument(
        buffer.capacity() == size,
        "Allocated capacity %d was not equal to requested capacity %d.",
        buffer.capacity(),
        size);

    return buffer;
  }

  private AllocationManager newAllocationManager(long size) {
    return newAllocationManager(this, size);
  }

  private AllocationManager newAllocationManager(BaseAllocator accountingAllocator, long size) {
    return allocationManagerFactory.create(accountingAllocator, size);
  }

  @Override
  public BufferAllocator getRoot() {
    return root;
  }

  @Override
  public BufferAllocator newChildAllocator(
      final String name, final long initReservation, final long maxAllocation) {
    return newChildAllocator(name, this.listener, initReservation, maxAllocation);
  }

  @Override
  public BufferAllocator newChildAllocator(
      final String name,
      final AllocationListener listener,
      final long initReservation,
      final long maxAllocation) {
    assertOpen();

    final ChildAllocator childAllocator =
        new ChildAllocator(
            this,
            name,
            configBuilder()
                .listener(listener)
                .initReservation(initReservation)
                .maxAllocation(maxAllocation)
                .roundingPolicy(roundingPolicy)
                .allocationManagerFactory(allocationManagerFactory)
                .build());

    if (DEBUG) {
      synchronized (DEBUG_LOCK) {
        childAllocators.put(childAllocator, childAllocator);
        if (historicalLog != null) {
          historicalLog.recordEvent(
              "allocator[%s] created new child allocator[%s]", name, childAllocator.getName());
        }
      }
    } else {
      childAllocators.put(childAllocator, childAllocator);
    }
    this.listener.onChildAdded(this, childAllocator);

    return childAllocator;
  }

  @Override
  public AllocationReservation newReservation() {
    assertOpen();

    return new Reservation();
  }

  @Override
  public synchronized void close() {
    /*
     * Some owners may close more than once because of complex cleanup and shutdown
     * procedures.
     */
    if (isClosed) {
      return;
    }

    isClosed = true;

    StringBuilder outstandingChildAllocators = new StringBuilder();
    if (DEBUG) {
      synchronized (DEBUG_LOCK) {
        verifyAllocator();

        // are there outstanding child allocators?
        if (!childAllocators.isEmpty()) {
          for (final BaseAllocator childAllocator : childAllocators.keySet()) {
            if (childAllocator.isClosed) {
              logger.warn(
                  String.format(
                      "Closed child allocator[%s] on parent allocator[%s]'s child list.\n%s",
                      childAllocator.name, name, toString()));
            }
          }

          throw new IllegalStateException(
              String.format(
                  "Allocator[%s] closed with outstanding child allocators.\n%s", name, toString()));
        }

        // are there outstanding buffers?
        final int allocatedCount = childLedgers != null ? childLedgers.size() : 0;
        if (allocatedCount > 0) {
          throw new IllegalStateException(
              String.format(
                  "Allocator[%s] closed with outstanding buffers allocated (%d).\n%s",
                  name, allocatedCount, toString()));
        }

        if (reservations != null && reservations.size() != 0) {
          throw new IllegalStateException(
              String.format(
                  "Allocator[%s] closed with outstanding reservations (%d).\n%s",
                  name, reservations.size(), toString()));
        }
      }
    } else {
      if (!childAllocators.isEmpty()) {
        outstandingChildAllocators.append("Outstanding child allocators : \n");
        synchronized (childAllocators) {
          for (final BaseAllocator childAllocator : childAllocators.keySet()) {
            outstandingChildAllocators.append(String.format("  %s", childAllocator.toString()));
          }
        }
      }
    }

    // Is there unaccounted-for outstanding allocation?
    final long allocated = getAllocatedMemory();
    if (allocated > 0) {
      if (parent != null && reservation > allocated) {
        parent.releaseBytes(reservation - allocated);
      }
      String msg =
          String.format(
              "Memory was leaked by query. Memory leaked: (%d)\n%s%s",
              allocated, outstandingChildAllocators.toString(), toString());
      logger.error(msg);
      throw new IllegalStateException(msg);
    }

    // we need to release our memory to our parent before we tell it we've closed.
    super.close();

    // Inform our parent allocator that we've closed
    if (parentAllocator != null) {
      parentAllocator.childClosed(this);
    }

    if (DEBUG) {
      if (historicalLog != null) {
        historicalLog.recordEvent("closed");
      }
      logger.debug(String.format("closed allocator[%s].", name));
    }
  }

  @Override
  public String toString() {
    final Verbosity verbosity =
        logger.isTraceEnabled() ? Verbosity.LOG_WITH_STACKTRACE : Verbosity.BASIC;
    final StringBuilder sb = new StringBuilder();
    print(sb, 0, verbosity);
    return sb.toString();
  }

  /**
   * Provide a verbose string of the current allocator state. Includes the state of all child
   * allocators, along with historical logs of each object and including stacktraces.
   *
   * @return A Verbose string of current allocator state.
   */
  @Override
  public String toVerboseString() {
    final StringBuilder sb = new StringBuilder();
    print(sb, 0, Verbosity.LOG_WITH_STACKTRACE);
    return sb.toString();
  }

  @FormatMethod
  private void hist(@FormatString String noteFormat, Object... args) {
    if (historicalLog != null) {
      historicalLog.recordEvent(noteFormat, args);
    }
  }

  /**
   * Verifies the accounting state of the allocator. Only works for DEBUG.
   *
   * @throws IllegalStateException when any problems are found
   */
  void verifyAllocator() {
    final IdentityHashMap<AllocationManager, BaseAllocator> seen = new IdentityHashMap<>();
    verifyAllocator(seen);
  }

  /**
   * Verifies the accounting state of the allocator (Only works for DEBUG) This overload is used for
   * recursive calls, allowing for checking that ArrowBufs are unique across all allocators that are
   * checked.
   *
   * @param buffersSeen a map of buffers that have already been seen when walking a tree of
   *     allocators
   * @throws IllegalStateException when any problems are found
   */
  private void verifyAllocator(
      final IdentityHashMap<AllocationManager, BaseAllocator> buffersSeen) {
    // The remaining tests can only be performed if we're in debug mode.
    if (!DEBUG) {
      return;
    }

    synchronized (DEBUG_LOCK) {
      final long allocated = getAllocatedMemory();

      // verify my direct descendants
      final Set<BaseAllocator> childSet = childAllocators.keySet();
      for (final BaseAllocator childAllocator : childSet) {
        childAllocator.verifyAllocator(buffersSeen);
      }

      /*
       * Verify my relationships with my descendants.
       *
       * The sum of direct child allocators' owned memory must be <= my allocated memory; my
       * allocated memory also
       * includes ArrowBuf's directly allocated by me.
       */
      long childTotal = 0;
      for (final BaseAllocator childAllocator : childSet) {
        childTotal += Math.max(childAllocator.getAllocatedMemory(), childAllocator.reservation);
      }
      if (childTotal > getAllocatedMemory()) {
        if (historicalLog != null) {
          historicalLog.logHistory(logger);
        }
        logger.debug("allocator[" + name + "] child event logs BEGIN");
        for (final BaseAllocator childAllocator : childSet) {
          if (childAllocator.historicalLog != null) {
            childAllocator.historicalLog.logHistory(logger);
          }
        }
        logger.debug("allocator[" + name + "] child event logs END");
        throw new IllegalStateException(
            "Child allocators own more memory ("
                + childTotal
                + ") than their parent (name = "
                + name
                + " ) has allocated ("
                + getAllocatedMemory()
                + ')');
      }

      // Furthermore, the amount I've allocated should be that plus buffers I've allocated.
      long bufferTotal = 0;

      final Set<@KeyFor("this.childLedgers") BufferLedger> ledgerSet =
          childLedgers != null ? childLedgers.keySet() : null;
      if (ledgerSet != null) {
        for (final BufferLedger ledger : ledgerSet) {
          if (!ledger.isOwningLedger()) {
            continue;
          }

          final AllocationManager am = ledger.getAllocationManager();
          /*
           * Even when shared, ArrowBufs are rewrapped, so we should never see the same instance
           * twice.
           */
          final BaseAllocator otherOwner = buffersSeen.get(am);
          if (otherOwner != null) {
            throw new IllegalStateException(
                "This allocator's ArrowBuf already owned by another " + "allocator");
          }
          buffersSeen.put(am, this);

          bufferTotal += am.getSize();
        }
      }

      // Preallocated space has to be accounted for
      final Set<@KeyFor("this.reservations") Reservation> reservationSet =
          reservations != null ? reservations.keySet() : null;
      long reservedTotal = 0;
      if (reservationSet != null) {
        for (final Reservation reservation : reservationSet) {
          if (!reservation.isUsed()) {
            reservedTotal += reservation.getSize();
          }
        }
      }

      if (bufferTotal + reservedTotal + childTotal != getAllocatedMemory()) {
        final StringBuilder sb = new StringBuilder();
        sb.append("allocator[");
        sb.append(name);
        sb.append("]\nallocated: ");
        sb.append(Long.toString(allocated));
        sb.append(" allocated - (bufferTotal + reservedTotal + childTotal): ");
        sb.append(Long.toString(allocated - (bufferTotal + reservedTotal + childTotal)));
        sb.append('\n');

        if (bufferTotal != 0) {
          sb.append("buffer total: ");
          sb.append(Long.toString(bufferTotal));
          sb.append('\n');
          dumpBuffers(sb, ledgerSet);
        }

        if (childTotal != 0) {
          sb.append("child total: ");
          sb.append(Long.toString(childTotal));
          sb.append('\n');

          for (final BaseAllocator childAllocator : childSet) {
            sb.append("child allocator[");
            sb.append(childAllocator.name);
            sb.append("] owned ");
            sb.append(Long.toString(childAllocator.getAllocatedMemory()));
            sb.append('\n');
          }
        }

        if (reservedTotal != 0) {
          sb.append(String.format("reserved total : %d bytes.", reservedTotal));
          if (reservationSet != null) {
            for (final Reservation reservation : reservationSet) {
              if (reservation.historicalLog != null) {
                reservation.historicalLog.buildHistory(sb, 0, true);
              }
              sb.append('\n');
            }
          }
        }

        logger.debug(sb.toString());

        final long allocated2 = getAllocatedMemory();

        if (allocated2 != allocated) {
          throw new IllegalStateException(
              String.format(
                  "allocator[%s]: allocated t1 (%d) + allocated t2 (%d). Someone released memory while in verification.",
                  name, allocated, allocated2));
        }
        throw new IllegalStateException(
            String.format(
                "allocator[%s]: buffer space (%d) + prealloc space (%d) + child space (%d) != allocated (%d)",
                name, bufferTotal, reservedTotal, childTotal, allocated));
      }
    }
  }

  void print(StringBuilder sb, int level, Verbosity verbosity) {

    CommonUtil.indent(sb, level)
        .append("Allocator(")
        .append(name)
        .append(") ")
        .append(reservation)
        .append('/')
        .append(getAllocatedMemory())
        .append('/')
        .append(getPeakMemoryAllocation())
        .append('/')
        .append(getLimit())
        .append(" (res/actual/peak/limit)")
        .append('\n');

    if (DEBUG) {
      CommonUtil.indent(sb, level + 1)
          .append(String.format("child allocators: %d\n", childAllocators.size()));
      for (BaseAllocator child : childAllocators.keySet()) {
        child.print(sb, level + 2, verbosity);
      }

      CommonUtil.indent(sb, level + 1)
          .append(String.format("ledgers: %d\n", childLedgers != null ? childLedgers.size() : 0));
      if (childLedgers != null) {
        for (BufferLedger ledger : childLedgers.keySet()) {
          ledger.print(sb, level + 2, verbosity);
        }
      }

      final Set<@KeyFor("this.reservations") Reservation> reservations =
          this.reservations != null ? this.reservations.keySet() : null;
      CommonUtil.indent(sb, level + 1)
          .append(
              String.format("reservations: %d\n", reservations != null ? reservations.size() : 0));
      if (reservations != null) {
        for (final Reservation reservation : reservations) {
          if (verbosity.includeHistoricalLog) {
            if (reservation.historicalLog != null) {
              reservation.historicalLog.buildHistory(sb, level + 3, true);
            }
          }
        }
      }
    }
  }

  private void dumpBuffers(
      final StringBuilder sb,
      final @Nullable Set<@KeyFor("this.childLedgers") BufferLedger> ledgerSet) {
    if (ledgerSet != null) {
      for (final BufferLedger ledger : ledgerSet) {
        if (!ledger.isOwningLedger()) {
          continue;
        }
        final AllocationManager am = ledger.getAllocationManager();
        sb.append("UnsafeDirectLittleEndian[identityHashCode == ");
        sb.append(Integer.toString(System.identityHashCode(am)));
        sb.append("] size ");
        sb.append(Long.toString(am.getSize()));
        sb.append('\n');
      }
    }
  }

  /** Enum for logging verbosity. */
  public enum Verbosity {
    BASIC(false, false), // only include basic information
    LOG(true, false), // include basic
    LOG_WITH_STACKTRACE(true, true) //
  ;

    public final boolean includeHistoricalLog;
    public final boolean includeStackTraces;

    Verbosity(boolean includeHistoricalLog, boolean includeStackTraces) {
      this.includeHistoricalLog = includeHistoricalLog;
      this.includeStackTraces = includeStackTraces;
    }
  }

  /**
   * Returns a default {@link Config} instance.
   *
   * @see ImmutableConfig.Builder
   */
  public static Config defaultConfig() {
    return DEFAULT_CONFIG;
  }

  /** Returns a builder class for configuring BaseAllocator's options. */
  public static ImmutableConfig.Builder configBuilder() {
    return ImmutableConfig.builder();
  }

  @Override
  public RoundingPolicy getRoundingPolicy() {
    return roundingPolicy;
  }

  /** Config class of {@link BaseAllocator}. */
  @Value.Immutable
  abstract static class Config {
    /** Factory for creating {@link AllocationManager} instances. */
    @Value.Default
    AllocationManager.Factory getAllocationManagerFactory() {
      return DefaultAllocationManagerOption.getDefaultAllocationManagerFactory();
    }

    /** Listener callback. Must be non-null. */
    @Value.Default
    AllocationListener getListener() {
      return AllocationListener.NOOP;
    }

    /** Initial reservation size (in bytes) for this allocator. */
    @Value.Default
    long getInitReservation() {
      return 0;
    }

    /**
     * Max allocation size (in bytes) for this allocator, allocations past this limit fail. Can be
     * modified after construction.
     */
    @Value.Default
    long getMaxAllocation() {
      return Long.MAX_VALUE;
    }

    /** The policy for rounding the buffer size. */
    @Value.Default
    RoundingPolicy getRoundingPolicy() {
      return DefaultRoundingPolicy.DEFAULT_ROUNDING_POLICY;
    }
  }

  /**
   * Implementation of {@link AllocationReservation} that supports history tracking under
   * {@linkplain #DEBUG} is true.
   */
  public class Reservation implements AllocationReservation {

    private final @Nullable HistoricalLog historicalLog;
    private int nBytes = 0;
    private boolean used = false;
    private boolean closed = false;

    /**
     * Creates a new reservation.
     *
     * <p>If {@linkplain #DEBUG} is true this will capture a historical log of events relevant to
     * this Reservation.
     */
    @SuppressWarnings("nullness:argument") // to handle null assignment on third party dependency:
    // System.identityHashCode
    public Reservation() {
      if (DEBUG) {
        historicalLog =
            new HistoricalLog(
                "Reservation[allocator[%s], %d]", name, System.identityHashCode(this));
        historicalLog.recordEvent("created");
        synchronized (DEBUG_LOCK) {
          if (reservations != null) {
            reservations.put(this, this);
          }
        }
      } else {
        historicalLog = null;
      }
    }

    @Override
    public boolean add(final int nBytes) {
      assertOpen();

      Preconditions.checkArgument(nBytes >= 0, "nBytes(%d) < 0", nBytes);
      Preconditions.checkState(
          !closed, "Attempt to increase reservation after reservation has been closed");
      Preconditions.checkState(
          !used, "Attempt to increase reservation after reservation has been used");

      // we round up to next power of two since all reservations are done in powers of two. This
      // may overestimate the
      // preallocation since someone may perceive additions to be power of two. If this becomes a
      // problem, we can look
      // at
      // modifying this behavior so that we maintain what we reserve and what the user asked for
      // and make sure to only
      // round to power of two as necessary.
      final int nBytesTwo = CommonUtil.nextPowerOfTwo(nBytes);
      if (!reserve(nBytesTwo)) {
        return false;
      }

      this.nBytes += nBytesTwo;
      return true;
    }

    @Override
    public ArrowBuf allocateBuffer() {
      assertOpen();

      Preconditions.checkState(!closed, "Attempt to allocate after closed");
      Preconditions.checkState(!used, "Attempt to allocate more than once");

      final ArrowBuf arrowBuf = allocate(nBytes);
      used = true;
      return arrowBuf;
    }

    @Override
    public int getSize() {
      return nBytes;
    }

    @Override
    public boolean isUsed() {
      return used;
    }

    @Override
    public boolean isClosed() {
      return closed;
    }

    @Override
    public void close() {
      assertOpen();

      if (closed) {
        return;
      }

      if (DEBUG) {
        if (!isClosed()) {
          final Object object;
          synchronized (DEBUG_LOCK) {
            object = reservations != null ? reservations.remove(this) : null;
          }
          if (object == null) {
            final StringBuilder sb = new StringBuilder();
            print(sb, 0, Verbosity.LOG_WITH_STACKTRACE);
            logger.debug(sb.toString());
            throw new IllegalStateException(
                String.format(
                    "Didn't find closing reservation[%d]", System.identityHashCode(this)));
          }

          if (historicalLog != null) {
            historicalLog.recordEvent("closed");
          }
        }
      }

      if (!used) {
        releaseReservation(nBytes);
      }

      closed = true;
    }

    @Override
    public boolean reserve(int nBytes) {
      assertOpen();

      final AllocationOutcome outcome = BaseAllocator.this.allocateBytes(nBytes);

      if (historicalLog != null) {
        historicalLog.recordEvent("reserve(%d) => %s", nBytes, Boolean.toString(outcome.isOk()));
      }

      return outcome.isOk();
    }

    /**
     * Allocate a buffer of the requested size.
     *
     * <p>The implementation of the allocator's inner class provides this.
     *
     * @param nBytes the size of the buffer requested
     * @return the buffer, or null, if the request cannot be satisfied
     */
    private ArrowBuf allocate(int nBytes) {
      assertOpen();

      boolean success = false;

      /*
       * The reservation already added the requested bytes to the allocators owned and allocated
       * bytes via reserve().
       * This ensures that they can't go away. But when we ask for the buffer here, that will add
       * to the allocated bytes
       * as well, so we need to return the same number back to avoid double-counting them.
       */
      try {
        final ArrowBuf arrowBuf = BaseAllocator.this.bufferWithoutReservation(nBytes, null);

        listener.onAllocation(nBytes);
        if (historicalLog != null) {
          historicalLog.recordEvent(
              "allocate() => %s", String.format("ArrowBuf[%d]", arrowBuf.getId()));
        }
        success = true;
        return arrowBuf;
      } finally {
        if (!success) {
          releaseBytes(nBytes);
        }
      }
    }

    /**
     * Return the reservation back to the allocator without having used it.
     *
     * @param nBytes the size of the reservation
     */
    private void releaseReservation(int nBytes) {
      assertOpen();

      releaseBytes(nBytes);

      if (historicalLog != null) {
        historicalLog.recordEvent("releaseReservation(%d)", nBytes);
      }
    }
  }
}