2014-07-11 55 views
0

假设我有以下的JSON:猪的新手,如何将JSON转换为另一个JSON,并使用猪的关键值对子集?

{ 
"state":"VA", 
"fruit":[ 
{"name":"Bannana", 
"color":"Yellow", 
"cost":1.6 
}, 
{"name":"Apple", 
"color":"Red" 
"cost":1.4 
} 
]} 

在猪,我怎么变换上述以下:

{ 
"state":"VA", 
"fruit":[ 
{"name":"Bannana",, 
"cost":1.6 
}, 
{"name":"Apple", 
"cost":1.4 
} 
]} 

我已经试过:

A = #load file 
B = FOREACH A GENERATE 
state, 
fruit.name, 
fruit.cost; 

和如下:

A = #load file 
B = FOREACH A GENERATE 
state, 
fruit as (m:bag{FruitInfo.(tuple(name:string, cost:double))}); 

似乎不管我做什么我一直在获取嵌套数组。我正在尝试做什么?我选择了猪的数据转换能力。请注意,数据使用AvroStorage加载。

+0

看看这里:http://pig.apache.org/docs/r0 .12.0/basic.html – aelor

+0

尝试使用UDF。我已将工作代码发布为答案。任何限制不使用UDF? –

回答

0

我怀疑它是否可以在没有UDF的情况下完成。

的Java UDF:

package com.example.exp1; 

import java.io.IOException; 
import java.util.Iterator; 

import org.apache.pig.EvalFunc; 
import org.apache.pig.data.BagFactory; 
import org.apache.pig.data.DataBag; 
import org.apache.pig.data.DataType; 
import org.apache.pig.data.Tuple; 
import org.apache.pig.data.TupleFactory; 
import org.apache.pig.impl.logicalLayer.schema.Schema; 
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; 

public class ReformatBag extends EvalFunc<DataBag> { 

    @Override 
    public DataBag exec(Tuple input) throws IOException { 

     DataBag returnBag = BagFactory.getInstance().newDefaultBag(); 
     if (input == null || input.size() == 0 || input.get(0) == null) 
      return null; 
     try { 
      // Get the bag 
      DataBag bag = DataType.toBag(input.get(0)); 
      // Iterate throughout the bag 
      Iterator it = bag.iterator(); 
      while (it.hasNext()) { 
       // assign the current 
       Tuple t = (Tuple) it.next(); 
       // Create a new Tuple of size 2 where the refactor the data 
       Tuple reformatTuple = TupleFactory.getInstance().newTuple(2); 
       // Add 1st field (name) 
       reformatTuple.set(0, t.get(0)); 
       // Add 3rd field (price) 
       reformatTuple.set(1, t.get(2)); 
       //add to the bag. Continue iterating. 
       returnBag.add(reformatTuple); 
      } 
     } catch (Exception e) { 
      throw new IOException("Caught exception processing input row ", e); 
     } 
     return returnBag; 
    } 

    public Schema outputSchema(Schema input) { 
     try { 
      Schema outputSchema = new Schema(); 
      Schema bagSchema = new Schema(); 
      // Tuple schema which holds name and cost schema 
      Schema innerTuple = new Schema(); 
      innerTuple.add(new FieldSchema("name", DataType.CHARARRAY)); 
      innerTuple.add(new FieldSchema("cost", DataType.FLOAT)); 

      // Add the tuple schema holding name & cost to the bag schema 
      bagSchema.add(new FieldSchema("t1", innerTuple, DataType.TUPLE)); 
      // Return type of the UDF 
      outputSchema.add(new FieldSchema("fruit", bagSchema, DataType.BAG)); 
      return outputSchema; 
     } catch (Exception e) { 
      return null; 
     } 
    } 

} 

猪代码:

REGISTER /path/to/jar/exp1-1.0-SNAPSHOT.jar; 

input_data = LOAD 'text.json' 
     USING JsonLoader('state:chararray,fruit:{(name:chararray,color:chararray,cost:float)}'); 

reformat_data = FOREACH input_data 
     GENERATE state,com.example.exp1.ReformatBag(fruit); 
STORE 
     reformat_data 
     INTO 'output.json' 
     USING JsonStorage(); 

输出:

{"state":"VA","fruit":[{"name":"Bannana","cost":1.6},{"name":"Apple","cost":1.4}]} 
相关问题