Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions xtable-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
<name>XTable Project API</name>

<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand Down Expand Up @@ -88,5 +92,13 @@
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import lombok.Getter;
import lombok.NonNull;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import com.google.common.base.Preconditions;
Expand All @@ -50,22 +51,26 @@ class ExternalTable {
/** Optional, additional properties that can be used to define interactions with the table */
protected final Properties additionalProperties;

protected final Configuration hadoopConf;

ExternalTable(
@NonNull String name,
@NonNull String formatName,
@NonNull String basePath,
String[] namespace,
CatalogConfig catalogConfig,
Properties additionalProperties) {
Properties additionalProperties,
Configuration hadoopConf) {
this.name = name;
this.formatName = formatName;
this.basePath = sanitizeBasePath(basePath);
this.namespace = namespace;
this.catalogConfig = catalogConfig;
this.additionalProperties = additionalProperties;
this.hadoopConf = hadoopConf;
}

protected String sanitizeBasePath(String tableBasePath) {
protected static String sanitizeBasePath(String tableBasePath) {
Path path = new Path(tableBasePath);
Preconditions.checkArgument(path.isAbsolute(), "Table base path must be absolute");
if (path.isAbsoluteAndSchemeAuthorityNull()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,25 @@

package org.apache.xtable.conversion;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Properties;

import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;

import org.apache.hadoop.conf.Configuration;

@EqualsAndHashCode(callSuper = true)
@Getter
public class SourceTable extends ExternalTable {
/** The path to the data files, defaults to the basePath */
@NonNull private final String dataPath;

private final transient Configuration hadoopConf;

@Builder(toBuilder = true)
public SourceTable(
String name,
Expand All @@ -39,8 +45,38 @@ public SourceTable(
String dataPath,
String[] namespace,
CatalogConfig catalogConfig,
Properties additionalProperties) {
super(name, formatName, basePath, namespace, catalogConfig, additionalProperties);
Properties additionalProperties,
Configuration hadoopConf) {
super(name, formatName, basePath, namespace, catalogConfig, additionalProperties, hadoopConf);
this.dataPath = dataPath == null ? this.getBasePath() : sanitizeBasePath(dataPath);
this.hadoopConf = hadoopConf;
}

public SourceTable(
@NonNull String name,
@NonNull String basePath,
String dataPath,
String[] namespace,
CatalogConfig catalogConfig,
Properties additionalProperties,
Configuration hadoopConf) {
super(
name,
resolveFormatOrThrow(basePath, hadoopConf),
basePath,
namespace,
catalogConfig,
additionalProperties,
hadoopConf);
this.dataPath = dataPath == null ? this.getBasePath() : sanitizeBasePath(dataPath);
this.hadoopConf = hadoopConf;
}

private static String resolveFormatOrThrow(String basePath, Configuration hadoopConf) {
try {
return SourceTableFormatDetector.detectFormat(basePath, hadoopConf);
} catch (IOException e) {
throw new UncheckedIOException("Failed to auto-detect source table format", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.conversion;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.iceberg.Table;
import org.apache.iceberg.hadoop.HadoopTables;

import org.apache.xtable.model.storage.TableFormat;

public class SourceTableFormatDetector {
// private constructor to prevent instantiation
private SourceTableFormatDetector() {
throw new UnsupportedOperationException("This class cannot be instantiated");
}
// helper method to detect input format
public static String detectFormat(String pathStr, Configuration conf) throws IOException {
String sanitizeBasePath = ExternalTable.sanitizeBasePath(pathStr);
Path basePath = new Path(sanitizeBasePath);
FileSystem fs = basePath.getFileSystem(conf);

List<String> matches = new ArrayList<>();

if (fs.exists(new Path(basePath, "_delta_log"))) {
matches.add(TableFormat.DELTA);
}

if (fs.exists(new Path(basePath, ".hoodie"))) {
matches.add(TableFormat.HUDI);
}

try {
HadoopTables tables = new HadoopTables(conf);
Table table = tables.load(pathStr);
if (table != null) {
matches.add(TableFormat.ICEBERG);
}
} catch (Exception e) {
// throw new IllegalArgumentException("Failed to inspect Iceberg table at " + pathStr, e);
}

if (matches.size() == 1) {
return matches.get(0);
}

if (matches.size() > 1) {
throw new IllegalArgumentException(
"Multiple table formats detected at path '"
+ pathStr
+ "': "
+ matches
+ ". Please provide one source format explicitly.");
}
throw new IllegalArgumentException("Unable to detect table format for path: " + pathStr);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import lombok.EqualsAndHashCode;
import lombok.Getter;

import org.apache.hadoop.conf.Configuration;

@Getter
@EqualsAndHashCode(callSuper = true)
public class TargetTable extends ExternalTable {
Expand All @@ -39,8 +41,9 @@ public TargetTable(
String[] namespace,
CatalogConfig catalogConfig,
Duration metadataRetention,
Properties additionalProperties) {
super(name, formatName, basePath, namespace, catalogConfig, additionalProperties);
Properties additionalProperties,
Configuration hadoopConf) {
super(name, formatName, basePath, namespace, catalogConfig, additionalProperties, hadoopConf);
this.metadataRetention =
metadataRetention == null ? Duration.of(7, ChronoUnit.DAYS) : metadataRetention;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,39 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Test;

public class TestExternalTable {
Configuration hadoopConf = new Configuration();

@Test
void sanitizePath() {
ExternalTable tooManySlashes =
new ExternalTable("name", "hudi", "s3://bucket//path", null, null, null);
new ExternalTable("name", "hudi", "s3://bucket//path", null, null, null, hadoopConf);
assertEquals("s3://bucket/path", tooManySlashes.getBasePath());

ExternalTable localFilePath =
new ExternalTable("name", "hudi", "/local/data//path", null, null, null);
new ExternalTable("name", "hudi", "/local/data//path", null, null, null, hadoopConf);
assertEquals("file:///local/data/path", localFilePath.getBasePath());

ExternalTable properLocalFilePath =
new ExternalTable("name", "hudi", "file:///local/data//path", null, null, null);
new ExternalTable("name", "hudi", "file:///local/data//path", null, null, null, hadoopConf);
assertEquals("file:///local/data/path", properLocalFilePath.getBasePath());
}

@Test
void errorIfRequiredArgsNotSet() {
assertThrows(
NullPointerException.class,
() -> new ExternalTable("name", "hudi", null, null, null, null));
() -> new ExternalTable("name", "hudi", null, null, null, null, hadoopConf));

assertThrows(
NullPointerException.class,
() -> new ExternalTable("name", null, "file://bucket/path", null, null, null));
() -> new ExternalTable("name", null, "file://bucket/path", null, null, null, hadoopConf));

assertThrows(
NullPointerException.class,
() -> new ExternalTable(null, "hudi", "file://bucket/path", null, null, null));
() -> new ExternalTable(null, "hudi", "file://bucket/path", null, null, null, hadoopConf));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public static SourceTable convertToSourceTable(TargetTable table) {
table.getBasePath(),
table.getNamespace(),
table.getCatalogConfig(),
table.getAdditionalProperties());
table.getAdditionalProperties(),
table.hadoopConf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,7 @@ private static ConversionConfig getTableSyncConfig(
.basePath(table.getBasePath())
.dataPath(table.getDataPath())
.additionalProperties(sourceProperties)
.hadoopConf(jsc.hadoopConfiguration())
.build();

List<TargetTable> targetTables =
Expand All @@ -1138,6 +1139,7 @@ private static ConversionConfig getTableSyncConfig(
// set the metadata path to the data path as the default (required by Hudi)
.basePath(table.getDataPath())
.metadataRetention(metadataRetention)
.hadoopConf(jsc.hadoopConfiguration())
.build())
.collect(Collectors.toList());

Expand Down
Loading