単純な場合の Skip Graphs アルゴリズムを実装してみた

分散などは考えずに、単純な場合の Skip Graphs アルゴリズムを実装してみた。(僕の理解やコードが間違っている可能性があるので注意)
Skip graphs by James Aspnes and Gauri Shah という論文に書かれているアルゴリズム


実装したのは

  • Level 0, Level1 の単純2階層の Skip Graph (= membership vector は 0 と 1 の 2種類しかない)
    • 実装してみて分かったが階層を増やすのはとても簡単だ
  • node の insert/search/range-search をサポート
    • search が書ければ range-search は簡単


また理解の助けになるよう

  • 検索経路が取得できるようにした
  • テストコードをまじめに書いた

動作例

node20 (key=20, value="$20") のノードを開始点として key = 5 のノードを検索する。
検索経路は level 1 で 20 => 13 => 6、level 0 で 6 => 5 となる。

(let-values (([found path] (node-search node20 5)))
    (test-true found)
    (test-equal '((1 . 20) (1 . 13) (1 . 6) (0 . 6) (0 . 5) found) path)
    (test-equal "$5" (node-value found)))

Skip Graph のコード

doubly linked list の実装も含まれているので少し長いですが、アルゴリズム自体は短い。

(library (skip graph)
  (export make-skip-graph skip-graph-add! skip-graph-level0
          make-node node-key node-value node-append! skip-graph-level0-key->list skip-graph-level1-key->list
          node-next node-prev node-membership node-search node-range-search)
  (import (rnrs)
          (mosh)
          (mosh control))

(define-record-type skip-graph
  (fields
   (mutable level0)
   (mutable level10)
   (mutable level11))
  (protocol
   (lambda (c)
     (lambda ()
       (c #f #f #f)))))

(define (node-search-with-level level start key accum-path)
  (define (search-to-direction node-direction-proc key-cmp-proc)
    (let loop ([node start]
               [path (cons (cons level (node-key start)) accum-path)])
      ;;(format #t "level=~d node=~a\n" level (node-key node))
      (cond
       [(= (node-key node) key)
        (values #t node (cons 'found path))]
       [(not (node-direction-proc level node))
        (values #f node path)]
       [(key-cmp-proc (node-key (node-direction-proc level node)) key)
        (values #f node path)]
       [else
        (loop (node-direction-proc level node) (cons (cons level (node-key (node-direction-proc level node))) path))])))
  (cond
   [(> (node-key start) key)
    ;; search to left
    (search-to-direction node-prev <)]
   [else
    ;; search to right
    (search-to-direction node-next >)]))

(define (node-search start key)
  (let loop ([level 1] ;; start search on level 1
             [start start]
             [path '()])
    (cond
     [(< level 0) (values #f (reverse path))]
     [else
      (let-values (([found? found-node accum-path] (node-search-with-level level start key path)))
        (if found?
            (values found-node (reverse accum-path))
            (loop (- level 1) found-node accum-path)))])))

(define (node-range-search start key1 key2)
  (assert (<= key1 key2))
  (let-values (([node path](node-search start key1)))
    (let loop ([node node]
               [ret '()])
      (cond
       [(>= key2 (node-key node))
        ;; always search on level0
        (loop (node-next 0 node) (cons node ret))]
       [else
        (values (reverse ret) path)]))))

(define (node->list level root)
  (let loop ([node root]
             [ret '()])
    (if node
        (loop (node-next level node) (cons node ret))
        (reverse ret))))

(define (node-key->list level root)
  (map node-key (node->list level root)))

(define (skip-graph-level0-key->list sg)
  (node-key->list 0 (skip-graph-level0 sg)))

(define (skip-graph-level1-key->list sg)
  (list
   (node-key->list 1 (skip-graph-level10 sg))
   (node-key->list 1 (skip-graph-level11 sg))))

;; todo refactoring
(define (skip-graph-add! sg node)
  (aif (skip-graph-level0 sg)
       (skip-graph-level0-set! sg (node-insert! 0 it node))
       (skip-graph-level0-set! sg node))
  (cond
   [(zero? (node-membership node))
    (aif (skip-graph-level10 sg)
         (skip-graph-level10-set! sg (node-insert! 1 it node))
         (skip-graph-level10-set! sg node))]
   [else
    (aif (skip-graph-level11 sg)
         (skip-graph-level11-set! sg (node-insert! 1 it node))
         (skip-graph-level11-set! sg node))]))

(define membership 0)

(define (gen-membership)
  (cond
   [(zero? membership)
    (set! membership 1)
    0]
   [else
    (set! membership 0)
    1]))

(define-record-type node
  (fields
   (immutable key)
   (immutable value)
   (immutable membership)
   (mutable prev*)
   (mutable next*))
  (protocol
   (lambda (c)
     (lambda (key value)
       (c key value (gen-membership) (make-vector 2 #f) (make-vector 2 #f))))))

(define (node-next level n)
  (vector-ref (node-next* n) level))

(define (node-prev level n)
  (vector-ref (node-prev* n) level))

(define (node-next-set! level n1 n2)
  (vector-set! (node-next* n1) level n2))

(define (node-prev-set! level n1 n2)
  (vector-set! (node-prev* n1) level n2))

(define (node-append! level n1 n2)
  (node-next-set! level n1 n2)
  (node-prev-set! level n2 n1))

(define (node-insert! level root node)
  (define (node< a b)
    (< (node-key a) (node-key b)))
  (cond
   [(node< node root)
    (node-append! level node root)
    ;; root is changed
    node]
   [else
    (let loop ([n root])
      (cond
       [(not (node-next level n)) ;; tail
        (node-append! level n node)
        root]
       [(node< node (node-next level n))
        (let ([next (node-next level n)])
          (node-append! level n node)
          (node-next-set! level node next)
          (node-prev-set! level next node)
          root)]
       [else
        (loop (node-next level n))]))]))
)

テストコード

(import (rnrs)
        (skip graph)
        (mosh test))

;; node
(let ([node (make-node "key1" "value1")])
  (test-equal "key1" (node-key node))
  (test-equal "value1" (node-value node))
  (test-eq 0 (node-membership node)))

;; node append!
(let ([level 0]
      [node1 (make-node "key1" "value1")]
      [node2 (make-node "key2" "value2")])
  (node-append! level node1 node2)
  (test-eq 1 (node-membership node1))
  (test-eq 0 (node-membership node2))
  (test-eq node2 (node-next level node1))
  (test-eq node1 (node-prev level node2)))

;; skip graph.
(let ([sg (make-skip-graph)]
      [node13 (make-node 13 "$13")]
      [node30 (make-node 30 "$30")]
      [node20 (make-node 20 "$20")]
      [node5 (make-node 5 "$5")]
      [node40 (make-node 40 "$40")]
      [node2 (make-node 2 "$2")]
      [node6 (make-node 6 "$6")])
  (test-equal '() (skip-graph-level0-key->list sg))

  (skip-graph-add! sg node13)
  (test-equal '(13) (skip-graph-level0-key->list sg))
  (test-equal '(() (13)) (skip-graph-level1-key->list sg))

  (skip-graph-add! sg node30)
  (test-equal '(13 30) (skip-graph-level0-key->list sg))
  (test-equal '((30) (13)) (skip-graph-level1-key->list sg))

  (skip-graph-add! sg node20)
  (test-equal '(13 20 30) (skip-graph-level0-key->list sg))
  (test-equal '((30) (13 20)) (skip-graph-level1-key->list sg))

  (skip-graph-add! sg node5)
  (test-equal '(5 13 20 30) (skip-graph-level0-key->list sg))
  (test-equal '((5 30) (13 20)) (skip-graph-level1-key->list sg))

  (skip-graph-add! sg node40)
  (test-equal '(5 13 20 30 40) (skip-graph-level0-key->list sg))
  (test-equal '((5 30) (13 20 40)) (skip-graph-level1-key->list sg))

  (skip-graph-add! sg node2)
  (test-equal '(2 5 13 20 30 40) (skip-graph-level0-key->list sg))
  (test-equal '((2 5 30) (13 20 40)) (skip-graph-level1-key->list sg))

  (skip-graph-add! sg node6)
  (test-equal '(2 5 6 13 20 30 40) (skip-graph-level0-key->list sg))
  (test-equal '((2 5 30) (6 13 20 40)) (skip-graph-level1-key->list sg))

  ;; start node is node30, search to left on level 1
  (let-values (([found path] (node-search node30 5)))
    (test-true found)
    (test-equal '((1 . 30) (1 . 5) found) path)
    (test-equal "$5" (node-value found)))

  ;; start node is node2, search to right on level 1
  (let-values (([found path] (node-search node2 5)))
    (test-true found)
    (test-equal '((1 . 2) (1 . 5) found) path)
    (test-equal "$5" (node-value found)))

  ;; start node is node20, search to left on level 0
  (let-values (([found path] (node-search node20 5)))
    (test-true found)
    (test-equal '((1 . 20) (1 . 13) (1 . 6) (0 . 6) (0 . 5) found) path)
    (test-equal "$5" (node-value found)))

  ;; start node is node40, search to left on level 0
  (let-values (([found path] (node-search node40 5)))
    (test-true found)
    (test-equal '((1 . 40) (1 . 20) (1 . 13) (1 . 6) (0 . 6)  (0 . 5) found) path)
    (test-equal "$5" (node-value found)))

  (let-values (([found path] (node-search node2 40)))
    (test-true found)
    (test-equal '((1 . 2) (1 . 5) (1 . 30) (0 . 30) (0 . 40) found) path)
    (test-equal "$40" (node-value found)))

  ;; not found
  (let-values (([found path] (node-search node40 4)))
    (test-equal '((1 . 40) (1 . 20) (1 . 13) (1 . 6) (0 . 6) (0 . 5)) path)
    (test-false found))

  ;; not found
  (let-values (([found path] (node-search node40 1000)))
    (test-equal '((1 . 40) (0 . 40)) path)
    (test-false found))

  ;; range search
  (let-values (([found path] (node-range-search node40 13 25)))
    (test-equal '((13 . "$13") (20 . "$20")) (map (lambda (node) (cons (node-key node) (node-value node))) found)))

  (let-values (([found path] (node-range-search node2 13 25)))
    (test-equal '((13 . "$13") (20 . "$20")) (map (lambda (node) (cons (node-key node) (node-value node))) found)))

(test-results))