0

我想知道如何创建一个AWS数据管道,它可以从S3获取json文件并将其导入到DynamoDB表中。我能够创建一些实现这一点的Java代码,但我想通过数据管道来完成。我可以看到有从DynamoDB导出到S3并导入备份的模板,但我正在努力弄清楚如何导入普通的json文件。通过AWS数据管道将json文件导入到DynamoDB

回答

1

在文档中,你会发现从DynamoDb导入和导出数据的例子(http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-importexport-ddb.html

下面是它是如何与文档中描述:

要创建管道

开放AWS数据管道控制台

您看到的第一个屏幕取决于您是否创建了一个 pi在当前区域的皮肤。

如果您尚未在此区域创建管道,控制台将显示一个介绍性屏幕 。立即选择立即开始。

如果您已在此区域中创建管道,则控制台 将显示一个页面,其中列出了该区域的管道。选择 创建新的管道。

在名称中,输入管道的名称。

(可选)在描述中,为管道输入描述。

对于Source,请选择使用模板构建,然后选择以下模板 :从S3导入DynamoDB备份数据。

在参数下,输入S3文件夹 S3:// elasticmapreduce /样品/商店/ ProductCatalog,这是一个样本 数据源,并设置DynamoDB表名到表格的名称。

在Schedule下,选择管道激活。

在流水线配置下,启用记录日志。在日志的S3位置选择文件夹 图标,选择您的一个存储桶或 文件夹,然后选择选择。

如果您愿意,可以改为禁用日志记录。

在Security/Access下,将IAM角色设置为Default。

点击在建筑师中编辑。

接下来,根据活动的结果配置AWS Data 管道执行的Amazon SNS通知操作。

配置成功和失败操作

在右侧窗格中单击活动。

从添加一个可选字段,选择成功。

从新添加的成功,选择新建:操作。

从添加一个可选字段,选择On Fail。

从新添加的失败中选择新建:操作。

在右侧窗格中,单击其他。

对于DefaultAction1,请执行下列操作:

将名称更改为SuccessSnsAlarm。

从类型中选择SnsAlarm。

在主题Arn中,输入您创建的主题的ARN。

输入主题和消息。

对于DefaultAction2,请执行下列操作:

将名称更改为FailureSnsAlarm。

从类型中选择SnsAlarm。

在主题Arn中,输入您创建的主题的ARN。

输入主题和消息。

公共github网站有一些与DynamoDB合作的示例(https://github.com/awslabs/data-pipeline-samples)。以下是管道定义的一个示例:

{ 
    "objects": [ 
     { 
      "occurrences": "1", 
      "period": "1 Day", 
      "name": "RunOnce", 
      "id": "DefaultSchedule", 
      "type": "Schedule", 
      "startAt": "FIRST_ACTIVATION_DATE_TIME", 
     "maxActiveInstances" : "1" 
     }, 
     { 
      "failureAndRerunMode": "CASCADE", 
      "schedule": { 
       "ref": "DefaultSchedule" 
      }, 
      "resourceRole": "DataPipelineDefaultResourceRole", 
      "role": "DataPipelineDefaultRole", 
      "pipelineLogUri": "s3://", 
      "scheduleType": "cron", 
      "name": "Default", 
      "id": "Default" 
     }, 
     { 
      "maximumRetries": "2", 
      "name": "TableBackupActivity", 
      "step": "s3://dynamodb-emr-us-east-1/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbExport,#{myOutputS3Loc}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')},#{myDDBTableName},#{myDDBReadThroughputRatio}", 
      "id": "TableBackupActivity", 
      "runsOn": { 
       "ref": "EmrClusterForBackup" 
      }, 
      "type": "EmrActivity" 
     }, 
     { 
      "bootstrapAction": "s3://elasticmapreduce/bootstrap-actions/configure-hadoop, --yarn-key-value, yarn.nodemanager.resource.memory-mb=12800,--yarn-key-value,yarn.scheduler.minimum-allocation-mb=256,--mapred-key-value,mapreduce.map.memory.mb=500,--mapred-key-value,mapreduce.map.java.opts=-Xmx400M,--mapred-key-value,mapreduce.job.reduce.slowstart.completedmaps=1,--mapred-key-value,mapreduce.map.speculative=false", 
      "name": "EmrClusterForBackup", 
      "amiVersion": "3.8.0", 
      "id": "EmrClusterForBackup", 
      "type": "EmrCluster", 
      "masterInstanceType": "m1.medium", 
      "coreInstanceType": "#{myInstanceType}", 
      "coreInstanceCount": "#{myInstanceCount}", 
     "terminateAfter" : "12 hours" 
     } 
    ], 
    "parameters": [ 
     { 
      "description": "OutputS3folder", 
      "id": "myOutputS3Loc", 
      "type": "AWS::S3::ObjectKey" 
     }, 
     { 
      "default": "0.2", 
      "watermark": "Valuebetween0.1-1.0", 
      "description": "DynamoDB Read Throughput Ratio", 
      "id": "myDDBReadThroughputRatio", 
      "type": "Double" 
     }, 
     { 
      "description": "DynamoDB Table Name", 
      "id": "myDDBTableName", 
      "type": "String" 
     }, 
     { 
      "description": "Instance Type", 
      "id": "myInstanceType", 
      "watermark" : "Use m1.medium if Read Capacity Units for the job <= 900. Else use m3.xlarge", 
      "type": "String", 
      "default": "m3.xlarge" 
     }, 
     { 
      "description": "Instance Count", 
     "watermark" : "(Read Capacity Units/300) for m1.medium if RCU <= 900. Else (RCU/1500) for m3.xlarge", 
      "id": "myInstanceCount", 
      "type": "Integer", 
      "default": "1" 
     }, 
    { 
     "description" : "Burst IOPs", 
     "watermark" : "Add IOPS to the DDB table by this percent for the duration of the export job", 
      "id"   : "myBurstIOPS", 
      "type"  : "Double", 
      "default"  : "0.0" 
    } 
    ] 
} 
相关问题