StreamPipe.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.flight;
import io.grpc.stub.StreamObserver;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.arrow.flight.FlightProducer.StreamListener;
import org.apache.arrow.flight.grpc.StatusUtils;
import org.apache.arrow.util.AutoCloseables;
/**
* Shim listener to avoid exposing GRPC internals.
*
* @param <FROM> From Type
* @param <TO> To Type
*/
class StreamPipe<FROM, TO> implements StreamListener<FROM> {
private final StreamObserver<TO> delegate;
private final Function<FROM, TO> mapFunction;
private final Consumer<Throwable> errorHandler;
private AutoCloseable resource;
private boolean closed = false;
/**
* Wrap the given gRPC StreamObserver with a transformation function.
*
* @param delegate The {@link StreamObserver} to wrap.
* @param func The transformation function.
* @param errorHandler A handler for uncaught exceptions (e.g. if something tries to double-close
* this stream).
* @param <FROM> The source type.
* @param <TO> The output type.
* @return A wrapped listener.
*/
public static <FROM, TO> StreamPipe<FROM, TO> wrap(
StreamObserver<TO> delegate, Function<FROM, TO> func, Consumer<Throwable> errorHandler) {
return new StreamPipe<>(delegate, func, errorHandler);
}
public StreamPipe(
StreamObserver<TO> delegate, Function<FROM, TO> func, Consumer<Throwable> errorHandler) {
super();
this.delegate = delegate;
this.mapFunction = func;
this.errorHandler = errorHandler;
this.resource = null;
}
/** Set an AutoCloseable resource to be cleaned up when the gRPC observer is to be completed. */
void setAutoCloseable(AutoCloseable ac) {
resource = ac;
}
@Override
public void onNext(FROM val) {
delegate.onNext(mapFunction.apply(val));
}
@Override
public void onError(Throwable t) {
if (closed) {
errorHandler.accept(t);
return;
}
try {
AutoCloseables.close(resource);
} catch (Exception e) {
errorHandler.accept(e);
} finally {
// Set closed to true in case onError throws, so that we don't try to close again
closed = true;
delegate.onError(StatusUtils.toGrpcException(t));
}
}
@Override
public void onCompleted() {
if (closed) {
errorHandler.accept(new IllegalStateException("Tried to complete already-completed call"));
return;
}
try {
AutoCloseables.close(resource);
} catch (Exception e) {
errorHandler.accept(e);
} finally {
// Set closed to true in case onCompleted throws, so that we don't try to close again
closed = true;
delegate.onCompleted();
}
}
/** Ensure this stream has been completed. */
void ensureCompleted() {
if (!closed) {
onCompleted();
}
}
}