MessageChannelReader.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.arrow.vector.ipc.message;

import java.io.IOException;
import org.apache.arrow.flatbuf.Message;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.ipc.ReadChannel;

/** Reads a sequence of messages using a ReadChannel. */
public class MessageChannelReader implements AutoCloseable {
  protected ReadChannel in;
  protected BufferAllocator allocator;

  /**
   * Construct a MessageReader to read streaming messages from an existing ReadChannel.
   *
   * @param in Channel to read messages from
   * @param allocator BufferAllocator used to read Message body into an ArrowBuf.
   */
  public MessageChannelReader(ReadChannel in, BufferAllocator allocator) {
    this.in = in;
    this.allocator = allocator;
  }

  /**
   * Read a message from the ReadChannel and return a MessageResult containing the Message metadata
   * and optional message body data. Once the end-of-stream has been reached, a null value will be
   * returned. If the message has no body, then MessageResult.getBodyBuffer() returns null.
   *
   * @return MessageResult or null if reached end-of-stream
   * @throws IOException on error
   */
  public MessageResult readNext() throws IOException {

    // Read the flatbuf message and check for end-of-stream
    MessageMetadataResult result = MessageSerializer.readMessage(in);
    if (result == null) {
      return null;
    }
    Message message = result.getMessage();
    ArrowBuf bodyBuffer = null;

    // Read message body data if defined in message
    if (result.messageHasBody()) {
      long bodyLength = result.getMessageBodyLength();
      bodyBuffer = MessageSerializer.readMessageBody(in, bodyLength, allocator);
    }

    return new MessageResult(message, bodyBuffer);
  }

  /**
   * Get the number of bytes read from the ReadChannel.
   *
   * @return number of bytes
   */
  public long bytesRead() {
    return in.bytesRead();
  }

  /**
   * Close the ReadChannel.
   *
   * @throws IOException on error
   */
  @Override
  public void close() throws IOException {
    in.close();
  }
}