Downloading a bunch of files in parallel using Clojure Agents

I suddenly needed to download around 3000 files from the Internet. I had the urls in a sequence and I was thinking about a nice way to download the files in parallel.

The idea of using Clojure Agents came naturally to my mind and I was thinking about writing an Agent based HTTP client in Clojure. I asked around on the Clojure IRC channel and the very helpful Stuart Sierra pointed me towards clojure.contrib.http.agent

Indeed, c.c.http.agent seemed to be exactly what I had in my mind :)

The API seemed to be straightforward enough and I got cracking immediately. I came up with something like this –

;;; downloader.clj -- Parallel Downloader -*- Clojure -*-
;;; Time-stamp: "2009-10-06 13:38:57 ghoseb"
;;; Author: Baishampayan Ghose 
 
(ns downloader
  (:require [clojure.contrib.http.agent :as h]
            [clojure.contrib.duck-streams :as d]))
 
A vector of vectors containing the file name and the URL
(def url-data [["file1" "http://some.domain/file1.xml"]
               ["file2" "http://some.domain/file2.xml"]
               ; Many many more :)
               ])
 
(defn download
  "Download the data in the given URL using HTTP Agents
   Args:
     file-name - The file name to save the data in
     url - The URL to fetch
  "
  [file-name url]
  (h/http-agent url
                :handler (fn [agnt]
                           (let [fname file-name]  ; File name in a closure
                             (with-open [w (d/writer fname)]
                               (d/copy (h/stream agnt) w))))))
 
(defn download-all
  "Download all the URLs
   Args:
     url-data - A vector of vectors containing the file name and the url
  "
  [url-data]
  (doseq [[file-name url] url-data]
    (download file-name url)))
 
(download-all url-data)

This looked fine and worked with a small set of urls. But when I ran it on the full-blown set of URLs, the server bailed out because of too many concurrent requests. The reason being the fact that http.agent uses send-off to dispatch action to the agents and send-off can end up using a potentially very large thread-pool.

Surely I needed to somehow make sure that only a limited number of files are downloaded in parallel and start downloading more when those are done.

To achieve that, I did this –

(def partitioned-data (partition 15 url-data)) ;; 15 being the max parallel downloads
 
(defn download-all2
  "Download all the files, step by step
   Args:
     p-url-data - Partitioned url data
  "
  [p-url-data]
  (doseq [url-data p-url-data]
    (let [agnts (map #(download (first %) (second %)) url-data)]
      (apply await agnts)))) ; Wait till the agents finish
 
(download-all2 partitioned-data)

What did I just do? I simply partitioned the data set by the number of parallel downloads I wanted to do, and then modified the download-all function to take the partitioned data, dispatch agents on one partition and wait for them to finish, and then move on to the next partition.

Simple, yet very beautiful.

Tags: , , , , ,

4 comments

  1. Nice, I could have used exactly such a download tool just this week. Thanks for sharing.

    One thing that immediately hit me upon seeing “partition” is that your code does download files of the same partition in parallel, but (apply await agnts) will force all downloads to complete before the next partition is even considered.

    Tom Faulhaber showed how to use fill-queue in a doseq to process HTTP requests in a queue without blocking newly incoming requests. You might be able to adapt this pattern to your needs:

    http://infolace.blogspot.com/2009/08/simple-webhooks-with-clojure-and-ring.html

    Oh, and have a happy 2010!

  2. Daniel,
    Thanks for stopping by :) I agree with you. I could have used fill-queue for this, could then download a new file as soon as one has finished instead of waiting for the whole bunch to be done.

    I will probably post a new version which uses fill-queue instead.

    And Happy New Year to you too!

  3. Something to watch out for, by example. Evaluate (partition 3 (range 7)) and you will see that is it is ((0 1 2) (3 4 5)). Notice the missing element 6.

    Your example partitions the urls into blocks of 15. If the number of urls to be downloaded is not evenly divisible by 15 you will miss some.

  4. If you don’t want to accidentally drop any elements, use partition-all (in Clojure 1.2) instead of partition.

    (partition-all 3 (range 7)) ==> ((0 1 2) (3 4 5) (6))

    Also, in Clojure 1.2, partition can take additional args to specify the step and pad so this will work:

    (partition 3 3 nil (range 7)) ==> ((0 1 2) (3 4 5) (6))

Leave a comment