2012-08-29 57 views
9

我正在使用Clojure应用程序从Web API访问数据。我会发出很多请求,并且很多请求会导致更多的请求,所以我希望将请求URL保留在队列中,以便在后续下载之间保留60秒。Clojure中的工作队列

this blog post我把这个在一起:

(def queue-delay (* 1000 60)) ; one minute 

(defn offer! 
    [q x] 
    (.offerLast q x) 
    q) 

(defn take! 
    [q] 
    (.takeFirst q)) 

(def my-queue (java.util.concurrent.LinkedBlockingDeque.)) 

(defn- process-queue-item 
    [item] 
    (println ">> " item) ; this would be replaced by downloading `item` 
    (Thread/sleep queue-delay)) 

如果我包括(future (process-queue-item (take! my-queue)))在我的代码,然后在某个地方的REPL我可以(offer! my-queue "something"),我看到了立即打印“>>东西”。到现在为止还挺好!但是我需要队列持续整个时间我的程序处于活动状态。我刚才提到的(future ...)电话会在队列中有一个项目退出队列时提供,但我希望能够持续观看队列,并在有可用时呼叫process-queue-item。另外,与通常的Clojure对并发的爱相反,我想确保一次只有一个请求被执行,并且我的程序等待60秒来完成每个后续请求。

我认为this Stack Overflow question是相关的,但我不知道如何适应它做我想做的。如何连续轮询我的队列并确保一次只运行一个请求?

+0

为什么你想连续轮询,但只发送每60秒?每60秒轮询一次会完成同样的事情吗? – mamboking

+0

@maboking几乎,是的。该方法唯一的缺点是将第一个项目添加到队列中:如果程序需要5秒钟才能确定第一个请求URL将会是什么,然后它将在那里坐下55秒直到队列被检查。无论如何,该计划将会非常长时间运行,但我想这不是太多问题。 – bdesham

+0

您是否在避免任务调度程序?例如,这个,https://github.com/zcaudate/cronj(还有一个其他库在该回购的自述文件中) – georgek

回答

0

我结束了滚动我自己的小型图书馆,我叫simple-queue。你可以阅读关于GitHub的完整文档,但这里是完整的源代码。 我不会保持这个答案更新,所以如果你想使用这个库,请从GitHub获取源代码。

(ns com.github.bdesham.simple-queue) 

(defn new-queue 
    "Creates a new queue. Each trigger from the timer will cause the function f 
    to be invoked with the next item from the queue. The queue begins processing 
    immediately, which in practice means that the first item to be added to the 
    queue is processed immediately." 
    [f & opts] 
    (let [options (into {:delaytime 1} 
         (select-keys (apply hash-map opts) [:delaytime])), 
     delaytime (:delaytime options), 
     queue {:queue (java.util.concurrent.LinkedBlockingDeque.)}, 
     task (proxy [java.util.TimerTask] [] 
       (run [] 
       (let [item (.takeFirst (:queue queue)), 
         value (:value item), 
         prom (:promise item)] 
        (if prom 
        (deliver prom (f value)) 
        (f value))))), 
     timer (java.util.Timer.)] 
    (.schedule timer task 0 (int (* 1000 delaytime))) 
    (assoc queue :timer timer))) 

(defn cancel 
    "Permanently stops execution of the queue. If a task is already executing 
    then it proceeds unharmed." 
    [queue] 
    (.cancel (:timer queue))) 

(defn process 
    "Adds an item to the queue, blocking until it has been processed. Returns 
    (f item)." 
    [queue item] 
    (let [prom (promise)] 
    (.offerLast (:queue queue) 
       {:value item, 
       :promise prom}) 
    @prom)) 

(defn add 
    "Adds an item to the queue and returns immediately. The value of (f item) is 
    discarded, so presumably f has side effects if you're using this." 
    [queue item] 
    (.offerLast (:queue queue) 
       {:value item, 
       :promise nil})) 

使用这个队列,返回值的示例:

(def url-queue (q/new-queue slurp :delaytime 30)) 
(def github (q/process url-queue "https://github.com")) 
(def google (q/process url-queue "http://www.google.com")) 

q/process到的调用将阻塞,所以会有两个def语句之间有30秒的延迟。

纯粹使用此队列的副作用的例子:

(defn cache-url 
    [{url :url, filename :filename}] 
    (spit (java.io.File. filename) 
     (slurp url))) 

(def url-queue (q/new-queue cache-url :delaytime 30)) 
(q/add url-queue {:url "https://github.com", 
        :filename "github.html"}) ; returns immediately 
(q/add url-queue {:url "https://google.com", 
        :filename "google.html"}) ; returns immediately 

现在呼吁立即q/add回报。

2

这是来自a project I did for fun的代码片段。这并不完美,但可以让你了解我如何解决“等待第一件产品55秒”的问题。它基本上是通过承诺循环的,使用期货来立即处理或直到承诺“变为”可用。

(defn ^:private process 
    [queues] 
    (loop [[q & qs :as q+qs] queues p (atom true)] 
    (when-not (Thread/interrupted) 
     (if (or 
      (< (count (:promises @work-manager)) (:max-workers @work-manager)) 
      @p) ; blocks until a worker is available 
     (if-let [job (dequeue q)] 
      (let [f (future-call #(process-job job))] 
      (recur queues (request-promise-from-work-manager))) 
      (do 
      (Thread/sleep 5000) 
      (recur (if (nil? qs) queues qs) p))) 
     (recur q+qs (request-promise-from-work-manager)))))) 

也许你可以做类似的事情?代码不是很好,可能需要重新编写才能使用lazy-seq,但那只是我尚未得到的练习!

0

这很可能是疯了,但你总是可以使用这样的函数来创建一个减慢的懒惰序列:

(defn slow-seq [delay-ms coll] 
    "Creates a lazy sequence with delays between each element" 
    (lazy-seq 
    (if-let [s (seq coll)] 
     (do 
      (Thread/sleep delay-ms) 
      (cons (first s) 
       (slow-seq delay-ms (rest s))))))) 

这基本上将确保每一个函数调用之间的延迟。

你可以像下面这样使用它,提供毫秒的延迟:

(doseq [i (slow-seq 500 (range 10))] 
    (println (rand-int 10)) 

或者你也可以把你的函数调用序列里面的东西,如:

(take 10 (slow-seq 500 (repeatedly #(rand-int 10)))) 

显然,在上述两种情况下,您都可以用您用来执行/触发下载的任何代码替换(rand-int 10)

+0

如果我正在阅读这个权利,在运行'slow-seq'之前''coll'的所有元素都必须知道,对吧?我想要一些可以让你动态地添加项目而没有问题的东西。具体而言,如果一个API调用的结果是我需要再次调用API,该函数是否允许将第二个调用放在队列中? – bdesham