0

我在使用scala中的kinesis应用程序运行简单的vanilla spark流时遇到了一些问题。我在一些教程中遵循了基本指导,如SnowplowWordCountASLSpark Streaming Kinesis集成:初始化Worker中的LeaseCoordinator时出错

但我仍然不能让它因为这室壁运动工作者错误的工作:

16/11/15 09:00:27 ERROR Worker: Caught exception when initializing LeaseCoordinator 
com.amazonaws.services.kinesis.leases.exceptions.DependencyException: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain 
    at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:125) 
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.initialize(KinesisClientLibLeaseCoordinator.java:173) 
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:374) 
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:318) 
    at org.apache.spark.streaming.kinesis.KinesisReceiver$$anon$1.run(KinesisReceiver.scala:174) 
Caused by: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain 
    at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117) 
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:1758) 
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.createTable(AmazonDynamoDBClient.java:822) 
    at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:118) 
    ... 4 more 

这里是我的代码示例:

import com.amazonaws.auth.BasicAWSCredentials 
import com.amazonaws.internal.StaticCredentialsProvider 
import com.amazonaws.regions.RegionUtils 
import com.amazonaws.services.kinesis.AmazonKinesisClient 
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming.kinesis.KinesisUtils 
import org.apache.spark.SparkConf 
import org.apache.spark.streaming.{Milliseconds, StreamingContext} 

/** 
    * Created by franco on 11/11/16. 
    */ 
object TestApp { 
    // === Configurations for Kinesis streams === 
    val awsAccessKeyId = "XXXXXX" 
    val awsSecretKey = "XXXXXXX" 
    val kinesisStreamName = "MyStream" 
    val kinesisEndpointUrl = "https://kinesis.region.amazonaws.com" //example "https://kinesis.us-west-2.amazonaws.com" 
    val appName = "MyAppName" 

    def main(args: Array[String]): Unit = { 

    val credentials = new BasicAWSCredentials(awsAccessKeyId,awsSecretKey) 

    val provider = new StaticCredentialsProvider(credentials) 

    val kinesisClient = new AmazonKinesisClient(provider) 
    kinesisClient.setEndpoint(kinesisEndpointUrl) 

    val shards = kinesisClient.describeStream(kinesisStreamName).getStreamDescription.getShards.size() 

    val streams = shards 

    val batchInterval = Milliseconds(2000) 

    val kinesisCheckpointInterval = batchInterval 

    val regionName = RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName 

    val cores : Int = Runtime.getRuntime.availableProcessors() 
    println("Available Cores : " + cores.toString) 
    val config = new SparkConf().setAppName("MyAppName").setMaster("local[" + (cores/2) + "]") 
    val ssc = new StreamingContext(config, batchInterval) 

    // Create the Kinesis DStreams 
    val kinesisStreams = (0 until streams).map { i => 
     KinesisUtils.createStream(ssc, appName, kinesisStreamName, kinesisEndpointUrl, regionName, 
     InitialPositionInStream.LATEST, kinesisCheckpointInterval * 2, StorageLevel.MEMORY_AND_DISK_2) 
    } 

    ssc.union(kinesisStreams).map(bytes => new String(bytes)).print() 
    // Start the streaming context and await termination 
    ssc.start() 
    ssc.awaitTermination() 
    } 


} 

我的IAM策略是这样的:

{ 
    "Version": "2012-10-17", 
    "Statement": [ 
     { 
      "Sid": "Stmt123", 
      "Effect": "Allow", 
      "Action": [ 
       "kinesis:DescribeStream", 
       "kinesis:GetShardIterator", 
       "kinesis:GetRecords" 
      ], 
      "Resource": [ 
       "arn:aws:kinesis:region:account:stream/name" 
      ] 
     }, 
     { 
      "Sid": "Stmt456", 
      "Effect": "Allow", 
      "Action": [ 
       "dynamodb:CreateTable", 
       "dynamodb:DeleteItem", 
       "dynamodb:DescribeTable", 
       "dynamodb:GetItem", 
       "dynamodb:PutItem", 
       "dynamodb:Scan", 
       "dynamodb:UpdateItem" 
      ], 
      "Resource": [ 
       "arn:aws:dynamodb:region:account:table/name" 
      ] 
     }, 
     { 
      "Sid": "Stmt789", 
      "Effect": "Allow", 
      "Action": [ 
       "cloudwatch:PutMetricData" 
      ], 
      "Resource": [ 
       "*" 
      ] 
     } 
    ] 
} 

我不明白这个应用程序有什么问题。任何关于这个问题的指导将不胜感激。

回答

1

还有其他DStream的构造函数可以让您传入AWS访问密钥和密钥。

例如,下面链接中的第1个和第5个构造函数将允许您在构造函数中传递它们(并且应该通过系统传递),而不必设置系统属性。

KinesisUtil Constructors

1

最终我通过将凭据值设置为系统属性来实现它。

System.setProperty("aws.accessKeyId","XXXXXX") 
System.setProperty("aws.secretKey","XXXXXX") 

但是我对这个解决方案并不满意。

您是否认为有关于此方法的问题?