Доступ к общедоступному файлу Amazon S3 из Apache Spark

У меня есть общедоступный ресурс Amazon s3 (текстовый файл), и я хочу получить к нему доступ из spark. Это означает - у меня нет учетных данных Amazon - все работает нормально, если я хочу просто загрузить его:

val bucket = "<my-bucket>"
val key = "<my-key>"

val client = new AmazonS3Client
val o = client.getObject(bucket, key)
val content = o.getObjectContent // <= can be read and used as input stream

Однако, когда я пытаюсь получить доступ к тому же ресурсу из контекста spark

val conf = new SparkConf().setAppName("app").setMaster("local")
val sc = new SparkContext(conf)
val f = sc.textFile(s"s3a://$bucket/$key")
println(f.count())

Я получаю следующую ошибку со stacktrace:

Exception in thread "main" com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
    at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
    at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
    at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2653)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:221)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
    at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1099)
    at com.example.Main$.main(Main.scala:14)
    at com.example.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

Я не хочу предоставлять учетные данные AWS - я просто хочу получить доступ к ресурсу анонимно (пока) - как этого добиться? Мне, вероятно, нужно заставить его использовать что-то вроде AnonymousAWSCredentialsProvider - но как поместить его в spark или hadoop?

P.S. Мой build.sbt на всякий случай

scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.4.1",
  "org.apache.hadoop" % "hadoop-aws" % "2.7.1"
)

ОБНОВЛЕНО: После того, как я провел некоторые исследования - я вижу причину, по которой он не работает.

Прежде всего, S3AFileSystem создает клиента AWS со следующим порядком учетных данных:

AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain(
    new BasicAWSCredentialsProvider(accessKey, secretKey),
    new InstanceProfileCredentialsProvider(),
    new AnonymousAWSCredentialsProvider()
);

Значения «accessKey» и «secretKey» берутся из экземпляра spark conf (ключи должны быть «fs.s3a»). .access.key "и" fs.s3a.secret.key "или org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY и org.apache.hadoop.fs.s3a.Constants.SECRET_KEY, что более удобно) .

Второе - вы, вероятно, видите, что AnonymousAWSCredentialsProvider - это третий вариант (последний приоритет) - что может быть не так с этим? См. Реализацию AnonymousAWSCredentials:

public class AnonymousAWSCredentials implements AWSCredentials {

    public String getAWSAccessKeyId() {
        return null;
    }

    public String getAWSSecretKey() {
        return null;
    }
}

Он просто возвращает ноль как для ключа доступа, так и для секретного ключа. Звучит разумно. Но загляните внутрь AWSCredentialsProviderChain:

AWSCredentials credentials = provider.getCredentials();

if (credentials.getAWSAccessKeyId() != null &&
    credentials.getAWSSecretKey() != null) {
    log.debug("Loading credentials from " + provider.toString());

    lastUsedProvider = provider;
    return credentials;
}

Он не выбирает провайдера, если оба ключа нулевые - это означает, что анонимные учетные данные не могут работать. Похоже, ошибка внутри aws-java-sdk-1.7.4. Я пытался использовать последнюю версию - но она несовместима с hadoop-aws-2.7.1.

Есть еще идеи?

10
задан pkozlov 19 July 2015 в 11:20
поделиться