AllocationManager.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.arrow.memory;

import org.apache.arrow.util.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
 * An AllocationManager is the implementation of a physical memory allocation.
 *
 * <p>Manages the relationship between the allocators and a particular memory allocation. Ensures
 * that one allocator owns the memory that multiple allocators may be referencing. Manages a
 * BufferLedger between each of its associated allocators. It does not track the reference count;
 * that is the role of {@link BufferLedger} (aka {@link ReferenceManager}).
 *
 * <p>This is a public interface implemented by concrete allocator implementations (e.g. Netty or
 * Unsafe).
 *
 * <p>Threading: AllocationManager manages thread-safety internally. Operations within the context
 * of a single BufferLedger are lockless in nature and can be leveraged by multiple threads.
 * Operations that cross the context of two ledgers will acquire a lock on the AllocationManager
 * instance. Important note, there is one AllocationManager per physical buffer allocation. As such,
 * there will be thousands of these in a typical query. The contention of acquiring a lock on
 * AllocationManager should be very low.
 */
public abstract class AllocationManager {
  // The RootAllocator we are associated with. An allocation can only ever be associated with a
  // single RootAllocator.
  private final BufferAllocator root;
  // An allocation can be tracked by multiple allocators. (This is because an allocator is more like
  // a ledger.)
  // All such allocators track reference counts individually, via BufferLedger instances. When an
  // individual
  // reference count reaches zero, the allocator will be dissociated from this allocation. If that
  // was via the
  // owningLedger, then no more allocators should be tracking this allocation, and the allocation
  // will be freed.
  // ARROW-1627: Trying to minimize memory overhead caused by previously used IdentityHashMap
  private final LowCostIdentityHashMap<BufferAllocator, BufferLedger> map =
      new LowCostIdentityHashMap<>();
  // The primary BufferLedger (i.e. reference count) tracking this allocation.
  // This is mostly a semantic constraint on the API user: if the reference count reaches 0 in the
  // owningLedger, then
  // there are not supposed to be any references through other allocators. In practice, this doesn't
  // do anything
  // as the implementation just forces ownership to be transferred to one of the other extant
  // references.
  private volatile @Nullable BufferLedger owningLedger;

  @SuppressWarnings(
      "nullness:method.invocation") // call to associate(a, b) not allowed on the given receiver
  protected AllocationManager(BufferAllocator accountingAllocator) {
    Preconditions.checkNotNull(accountingAllocator);
    accountingAllocator.assertOpen();

    this.root = accountingAllocator.getRoot();

    // we do a no retain association since our creator will want to retrieve the newly created
    // ledger and will create a reference count at that point
    this.owningLedger = associate(accountingAllocator, false);
  }

  @Nullable
  BufferLedger getOwningLedger() {
    return owningLedger;
  }

  void setOwningLedger(final BufferLedger ledger) {
    this.owningLedger = ledger;
  }

  /**
   * Associate the existing underlying buffer with a new allocator. This will increase the reference
   * count on the corresponding buffer ledger by 1.
   *
   * @param allocator The target allocator to associate this buffer with.
   * @return The reference manager (new or existing) that associates the underlying buffer to this
   *     new ledger.
   */
  BufferLedger associate(final BufferAllocator allocator) {
    return associate(allocator, true);
  }

  private BufferLedger associate(final BufferAllocator allocator, final boolean retain) {
    allocator.assertOpen();
    Preconditions.checkState(
        root == allocator.getRoot(),
        "A buffer can only be associated between two allocators that share the same root");

    synchronized (this) {
      BufferLedger ledger = map.get(allocator);
      if (ledger != null) {
        // We were already being tracked by the given allocator, just return it
        if (retain) {
          // bump the ref count for the ledger
          ledger.increment();
        }
        return ledger;
      }

      // We weren't previously being tracked by the given allocator; create a new ledger
      ledger = new BufferLedger(allocator, this);

      if (retain) {
        // the new reference manager will have a ref count of 1
        ledger.increment();
      }

      // store the mapping for <allocator, reference manager>
      BufferLedger oldLedger = map.put(ledger);
      Preconditions.checkState(
          oldLedger == null,
          "Detected inconsistent state: A reference manager already exists for this allocator");

      if (allocator instanceof BaseAllocator) {
        // needed for debugging only: keep a pointer to reference manager inside allocator
        // to dump state, verify allocator state etc
        ((BaseAllocator) allocator).associateLedger(ledger);
      }
      return ledger;
    }
  }

  /**
   * The way that a particular ReferenceManager (BufferLedger) communicates back to the
   * AllocationManager that it no longer needs to hold a reference to a particular piece of memory.
   * Reference manager needs to hold a lock to invoke this method It is called when the shared
   * refcount of all the ArrowBufs managed by the calling ReferenceManager drops to 0.
   */
  void release(final BufferLedger ledger) {
    final BufferAllocator allocator = ledger.getAllocator();
    allocator.assertOpen();

    // remove the <BaseAllocator, BufferLedger> mapping for the allocator
    // of calling BufferLedger
    Preconditions.checkState(
        map.containsKey(allocator), "Expecting a mapping for allocator and reference manager");
    final BufferLedger oldLedger = map.remove(allocator);
    Preconditions.checkState(
        oldLedger != null, "Expecting a mapping for allocator and reference manager");
    BufferAllocator oldAllocator = oldLedger.getAllocator();
    if (oldAllocator instanceof BaseAllocator) {
      // needed for debug only: tell the allocator that AllocationManager is removing a
      // reference manager associated with this particular allocator
      ((BaseAllocator) oldAllocator).dissociateLedger(oldLedger);
    }

    if (oldLedger == owningLedger) {
      // the release call was made by the owning reference manager
      if (map.isEmpty()) {
        // the only <allocator, reference manager> mapping was for the owner
        // which now has been removed, it implies we can safely destroy the
        // underlying memory chunk as it is no longer being referenced
        oldAllocator.releaseBytes(getSize());
        // free the memory chunk associated with the allocation manager
        release0();
        oldAllocator.getListener().onRelease(getSize());
        owningLedger = null;
      } else {
        // since the refcount dropped to 0 for the owning reference manager and allocation
        // manager will no longer keep a mapping for it, we need to change the owning
        // reference manager to whatever the next available <allocator, reference manager>
        // mapping exists.
        BufferLedger newOwningLedger = map.getNextValue();
        // we'll forcefully transfer the ownership and not worry about whether we
        // exceeded the limit since this consumer can't do anything with this.
        oldLedger.transferBalance(newOwningLedger);
      }
    } else {
      // the release call was made by a non-owning reference manager, so after remove there have
      // to be 1 or more <allocator, reference manager> mappings
      Preconditions.checkState(
          map.size() > 0,
          "The final removal of reference manager should be connected to owning reference manager");
    }
  }

  /**
   * Return the size of underlying chunk of memory managed by this Allocation Manager.
   *
   * <p>The underlying memory chunk managed can be different from the original requested size.
   *
   * @return size of underlying memory chunk
   */
  public abstract long getSize();

  /** Return the absolute memory address pointing to the fist byte of underlying memory chunk. */
  protected abstract long memoryAddress();

  /** Release the underlying memory chunk. */
  protected abstract void release0();

  /**
   * A factory interface for creating {@link AllocationManager}. One may extend this interface to
   * use a user-defined AllocationManager implementation.
   */
  public interface Factory {

    /**
     * Create an {@link AllocationManager}.
     *
     * @param accountingAllocator The allocator that are expected to be associated with newly
     *     created AllocationManager. Currently it is always equivalent to "this"
     * @param size Size (in bytes) of memory managed by the AllocationManager
     * @return The created AllocationManager used by this allocator
     */
    AllocationManager create(BufferAllocator accountingAllocator, long size);

    ArrowBuf empty();
  }
}