2017-07-17 37 views
0

我想用我是Spark Noob(刚刚在4天前开始阅读一本书)前言这个问题。尽管如此,我正尝试移植一些我在Python中的Pandas库的帮助下编写的内容,以便我可以利用我们刚刚创建的集群。在大熊猫数据帧df的数据是这样的:迭代抛出DataFrame中的行并设置Spark中的值

+---------------------+-----------+-------+-------------+----------------------+ 
|  TimeStamp  | Customer | User | Application | TimeSinceApplication | 
+---------------------+-----------+-------+-------------+----------------------+ 
| 2017-01-01 00:00:01 | customer1 | user1 | app1  |    NaN | 
| 2017-01-01 00:01:01 | customer1 | user2 | app2  |    NaN | 
| 2017-01-01 00:02:01 | customer1 | user1 | app2  |    NaN | 
| 2017-01-01 00:03:01 | customer1 | user1 | app1  |    NaN | 
+---------------------+-----------+-------+-------------+----------------------+ 

在Python中,我写道:

unique_users = df.User.unique().tolist() 
for j in range(0, len(unique_users): 
    user = unique_users[0] 
    access_events_for_user = df[df.User == user].copy() 
    indexes_for_access_events = access_events_for_user.index 
    applications_used = dict() 
    for i in range(0, len(access_events)): 
     current_access_event_ts = df.loc[current_auth_index].TimeStamp 
     if i == 0: 
      current_access_event_index = int(indexes_for_access_events[i]) 
      df[current_access_event_index, 'TimeSinceApplicaiton'] = 2592000 
      continue 
    if df.loc[current_access_event_index].Application in applications_used: 
     time_since = current_access_event_ts - \ 
      applications_used[df.loc[current_access_event_index].Application]).total_seconds() 
     df.loc[current_access_event_index, ’TimeSinceApplication] = time_since 
     applications_used[df.loc[current_access_event_index].Application] = current_access_event_ts 
    else: 
     df.loc[current_access_event_index, ’TimeSinceApplication] = 2592000 
     applications_used[df.loc[current_access_event_index].Application] = current_access_event_ts 

它吐出来的是这样的:

+---------------------+-----------+-------+-------------+----------------------+ 
|  TimeStamp  | Customer | User | Application | TimeSinceApplication | 
+---------------------+-----------+-------+-------------+----------------------+ 
| 2017-01-01 00:00:01 | customer1 | user1 | app1  |    2592000 | 
| 2017-01-01 00:01:01 | customer1 | user2 | app2  |    2592000 | 
| 2017-01-01 00:02:01 | customer1 | user1 | app2  |    2592000 | 
| 2017-01-01 00:03:01 | customer1 | user1 | app1  |     180 | 
|      |   |  |    |      | 
+---------------------+-----------+-------+-------------+----------------------+ 

基本上,我试图获得用户访问应用程序以来的时间。如果这是用户第一次访问该应用程序,我将其设置为30天的默认值。我们可以按客户划分数据,并按时间戳排序,以便按顺序排列。我只是不确定如何做到这一点,而不需要在Spark中调用collect(),就像here中的答案一样,这会破坏Spark的目的。这甚至有可能吗?

回答

1

这正在接近DataFrame API的复杂性极限。其他人可能会提出一种使用DataFrames的方法,但是我个人认为RDD API更适合于此。下面是一个例子给你如何构建算法星火一个想法:

data = [(datetime(2017, 1, 1, 0, 0, 1), 'customer1', 'user1', 'app1'), 
     (datetime(2017, 1, 1, 0, 1, 1), 'customer1', 'user2', 'app2'), 
     (datetime(2017, 1, 1, 0, 2, 1), 'customer1', 'user1', 'app2'), 
     (datetime(2017, 1, 1, 0, 3, 1), 'customer1', 'user1', 'app1')] 

rdd = sc.parallelize(data) 

def toTimeSince(row): 
    cust_user_app, timestamps = row 
    timestamps = sorted(timestamps) 
    result = [(timestamps[0], *cust_user_app, timedelta(30))] 
    previous_timestamp = timestamps[0] 
    for timestamp in sorted(timestamps)[1:]: 
     result.append((timestamp, *cust_user_app, timestamp - previous_timestamp)) 
    return result 

(rdd 
.map(lambda row: (row[1:], [row[0]])) # Data looks like ((customer, user, app), [timestamp]) 
.reduceByKey(lambda a, b: a + b) # Data looks like ((customer, user, app), list_of_timestamps) 
.flatMap(toTimeSince) # Data looks like (timestamp, customer, user, app, time_since_previous) 
.collect()) 

结果:

[(datetime.datetime(2017, 1, 1, 0, 1, 1), 'customer1', 'user2', 'app2', datetime.timedelta(30)), 
(datetime.datetime(2017, 1, 1, 0, 2, 1), 'customer1', 'user1', 'app2', datetime.timedelta(30)), 
(datetime.datetime(2017, 1, 1, 0, 0, 1), 'customer1', 'user1', 'app1', datetime.timedelta(30)), 
(datetime.datetime(2017, 1, 1, 0, 3, 1), 'customer1', 'user1', 'app1', datetime.timedelta(0, 180))] 

的关键点是:

  • 算法正如你描述它并非固有地适合于Spark--行之间存在很强的依赖性(每行必须通过与另一行进行比较来计算),这很难并行化。
  • 我的建议使用Spark为同一客户,用户和应用的记录汇总时间戳列表。在此之后,可以轻松地为每个客户 - 用户 - 应用程序组合排序时间戳,并将其展开到所需的数据集中。
+0

很好的解决方案。 @timchap你可以请阅读我的解决方案,并告诉我,如果这是正确的或不。 –

0

它可能你必须在pyspark和分区中使用窗口功能,因为你的窗口将是用户和应用程序。你必须给予排名然后如果排名是一个然后设置为您的默认值否则当前时间 - 上一次。我想这就是你想要做的。

在SQL术语中,你必须使用分区的子句,但要在pyspark中使用它,你必须使用窗口。 希望这会解决你的问题有点懒惰写代码抱歉。

+0

我并不是要求任何人给我提供解决方案,但是代码片段会非常有帮助。 –