ArrowFlightJdbcDriver.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.driver.jdbc;
import static org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.replaceSemiColons;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.logging.Logger;
import org.apache.arrow.driver.jdbc.utils.ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty;
import org.apache.arrow.driver.jdbc.utils.UrlParser;
import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.util.VisibleForTesting;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.DriverVersion;
import org.apache.calcite.avatica.Meta;
import org.apache.calcite.avatica.UnregisteredDriver;
/** JDBC driver for querying data from an Apache Arrow Flight server. */
public class ArrowFlightJdbcDriver extends UnregisteredDriver {
private static final String CONNECT_STRING_PREFIX = "jdbc:arrow-flight-sql://";
private static final String CONNECT_STRING_PREFIX_DEPRECATED = "jdbc:arrow-flight://";
private static final String CONNECTION_STRING_EXPECTED =
"jdbc:arrow-flight-sql://[host][:port][?param1=value&...]";
private static DriverVersion version;
static {
// Special code for supporting Java9 and higher.
// Netty requires some extra properties to unlock some native memory management api
// Setting this property if not already set externally
// This has to be done before any netty class is being loaded
final String key = "io.netty.tryReflectionSetAccessible";
final String tryReflectionSetAccessible = System.getProperty(key);
if (tryReflectionSetAccessible == null) {
System.setProperty(key, Boolean.TRUE.toString());
}
new ArrowFlightJdbcDriver().register();
}
@Override
public Logger getParentLogger() {
// Return the logger associated with the driver package ('org.apache.arrow.driver.jdbc')
// When packaged in flight-sql-jdbc-driver, it will also apply to all shaded dependencies
return Logger.getLogger(getClass().getPackage().getName());
}
@Override
public ArrowFlightConnection connect(final String url, final Properties info)
throws SQLException {
final Properties properties = new Properties(info);
properties.putAll(info);
if (url != null) {
final Optional<Map<Object, Object>> maybeProperties = getUrlsArgs(url);
if (!maybeProperties.isPresent()) {
return null;
}
final Map<Object, Object> propertiesFromUrl = maybeProperties.get();
properties.putAll(propertiesFromUrl);
}
try {
return ArrowFlightConnection.createNewConnection(
this, factory, url, lowerCasePropertyKeys(properties), new RootAllocator(Long.MAX_VALUE));
} catch (final FlightRuntimeException e) {
throw new SQLException("Failed to connect.", e);
}
}
@Override
protected String getFactoryClassName(final JdbcVersion jdbcVersion) {
return ArrowFlightJdbcFactory.class.getName();
}
@Override
@SuppressWarnings("StringSplitter")
protected DriverVersion createDriverVersion() {
if (version == null) {
final InputStream flightProperties =
this.getClass().getResourceAsStream("/properties/flight.properties");
if (flightProperties == null) {
throw new RuntimeException(
"Flight Properties not found. Ensure the JAR was built properly.");
}
try (final Reader reader =
new BufferedReader(new InputStreamReader(flightProperties, StandardCharsets.UTF_8))) {
final Properties properties = new Properties();
properties.load(reader);
final String parentName = properties.getProperty("org.apache.arrow.flight.name");
final String parentVersion = properties.getProperty("org.apache.arrow.flight.version");
final String[] pVersion = parentVersion.split("\\.");
final int parentMajorVersion = Integer.parseInt(pVersion[0]);
final int parentMinorVersion = Integer.parseInt(pVersion[1]);
final String childName = properties.getProperty("org.apache.arrow.flight.jdbc-driver.name");
final String childVersion =
properties.getProperty("org.apache.arrow.flight.jdbc-driver.version");
final String[] cVersion = childVersion.split("\\.");
final int childMajorVersion = Integer.parseInt(cVersion[0]);
final int childMinorVersion = Integer.parseInt(cVersion[1]);
version =
new DriverVersion(
childName,
childVersion,
parentName,
parentVersion,
true,
childMajorVersion,
childMinorVersion,
parentMajorVersion,
parentMinorVersion);
} catch (final IOException e) {
throw new RuntimeException("Failed to load driver version.", e);
}
}
return version;
}
@Override
public Meta createMeta(final AvaticaConnection connection) {
return new ArrowFlightMetaImpl(connection);
}
@Override
protected String getConnectStringPrefix() {
return CONNECT_STRING_PREFIX;
}
@Override
public boolean acceptsURL(final String url) {
Preconditions.checkNotNull(url);
return url.startsWith(CONNECT_STRING_PREFIX)
|| url.startsWith(CONNECT_STRING_PREFIX_DEPRECATED);
}
/**
* Parses the provided url based on the format this driver accepts, retrieving arguments after the
* {@link #CONNECT_STRING_PREFIX}.
*
* <p>This method gets the args if the provided URL follows this pattern: {@code
* jdbc:arrow-flight-sql://<host>:<port>[/?key1=val1&key2=val2&(...)]}
*
* <table border="1">
* <tr>
* <td>Group</td>
* <td>Definition</td>
* <td>Value</td>
* </tr>
* <tr>
* <td>? ��� inaccessible</td>
* <td>{@link #getConnectStringPrefix}</td>
* <td>
* the URL prefix accepted by this driver, i.e.,
* {@code "jdbc:arrow-flight-sql://"}
* </td>
* </tr>
* <tr>
* <td>1</td>
* <td>IPv4 host name</td>
* <td>
* first word after previous group and before "{@code :}"
* </td>
* </tr>
* <tr>
* <td>2</td>
* <td>IPv4 port number</td>
* <td>
* first number after previous group and before "{@code /?}"
* </td>
* </tr>
* <tr>
* <td>3</td>
* <td>custom call parameters</td>
* <td>
* all parameters provided after "{@code /?}" ��� must follow the
* pattern: "{@code key=value}" with "{@code &}" separating a
* parameter from another
* </td>
* </tr>
* </table>
*
* @param url The url to parse.
* @return the parsed arguments, or an empty optional if the driver does not handle this URL.
* @throws SQLException If an error occurs while trying to parse the URL.
*/
@VisibleForTesting // ArrowFlightJdbcDriverTest
Optional<Map<Object, Object>> getUrlsArgs(String url) throws SQLException {
/*
*
* Perhaps this logic should be inside a utility class, separated from this
* one, so as to better delegate responsibilities and concerns throughout
* the code and increase maintainability.
*
* =====
*
* Keep in mind that the URL must ALWAYS follow the pattern:
* "jdbc:arrow-flight-sql://<host>:<port>[/?param1=value1¶m2=value2&(...)]."
*
*/
final Properties resultMap = new Properties();
url = replaceSemiColons(url);
if (!url.startsWith("jdbc:")) {
throw new SQLException(
"Connection string must start with 'jdbc:'. Expected format: "
+ CONNECTION_STRING_EXPECTED);
}
// It's necessary to use a string without "jdbc:" at the beginning to be parsed as a valid URL.
url = url.substring(5);
final URI uri;
try {
uri = URI.create(url);
} catch (final IllegalArgumentException e) {
throw new SQLException("Malformed/invalid URL!", e);
}
if (!Objects.equals(uri.getScheme(), "arrow-flight")
&& !Objects.equals(uri.getScheme(), "arrow-flight-sql")) {
return Optional.empty();
}
if (uri.getHost() == null) {
throw new SQLException(
"URL must have a host. Expected format: " + CONNECTION_STRING_EXPECTED);
} else if (uri.getPort() < 0) {
throw new SQLException(
"URL must have a port. Expected format: " + CONNECTION_STRING_EXPECTED);
}
resultMap.put(ArrowFlightConnectionProperty.HOST.camelName(), uri.getHost()); // host
resultMap.put(ArrowFlightConnectionProperty.PORT.camelName(), uri.getPort()); // port
final String extraParams = uri.getRawQuery(); // optional params
if (extraParams != null) {
final Map<String, String> keyValuePairs = UrlParser.parse(extraParams, "&");
resultMap.putAll(keyValuePairs);
}
return Optional.of(resultMap);
}
static Properties lowerCasePropertyKeys(final Properties properties) {
final Properties resultProperty = new Properties();
properties.forEach((k, v) -> resultProperty.put(k.toString().toLowerCase(), v));
return resultProperty;
}
}