VizReader's distributed word index.

After having used VizReader as a single local application for a while it quickly became clear that the full word index that are mapping words to articles - in order to enable word searches - was growing at an alarming rate. Something needed to be done and I started looking into the ext functionality. It is overkill for managing a simple distributed index though, the id function however is a good fit.

Currently the remotes all reside on the same machine so the speedup is achieved by querying a bunch of smaller files in parallel as opposed to having only one process go through one big file. Had this been done "properly" by using different machines the speedup would probably be much bigger, especially if said machines were located in the same data center.

Before we start looking at the actual code let's first list what is happening from start to finish:

  1. When an article is imported all words are counted and the count, the resultant numbers from calling id on the article and the word object, and the article's date are all sent to and saved in a remote. Which remote to send to is inferred from the aid number of the article which is not to be confused with the result returned by the id function, aid is created explicitly as an auto incremented key.
  2. When a search is performed all remotes are queried at the same time. Since we are storing the article's date in the index too it is possible for each remote to return results sorted by date. The logic that is responsible for querying the remotes will then store the first result from each remote in a hash that is continuously sorted by date in order to link only the newest articles from the parallel query, more on that later.
  3. To build the result sent for display in the GUI is now just a simple matter of calling id on our list of numbers in order to fetch the real articles on disc.


(class +Aword +Entity)
(rel word (+Need +Key +String))

(class +Article +Entity)
(rel aid       (+Key +Number))
(rel title     (+String))
(rel htmlUrl   (+Key +String))
(rel body      (+Blob))
(rel picoStamp (+Ref +Number))

. . .

(dm setArticleWords> (A Ws)
   (let Idx (idx> (; A words))
      (put> A 'common
         (eval> '+Agent A 'setArticleWords
            (lit
               (make
                  (for Wstr Ws
                     (unless (common?> This Wstr)
                        (link
                           (list
                              (id A)
                              (id (request '(+Aword) 'word Wstr))
                              (val (car (idx 'Idx (lowc Wstr))))
                              (; A picoStamp)))))))))))


The +Agent class that is responsible for communicating with the remotes will call 'setArticleWords on the remote that is inferred from the article A in question, that's why it's the first argument to the eval> function. Ws is a list of all words in the article, Idx is an index tree mapping words to their counts, this tree has been generated earlier and is now simply rebuilt. We loop through each word and filter out common words. Each element in the list we then send to the remote is generated by calling id on the article and the word, getting the count from the index tree and the date from the article.

I will not explain the eval> method of my +Agent class since it contains a lot of logic that doesn't have anything to do with what this article is about, the main thing happening there is simply using pr to send (setArticleWords Lst) to the remote which simply uses eval to execute.

(class +WordCount +Entity)
(rel article   (+Ref +Number))
(rel word      (+Aux +Ref +Number) (article))
(rel count     (+Number))
(rel picoStamp (+Ref +Number))

(de setArticleWords (Lst)
   (dbSync)
   (let Res
      (mapcar id
         (tail 20
            (by '((Wc)(; Wc count)) sort
               (make
                  (for Wc Lst
                     (link
                        (request
                           '(+WordCount)
                           'article (car Wc)
                           'word (cadr Wc)
                           'count (caddr Wc)
                           'picoStamp (last Wc))))))))
      (commit 'upd)
      (pr Res)))


The above is the remote ER and the function we just called, note that in addition to storing the words here we are also returning the 20 most common words for local storage, yet again using id but here on the remote instead.

(dm getAsByWd> (W Start)
   (mapcar '((A)(id (db: +Article) A))
      (extArticles> '+Agent Start 25 'getArticles (lit (id W)))))

. . .

(dm rd1> (Sock)
   (or
      (in Sock (rd))
      (nil
         (close Sock))))

(dm extArticles> (Start Count . @)
   (let (End (+ Start Count) Socks (getSocks> This))
      (for S Socks
         (out S (pr (peel> '+Gh (rest)))))
      (let Q (new '(+Hash) (extract '((S)(let A (rd1> This S) (when A (list S A)))) Socks))
         (tail Count
            (make
               (until (empty?> Q)
                  (let Cur (car (sort> Q T 'cdadr))
                     (link (caadr Cur))
                     (let A (rd1> This (car Cur))
                        (if A
                           (set> Q (car Cur) A)
                           (del> Q (car Cur))))
                     (when (<= End (length (made)))
                        (empty> Q)))))))))


Here we query the remotes for all articles containing a certain word, the first result set will use a Start value of 1, the Count will always be 25. We use id to get the word's number for the remote and then id again to fetch the local articles for the result we send to the GUI. In extArticles> we first figure out when to stop by adding Start and Count, in our case it will be 26. We then get all the sockets to the remotes and send the code for execution by way of pr. Next we build a hash using our sockets as keys with the article info in the value. Then we continue with reading from the sockets until the hash is empty which can happen if all the remotes have finished sending articles or if we have reached our goal, in this case fetching 25 articles. The hash is repeatedly sorted by date and the newest article is linked, the spot where that article came from will then be filled by the next article from the remote in question and so on until we meet one of the end conditions.

(de go ()
. . .
   (rollback)
   (task (port (+ *IdxNum 4040))
      (let? Sock (accept @)
         (unless (fork)
            (in Sock
               (while (rd)
                  (sync)
                  (out Sock
                     (eval @))))
            (bye))
         (close Sock)))
   (forked))

(de getArticles (W)
   (let Goal
      (goal
         (quote
            @Word W
            @Date (cons (- (stamp> '+Gh) (* 6 31 86400)) (stamp> '+Gh))
            (select (@Wcs)
               ((picoStamp +WordCount @Date) (word +WordCount @Word))
               (same @Word @Wcs word)
               (range @Date @Wcs picoStamp))))
      (do 25
         (NIL (prove Goal))
         (bind @
            (pr (cons (; @Wcs article) (; @Wcs picoStamp)))
            (unless (flush) (bye)))))
   (bye))


The above is all on the remote, note that the go method is built to be able to receive repeated use of pr from the local/master of which there is no example yet in VizReader, anyway because of that fact we need to finish getArticles with a bye in order to stop execution. The getArticles function will repeatedly pr all articles that are newer than half a year and which at the same time contain the word in question, we will stop after having printed 25 of them. It won't make sense printing more since the receiving end only wants a maximum of 25 anyway.

http://picolisp.com/wiki/?distributedwordindex

15may10    hsarvell