diff --git a/xtable-api/pom.xml b/xtable-api/pom.xml index 43aa7bace..5b09ad8de 100644 --- a/xtable-api/pom.xml +++ b/xtable-api/pom.xml @@ -29,6 +29,10 @@ XTable Project API + + org.apache.commons + commons-lang3 + com.fasterxml.jackson.core jackson-annotations @@ -88,5 +92,13 @@ org.mockito mockito-junit-jupiter + + org.apache.iceberg + iceberg-core + + + org.apache.iceberg + iceberg-api + diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java index a6c97d8fa..c75eaf522 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java @@ -24,6 +24,7 @@ import lombok.Getter; import lombok.NonNull; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import com.google.common.base.Preconditions; @@ -50,22 +51,26 @@ class ExternalTable { /** Optional, additional properties that can be used to define interactions with the table */ protected final Properties additionalProperties; + protected final Configuration hadoopConf; + ExternalTable( @NonNull String name, @NonNull String formatName, @NonNull String basePath, String[] namespace, CatalogConfig catalogConfig, - Properties additionalProperties) { + Properties additionalProperties, + Configuration hadoopConf) { this.name = name; this.formatName = formatName; this.basePath = sanitizeBasePath(basePath); this.namespace = namespace; this.catalogConfig = catalogConfig; this.additionalProperties = additionalProperties; + this.hadoopConf = hadoopConf; } - protected String sanitizeBasePath(String tableBasePath) { + protected static String sanitizeBasePath(String tableBasePath) { Path path = new Path(tableBasePath); Preconditions.checkArgument(path.isAbsolute(), "Table base path must be absolute"); if (path.isAbsoluteAndSchemeAuthorityNull()) { diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java index f3e1c3599..f287ed5b4 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java @@ -18,6 +18,8 @@ package org.apache.xtable.conversion; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Properties; import lombok.Builder; @@ -25,12 +27,16 @@ import lombok.Getter; import lombok.NonNull; +import org.apache.hadoop.conf.Configuration; + @EqualsAndHashCode(callSuper = true) @Getter public class SourceTable extends ExternalTable { /** The path to the data files, defaults to the basePath */ @NonNull private final String dataPath; + private final transient Configuration hadoopConf; + @Builder(toBuilder = true) public SourceTable( String name, @@ -39,8 +45,38 @@ public SourceTable( String dataPath, String[] namespace, CatalogConfig catalogConfig, - Properties additionalProperties) { - super(name, formatName, basePath, namespace, catalogConfig, additionalProperties); + Properties additionalProperties, + Configuration hadoopConf) { + super(name, formatName, basePath, namespace, catalogConfig, additionalProperties, hadoopConf); this.dataPath = dataPath == null ? this.getBasePath() : sanitizeBasePath(dataPath); + this.hadoopConf = hadoopConf; + } + + public SourceTable( + @NonNull String name, + @NonNull String basePath, + String dataPath, + String[] namespace, + CatalogConfig catalogConfig, + Properties additionalProperties, + Configuration hadoopConf) { + super( + name, + resolveFormatOrThrow(basePath, hadoopConf), + basePath, + namespace, + catalogConfig, + additionalProperties, + hadoopConf); + this.dataPath = dataPath == null ? this.getBasePath() : sanitizeBasePath(dataPath); + this.hadoopConf = hadoopConf; + } + + private static String resolveFormatOrThrow(String basePath, Configuration hadoopConf) { + try { + return SourceTableFormatDetector.detectFormat(basePath, hadoopConf); + } catch (IOException e) { + throw new UncheckedIOException("Failed to auto-detect source table format", e); + } } } diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTableFormatDetector.java b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTableFormatDetector.java new file mode 100644 index 000000000..87613d5a4 --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTableFormatDetector.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.conversion; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.iceberg.Table; +import org.apache.iceberg.hadoop.HadoopTables; + +import org.apache.xtable.model.storage.TableFormat; + +public class SourceTableFormatDetector { + // private constructor to prevent instantiation + private SourceTableFormatDetector() { + throw new UnsupportedOperationException("This class cannot be instantiated"); + } + // helper method to detect input format + public static String detectFormat(String pathStr, Configuration conf) throws IOException { + String sanitizeBasePath = ExternalTable.sanitizeBasePath(pathStr); + Path basePath = new Path(sanitizeBasePath); + FileSystem fs = basePath.getFileSystem(conf); + + List matches = new ArrayList<>(); + + if (fs.exists(new Path(basePath, "_delta_log"))) { + matches.add(TableFormat.DELTA); + } + + if (fs.exists(new Path(basePath, ".hoodie"))) { + matches.add(TableFormat.HUDI); + } + + try { + HadoopTables tables = new HadoopTables(conf); + Table table = tables.load(pathStr); + if (table != null) { + matches.add(TableFormat.ICEBERG); + } + } catch (Exception e) { + // throw new IllegalArgumentException("Failed to inspect Iceberg table at " + pathStr, e); + } + + if (matches.size() == 1) { + return matches.get(0); + } + + if (matches.size() > 1) { + throw new IllegalArgumentException( + "Multiple table formats detected at path '" + + pathStr + + "': " + + matches + + ". Please provide one source format explicitly."); + } + throw new IllegalArgumentException("Unable to detect table format for path: " + pathStr); + } +} diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java index 6256da2c6..4ac9ede1b 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java @@ -26,6 +26,8 @@ import lombok.EqualsAndHashCode; import lombok.Getter; +import org.apache.hadoop.conf.Configuration; + @Getter @EqualsAndHashCode(callSuper = true) public class TargetTable extends ExternalTable { @@ -39,8 +41,9 @@ public TargetTable( String[] namespace, CatalogConfig catalogConfig, Duration metadataRetention, - Properties additionalProperties) { - super(name, formatName, basePath, namespace, catalogConfig, additionalProperties); + Properties additionalProperties, + Configuration hadoopConf) { + super(name, formatName, basePath, namespace, catalogConfig, additionalProperties, hadoopConf); this.metadataRetention = metadataRetention == null ? Duration.of(7, ChronoUnit.DAYS) : metadataRetention; } diff --git a/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java b/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java index 5422b0a7f..d34fc752a 100644 --- a/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java +++ b/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java @@ -21,21 +21,24 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.Test; public class TestExternalTable { + Configuration hadoopConf = new Configuration(); + @Test void sanitizePath() { ExternalTable tooManySlashes = - new ExternalTable("name", "hudi", "s3://bucket//path", null, null, null); + new ExternalTable("name", "hudi", "s3://bucket//path", null, null, null, hadoopConf); assertEquals("s3://bucket/path", tooManySlashes.getBasePath()); ExternalTable localFilePath = - new ExternalTable("name", "hudi", "/local/data//path", null, null, null); + new ExternalTable("name", "hudi", "/local/data//path", null, null, null, hadoopConf); assertEquals("file:///local/data/path", localFilePath.getBasePath()); ExternalTable properLocalFilePath = - new ExternalTable("name", "hudi", "file:///local/data//path", null, null, null); + new ExternalTable("name", "hudi", "file:///local/data//path", null, null, null, hadoopConf); assertEquals("file:///local/data/path", properLocalFilePath.getBasePath()); } @@ -43,14 +46,14 @@ void sanitizePath() { void errorIfRequiredArgsNotSet() { assertThrows( NullPointerException.class, - () -> new ExternalTable("name", "hudi", null, null, null, null)); + () -> new ExternalTable("name", "hudi", null, null, null, null, hadoopConf)); assertThrows( NullPointerException.class, - () -> new ExternalTable("name", null, "file://bucket/path", null, null, null)); + () -> new ExternalTable("name", null, "file://bucket/path", null, null, null, hadoopConf)); assertThrows( NullPointerException.class, - () -> new ExternalTable(null, "hudi", "file://bucket/path", null, null, null)); + () -> new ExternalTable(null, "hudi", "file://bucket/path", null, null, null, hadoopConf)); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java index f21be6702..198713f7c 100644 --- a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java +++ b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java @@ -28,6 +28,7 @@ public static SourceTable convertToSourceTable(TargetTable table) { table.getBasePath(), table.getNamespace(), table.getCatalogConfig(), - table.getAdditionalProperties()); + table.getAdditionalProperties(), + table.hadoopConf); } } diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index 2da3078b6..5af3e5357 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -1126,6 +1126,7 @@ private static ConversionConfig getTableSyncConfig( .basePath(table.getBasePath()) .dataPath(table.getDataPath()) .additionalProperties(sourceProperties) + .hadoopConf(jsc.hadoopConfiguration()) .build(); List targetTables = @@ -1138,6 +1139,7 @@ private static ConversionConfig getTableSyncConfig( // set the metadata path to the data path as the default (required by Hudi) .basePath(table.getDataPath()) .metadataRetention(metadataRetention) + .hadoopConf(jsc.hadoopConfiguration()) .build()) .collect(Collectors.toList()); diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java index facbcf3a6..5865a0e42 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java @@ -57,6 +57,7 @@ import org.apache.xtable.conversion.ConversionController; import org.apache.xtable.conversion.ConversionSourceProvider; import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.conversion.SourceTableFormatDetector; import org.apache.xtable.conversion.TargetTable; import org.apache.xtable.hudi.HudiSourceConfig; import org.apache.xtable.iceberg.IcebergCatalogConfig; @@ -121,18 +122,34 @@ static SourceTable sourceTableBuilder( @NonNull DatasetConfig.Table table, CatalogConfig catalogConfig, @NonNull DatasetConfig datasetConfig, - Properties sourceProperties) { - SourceTable sourceTable = - SourceTable.builder() - .name(table.getTableName()) - .basePath(table.getTableBasePath()) - .namespace(table.getNamespace() == null ? null : table.getNamespace().split("\\.")) - .dataPath(table.getTableDataPath()) - .catalogConfig(catalogConfig) - .additionalProperties(sourceProperties) - .formatName(datasetConfig.sourceFormat) - .build(); - return sourceTable; + Properties sourceProperties, + Configuration hadoopConf) + throws IOException { + if (datasetConfig.sourceFormat != null) { + SourceTable sourceTable = + SourceTable.builder() + .name(table.getTableName()) + .basePath(table.getTableBasePath()) + .namespace(table.getNamespace() == null ? null : table.getNamespace().split("\\.")) + .dataPath(table.getTableDataPath()) + .catalogConfig(catalogConfig) + .additionalProperties(sourceProperties) + .formatName(datasetConfig.sourceFormat) + .hadoopConf(hadoopConf) + .build(); + return sourceTable; + } else { + SourceTable sourceTable = + new SourceTable( + table.getTableName(), + table.getTableBasePath(), + table.getTableDataPath(), + table.getNamespace() == null ? null : table.getNamespace().split("\\."), + catalogConfig, + sourceProperties, + hadoopConf); + return sourceTable; + } } static List targetTableBuilder( @@ -160,7 +177,8 @@ static void syncTableMetdata( List tableFormatList, CatalogConfig catalogConfig, Configuration hadoopConf, - ConversionSourceProvider conversionSourceProvider) { + ConversionSourceProvider conversionSourceProvider) + throws IOException { ConversionController conversionController = new ConversionController(hadoopConf); for (DatasetConfig.Table table : datasetConfig.getDatasets()) { log.info( @@ -174,7 +192,7 @@ static void syncTableMetdata( } SourceTable sourceTable = - sourceTableBuilder(table, catalogConfig, datasetConfig, sourceProperties); + sourceTableBuilder(table, catalogConfig, datasetConfig, sourceProperties, hadoopConf); List targetTables = targetTableBuilder(table, catalogConfig, tableFormatList); ConversionConfig conversionConfig = ConversionConfig.builder() @@ -214,10 +232,28 @@ static CatalogConfig getIcebergCatalogConfig(String icebergCatalogConfigPath) th static ConversionSourceProvider getConversionSourceProvider( String conversionProviderConfigpath, DatasetConfig datasetConfig, Configuration hadoopConf) throws IOException { - // Process source format String sourceFormat = datasetConfig.sourceFormat; byte[] customConfig = getCustomConfigurations(conversionProviderConfigpath); TableFormatConverters tableFormatConverters = loadTableFormatConversionConfigs(customConfig); + if (sourceFormat == null + && datasetConfig.getDatasets() != null + && !datasetConfig.getDatasets().isEmpty()) { + DatasetConfig.Table firstTable = datasetConfig.getDatasets().get(0); + if (firstTable.getTableBasePath() != null) { + try { + String tablePath = firstTable.getTableBasePath(); + sourceFormat = SourceTableFormatDetector.detectFormat(tablePath, hadoopConf); + log.info( + "Source format was omitted in config. Auto-detected table format: {}", sourceFormat); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format( + "Source format %s is not supported. Known source and target formats are %s", + sourceFormat, tableFormatConverters.getTableFormatConverters().keySet())); + } + } + } + TableFormatConverters.ConversionConfig sourceConversionConfig = tableFormatConverters.getTableFormatConverters().get(sourceFormat); if (sourceConversionConfig == null) { diff --git a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java index f18ce867f..92aaa9700 100644 --- a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java +++ b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java @@ -19,6 +19,7 @@ package org.apache.xtable.utilities; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.io.File; import java.io.IOException; @@ -93,6 +94,32 @@ void testContinuousSyncMode(@TempDir Path tempDir) throws IOException { } } + @Test + void testSingleSyncModeWithoutInputTableFormat(@TempDir Path tempDir) throws IOException { + String tableName = "test-table"; + try (GenericTable table = + TestJavaHudiTable.forStandardSchema( + tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) { + table.insertRows(10); + File configFile = writeConfigFileWithoutSourceTableFormat(tempDir, table, tableName); + String[] args = new String[] {"--datasetConfig", configFile.getPath()}; + RunSync.main(args); + Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() + "/metadata")); + waitForNumIcebergCommits(icebergMetadataPath, 3); + } + } + + @Test + void testSingleSyncModeWithoutInputTableFormatAndEmptyPath(@TempDir Path tempDir) + throws IOException { + String tableName = "test-table"; + + File configFile = writeConfigFileWithoutSourceTableFormatAndEmptyPath(tempDir, tableName); + String[] args = new String[] {"--datasetConfig", configFile.getPath()}; + + assertThrows(IllegalArgumentException.class, () -> RunSync.main(args)); + } + private static File writeConfigFile(Path tempDir, GenericTable table, String tableName) throws IOException { RunSync.DatasetConfig config = @@ -106,7 +133,41 @@ private static File writeConfigFile(Path tempDir, GenericTable table, String tab .tableName(tableName) .build())) .build(); - File configFile = new File(tempDir + "config.yaml"); + File configFile = tempDir.resolve("config.yaml").toFile(); + RunSync.YAML_MAPPER.writeValue(configFile, config); + return configFile; + } + + private static File writeConfigFileWithoutSourceTableFormatAndEmptyPath( + Path tempDir, String tableName) throws IOException { + RunSync.DatasetConfig config = + RunSync.DatasetConfig.builder() + .targetFormats(Collections.singletonList("ICEBERG")) + .datasets( + Collections.singletonList( + RunSync.DatasetConfig.Table.builder() + .tableBasePath("") + .tableName(tableName) + .build())) + .build(); + File configFile = tempDir.resolve("config.yaml").toFile(); + RunSync.YAML_MAPPER.writeValue(configFile, config); + return configFile; + } + + private static File writeConfigFileWithoutSourceTableFormat( + Path tempDir, GenericTable table, String tableName) throws IOException { + RunSync.DatasetConfig config = + RunSync.DatasetConfig.builder() + .targetFormats(Collections.singletonList("ICEBERG")) + .datasets( + Collections.singletonList( + RunSync.DatasetConfig.Table.builder() + .tableBasePath(table.getBasePath()) + .tableName(tableName) + .build())) + .build(); + File configFile = tempDir.resolve("config.yaml").toFile(); RunSync.YAML_MAPPER.writeValue(configFile, config); return configFile; }