diff --git a/README.md b/README.md index 9d22137..772abda 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ In every run mode, the tool offers following universal features: - Backup is deleted after a successful run(can be overridden to keep the backup) - Currently supported file systems: - S3 + - S3a - Unix - HDFS @@ -142,6 +143,12 @@ To be able to perform any operation on S3 you must provide AWS credentials. The `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`. The application will read them automatically. For more information, as well as other ways to provide credentials, see [Using credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html) +### S3 Set up +To be able to perform any operation on S3a you must provide AWS credentials. The easiest way to do so is to set environment variables +`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`. The application will read them automatically. For more information, as well as other +ways to provide credentials, see [Using credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html) +Additionally to set provider endpoint, environment variable `AWS_ENDPOINT_URL` has to be set + ### HDFS Set up To be able to perform any operation on HDFS you must set environment variable `HADOOP_CONF_DIR`. diff --git a/src/main/scala/za/co/absa/spark_metadata_tool/Application.scala b/src/main/scala/za/co/absa/spark_metadata_tool/Application.scala index eb1ca2a..3364512 100644 --- a/src/main/scala/za/co/absa/spark_metadata_tool/Application.scala +++ b/src/main/scala/za/co/absa/spark_metadata_tool/Application.scala @@ -24,24 +24,9 @@ import org.log4s.Logger import software.amazon.awssdk.services.s3.S3Client import za.co.absa.spark_metadata_tool.LoggingImplicits._ import za.co.absa.spark_metadata_tool.io.{FileManager, HdfsFileManager, S3FileManager, UnixFileManager} -import za.co.absa.spark_metadata_tool.model.{ - AppConfig, - AppError, - AppErrorWithThrowable, - CompareMetadataWithData, - CreateMetadata, - FixPaths, - Hdfs, - InitializationError, - Merge, - NotFoundError, - S3, - SinkFileStatus, - TargetFilesystem, - Unix, - UnknownError -} +import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, AppErrorWithThrowable, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, NotFoundError, S3, S3a, SinkFileStatus, TargetFilesystem, Unix, UnknownError} +import java.net.URI import scala.util.Try import scala.util.chaining._ @@ -223,8 +208,15 @@ object Application extends App { } yield ()).tap(_.logInfo(s"Done processing file ${path.toString}")) def initS3(): Either[AppError, S3Client] = Try { - S3Client.builder().build() - }.toEither.leftMap(err => InitializationError("Failed to initialize S3 Client", err.some)) + //This is done because aws sdk does not support overriding aws endpoint url via env variable: + //https://docs.aws.amazon.com/sdkref/latest/guide/settings-reference.html#EVarSettings + //https://github.com/aws/aws-sdk-java-v2/issues/4501 + val endpoint = System.getenv("AWS_ENDPOINT_URL") + val builder = S3Client.builder() + if (endpoint.nonEmpty) builder.endpointOverride(new URI(endpoint)) + + builder.build() + }.toEither.leftMap(err => InitializationError("Failed to initialize S3A Client", err.some)) def initHdfs(): Either[AppError, FileSystem] = Try { val hadoopConfDir = sys.env("HADOOP_CONF_DIR") @@ -242,7 +234,8 @@ object Application extends App { (fs match { case Unix => UnixFileManager.asRight case Hdfs => initHdfs().map(hdfs => HdfsFileManager(hdfs)) - case S3 => initS3().map(client => S3FileManager(client)) + case S3 => initS3().map(client => S3FileManager(client, "s3")) + case S3a => initS3().map(client => S3FileManager(client, "s3a")) }).tap(fm => logger.debug(s"Initialized file manager : $fm")) } diff --git a/src/main/scala/za/co/absa/spark_metadata_tool/ArgumentParser.scala b/src/main/scala/za/co/absa/spark_metadata_tool/ArgumentParser.scala index 60a1bd0..46fbd15 100644 --- a/src/main/scala/za/co/absa/spark_metadata_tool/ArgumentParser.scala +++ b/src/main/scala/za/co/absa/spark_metadata_tool/ArgumentParser.scala @@ -25,19 +25,7 @@ import org.apache.log4j.PatternLayout import org.log4s.Logger import scopt.OParser import za.co.absa.spark_metadata_tool.LoggingImplicits._ -import za.co.absa.spark_metadata_tool.model.AppConfig -import za.co.absa.spark_metadata_tool.model.AppError -import za.co.absa.spark_metadata_tool.model.CompareMetadataWithData -import za.co.absa.spark_metadata_tool.model.FixPaths -import za.co.absa.spark_metadata_tool.model.Hdfs -import za.co.absa.spark_metadata_tool.model.InitializationError -import za.co.absa.spark_metadata_tool.model.Merge -import za.co.absa.spark_metadata_tool.model.ParsingError -import za.co.absa.spark_metadata_tool.model.S3 -import za.co.absa.spark_metadata_tool.model.TargetFilesystem -import za.co.absa.spark_metadata_tool.model.Unix -import za.co.absa.spark_metadata_tool.model.UnknownFileSystemError -import za.co.absa.spark_metadata_tool.model.CreateMetadata +import za.co.absa.spark_metadata_tool.model.{AppConfig, AppError, CompareMetadataWithData, CreateMetadata, FixPaths, Hdfs, InitializationError, Merge, ParsingError, S3, S3a, TargetFilesystem, Unix, UnknownFileSystemError} import java.time.LocalDateTime import java.time.format.DateTimeFormatter @@ -182,6 +170,7 @@ object ArgumentParser { case _ if path.startsWith("/") => Unix.asRight case _ if path.startsWith("hdfs://") => Hdfs.asRight case _ if path.startsWith("s3://") => S3.asRight + case _ if path.startsWith("s3a://") => S3a.asRight case _ => UnknownFileSystemError( s"Couldn't extract filesystem from path $path" diff --git a/src/main/scala/za/co/absa/spark_metadata_tool/io/S3FileManager.scala b/src/main/scala/za/co/absa/spark_metadata_tool/io/S3FileManager.scala index 0c0dad0..4606e65 100644 --- a/src/main/scala/za/co/absa/spark_metadata_tool/io/S3FileManager.scala +++ b/src/main/scala/za/co/absa/spark_metadata_tool/io/S3FileManager.scala @@ -45,7 +45,7 @@ import scala.util.Try import scala.util.Using import scala.util.chaining._ -case class S3FileManager(s3: S3Client) extends FileManager { +case class S3FileManager(s3: S3Client, scheme: String) extends FileManager { import S3FileManager._ implicit private val logger: Logger = org.log4s.getLogger @@ -124,7 +124,7 @@ case class S3FileManager(s3: S3Client) extends FileManager { val bucket = getBucket(baseDir) val prefix = ensureTrailingSlash(getKey(baseDir)) - val builder = new URIBuilder().setScheme("s3").setHost(bucket) + val builder = new URIBuilder().setScheme(scheme).setHost(bucket) val request = ListObjectsV2Request.builder().bucket(bucket).prefix(prefix).build() @@ -171,7 +171,7 @@ case class S3FileManager(s3: S3Client) extends FileManager { private def listBucket(path: Path, filter: FileType): Either[IoError, Seq[Path]] = Try { val bucket = getBucket(path) - val pathPrefix = s"s3://$bucket/" + val pathPrefix = s"$scheme://$bucket/" val rootKey = path.toString.stripPrefix(pathPrefix) val req = ListObjectsV2Request diff --git a/src/main/scala/za/co/absa/spark_metadata_tool/model/TargetFileSystem.scala b/src/main/scala/za/co/absa/spark_metadata_tool/model/TargetFileSystem.scala index 394fca1..d8f6180 100644 --- a/src/main/scala/za/co/absa/spark_metadata_tool/model/TargetFileSystem.scala +++ b/src/main/scala/za/co/absa/spark_metadata_tool/model/TargetFileSystem.scala @@ -29,3 +29,6 @@ case object Hdfs extends TargetFilesystem { case object S3 extends TargetFilesystem { override def pathPrefix: String = "s3://" } +case object S3a extends TargetFilesystem { + override def pathPrefix: String = "s3a://" +} diff --git a/src/test/scala/za/co/absa/spark_metadata_tool/io/S3FileManagerSpec.scala b/src/test/scala/za/co/absa/spark_metadata_tool/io/S3FileManagerSpec.scala index 62d85c5..2a02585 100644 --- a/src/test/scala/za/co/absa/spark_metadata_tool/io/S3FileManagerSpec.scala +++ b/src/test/scala/za/co/absa/spark_metadata_tool/io/S3FileManagerSpec.scala @@ -45,7 +45,7 @@ class S3FileManagerSpec extends AnyFlatSpec with Matchers with OptionValues with private val s3 = mock[S3Client] - private val io = S3FileManager(s3) + private val io = S3FileManager(s3, "s3") private val TMinus10 = Instant.now().minus(Duration.ofMinutes(10))