2012-09-24 84 views
2

只需简单地通过发送连接字段作为简化键,便可轻松地通过单个键连接数据集。 但是,通过几个键来连接记录,其中至少有一个相同的记录对我来说并不那么容易。如何实现OR加入hadoop(烫伤/级联)

比如我的日志,我想通过用户参数组他们,我想(ip地址,的sessionId,visitorCockies)将这些

所以LOG1应的log 2进行分组,如果log1.ip ==的log 2。 ip或者log1.session = log2.session或者log1.cockie = log2.coockie。也许有可能创建组合密钥或一些概率方法,如minHash ...

这可能吗?

回答

0

问题是MapReduce连接通常是通过给某些字段上匹配的记录使用相同的reduce键来实现的,以便它们被发送到同一个reducer。因此,任何解决这个问题的方法都会有些破绽,但这是可能的...

下面是我会推荐的:对于每个输入记录,生成三个副本,每个副本都带有一个新的“键”字段以来自它的字段为前缀。因此,例如,说你有以下输入:

(ip=1.2.3.4, session=ABC, cookie=123) 
(ip=3.4.5.6, session=DEF, cookie=456) 

,那么你会产生

(ip=1.2.3.4, session=ABC, cookie=123, key=ip_1.2.3.4) 
(ip=1.2.3.4, session=ABC, cookie=123, key=session_ABC) 
(ip=1.2.3.4, session=ABC, cookie=123, key=cookie_123) 
(ip=3.4.5.6, session=DEF, cookie=456, key=ip_3.4.5.6) 
(ip=3.4.5.6, session=DEF, cookie=456, key=session_DEF) 
(ip=3.4.5.6, session=DEF, cookie=456, key=cookie_456) 

然后你可以简单地在这个新的领域组。

我并不太熟悉烫伤/级联(尽管我一直想要了解更多关于它的内容),但这肯定会符合Hadoop中通常如何进行连接。

+0

这样,我会得到3个不同的重叠群(每个键repectivelly相等),所以我需要一种方法将它们合并到单个组 – yura

+0

@yura如果左记录可以与多个正确的记录,然后加入通常是加入会让他们没有参与(如此重复的价值)。原因是合并会导致未定义的元组大小(表格宽度),您可能会得到1,2或3个正确的记录。因此这个解决方案是正确的,(但缺乏细节和基本的实现;)。 – samthebest

0

按照上面的Joe所述创建单独的连接后,您需要删除重复项。如果您在“OR-join”中使用的所有字段中的值相同,则数据中的两个元组是重复的。因此,如果您之后对代表所有相关字段的键进行自然连接,则会将所有重复项组合在一起。因此,你可以用一个单独的元组来替换它们。我们来看一个例子:假设你已经有字段(A,B,C,D)的元组,并且你感兴趣的字段是A,B和C.你首先要做equi-join分别在A,B和C上。对于每一个,你都会自己加入初始元组流。用(A0,B0,C0,D0)表示第一个流,用(A1,B1,C1,D1)表示第二个流。结果将是元组(A0,B0,C0,D0,A1,B1,C1,D1)。对于每个元组,您都可以创建一个元组(A0A1B0B1C0C1,A0,B0,C0,D0,A1,B1,C1,D1),这样所有的副本都将在后续的缩减器中组合在一起。对于每个组,只返回其中一个包含的元组。

0

你能描述更多关于“通过几个键加入记录”吗?

如果您知道工作流中可以连接特定键的点,最好的方法可能是定义一个具有多个连接的流,而不是尝试操纵复杂的数据结构来解析N个键一步。

这里是一个示例应用程序,它展示了如何处理不同类型的连接在级联:https://github.com/Cascading/CoPA

0

级联,我结束了创建,如果内部的任何条件的输出或是真实的,其检查过滤器。级联滤波器输出可以选择使用的True/False值。

0

提示:使用类型别名,让您的烫码好的阅读

注意0:该解决方案是特别好的,因为它总是会仅有1 mapred工作,即使有多个键,加入。

注1:假设每个管道没有重复的键,否则你必须使'键也有一个索引,它来自哪个日志,而mapTo将是一个flatMapTo并且有点复杂。

注意2:为了简单起见,这将放弃连接字段,让他们你需要一个很大的丑陋元组(ip1,ip2,session1,session2,...等)。如果你真的想要,我可以写出一个例子来保持它们。

注3:如果你真的想合并重复的值,你可以用每个logEntry1和logEntry2的GROUPBY按照此,产生logEntryList,然后猫(如在评论中提及这是不正常的加入)。这将创建2个更多的mapred作业。

type String2 = (String, String) 
type String3 = (String, String, String) 

def addKey(log: Pipe): Pipe = log.flatMap[String3, String](('ip, 'session, 'cookie) -> 'key)(
    _.productIterator.toList.zipWithIndex.map { 
    case (key: String, index: Int) => index.toString + key 
    } 
) 

(addKey(log1) ++ addKey(log2)).groupBy('key)(_.toList[String]('logEntry -> 'group)) 
.mapTo[Iterable[String], String2]('group -> ('logEntry1, 'logEntry2))(list => (list.head, list.last))