0

我试图实现的目标是将S3中的对象从一个帐户(A1 - 不受我控制)复制到另一个帐户中的S3(A2 - 受控由我)。 对于A1中的OPS,我提供了一个角色,我可以使用boto3库。AWS提供从另一个帐户访问假定的角色以访问我帐户中的S3

session = boto3.Session() 
sts_client = session.client('sts') 

assumed_role = sts_client.assume_role(
    RoleArn="arn:aws:iam::1234567890123:role/organization", 
    RoleSessionName="blahblahblah" 
) 

这部分没有问题。 问题是从S3到S3的直接复制失败,因为假设的角色无法访问我的S3。

s3 = boto3.resource('s3') 
copy_source = { 
    'Bucket': a1_bucket_name, 
    'Key': key_name 
} 

bucket = s3.Bucket(a2_bucket_name) 
bucket.copy(copy_source, hardcoded_key) 

由于这个结果我得到

botocore.exceptions.ClientError: An error occurred (403) when calling the HeadObject operation: Forbidden 

在这行代码:

bucket.copy(copy_source, hardcoded_key) 

有没有什么办法可以授予访问我的S3为假定角色? 我真的很想在没有在本地下载文件之前直接上传S3到S3副本。

请告知是否有比这更好的方法。

想法是让此脚本每天在AWS数据管道内运行,例如。

回答

1

要从一个S3桶对象复制到另一个S3桶,则需要使用一个组的两个S3桶之间传送数据可以访问这两个存储桶的AWS凭证。

如果这些分组在不同的AWS帐号,你需要两件事情:

  1. 凭据目标桶,并
  2. 水桶政策上的源桶允许读取目标AWS帐户。

单凭这些,您可以复制对象。您不需要源帐户的凭据。

  1. 添加水桶政策,您桶允许读取目标AWS账户。

下面是一个简单的政策:

{ 
    "Version": "2012-10-17", 
    "Statement": [ 
     { 
      "Sid": "DelegateS3Access", 
      "Effect": "Allow", 
      "Principal": { 
       "AWS": "arn:aws:iam::123456789012:root" 
      }, 
      "Action": "s3:*", 
      "Resource": [ 
       "arn:aws:s3:::BUCKET_NAME", 
       "arn:aws:s3:::BUCKET_NAME/*" 
      ] 
     } 
    ] 
} 

一定要与您的桶名称来代替BUCKET_NAME。并将123456789012替换为您的目标 AWS账号。

  1. 为您的目标使用凭据 AWS账户(目标存储桶的所有者)执行复制。

其他注意事项:

您也可以通过颠倒两个要求复制对象:

  1. 凭据源AWS账户,并
  2. 目标水桶的桶和政策允许编写访问源AWS账户。

但是,这样做时,对象元数据不会被正确复制。我已经与AWS Support讨论过这个问题,他们建议阅读外部帐户而不是写入外部帐户以避免此问题。

0

这是一个示例代码来使用2个不同的AWS帐户博托3.

from boto.s3.connection import S3Connection 
from boto.s3.key import Key 
from Queue import LifoQueue 
import threading 

source_aws_key = '*******************' 
source_aws_secret_key = '*******************' 
dest_aws_key = '*******************' 
dest_aws_secret_key = '*******************' 
srcBucketName = '*******************' 
dstBucketName = '*******************' 

class Worker(threading.Thread): 
    def __init__(self, queue): 
     threading.Thread.__init__(self) 
     self.source_conn = S3Connection(source_aws_key, source_aws_secret_key) 
     self.dest_conn = S3Connection(dest_aws_key, dest_aws_secret_key) 
     self.srcBucket = self.source_conn.get_bucket(srcBucketName) 
     self.dstBucket = self.dest_conn.get_bucket(dstBucketName) 
     self.queue = queue 

    def run(self): 
     while True: 
      key_name = self.queue.get() 
      k = Key(self.srcBucket, key_name) 
      dist_key = Key(self.dstBucket, k.key) 
      if not dist_key.exists() or k.etag != dist_key.etag: 
       print 'copy: ' + k.key 
       self.dstBucket.copy_key(k.key, srcBucketName, k.key, storage_class=k.storage_class) 
      else: 
       print 'exists and etag matches: ' + k.key 

      self.queue.task_done() 

def copyBucket(maxKeys = 1000): 
    print 'start' 

    s_conn = S3Connection(source_aws_key, source_aws_secret_key) 
    srcBucket = s_conn.get_bucket(srcBucketName) 

    resultMarker = '' 
    q = LifoQueue(maxsize=5000) 

    for i in range(10): 
     print 'adding worker' 
     t = Worker(q) 
     t.daemon = True 
     t.start() 

    while True: 
     print 'fetch next 1000, backlog currently at %i' % q.qsize() 
     keys = srcBucket.get_all_keys(max_keys = maxKeys, marker = resultMarker) 
     for k in keys: 
      q.put(k.key) 
     if len(keys) < maxKeys: 
      print 'Done' 
      break 
     resultMarker = keys[maxKeys - 1].key 

    q.join() 
    print 'done' 

if __name__ == "__main__": 
    copyBucket()