我想用我是Spark Noob(刚刚在4天前开始阅读一本书)前言这个问题。尽管如此,我正尝试移植一些我在Python中的Pandas库的帮助下编写的内容,以便我可以利用我们刚刚创建的集群。在大熊猫数据帧df
的数据是这样的:迭代抛出DataFrame中的行并设置Spark中的值
+---------------------+-----------+-------+-------------+----------------------+
| TimeStamp | Customer | User | Application | TimeSinceApplication |
+---------------------+-----------+-------+-------------+----------------------+
| 2017-01-01 00:00:01 | customer1 | user1 | app1 | NaN |
| 2017-01-01 00:01:01 | customer1 | user2 | app2 | NaN |
| 2017-01-01 00:02:01 | customer1 | user1 | app2 | NaN |
| 2017-01-01 00:03:01 | customer1 | user1 | app1 | NaN |
+---------------------+-----------+-------+-------------+----------------------+
在Python中,我写道:
unique_users = df.User.unique().tolist()
for j in range(0, len(unique_users):
user = unique_users[0]
access_events_for_user = df[df.User == user].copy()
indexes_for_access_events = access_events_for_user.index
applications_used = dict()
for i in range(0, len(access_events)):
current_access_event_ts = df.loc[current_auth_index].TimeStamp
if i == 0:
current_access_event_index = int(indexes_for_access_events[i])
df[current_access_event_index, 'TimeSinceApplicaiton'] = 2592000
continue
if df.loc[current_access_event_index].Application in applications_used:
time_since = current_access_event_ts - \
applications_used[df.loc[current_access_event_index].Application]).total_seconds()
df.loc[current_access_event_index, ’TimeSinceApplication] = time_since
applications_used[df.loc[current_access_event_index].Application] = current_access_event_ts
else:
df.loc[current_access_event_index, ’TimeSinceApplication] = 2592000
applications_used[df.loc[current_access_event_index].Application] = current_access_event_ts
它吐出来的是这样的:
+---------------------+-----------+-------+-------------+----------------------+
| TimeStamp | Customer | User | Application | TimeSinceApplication |
+---------------------+-----------+-------+-------------+----------------------+
| 2017-01-01 00:00:01 | customer1 | user1 | app1 | 2592000 |
| 2017-01-01 00:01:01 | customer1 | user2 | app2 | 2592000 |
| 2017-01-01 00:02:01 | customer1 | user1 | app2 | 2592000 |
| 2017-01-01 00:03:01 | customer1 | user1 | app1 | 180 |
| | | | | |
+---------------------+-----------+-------+-------------+----------------------+
基本上,我试图获得用户访问应用程序以来的时间。如果这是用户第一次访问该应用程序,我将其设置为30天的默认值。我们可以按客户划分数据,并按时间戳排序,以便按顺序排列。我只是不确定如何做到这一点,而不需要在Spark中调用collect()
,就像here中的答案一样,这会破坏Spark的目的。这甚至有可能吗?
很好的解决方案。 @timchap你可以请阅读我的解决方案,并告诉我,如果这是正确的或不。 –