12
在使用F#代理之后,我尝试使用它们进行映射缩减。使用F#代理映射Reduce
我使用的基本结构是:
- 地图上级主管排队了所有的工作,其状态做,从地图工人
- 减少监事接收工作请求做同样的事情作为地图监督员减少工作
- 一堆地图和减少映射和减少的工作人员,如果一个工作失败,它会将其发送回相应的主管进行重新处理。
我想知道的问题是:
- 这是否任何意义相比,更传统的(但很漂亮的)地图降低像(http://tomasp.net/blog/fsharp- parallel-aggregate.aspx)使用PSeq?
- 我实施地图的方式和减少工人看起来难看还有没有更好的办法?
- 看来我可以创建一个1000 000的地图工和1000 0000的工作人员减少大声笑,我应该如何选择这些数字,越多越好?
非常感谢,
type Agent<'T> = MailboxProcessor<'T>
//This is the response the supervisor
//gives to the worker request for work
type 'work SupervisorResponse =
| Work of 'work //a piece of work
| NoWork//no work left to do
//This is the message to the supervisor
type 'work WorkMsg =
| ToDo of 'work //piles up work in the Supervisor queue
| WorkReq of AsyncReplyChannel<SupervisorResponse<'work>> //'
//The supervisor agent can be interacted with
type AgentOperation =
| Stop //stop the agent
| Status //yield the current status of supervisor
type 'work SupervisorMsg =
| WorkRel of 'work WorkMsg
| Operation of AgentOperation
//Supervises Map and Reduce workers
module AgentSupervisor=
let getNew (name:string) =
new Agent<SupervisorMsg<'work>>(fun inbox -> //'
let rec loop state = async {
let! msg = inbox.Receive()
match msg with
| WorkRel(m) ->
match m with
| ToDo(work) ->
let newState = work:state
return! loop newState
| WorkReq(replyChannel) ->
match state with
| [] ->
replyChannel.Reply(NoWork)
return! loop []
| [item] ->
replyChannel.Reply(Work(item))
return! loop []
| (item::remaining) ->
replyChannel.Reply(Work(item))
return! loop remaining
| Operation(op) ->
match op with
| Status ->
Console.WriteLine(name+" current Work Queue "+
string (state.Length))
return! loop state
| Stop ->
Console.WriteLine("Stoppped SuperVisor Agent "+name)
return()
}
loop [])
let stop (agent:Agent<SupervisorMsg<'work>>) = agent.Post(Operation(Stop))
let status (agent:Agent<SupervisorMsg<'work>>) =agent.Post(Operation(Status))
//Code for the workers
type 'success WorkOutcome =
| Success of 'success
| Fail
type WorkerMsg =
| Start
| Stop
| Continue
module AgentWorker =
type WorkerSupervisors<'reduce,'work> =
{ Map:Agent<SupervisorMsg<'work>> ; Reduce:Agent<SupervisorMsg<'reduce>> }
let stop (agent:Agent<WorkerMsg>) = agent.Post(Stop)
let start (agent:Agent<WorkerMsg>) = agent.Start()
agent.Post(Start)
let getNewMapWorker(map, supervisors:WorkerSupervisors<'reduce,'work> ) =
new Agent<WorkerMsg>(fun inbox ->
let rec loop() = async {
let! msg = inbox.Receive()
match msg with
| Start -> inbox.Post(Continue)
return! loop()
| Continue ->
let! supervisorOrder =
supervisors.Map.PostAndAsyncReply(
fun replyChannel ->
WorkRel(WorkReq(replyChannel)))
match supervisorOrder with
| Work(work) ->
let! res = map work
match res with
| Success(toReduce) ->
supervisors.Reduce
.Post(WorkRel(ToDo(toReduce)))
| Fail ->
Console.WriteLine("Map Fail")
supervisors.Map
.Post(WorkRel(ToDo(work)))
inbox.Post(Continue)
| NoWork ->
inbox.Post(Continue)
return! loop()
| Stop ->
Console.WriteLine("Map worker stopped")
return()
}
loop() )
let getNewReduceWorker(reduce,reduceSupervisor:Agent<SupervisorMsg<'work>>)=//'
new Agent<WorkerMsg>(fun inbox ->
let rec loop() = async {
let! msg = inbox.Receive()
match msg with
| Start -> inbox.Post(Continue)
return! loop()
| Continue ->
let! supervisorOrder =
reduceSupervisor.PostAndAsyncReply(fun replyChannel ->
WorkRel(WorkReq(replyChannel)))
match supervisorOrder with
| Work(work) ->
let! res = reduce work
match res with
| Success(toReduce) -> inbox.Post(Continue)
| Fail ->
Console.WriteLine("ReduceFail")
reduceSupervisor.Post(WorkRel(ToDo(work)))
inbox.Post(Continue)
| NoWork -> inbox.Post(Continue)
return! loop()
|Stop ->Console.WriteLine("Reduce worker stopped"); return()
}
loop())
open AgentWorker
type MapReduce<'work,'reduce>(numberMap:int ,
numberReduce: int,
toProcess:'work list,
map:'work->Async<'reduce WorkOutcome>,
reduce:'reduce-> Async<unit WorkOutcome>) =
let mapSupervisor= AgentSupervisor.getNew("MapSupervisor")
let reduceSupervisor = AgentSupervisor.getNew("ReduceSupervisor")
let workerSupervisors = {Map = mapSupervisor ; Reduce = reduceSupervisor }
let mapWorkers =
[for i in 1..numberMap ->
AgentWorker.getNewMapWorker(map,workerSupervisors) ]
let reduceWorkers =
[for i in 1..numberReduce ->
AgentWorker.getNewReduceWorker(reduce,workerSupervisors.Reduce) ]
member this.Start() =
//Post work to do
toProcess
|>List.iter(fun elem -> mapSupervisor.Post(WorkRel(ToDo(elem))))
//Start supervisors
mapSupervisor.Start()
reduceSupervisor.Start()
//start workers
List.iter(fun mapper -> mapper |>start) mapWorkers
List.iter(fun reducer ->reducer|>start) reduceWorkers
member this.Status() = (mapSupervisor|>AgentSupervisor.status)
(reduceSupervisor|>AgentSupervisor.status)
member this.Stop() =
List.map2(fun mapper reducer ->
mapper |>stop; reducer|>stop) mapWorkers reduceWorkers
//Run some tests
let map = function (n:int64) -> async{ return Success(n) }
let reduce = function (toto: int64) -> async{ return Success() }
let mp = MapReduce<int64,int64>(1,1,[for i in 1L..1000000L->i],map,reduce)
mp.Start()
mp.Status()
mp.Stop()
仅供参考,我不是在读一个包含超过120行(格式不正确)代码的问题。 – Brian 2010-12-13 18:58:17
@布莱恩,对不起,我试图重新格式化一些东西,但仍然有颜色问题,这使得它非常丑陋。注意我并不是真的希望任何人阅读我的代码的所有120行,我只是把它放在他们的情况下,以便它可以清除我的问题。谢谢 – jlezard 2010-12-13 20:39:02
我做了一些编辑来清理它。特别是,它使用较少的水平和垂直空白(不需要向右滚动,不需要多行空白行)。另外,请注意//使用//作为避免SO错误着色多行的方法 – Brian 2010-12-13 20:55:10