2017-06-06 97 views
1

我试图从S3中读取大块文件到块中,而没有为并行处理切割任何行。如何从S3使用aws-java-sdk读取块的文件块

让我通过示例来解释: S3上有1G大小的文件。我想把这个文件分成64MB的卡盘。这很容易,我可以这样做:

S3Object s3object = s3.getObject(new GetObjectRequest(bucketName, key)); 

InputStream stream = s3object.getObjectContent(); 

byte[] content = new byte[64*1024*1024]; 

while (stream.read(content) != -1) { 

//process content here 

} 

但块的问题是它可能有100个完整的行和一个不完整的。但我不能处理不完整的行,不想丢弃它。

是否有办法处理这种情况?意味着所有的卡盘都没有偏线。

回答

1

aws-java-sdk已经为您的S3对象提供了流式传输功能。你必须调用“getObject”,结果将是一个InputStream。

1)AmazonS3Client.getObject(GetObjectRequest getObjectRequest) - > S3Object

2)S3Object.getObjectContent()

注意:该方法是一个简单的吸气剂,并不实际创建 流。如果您检索S3Object,应尽快关闭此输入流,因为对象内容不是缓冲在内存中的 ,而是直接从Amazon S3流式传输。此外, 未能关闭此流可能导致请求池变为 被阻止。

aws java docs

1

100整条生产线和一个不完整的

你的意思是,你需要逐行读取流线?如果是这样,而不是使用InputStream尝试使用BufferedReader读取s3对象流,以便您可以逐行读取流,但我认为这会比chunk慢一点。

 S3Object s3object = s3.getObject(new GetObjectRequest(bucketName, key)); 
     BufferedReader in = new BufferedReader(new InputStreamReader(s3object.getObjectContent())); 
     String line; 
     while ((line = in.readLine()) != null) { 

//process line here 

     } 
0

我通常的方法(InputStream - >BufferedReader.lines() - 线>批次 - >CompletableFuture)不会在这里工作,因为底层S3ObjectInputStream超时最终的大文件。

所以我创建了一个新的类S3InputStream,它并不关心它是如何长的开放和利用短暂的AWS SDK调用按需读取字节块。你提供了一个byte[]将被重用。 new byte[1 << 24](16Mb)似乎运作良好。

package org.harrison; 

import java.io.IOException; 
import java.io.InputStream; 

import com.amazonaws.services.s3.AmazonS3; 
import com.amazonaws.services.s3.AmazonS3ClientBuilder; 
import com.amazonaws.services.s3.model.GetObjectRequest; 

/** 
* An {@link InputStream} for S3 files that does not care how big the file is. 
* 
* @author stephen harrison 
*/ 
public class S3InputStream extends InputStream { 
    private static class LazyHolder { 
     private static final AmazonS3 S3 = AmazonS3ClientBuilder.defaultClient(); 
    } 

    private final String bucket; 
    private final String file; 
    private final byte[] buffer; 
    private long lastByteOffset; 

    private long offset = 0; 
    private int next = 0; 
    private int length = 0; 

    public S3InputStream(final String bucket, final String file, final byte[] buffer) { 
     this.bucket = bucket; 
     this.file = file; 
     this.buffer = buffer; 
     this.lastByteOffset = LazyHolder.S3.getObjectMetadata(bucket, file).getContentLength() - 1; 
    } 

    @Override 
    public int read() throws IOException { 
     if (next >= length) { 
      fill(); 

      if (length <= 0) { 
       return -1; 
      } 

      next = 0; 
     } 

     if (next >= length) { 
      return -1; 
     } 

     return buffer[this.next++]; 
    } 

    public void fill() throws IOException { 
     if (offset >= lastByteOffset) { 
      length = -1; 
     } else { 
      try (final InputStream inputStream = s3Object()) { 
       length = 0; 
       int b; 

       while ((b = inputStream.read()) != -1) { 
        buffer[length++] = (byte) b; 
       } 

       if (length > 0) { 
        offset += length; 
       } 
      } 
     } 
    } 

    private InputStream s3Object() { 
     final GetObjectRequest request = new GetObjectRequest(bucket, file).withRange(offset, 
       offset + buffer.length - 1); 

     return LazyHolder.S3.getObject(request).getObjectContent(); 
    } 
}