diff --git a/xtable-api/pom.xml b/xtable-api/pom.xml
index 43aa7bace..5b09ad8de 100644
--- a/xtable-api/pom.xml
+++ b/xtable-api/pom.xml
@@ -29,6 +29,10 @@
XTable Project API
+
+ org.apache.commons
+ commons-lang3
+
com.fasterxml.jackson.core
jackson-annotations
@@ -88,5 +92,13 @@
org.mockito
mockito-junit-jupiter
+
+ org.apache.iceberg
+ iceberg-core
+
+
+ org.apache.iceberg
+ iceberg-api
+
diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
index a6c97d8fa..c75eaf522 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
@@ -24,6 +24,7 @@
import lombok.Getter;
import lombok.NonNull;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import com.google.common.base.Preconditions;
@@ -50,22 +51,26 @@ class ExternalTable {
/** Optional, additional properties that can be used to define interactions with the table */
protected final Properties additionalProperties;
+ protected final Configuration hadoopConf;
+
ExternalTable(
@NonNull String name,
@NonNull String formatName,
@NonNull String basePath,
String[] namespace,
CatalogConfig catalogConfig,
- Properties additionalProperties) {
+ Properties additionalProperties,
+ Configuration hadoopConf) {
this.name = name;
this.formatName = formatName;
this.basePath = sanitizeBasePath(basePath);
this.namespace = namespace;
this.catalogConfig = catalogConfig;
this.additionalProperties = additionalProperties;
+ this.hadoopConf = hadoopConf;
}
- protected String sanitizeBasePath(String tableBasePath) {
+ protected static String sanitizeBasePath(String tableBasePath) {
Path path = new Path(tableBasePath);
Preconditions.checkArgument(path.isAbsolute(), "Table base path must be absolute");
if (path.isAbsoluteAndSchemeAuthorityNull()) {
diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
index f3e1c3599..f287ed5b4 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
@@ -18,6 +18,8 @@
package org.apache.xtable.conversion;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.Properties;
import lombok.Builder;
@@ -25,12 +27,16 @@
import lombok.Getter;
import lombok.NonNull;
+import org.apache.hadoop.conf.Configuration;
+
@EqualsAndHashCode(callSuper = true)
@Getter
public class SourceTable extends ExternalTable {
/** The path to the data files, defaults to the basePath */
@NonNull private final String dataPath;
+ private final transient Configuration hadoopConf;
+
@Builder(toBuilder = true)
public SourceTable(
String name,
@@ -39,8 +45,38 @@ public SourceTable(
String dataPath,
String[] namespace,
CatalogConfig catalogConfig,
- Properties additionalProperties) {
- super(name, formatName, basePath, namespace, catalogConfig, additionalProperties);
+ Properties additionalProperties,
+ Configuration hadoopConf) {
+ super(name, formatName, basePath, namespace, catalogConfig, additionalProperties, hadoopConf);
this.dataPath = dataPath == null ? this.getBasePath() : sanitizeBasePath(dataPath);
+ this.hadoopConf = hadoopConf;
+ }
+
+ public SourceTable(
+ @NonNull String name,
+ @NonNull String basePath,
+ String dataPath,
+ String[] namespace,
+ CatalogConfig catalogConfig,
+ Properties additionalProperties,
+ Configuration hadoopConf) {
+ super(
+ name,
+ resolveFormatOrThrow(basePath, hadoopConf),
+ basePath,
+ namespace,
+ catalogConfig,
+ additionalProperties,
+ hadoopConf);
+ this.dataPath = dataPath == null ? this.getBasePath() : sanitizeBasePath(dataPath);
+ this.hadoopConf = hadoopConf;
+ }
+
+ private static String resolveFormatOrThrow(String basePath, Configuration hadoopConf) {
+ try {
+ return SourceTableFormatDetector.detectFormat(basePath, hadoopConf);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to auto-detect source table format", e);
+ }
}
}
diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTableFormatDetector.java b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTableFormatDetector.java
new file mode 100644
index 000000000..87613d5a4
--- /dev/null
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTableFormatDetector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.conversion;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+
+import org.apache.xtable.model.storage.TableFormat;
+
+public class SourceTableFormatDetector {
+ // private constructor to prevent instantiation
+ private SourceTableFormatDetector() {
+ throw new UnsupportedOperationException("This class cannot be instantiated");
+ }
+ // helper method to detect input format
+ public static String detectFormat(String pathStr, Configuration conf) throws IOException {
+ String sanitizeBasePath = ExternalTable.sanitizeBasePath(pathStr);
+ Path basePath = new Path(sanitizeBasePath);
+ FileSystem fs = basePath.getFileSystem(conf);
+
+ List matches = new ArrayList<>();
+
+ if (fs.exists(new Path(basePath, "_delta_log"))) {
+ matches.add(TableFormat.DELTA);
+ }
+
+ if (fs.exists(new Path(basePath, ".hoodie"))) {
+ matches.add(TableFormat.HUDI);
+ }
+
+ try {
+ HadoopTables tables = new HadoopTables(conf);
+ Table table = tables.load(pathStr);
+ if (table != null) {
+ matches.add(TableFormat.ICEBERG);
+ }
+ } catch (Exception e) {
+ // throw new IllegalArgumentException("Failed to inspect Iceberg table at " + pathStr, e);
+ }
+
+ if (matches.size() == 1) {
+ return matches.get(0);
+ }
+
+ if (matches.size() > 1) {
+ throw new IllegalArgumentException(
+ "Multiple table formats detected at path '"
+ + pathStr
+ + "': "
+ + matches
+ + ". Please provide one source format explicitly.");
+ }
+ throw new IllegalArgumentException("Unable to detect table format for path: " + pathStr);
+ }
+}
diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
index 6256da2c6..4ac9ede1b 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
@@ -26,6 +26,8 @@
import lombok.EqualsAndHashCode;
import lombok.Getter;
+import org.apache.hadoop.conf.Configuration;
+
@Getter
@EqualsAndHashCode(callSuper = true)
public class TargetTable extends ExternalTable {
@@ -39,8 +41,9 @@ public TargetTable(
String[] namespace,
CatalogConfig catalogConfig,
Duration metadataRetention,
- Properties additionalProperties) {
- super(name, formatName, basePath, namespace, catalogConfig, additionalProperties);
+ Properties additionalProperties,
+ Configuration hadoopConf) {
+ super(name, formatName, basePath, namespace, catalogConfig, additionalProperties, hadoopConf);
this.metadataRetention =
metadataRetention == null ? Duration.of(7, ChronoUnit.DAYS) : metadataRetention;
}
diff --git a/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java b/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java
index 5422b0a7f..d34fc752a 100644
--- a/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java
+++ b/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java
@@ -21,21 +21,24 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Test;
public class TestExternalTable {
+ Configuration hadoopConf = new Configuration();
+
@Test
void sanitizePath() {
ExternalTable tooManySlashes =
- new ExternalTable("name", "hudi", "s3://bucket//path", null, null, null);
+ new ExternalTable("name", "hudi", "s3://bucket//path", null, null, null, hadoopConf);
assertEquals("s3://bucket/path", tooManySlashes.getBasePath());
ExternalTable localFilePath =
- new ExternalTable("name", "hudi", "/local/data//path", null, null, null);
+ new ExternalTable("name", "hudi", "/local/data//path", null, null, null, hadoopConf);
assertEquals("file:///local/data/path", localFilePath.getBasePath());
ExternalTable properLocalFilePath =
- new ExternalTable("name", "hudi", "file:///local/data//path", null, null, null);
+ new ExternalTable("name", "hudi", "file:///local/data//path", null, null, null, hadoopConf);
assertEquals("file:///local/data/path", properLocalFilePath.getBasePath());
}
@@ -43,14 +46,14 @@ void sanitizePath() {
void errorIfRequiredArgsNotSet() {
assertThrows(
NullPointerException.class,
- () -> new ExternalTable("name", "hudi", null, null, null, null));
+ () -> new ExternalTable("name", "hudi", null, null, null, null, hadoopConf));
assertThrows(
NullPointerException.class,
- () -> new ExternalTable("name", null, "file://bucket/path", null, null, null));
+ () -> new ExternalTable("name", null, "file://bucket/path", null, null, null, hadoopConf));
assertThrows(
NullPointerException.class,
- () -> new ExternalTable(null, "hudi", "file://bucket/path", null, null, null));
+ () -> new ExternalTable(null, "hudi", "file://bucket/path", null, null, null, hadoopConf));
}
}
diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
index f21be6702..198713f7c 100644
--- a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
+++ b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
@@ -28,6 +28,7 @@ public static SourceTable convertToSourceTable(TargetTable table) {
table.getBasePath(),
table.getNamespace(),
table.getCatalogConfig(),
- table.getAdditionalProperties());
+ table.getAdditionalProperties(),
+ table.hadoopConf);
}
}
diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index 2da3078b6..5af3e5357 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -1126,6 +1126,7 @@ private static ConversionConfig getTableSyncConfig(
.basePath(table.getBasePath())
.dataPath(table.getDataPath())
.additionalProperties(sourceProperties)
+ .hadoopConf(jsc.hadoopConfiguration())
.build();
List targetTables =
@@ -1138,6 +1139,7 @@ private static ConversionConfig getTableSyncConfig(
// set the metadata path to the data path as the default (required by Hudi)
.basePath(table.getDataPath())
.metadataRetention(metadataRetention)
+ .hadoopConf(jsc.hadoopConfiguration())
.build())
.collect(Collectors.toList());
diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
index facbcf3a6..5865a0e42 100644
--- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
+++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
@@ -57,6 +57,7 @@
import org.apache.xtable.conversion.ConversionController;
import org.apache.xtable.conversion.ConversionSourceProvider;
import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.SourceTableFormatDetector;
import org.apache.xtable.conversion.TargetTable;
import org.apache.xtable.hudi.HudiSourceConfig;
import org.apache.xtable.iceberg.IcebergCatalogConfig;
@@ -121,18 +122,34 @@ static SourceTable sourceTableBuilder(
@NonNull DatasetConfig.Table table,
CatalogConfig catalogConfig,
@NonNull DatasetConfig datasetConfig,
- Properties sourceProperties) {
- SourceTable sourceTable =
- SourceTable.builder()
- .name(table.getTableName())
- .basePath(table.getTableBasePath())
- .namespace(table.getNamespace() == null ? null : table.getNamespace().split("\\."))
- .dataPath(table.getTableDataPath())
- .catalogConfig(catalogConfig)
- .additionalProperties(sourceProperties)
- .formatName(datasetConfig.sourceFormat)
- .build();
- return sourceTable;
+ Properties sourceProperties,
+ Configuration hadoopConf)
+ throws IOException {
+ if (datasetConfig.sourceFormat != null) {
+ SourceTable sourceTable =
+ SourceTable.builder()
+ .name(table.getTableName())
+ .basePath(table.getTableBasePath())
+ .namespace(table.getNamespace() == null ? null : table.getNamespace().split("\\."))
+ .dataPath(table.getTableDataPath())
+ .catalogConfig(catalogConfig)
+ .additionalProperties(sourceProperties)
+ .formatName(datasetConfig.sourceFormat)
+ .hadoopConf(hadoopConf)
+ .build();
+ return sourceTable;
+ } else {
+ SourceTable sourceTable =
+ new SourceTable(
+ table.getTableName(),
+ table.getTableBasePath(),
+ table.getTableDataPath(),
+ table.getNamespace() == null ? null : table.getNamespace().split("\\."),
+ catalogConfig,
+ sourceProperties,
+ hadoopConf);
+ return sourceTable;
+ }
}
static List targetTableBuilder(
@@ -160,7 +177,8 @@ static void syncTableMetdata(
List tableFormatList,
CatalogConfig catalogConfig,
Configuration hadoopConf,
- ConversionSourceProvider conversionSourceProvider) {
+ ConversionSourceProvider conversionSourceProvider)
+ throws IOException {
ConversionController conversionController = new ConversionController(hadoopConf);
for (DatasetConfig.Table table : datasetConfig.getDatasets()) {
log.info(
@@ -174,7 +192,7 @@ static void syncTableMetdata(
}
SourceTable sourceTable =
- sourceTableBuilder(table, catalogConfig, datasetConfig, sourceProperties);
+ sourceTableBuilder(table, catalogConfig, datasetConfig, sourceProperties, hadoopConf);
List targetTables = targetTableBuilder(table, catalogConfig, tableFormatList);
ConversionConfig conversionConfig =
ConversionConfig.builder()
@@ -214,10 +232,28 @@ static CatalogConfig getIcebergCatalogConfig(String icebergCatalogConfigPath) th
static ConversionSourceProvider> getConversionSourceProvider(
String conversionProviderConfigpath, DatasetConfig datasetConfig, Configuration hadoopConf)
throws IOException {
- // Process source format
String sourceFormat = datasetConfig.sourceFormat;
byte[] customConfig = getCustomConfigurations(conversionProviderConfigpath);
TableFormatConverters tableFormatConverters = loadTableFormatConversionConfigs(customConfig);
+ if (sourceFormat == null
+ && datasetConfig.getDatasets() != null
+ && !datasetConfig.getDatasets().isEmpty()) {
+ DatasetConfig.Table firstTable = datasetConfig.getDatasets().get(0);
+ if (firstTable.getTableBasePath() != null) {
+ try {
+ String tablePath = firstTable.getTableBasePath();
+ sourceFormat = SourceTableFormatDetector.detectFormat(tablePath, hadoopConf);
+ log.info(
+ "Source format was omitted in config. Auto-detected table format: {}", sourceFormat);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Source format %s is not supported. Known source and target formats are %s",
+ sourceFormat, tableFormatConverters.getTableFormatConverters().keySet()));
+ }
+ }
+ }
+
TableFormatConverters.ConversionConfig sourceConversionConfig =
tableFormatConverters.getTableFormatConverters().get(sourceFormat);
if (sourceConversionConfig == null) {
diff --git a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java
index f18ce867f..92aaa9700 100644
--- a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java
+++ b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java
@@ -19,6 +19,7 @@
package org.apache.xtable.utilities;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import java.io.File;
import java.io.IOException;
@@ -93,6 +94,32 @@ void testContinuousSyncMode(@TempDir Path tempDir) throws IOException {
}
}
+ @Test
+ void testSingleSyncModeWithoutInputTableFormat(@TempDir Path tempDir) throws IOException {
+ String tableName = "test-table";
+ try (GenericTable table =
+ TestJavaHudiTable.forStandardSchema(
+ tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
+ table.insertRows(10);
+ File configFile = writeConfigFileWithoutSourceTableFormat(tempDir, table, tableName);
+ String[] args = new String[] {"--datasetConfig", configFile.getPath()};
+ RunSync.main(args);
+ Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() + "/metadata"));
+ waitForNumIcebergCommits(icebergMetadataPath, 3);
+ }
+ }
+
+ @Test
+ void testSingleSyncModeWithoutInputTableFormatAndEmptyPath(@TempDir Path tempDir)
+ throws IOException {
+ String tableName = "test-table";
+
+ File configFile = writeConfigFileWithoutSourceTableFormatAndEmptyPath(tempDir, tableName);
+ String[] args = new String[] {"--datasetConfig", configFile.getPath()};
+
+ assertThrows(IllegalArgumentException.class, () -> RunSync.main(args));
+ }
+
private static File writeConfigFile(Path tempDir, GenericTable table, String tableName)
throws IOException {
RunSync.DatasetConfig config =
@@ -106,7 +133,41 @@ private static File writeConfigFile(Path tempDir, GenericTable table, String tab
.tableName(tableName)
.build()))
.build();
- File configFile = new File(tempDir + "config.yaml");
+ File configFile = tempDir.resolve("config.yaml").toFile();
+ RunSync.YAML_MAPPER.writeValue(configFile, config);
+ return configFile;
+ }
+
+ private static File writeConfigFileWithoutSourceTableFormatAndEmptyPath(
+ Path tempDir, String tableName) throws IOException {
+ RunSync.DatasetConfig config =
+ RunSync.DatasetConfig.builder()
+ .targetFormats(Collections.singletonList("ICEBERG"))
+ .datasets(
+ Collections.singletonList(
+ RunSync.DatasetConfig.Table.builder()
+ .tableBasePath("")
+ .tableName(tableName)
+ .build()))
+ .build();
+ File configFile = tempDir.resolve("config.yaml").toFile();
+ RunSync.YAML_MAPPER.writeValue(configFile, config);
+ return configFile;
+ }
+
+ private static File writeConfigFileWithoutSourceTableFormat(
+ Path tempDir, GenericTable table, String tableName) throws IOException {
+ RunSync.DatasetConfig config =
+ RunSync.DatasetConfig.builder()
+ .targetFormats(Collections.singletonList("ICEBERG"))
+ .datasets(
+ Collections.singletonList(
+ RunSync.DatasetConfig.Table.builder()
+ .tableBasePath(table.getBasePath())
+ .tableName(tableName)
+ .build()))
+ .build();
+ File configFile = tempDir.resolve("config.yaml").toFile();
RunSync.YAML_MAPPER.writeValue(configFile, config);
return configFile;
}