我正在使用Clojure应用程序从web API访问数据。我将发出许多请求,其中许多请求将导致发出更多请求,因此我希望将请求URL放在一个队列中,以便在后续下载之间留出60秒。
在this blog post之后,我把这个放在一起:
(def queue-delay (* 1000 60)) ; one minute
(defn offer!
[q x]
(.offerLast q x)
q)
(defn take!
[q]
(.takeFirst q))
(def my-queue (java.util.concurrent.LinkedBlockingDeque.))
(defn- process-queue-item
[item]
(println ">> " item) ; this would be replaced by downloading `item`
(Thread/sleep queue-delay))如果我在代码中的某处包含了一个东西,那么在REPL中,我可以(offer! my-queue "something"),我会立即看到打印出来的">> (future (process-queue-item (take! my-queue)))“。到目前一切尚好!但我需要队列在我的程序处于活动状态的整个时间内持续。我刚才提到的(future ...)调用一旦可用就从队列中拉出一项,但我想要的是持续监视队列并在有可用的情况下调用process-queue-item的东西。
此外,与通常的Clojure对并发性的喜爱相反,我希望确保一次只发出一个请求,并且我的程序等待60秒来发出后续的每个请求。
我认为this Stack Overflow question是相关的,但我不确定如何调整它来做我想做的事情。如何持续轮询我的队列,并确保一次只运行一个请求?
发布于 2012-09-12 01:02:23
我最终推出了自己的小型库,我将其命名为simple-queue。您可以在GitHub上阅读完整的文档,但这里是完整的源代码。我不会让这个答案保持更新,所以如果你想使用这个库,请从GitHub获取源代码。
(ns com.github.bdesham.simple-queue)
(defn new-queue
"Creates a new queue. Each trigger from the timer will cause the function f
to be invoked with the next item from the queue. The queue begins processing
immediately, which in practice means that the first item to be added to the
queue is processed immediately."
[f & opts]
(let [options (into {:delaytime 1}
(select-keys (apply hash-map opts) [:delaytime])),
delaytime (:delaytime options),
queue {:queue (java.util.concurrent.LinkedBlockingDeque.)},
task (proxy [java.util.TimerTask] []
(run []
(let [item (.takeFirst (:queue queue)),
value (:value item),
prom (:promise item)]
(if prom
(deliver prom (f value))
(f value))))),
timer (java.util.Timer.)]
(.schedule timer task 0 (int (* 1000 delaytime)))
(assoc queue :timer timer)))
(defn cancel
"Permanently stops execution of the queue. If a task is already executing
then it proceeds unharmed."
[queue]
(.cancel (:timer queue)))
(defn process
"Adds an item to the queue, blocking until it has been processed. Returns
(f item)."
[queue item]
(let [prom (promise)]
(.offerLast (:queue queue)
{:value item,
:promise prom})
@prom))
(defn add
"Adds an item to the queue and returns immediately. The value of (f item) is
discarded, so presumably f has side effects if you're using this."
[queue item]
(.offerLast (:queue queue)
{:value item,
:promise nil}))使用此队列返回值的示例:
(def url-queue (q/new-queue slurp :delaytime 30))
(def github (q/process url-queue "https://github.com"))
(def google (q/process url-queue "http://www.google.com"))对q/process的调用将被阻塞,因此两个def语句之间将有30秒的延迟。
一个纯粹为了副作用而使用此队列的示例:
(defn cache-url
[{url :url, filename :filename}]
(spit (java.io.File. filename)
(slurp url)))
(def url-queue (q/new-queue cache-url :delaytime 30))
(q/add url-queue {:url "https://github.com",
:filename "github.html"}) ; returns immediately
(q/add url-queue {:url "https://google.com",
:filename "google.html"}) ; returns immediately现在,对q/add的调用立即返回。
发布于 2012-08-30 00:20:24
下面是来自a project I did for fun的代码片段。它并不完美,但可以让您了解我是如何解决“等待55秒完成第一项”的问题的。它基本上在承诺中循环,使用期货立即处理事情,或者直到承诺“变得可用”。
(defn ^:private process
[queues]
(loop [[q & qs :as q+qs] queues p (atom true)]
(when-not (Thread/interrupted)
(if (or
(< (count (:promises @work-manager)) (:max-workers @work-manager))
@p) ; blocks until a worker is available
(if-let [job (dequeue q)]
(let [f (future-call #(process-job job))]
(recur queues (request-promise-from-work-manager)))
(do
(Thread/sleep 5000)
(recur (if (nil? qs) queues qs) p)))
(recur q+qs (request-promise-from-work-manager))))))也许你可以做些类似的事情?代码不是很好,可能需要重新编写才能使用lazy-seq,但这只是我还没有完成的练习!
发布于 2012-09-04 15:52:03
这很可能是疯狂的,但你总是可以使用这样的函数来创建一个放慢速度的懒惰序列:
(defn slow-seq [delay-ms coll]
"Creates a lazy sequence with delays between each element"
(lazy-seq
(if-let [s (seq coll)]
(do
(Thread/sleep delay-ms)
(cons (first s)
(slow-seq delay-ms (rest s)))))))这基本上将确保每个函数调用之间的延迟。
您可以将其与以下内容一起使用,提供以毫秒为单位的延迟:
(doseq [i (slow-seq 500 (range 10))]
(println (rand-int 10))或者,您也可以将您的函数调用放在序列中,如下所示:
(take 10 (slow-seq 500 (repeatedly #(rand-int 10))))显然,在上述两种情况下,您都可以用您用来执行/触发下载的任何代码替换(rand-int 10)。
https://stackoverflow.com/questions/12179697
复制相似问题