BackpressureStrategy.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.arrow.flight;

import com.google.common.base.Preconditions;
import org.apache.arrow.vector.VectorSchemaRoot;

/**
 * Helper interface to dynamically handle backpressure when implementing FlightProducers. This must
 * only be used in FlightProducer implementations that are non-blocking.
 */
public interface BackpressureStrategy {
  /** The state of the client after a call to waitForListener. */
  enum WaitResult {
    /** Listener is ready. */
    READY,

    /** Listener was cancelled by the client. */
    CANCELLED,

    /** Timed out waiting for the listener to change state. */
    TIMEOUT,

    /** Indicates that the wait was interrupted for a reason unrelated to the listener itself. */
    OTHER
  }

  /**
   * Set up operations to work against the given listener.
   *
   * <p>This must be called exactly once and before any calls to {@link #waitForListener(long)} and
   * {@link OutboundStreamListener#start(VectorSchemaRoot)}
   *
   * @param listener The listener this strategy applies to.
   */
  void register(FlightProducer.ServerStreamListener listener);

  /**
   * Waits for the listener to be ready or cancelled up to the given timeout.
   *
   * @param timeout The timeout in milliseconds. Infinite if timeout is <= 0.
   * @return The result of the wait.
   */
  WaitResult waitForListener(long timeout);

  /**
   * A back pressure strategy that uses callbacks to notify when the client is ready or cancelled.
   */
  class CallbackBackpressureStrategy implements BackpressureStrategy {
    private final Object lock = new Object();
    private FlightProducer.ServerStreamListener listener;

    @Override
    public void register(FlightProducer.ServerStreamListener listener) {
      this.listener = listener;
      listener.setOnReadyHandler(this::onReady);
      listener.setOnCancelHandler(this::onCancel);
    }

    @Override
    public WaitResult waitForListener(long timeout) {
      Preconditions.checkNotNull(listener);
      long remainingTimeout = timeout;
      final long startTime = System.currentTimeMillis();
      synchronized (lock) {
        while (!listener.isReady() && !listener.isCancelled()) {
          try {
            lock.wait(remainingTimeout);
            if (timeout != 0) { // If timeout was zero explicitly, we should never report timeout.
              remainingTimeout = startTime + timeout - System.currentTimeMillis();
              if (remainingTimeout <= 0) {
                return WaitResult.TIMEOUT;
              }
            }
            if (!shouldContinueWaiting(listener, remainingTimeout)) {
              return WaitResult.OTHER;
            }
          } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            return WaitResult.OTHER;
          }
        }

        if (listener.isReady()) {
          return WaitResult.READY;
        } else if (listener.isCancelled()) {
          return WaitResult.CANCELLED;
        } else if (System.currentTimeMillis() > startTime + timeout) {
          return WaitResult.TIMEOUT;
        }
        throw new RuntimeException("Invalid state when waiting for listener.");
      }
    }

    /**
     * Interrupt waiting on the listener to change state.
     *
     * <p>This method can be used in conjunction with {@link
     * #shouldContinueWaiting(FlightProducer.ServerStreamListener, long)} to allow FlightProducers
     * to terminate streams internally and notify clients.
     */
    public void interruptWait() {
      synchronized (lock) {
        lock.notifyAll();
      }
    }

    /**
     * Callback function to run to check if the listener should continue to be waited on if it
     * leaves the waiting state without being cancelled, ready, or timed out.
     *
     * <p>This method should be used to determine if the wait on the listener was interrupted
     * explicitly using a call to {@link #interruptWait()} or if it was woken up due to a spurious
     * wake.
     */
    protected boolean shouldContinueWaiting(
        FlightProducer.ServerStreamListener listener, long remainingTimeout) {
      return true;
    }

    /** Callback to execute when the listener becomes ready. */
    protected void readyCallback() {}

    /** Callback to execute when the listener is cancelled. */
    protected void cancelCallback() {}

    private void onReady() {
      synchronized (lock) {
        readyCallback();
        lock.notifyAll();
      }
    }

    private void onCancel() {
      synchronized (lock) {
        cancelCallback();
        lock.notifyAll();
      }
    }
  }
}