2013-02-06 45 views
0

我正在编写自己的Pig Store类,我不想将其存储在文件中,我打算将它发送给某些第三方数据存储(缺少API调用)。未设置Hadoop Pig输出目录

注意:我在Cloudera的VirtualBox映像上运行它。

我写我的java类(见下表),并创建mystore.jar我使用这下面id.pig脚本:

store B INTO 'mylocation' USING MyStore('mynewlocation') 

运行此脚本以猪,同时,我看到下面的错误: 错误6000: 输出位置验证失败:'file://home/cloudera/test/id.out更多信息如下: 未设置输出目录。

or.apache.pig.impl.plan.VisitorException: ERROR 6000: 
at or.apache.pig.newplan.logical.rules.InputOutputFileValidator$InputOutputFileValidator.visit(InputOutputFileValidator.java:95) 

请帮忙!

-------------------- MyStore.java ----------------------

public class MyStore extends StoreFunc { 
    protected RecordWriter writer = null; 
    private String location = null; 


    public MyStore() { 
     location= null; 
    } 

    public MyStore (String location) { 
     this.location= location; 
    } 

    @Override 
    public OutputFormat getOutputFormat() throws IOException { 
     return new MyStoreOutputFormat(location); 
    } 

    @Override 
    public void prepareToWrite(RecordWriter writer) throws IOException { 
     this.writer = writer; 
    } 

    @Override 
    public void putNext(Tuple tuple) throws IOException { 
     //write tuple to location 

     try { 
      writer.write(null, tuple.toString()); 
     } catch (InterruptedException e) {   
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public void setStoreLocation(String location, Job job) throws IOException { 
     if(location!= null) 
      this.location= location; 
    } 

} 

-------------------- MyStoreOutputFormat.java --------------------- -

import java.io.DataOutputStream; 
import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.pig.data.Tuple; 

public class MyStoreOutputFormat extends 
     TextOutputFormat<WritableComparable, Tuple> { 
    private String location = null; 

    public MyStoreOutputFormat(String location) { 

     this.location = location; 
    } 

    @Override 
    public RecordWriter<WritableComparable, Tuple> getRecordWriter(
      TaskAttemptContext job) throws IOException, InterruptedException { 

     Configuration conf = job.getConfiguration(); 

     String extension = location; 
     Path file = getDefaultWorkFile(job, extension);  
     FileSystem fs = file.getFileSystem(conf); 

     FSDataOutputStream fileOut = fs.create(file, false); 

     return new MyStoreRecordWriter(fileOut); 
    } 

    protected static class MyStoreRecordWriter extends 
      RecordWriter<WritableComparable, Tuple> { 

     DataOutputStream out = null; 

     public MyStoreRecordWriter(DataOutputStream out) { 
      this.out = out; 
     } 

     @Override 
     public void close(TaskAttemptContext taskContext) throws IOException, 
       InterruptedException { 
      // close the location 
     } 

     @Override 
     public void write(WritableComparable key, Tuple value) 
       throws IOException, InterruptedException { 

      // write the data to location 
      if (out != null) { 
       out.writeChars(value.toString()); // will be calling API later. let me first dump to the location! 
      } 
     } 

    } 
} 

我错过了什么吗?

+0

请帮忙。我迫切需要它。谢谢! –

回答

1

首先,我认为你应该使用作业配置存储位置值,而不是变量实例

计划作业时你的任务在setStoreLocation方法的局部变量“位置”被调用,但getOutputFormat调用可能不会在执行阶段执行,届时位置变量可能不再设置(您的类的新实例可能已创建)。

如果你看看来源为PigStorage.setStoreLocation,你应该注意到他们店的位置在任务配置(2号线):

@Override 
public void setStoreLocation(String location, Job job) throws IOException { 
    job.getConfiguration().set("mapred.textoutputformat.separator", ""); 
    FileOutputFormat.setOutputPath(job, new Path(location)); 

    if("true".equals(job.getConfiguration().get("output.compression.enabled"))) { 
     FileOutputFormat.setCompressOutput(job, true); 
     String codec = job.getConfiguration().get("output.compression.codec"); 
     try { 
      FileOutputFormat.setOutputCompressorClass(job, (Class<? extends CompressionCodec>) Class.forName(codec)); 
     } catch (ClassNotFoundException e) { 
      throw new RuntimeException("Class not found: " + codec); 
     } 
    } else { 
     // This makes it so that storing to a directory ending with ".gz" or ".bz2" works. 
     setCompression(new Path(location), job); 
    } 
} 

所以,我认为你应该存储在工作变量的位置:

@Override 
public void setStoreLocation(String location, Job job) throws IOException { 
    if(location!= null) 
     job.getConfiguration().set("mylocation", location); 
} 

其中自定义输出格式就可以在createRecordReader方法提取:

@Override 
public RecordWriter<WritableComparable, Tuple> getRecordWriter(
     TaskAttemptContext job) throws IOException, InterruptedException { 

    Configuration conf = job.getConfiguration(); 

    String extension = conf.get("mylocation"); 
    Path file = getDefaultWorkFile(job, extension);  
    FileSystem fs = file.getFileSystem(conf); 

    FSDataOutputStream fileOut = fs.create(file, false); 

    return new MyStoreRecordWriter(fileOut); 
} 

最后(也可能是您看到的错误的实际原因),输出格式扩展了TextOutputFormat,并且在记录编写器中使用getDefaultWorkFile方法 - 此方法需要知道将文件输出到HDFS的位置,并且您的setStoreLocation方法中没有调用FileOutputFormat.setOutputPath(job, new Path(location));(请参阅我之前粘贴的PigStorage.setStoreLocation方法)。所以错误是因为它不知道在哪里创建默认工作文件。

+0

谢谢克里斯。我错过了“FileOutputFormat.setOutputPath(job,new Path(location));”呼叫。根据你的输入改变我的代码。 –