2013-08-21 59 views
2

我需要将map-reduce程序的输出存储到数据库中,那么有什么办法吗?将Apache Hadoop数据输出存储到Mysql数据库

如果是这样,是否有可能根据需要将输出存储到多列&表中?

请给我建议一些解决方案。

谢谢。

+0

检查了这一点:http://blog.cloudera.com/blog/2009/03/database-access-with-hadoop/具体的写作结果返回到数据库“一节 – Amar

+0

可以使用['Sqoop'](http://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html)与Thak you Amar进行数据传输以及来回 –

+0

。 – HarshaKP

回答

4

伟大的例子显示on this blog,我尝试过了,它会真的很好。我引用了代码中最重要的部分。

首先,您必须创建一个表示您想要存储的数据的类。这个类必须实现DBWritable接口:

public class DBOutputWritable implements Writable, DBWritable 
{ 
    private String name; 
    private int count; 

    public DBOutputWritable(String name, int count) { 
    this.name = name; 
    this.count = count; 
    } 

    public void readFields(DataInput in) throws IOException { } 

    public void readFields(ResultSet rs) throws SQLException { 
    name = rs.getString(1); 
    count = rs.getInt(2); 
    } 

    public void write(DataOutput out) throws IOException { } 

    public void write(PreparedStatement ps) throws SQLException { 
    ps.setString(1, name); 
    ps.setInt(2, count); 
    } 
} 

在减速创建先前定义的类的对象:

public class Reduce extends Reducer<Text, IntWritable, DBOutputWritable, NullWritable> { 

    protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) { 
    int sum = 0; 

    for(IntWritable value : values) { 
     sum += value.get(); 
    } 

    try { 
     ctx.write(new DBOutputWritable(key.toString(), sum), NullWritable.get()); 
    } catch(IOException e) { 
     e.printStackTrace(); 
    } catch(InterruptedException e) { 
     e.printStackTrace(); 
    } 
    } 
} 

最后,你必须配置你的数据库连接(不要忘记添加你的数据库连接器类路径)并注册您的映射器和reducer的输入/输出数据类型。

public class Main 
{ 
    public static void main(String[] args) throws Exception 
    { 
    Configuration conf = new Configuration(); 
    DBConfiguration.configureDB(conf, 
    "com.mysql.jdbc.Driver", // driver class 
    "jdbc:mysql://localhost:3306/testDb", // db url 
    "user", // username 
    "password"); //password 

    Job job = new Job(conf); 
    job.setJarByClass(Main.class); 
    job.setMapperClass(Map.class); // your mapper - not shown in this example 
    job.setReducerClass(Reduce.class); 
    job.setMapOutputKeyClass(Text.class); // your mapper - not shown in this example 
    job.setMapOutputValueClass(IntWritable.class); // your mapper - not shown in this example 
    job.setOutputKeyClass(DBOutputWritable.class); // reducer's KEYOUT 
    job.setOutputValueClass(NullWritable.class); // reducer's VALUEOUT 
    job.setInputFormatClass(...); 
    job.setOutputFormatClass(DBOutputFormat.class); 

    DBInputFormat.setInput(...); 

    DBOutputFormat.setOutput(
    job, 
    "output", // output table name 
    new String[] { "name", "count" } //table columns 
    ); 

    System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
} 
相关问题