FlightInfo.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.flight;
import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
import com.google.protobuf.ByteString;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.ipc.message.IpcOption;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.validate.MetadataV4UnionChecker;
/** A POJO representation of a FlightInfo, metadata associated with a set of data records. */
public class FlightInfo {
private final Schema schema;
private final FlightDescriptor descriptor;
private final List<FlightEndpoint> endpoints;
private final long bytes;
private final long records;
private final boolean ordered;
private final IpcOption option;
private final byte[] appMetadata;
/**
* Constructs a new instance.
*
* @param schema The schema of the Flight
* @param descriptor An identifier for the Flight.
* @param endpoints A list of endpoints that have the flight available.
* @param bytes The number of bytes in the flight
* @param records The number of records in the flight.
*/
public FlightInfo(
Schema schema,
FlightDescriptor descriptor,
List<FlightEndpoint> endpoints,
long bytes,
long records) {
this(schema, descriptor, endpoints, bytes, records, /*ordered*/ false, IpcOption.DEFAULT);
}
/**
* Constructs a new instance.
*
* @param schema The schema of the Flight
* @param descriptor An identifier for the Flight.
* @param endpoints A list of endpoints that have the flight available.
* @param bytes The number of bytes in the flight
* @param records The number of records in the flight.
* @param option IPC write options.
*/
public FlightInfo(
Schema schema,
FlightDescriptor descriptor,
List<FlightEndpoint> endpoints,
long bytes,
long records,
IpcOption option) {
this(schema, descriptor, endpoints, bytes, records, /*ordered*/ false, option);
}
/**
* Constructs a new instance.
*
* @param schema The schema of the Flight
* @param descriptor An identifier for the Flight.
* @param endpoints A list of endpoints that have the flight available.
* @param bytes The number of bytes in the flight
* @param records The number of records in the flight.
* @param ordered Whether the endpoints in this flight are ordered.
* @param option IPC write options.
*/
public FlightInfo(
Schema schema,
FlightDescriptor descriptor,
List<FlightEndpoint> endpoints,
long bytes,
long records,
boolean ordered,
IpcOption option) {
this(schema, descriptor, endpoints, bytes, records, ordered, option, null);
}
/**
* Constructs a new instance.
*
* @param schema The schema of the Flight
* @param descriptor An identifier for the Flight.
* @param endpoints A list of endpoints that have the flight available.
* @param bytes The number of bytes in the flight
* @param records The number of records in the flight.
* @param ordered Whether the endpoints in this flight are ordered.
* @param option IPC write options.
* @param appMetadata Metadata to send along with the flight
*/
public FlightInfo(
Schema schema,
FlightDescriptor descriptor,
List<FlightEndpoint> endpoints,
long bytes,
long records,
boolean ordered,
IpcOption option,
byte[] appMetadata) {
Objects.requireNonNull(descriptor);
Objects.requireNonNull(endpoints);
if (schema != null) {
MetadataV4UnionChecker.checkForUnion(schema.getFields().iterator(), option.metadataVersion);
}
this.schema = schema;
this.descriptor = descriptor;
this.endpoints = endpoints;
this.bytes = bytes;
this.records = records;
this.ordered = ordered;
this.option = option;
this.appMetadata = appMetadata;
}
/** Constructs from the protocol buffer representation. */
FlightInfo(Flight.FlightInfo pbFlightInfo) throws URISyntaxException {
try {
final ByteBuffer schemaBuf = pbFlightInfo.getSchema().asReadOnlyByteBuffer();
schema =
pbFlightInfo.getSchema().size() > 0
? MessageSerializer.deserializeSchema(
new ReadChannel(Channels.newChannel(new ByteBufferBackedInputStream(schemaBuf))))
: null;
} catch (IOException e) {
throw new RuntimeException(e);
}
descriptor = new FlightDescriptor(pbFlightInfo.getFlightDescriptor());
endpoints = new ArrayList<>();
for (final Flight.FlightEndpoint endpoint : pbFlightInfo.getEndpointList()) {
endpoints.add(new FlightEndpoint(endpoint));
}
bytes = pbFlightInfo.getTotalBytes();
records = pbFlightInfo.getTotalRecords();
ordered = pbFlightInfo.getOrdered();
appMetadata =
(pbFlightInfo.getAppMetadata().size() == 0
? null
: pbFlightInfo.getAppMetadata().toByteArray());
option = IpcOption.DEFAULT;
}
public Optional<Schema> getSchemaOptional() {
return Optional.ofNullable(schema);
}
/**
* Returns the schema, or an empty schema if no schema is present.
*
* @deprecated Deprecated. Use {@link #getSchemaOptional()} instead.
*/
@Deprecated
public Schema getSchema() {
return schema != null ? schema : new Schema(Collections.emptyList());
}
public long getBytes() {
return bytes;
}
public long getRecords() {
return records;
}
public FlightDescriptor getDescriptor() {
return descriptor;
}
public List<FlightEndpoint> getEndpoints() {
return endpoints;
}
public boolean getOrdered() {
return ordered;
}
public byte[] getAppMetadata() {
return appMetadata;
}
/** Converts to the protocol buffer representation. */
Flight.FlightInfo toProtocol() {
Flight.FlightInfo.Builder builder =
Flight.FlightInfo.newBuilder()
.addAllEndpoint(
endpoints.stream().map(t -> t.toProtocol()).collect(Collectors.toList()))
.setFlightDescriptor(descriptor.toProtocol())
.setTotalBytes(FlightInfo.this.bytes)
.setTotalRecords(records)
.setOrdered(ordered);
if (schema != null) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
MessageSerializer.serialize(new WriteChannel(Channels.newChannel(baos)), schema, option);
builder.setSchema(ByteString.copyFrom(baos.toByteArray()));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
if (appMetadata != null) {
builder.setAppMetadata(ByteString.copyFrom(appMetadata));
}
return builder.build();
}
/**
* Get the serialized form of this protocol message.
*
* <p>Intended to help interoperability by allowing non-Flight services to still return Flight
* types.
*/
public ByteBuffer serialize() {
return ByteBuffer.wrap(toProtocol().toByteArray());
}
/**
* Parse the serialized form of this protocol message.
*
* <p>Intended to help interoperability by allowing Flight clients to obtain stream info from
* non-Flight services.
*
* @param serialized The serialized form of the FlightInfo, as returned by {@link #serialize()}.
* @return The deserialized FlightInfo.
* @throws IOException if the serialized form is invalid.
* @throws URISyntaxException if the serialized form contains an unsupported URI format.
*/
public static FlightInfo deserialize(ByteBuffer serialized)
throws IOException, URISyntaxException {
return new FlightInfo(Flight.FlightInfo.parseFrom(serialized));
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof FlightInfo)) {
return false;
}
FlightInfo that = (FlightInfo) o;
return bytes == that.bytes
&& records == that.records
&& schema.equals(that.schema)
&& descriptor.equals(that.descriptor)
&& endpoints.equals(that.endpoints)
&& ordered == that.ordered
&& Arrays.equals(appMetadata, that.appMetadata);
}
@Override
public int hashCode() {
return Objects.hash(
schema, descriptor, endpoints, bytes, records, ordered, Arrays.hashCode(appMetadata));
}
@Override
public String toString() {
return "FlightInfo{"
+ "schema="
+ schema
+ ", descriptor="
+ descriptor
+ ", endpoints="
+ endpoints
+ ", bytes="
+ bytes
+ ", records="
+ records
+ ", ordered="
+ ordered
+ ", appMetadata="
+ (appMetadata == null ? "(none)" : Base64.getEncoder().encodeToString(appMetadata))
+ '}';
}
/**
* Create a builder for FlightInfo.
*
* @param schema The schema of the Flight
* @param descriptor An identifier for the Flight.
* @param endpoints A list of endpoints that have the flight available.
*/
public static Builder builder(
Schema schema, FlightDescriptor descriptor, List<FlightEndpoint> endpoints) {
return new Builder(schema, descriptor, endpoints);
}
/** Builder for FlightInfo. */
public static final class Builder {
private final Schema schema;
private final FlightDescriptor descriptor;
private final List<FlightEndpoint> endpoints;
private long bytes = -1;
private long records = -1;
private boolean ordered = false;
private IpcOption option = IpcOption.DEFAULT;
private byte[] appMetadata = null;
private Builder(Schema schema, FlightDescriptor descriptor, List<FlightEndpoint> endpoints) {
this.schema = schema;
this.descriptor = descriptor;
this.endpoints = endpoints;
}
/**
* Set the number of bytes for the flight. Default to -1 for unknown.
*
* @param bytes The number of bytes in the flight
*/
public Builder setBytes(long bytes) {
this.bytes = bytes;
return this;
}
/**
* Set the number of records for the flight. Default to -1 for unknown.
*
* @param records The number of records in the flight.
*/
public Builder setRecords(long records) {
this.records = records;
return this;
}
/**
* Set whether the flight endpoints are ordered. Default is false.
*
* @param ordered Whether the endpoints in this flight are ordered.
*/
public Builder setOrdered(boolean ordered) {
this.ordered = ordered;
return this;
}
/**
* Set IPC write options. Default is IpcOption.DEFAULT
*
* @param option IPC write options.
*/
public Builder setOption(IpcOption option) {
this.option = option;
return this;
}
/**
* Set the app metadata to send along with the flight. Default is null.
*
* @param appMetadata Metadata to send along with the flight
*/
public Builder setAppMetadata(byte[] appMetadata) {
this.appMetadata = appMetadata;
return this;
}
/** Build FlightInfo object. */
public FlightInfo build() {
return new FlightInfo(
schema, descriptor, endpoints, bytes, records, ordered, option, appMetadata);
}
}
}