FlightEndpoint.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.flight;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.arrow.flight.impl.Flight;
/** POJO to convert to/from the underlying protobuf FlightEndpoint. */
public class FlightEndpoint {
private final List<Location> locations;
private final Ticket ticket;
private final Instant expirationTime;
private final byte[] appMetadata;
/**
* Constructs a new endpoint with no expiration time.
*
* @param ticket A ticket that describe the key of a data stream.
* @param locations The possible locations the stream can be retrieved from.
*/
public FlightEndpoint(Ticket ticket, Location... locations) {
this(ticket, /*expirationTime*/ null, locations);
}
/**
* Constructs a new endpoint with an expiration time.
*
* @param ticket A ticket that describe the key of a data stream.
* @param expirationTime (optional) When this endpoint expires.
* @param locations The possible locations the stream can be retrieved from.
*/
public FlightEndpoint(Ticket ticket, Instant expirationTime, Location... locations) {
this(
ticket,
expirationTime,
null,
Collections.unmodifiableList(new ArrayList<>(Arrays.asList(locations))));
}
/** Private constructor with all parameters. Should only be called by Builder. */
private FlightEndpoint(
Ticket ticket, Instant expirationTime, byte[] appMetadata, List<Location> locations) {
Objects.requireNonNull(ticket);
this.locations = locations;
this.expirationTime = expirationTime;
this.ticket = ticket;
this.appMetadata = appMetadata;
}
/** Constructs from the protocol buffer representation. */
FlightEndpoint(Flight.FlightEndpoint flt) throws URISyntaxException {
this.locations = new ArrayList<>();
for (final Flight.Location location : flt.getLocationList()) {
this.locations.add(new Location(location.getUri()));
}
if (flt.hasExpirationTime()) {
this.expirationTime =
Instant.ofEpochSecond(
flt.getExpirationTime().getSeconds(), Timestamps.toNanos(flt.getExpirationTime()));
} else {
this.expirationTime = null;
}
this.appMetadata = (flt.getAppMetadata().isEmpty() ? null : flt.getAppMetadata().toByteArray());
this.ticket = new Ticket(flt.getTicket());
}
public List<Location> getLocations() {
return locations;
}
public Ticket getTicket() {
return ticket;
}
public Optional<Instant> getExpirationTime() {
return Optional.ofNullable(expirationTime);
}
public byte[] getAppMetadata() {
return appMetadata;
}
/** Converts to the protocol buffer representation. */
Flight.FlightEndpoint toProtocol() {
Flight.FlightEndpoint.Builder b =
Flight.FlightEndpoint.newBuilder().setTicket(ticket.toProtocol());
for (Location l : locations) {
b.addLocation(l.toProtocol());
}
if (expirationTime != null) {
b.setExpirationTime(
Timestamp.newBuilder()
.setSeconds(expirationTime.getEpochSecond())
.setNanos(expirationTime.getNano())
.build());
}
if (appMetadata != null) {
b.setAppMetadata(ByteString.copyFrom(appMetadata));
}
return b.build();
}
/**
* Get the serialized form of this protocol message.
*
* <p>Intended to help interoperability by allowing non-Flight services to still return Flight
* types.
*/
public ByteBuffer serialize() {
return ByteBuffer.wrap(toProtocol().toByteArray());
}
/**
* Parse the serialized form of this protocol message.
*
* <p>Intended to help interoperability by allowing Flight clients to obtain stream info from
* non-Flight services.
*
* @param serialized The serialized form of the message, as returned by {@link #serialize()}.
* @return The deserialized message.
* @throws IOException if the serialized form is invalid.
* @throws URISyntaxException if the serialized form contains an unsupported URI format.
*/
public static FlightEndpoint deserialize(ByteBuffer serialized)
throws IOException, URISyntaxException {
return new FlightEndpoint(Flight.FlightEndpoint.parseFrom(serialized));
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof FlightEndpoint)) {
return false;
}
FlightEndpoint that = (FlightEndpoint) o;
return locations.equals(that.locations)
&& ticket.equals(that.ticket)
&& Objects.equals(expirationTime, that.expirationTime)
&& Arrays.equals(appMetadata, that.appMetadata);
}
@Override
public int hashCode() {
return Objects.hash(locations, ticket, expirationTime, Arrays.hashCode(appMetadata));
}
@Override
public String toString() {
return "FlightEndpoint{"
+ "locations="
+ locations
+ ", ticket="
+ ticket
+ ", expirationTime="
+ (expirationTime == null ? "(none)" : expirationTime.toString())
+ ", appMetadata="
+ (appMetadata == null ? "(none)" : Base64.getEncoder().encodeToString(appMetadata))
+ '}';
}
/**
* Create a builder for FlightEndpoint.
*
* @param ticket A ticket that describe the key of a data stream.
* @param locations The possible locations the stream can be retrieved from.
*/
public static Builder builder(Ticket ticket, Location... locations) {
return new Builder(ticket, locations);
}
/** Builder for FlightEndpoint. */
public static final class Builder {
private final Ticket ticket;
private final List<Location> locations;
private Instant expirationTime = null;
private byte[] appMetadata = null;
private Builder(Ticket ticket, Location... locations) {
this.ticket = ticket;
this.locations = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(locations)));
}
/**
* Set expiration time for the endpoint. Default is null, which means don't expire.
*
* @param expirationTime (optional) When this endpoint expires.
*/
public Builder setExpirationTime(Instant expirationTime) {
this.expirationTime = expirationTime;
return this;
}
/**
* Set the app metadata to send along with the flight. Default is null;
*
* @param appMetadata Metadata to send along with the flight
*/
public Builder setAppMetadata(byte[] appMetadata) {
this.appMetadata = appMetadata;
return this;
}
/** Build FlightEndpoint object. */
public FlightEndpoint build() {
return new FlightEndpoint(ticket, expirationTime, appMetadata, locations);
}
}
}