Monday, November 03, 2014

Near Duplicate Detection using MinHashing and Solr


Introduction


Recently there was a discussion on detecting near duplicate documents on LinkedIn (login required). Someone suggested using NGrams, which prompted me to suggest using More Like This (MLT) queries on Solr using shingles for terms (the general idea of querying with shingles is explained here).

Turns out that this was a bit naive. For one thing, the Solr ShingleFilter works differently than you would expect. For example, for the phrase "lipstick on a pig", you would expect 3-grams to be the set of tokens {"lipstick on a", "on a pig"}. With Solr's Shinglefilter, for minShingleSize=3, maxShingleSize=3 and outputUnigrams=true, you get {"lipstick|lipstick on a", "on|on a pig"}, ie synonym tokens with the unigram as anchor and the n-gram(s) as synonyms. If you set outputUnigrams=false, no shingles are produced because there is no anchor for the synonym term. Further, since MLT works by matching tokens found by analyzing a query document with tokens found in index documents, the only way to implement my original suggestion would be a custom ShingleFilter.

While I've built custom Solr components in the past, in hindsight I think its generally a bad idea for two reasons. First the Lucene and Solr projects are quite fast moving and APIs get changed frequently. Second, custom extension points are often regarded as "expert" and there is less concern for backward compatibility for these APIs than the user-centric ones. I guess the expectation is that people doing customizations are responsible for being on top of API changes as they occur. Maybe its true in general, but for me its a potential annoyance I have to deal with each time I upgrade.

In any case, I figured out a user-centric approach to do this. Instead of analyzing the content into shingles (n-grams), we decompose the content into shingles at index time and store them in a multi-valued string (no tokenization beyond the shingling) field. When trying to find near duplicates, we search using a boolean query of shingles built from the query document. This returns a ranked list of documents where topmost document has the most shingles matching the shingles of the query document, the next one less so, and so on.

This works well, except when the number of shingles becomes too large and hits the limit of the boolean query clauses. To get around this limitation, I implemented MinHashing, a technique I learned at the Mining of Massive Datasets (MMDS) course on Coursera. MinHashing creates an approximate representation of the document with a fixed number of hashes. Not only does it solve the boolean query limit issue, it also results in a smaller memory footprint.

This post describes both approaches - shingling and minhashing - to find near duplicates in the Restaurant Dataset, which consists of 533 restaurant names, addresses, telephone number and genre from Fodor's and 331 from Zagat's restaurant guides. Thanks to this post on the Knime blog for the reference to the Restaurant Dataset.

MinHashing Overview


It took me a while to understand MinHashing. One of the reasons I took the MMDS course was because I was unable to understand MinHashing from the MMDS Book. The lectures helped, but I still didn't understand how to implement it. I owe the understanding of that final step to my classmates at the course. I describe below my understanding, mostly as an attempt to pay it forward, hopefully it helps you too if you are struggling.

Consider the three strings s1, s2 and s3 below which need to be tested for near-duplicates. It is pretty obvious that s1 and s2 are close but s3 is completely different. We compare the strings (s1,s1), (s1,s2) and (s1,s3) as examples of exact duplicates, near duplicates and not duplicates.

1
2
3
4
5
6
7
8
9
s1 = "Art's Deli 12224 Ventura Blvd. Studio City"
s2 = "Art's Delicatessen 12224 Ventura Blvd. Studio City"
s3 = "Hotel Bel-Air 701 Stone Canyon Rd. Bel Air"

(s1,s1) (s1,s2) (s1,s3)
1.000   0.778   0.000   J(s,t)
1.000   0.333   0.000   J(shingle(s), shingle(t))
1.000   0.333   0.000   J(hash(shingle(s)), hash(shingle(t)))
1.000   0.290   0.000   J(minhash(shingle(s)), minhash(shingle(t)))

The first comparison is the Jaccard similarity between the pairs, treating each sentence as a bag of words. For near-duplicate detection, it is customary to consider the Jaccard similarity between shingles. This is what the second comparison does - it compares the Jaccard similarity between 3-grams rather than individual words. Rather than actual 3-grams, we could just as well store integer hashes instead - it saves space and there is no loss in precision.

The MinHash is the minimum of the hashes of the shingles we computed above. So now a document is represented by this single MinHash instead of a set of shingles (or a set of hashes). However, a single hash is not a good enough representation of the document, so we attempt to correct for that by representing a document with a set of MinHashes. Each MinHash in the set is generated by XOR-ing the set of hashes generated from shingles with a random number, then taking the minimum. The set of MinHashes is referred to as the signature of the document. In order for this signature to be comparable across documents, all documents must use the same set of random numbers for XOR-ing. The last row above represent the Jaccard similarity between the signatures of the three pairs of documents - as you can see, there is a slight loss of precision going from the shingles (and hashes of shingles) to the corresponding minhash signatures.

For the math behind this refer to the MMDS book or course. For a slightly more complete explanation, refer to this post. I did not end up implementing the banding for Locality Sensitive Hashing (LSH) because I wasn't getting good results on my corpus (my strings are too short) - I matched the full signature as described in this post.

Data Preprocessing


The restaurant dataset has 112 matching pairs (exact and near-duplicates) between the Zagat's and Fodor's lists. According to the README file, the telephone numbers in the data makes this task almost trivial, so I preprocessed the data to remove the telephone number and genre. Additionally, I added a line number to each line so I could list potential matches concisely as a pair of numbers (and also because I hope to reuse some of this code to do my Programming Assignment on the MMDS class). Here's the script to do this.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import os
import re

def extract_addr(fin, fout, phone_pattern):
  global cnt
  for line in fin:
    line = line.strip()
    m = re.search(phone_pattern, line)
    addr = None
    if m is None:
      addr = line
    else:
      start = m.start()
      addr = line[0:start].strip()
    fout.write("%d %s\n" % (cnt, addr))
    cnt += 1

orig_dir = "original"
proc_dir = "data"

input_files = ["fodors.txt", "zagats.txt"]
phone_patterns = [r"\d{3}\/", r"\d{3}-"]
fout = open(os.path.join(proc_dir, "addresses.txt"), 'wb')
cnt = 1
for i in range(0, 2):
  input_file = input_files[i]
  phone_pattern = phone_patterns[i]
  fin = open(os.path.join(orig_dir, input_file), 'rb')
  extract_addr(fin, fout, phone_pattern)
  fin.close()
fout.close()

This results in a single file containing name and address entries from both guides that looks like this:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
1 Adriano's Ristorante 2930 Beverly Glen Circle Los Angeles
2 Arnie Morton's of Chicago 435 S. La Cienega Blvd. Los Angeles
3 Art's Delicatessen 12224 Ventura Blvd. Studio City
4 Barney Greengrass 9570 Wilshire Blvd. Beverly Hills
5 Beaurivage 26025 Pacific Coast Hwy. Malibu
...
860 Ti Couz 3108 16th St. San Francisco
861 Trio Cafe 1870 Fillmore St. San Francisco
862 Tu Lan 8 Sixth St. San Francisco
863 Vicolo Pizzeria 201 Ivy St. San Francisco
864 Wa-Ha-Ka Oaxaca Mexican Grill 2141 Polk St. San Francisco

Solr Setup


I needed to set up the following fields in my Solr schema.xml file to support the application. The content field was already there, but I made it single valued. The md5_hash field stores the MD5 Hash of the content - useful for detecting exact duplicates without having to use a (potentially expensive) distance measure - more on that later. The num_words and first_word are useful for limiting the scope of the search - for example, we use first_word as a filter to only compare the source address with target addresses where the first word is the same. The content_ng is a multi-valued field that contains the shingles for the content - this may be prohibitively large for actual documents. Finally the content_sg is another multi-valued field that stores a fixed number of MinHash signatures representing the document.

1
2
3
4
5
6
7
8
<field name="content" type="text_general" indexed="false" stored="true" multiValued="false"/>
   <field name="md5_hash" type="string" indexed="true" stored="true"/>
   <field name="num_words" type="int" indexed="true" stored="true" />
   <field name="first_word" type="string" indexed="true" stored="true"/>
   <field name="content_ng" type="string" indexed="true" stored="true" 
       multiValued="true"/>
   <field name="content_sg" type="string" indexed="true" stored="true" 
       multiValued="true"/>

Code


Like most search applications, the functionality is split between an Indexer, which parses and writes out the input into the index, and a Searcher that reads from the index. Both Indexer and Searcher need to co-operate for best results, this functionality is generally taken care of by the Analyzer (part of Solr). In our case, since we are handling stuff in the client, we have a common class that handles all the work of normalizing, shingling and MinHashing - this class is called from both Indexer and Searcher.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Source: src/main/scala/com/mycompany/solr4extras/neardups/ShingleProcessor.scala
package com.mycompany.solr4extras.neardups

import scala.math.abs
import scala.util.Random
import scala.util.hashing.MurmurHash3

import com.aliasi.tokenizer.RegExTokenizerFactory
import com.aliasi.tokenizer.TokenNGramTokenizerFactory

class ShingleProcessor(size: Int) {

  // everything in \\p{Punct} except "-", other exclusions may
  // be added as they are discovered
  val Puncts = """[!"#$%&\'()*+,./:;<=>?@[\\]^_`{|}~]""".r

  val factory = new TokenNGramTokenizerFactory(
    new RegExTokenizerFactory("\\S+"), size, size)

  def normalize(str: String): String = 
    Puncts.replaceAllIn(str.toLowerCase, " ")
      .replaceAll("\\s+", " ")
  
  def words(str: String): Array[String] = normalize(str).split(" ")
  
  def firstWord(str: String): String = words(str).head
  
  def numWords(str: String): Int = words(str).size
  
  def ngrams(str: String): Array[String] = {
    if (numWords(str) < size) Array(str)
    else {
      val normStr = normalize(str)
      val tokenizer = factory.tokenizer(
        normStr.toCharArray(), 0, normStr.length())
      tokenizer.tokenize().toArray
    }
  }

  def hash(str: String): Int = MurmurHash3.stringHash(str, seed=42)
  
  def minHash(hashes: Array[Int]): Int = {
    hashes.toArray.sortWith(_ > _).head
  }
  
  def signatures(size: Int, str: String): Array[Int] = {
    Random.setSeed(42L)
    Array.fill(size)(Random.nextInt).map(mask => 
      minHash(ngrams(str).map(shingle => hash(shingle) ^ mask)))
  }
  
  def jaccard(set1: Set[String], set2: Set[String]): Float = 
    1.0F * set1.intersect(set2).size / set1.union(set2).size
}

The indexer parses each input line and computes the fields and writes them out. There is not much to explain here, what you see is (mostly) what you get.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// Source: src/main/scala/com/mycompany/solr4extras/neardups/Indexer.scala
package com.mycompany.solr4extras.neardups

import java.io.File
import java.util.concurrent.atomic.AtomicInteger

import scala.io.Source

import org.apache.commons.codec.digest.DigestUtils
import org.apache.solr.client.solrj.impl.HttpSolrServer
import org.apache.solr.common.SolrInputDocument

class Indexer(ngramSize: Int, sigSize: Int) {

  val SolrUrl = "http://localhost:8983/solr/collection1"
  val InputFile = new File("/path/to/addresses.txt")

  def buildIndex(): Unit = {
    val solr = new HttpSolrServer(SolrUrl)
    val processor = new ShingleProcessor(ngramSize)
    Source.fromFile(InputFile).getLines
      .foreach(line => {
        val splitAt = line.indexOf(' ')
        val id = line.substring(0, splitAt).toInt
        val data = processor.normalize(line.substring(splitAt + 1))
        if (id % 100 == 0) {
          Console.println("Processing record: %s".format(id))
          solr.commit()
        }
        val doc = new SolrInputDocument()
        doc.addField("id", id.toString)
        doc.addField("content", data)
        // to quickly check for exact match
        doc.addField("md5_hash", DigestUtils.md5Hex(data))
        // to limit scope of search
        doc.addField("num_words", processor.numWords(data))
        doc.addField("first_word", processor.firstWord(data))
        // compute shingles
        processor.ngrams(data).foreach(ngram => 
          doc.addField("content_ng", ngram))
        // compute the minhash signature matrix
        processor.signatures(sigSize, data).foreach(sig => 
          doc.addField("content_sg", sig.toString))
        solr.add(doc)
      })
      Console.println("Cleaning up... complete")
      solr.commit()
  }
}

The Searcher is a bit more complicated. The idea is that we want all pairs (or larger groups) of address documents that are duplicates or near duplicates, so we need to iterate through the documents in the index and find near duplicates for each one. We protect against double counting by enforcing that we only look for neighbors whose id is greater than the query document. In addition, we limit the scope of the search to documents which share the same first word as the query document. Thus we reduce an all-pairs comparison of complexity O(n2) to an O(n) process (assuming search is almost lookup, having O(1)).

In order to provide "good" duplicates, we restrict the neighbors of each address to those within an edit distance of 2. The editDistance() method implements a word version of Levenshtein's Distance. Running editDistance() is more expensive than running a straight match, so for exact matches (edit distance of 0) we just match on the MD5 hash, once that fails we fall back to using edit distance until the edit distance threshold is reached.

To generate a file of duplicate and near duplicate pairs, we invoke the reportNearDups method of the Searcher.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Source: src/main/scala/com/mycompany/solr4extras/neardups/Searcher.scala
package com.mycompany.solr4extras.neardups

import java.io.File
import java.io.FileWriter
import java.io.PrintWriter

import scala.collection.JavaConversions._
import scala.math.min

import org.apache.solr.client.solrj.SolrQuery
import org.apache.solr.client.solrj.SolrRequest.METHOD
import org.apache.solr.client.solrj.impl.HttpSolrServer

class Searcher(ngramSize: Int, sigSize: Int) {

  val SolrUrl = "http://localhost:8983/solr/collection1"

  val processor = new ShingleProcessor(ngramSize)
  val solr = new HttpSolrServer(SolrUrl)

  def getById(id: Int): Document = {
    val idquery = new SolrQuery()
    idquery.setQuery("id:" + id)
    idquery.setRows(1)
    idquery.setFields("id", "content", "md5_hash")
    val idresp = solr.query(idquery, METHOD.POST)
    val iddoc = idresp.getResults().head
    Document(iddoc.getFieldValue("id").asInstanceOf[String],
      iddoc.getFieldValue("content").asInstanceOf[String],
      iddoc.getFieldValue("md5_hash").asInstanceOf[String],
      0, 0.0F)
  }
  
  def findNearDupsByShingles(docId: Int, maxDist: Int): List[Document] = {
    val doc = getById(docId)
    val queryString = processor.ngrams(doc.content)
      .map(ngram => "content_ng:\"" + ngram + "\"")
      .mkString(" OR ")
    nearDupSearch(docId, maxDist, queryString)
  }
  
  def findNearDupsByMinhash(docId: Int, maxDist: Int): List[Document] = {
    val doc = getById(docId)
    val queryString = processor.signatures(sigSize, doc.content)
      .map(sig => "content_sg:\"" + sig + "\"")
      .mkString(" OR ")
    nearDupSearch(docId, maxDist, queryString)
  }
  
  def nearDupSearch(docId: Int, maxDist: Int, q: String): List[Document] = {
    val sourceDoc = getById(docId)
    val query = new SolrQuery()
    // build shingle query
    query.setQuery(q)
    query.setFilterQueries("first_word:\"" + 
      processor.firstWord(sourceDoc.content) + "\"")
    query.setRows(10)
    query.setFields("id", "content", "md5_hash", "score")
    val resp = solr.query(query, METHOD.POST)
    var prevEditDist = 0
    resp.getResults().map(ndoc => {
      val hash = ndoc.getFieldValue("md5_hash").asInstanceOf[String]
      val content = ndoc.getFieldValue("content").asInstanceOf[String]
      val editDist = if (hash == sourceDoc.md5hash) 0 
        else if (prevEditDist <= maxDist)
          editDistance(processor.words(sourceDoc.content), 
                       processor.words(content))
        else Int.MaxValue
      prevEditDist = editDist
      new Document(ndoc.getFieldValue("id").asInstanceOf[String], 
        content, hash, editDist,
        ndoc.getFieldValue("score").asInstanceOf[Float])
    })
    .filter(doc => doc.id.toInt > sourceDoc.id.toInt)
    .filter(doc => doc.editDist <= maxDist)
    .toList
  }
  
  def reportNearDups(outfile: File, startDocId: Int, 
      endDocId: Int, editDist: Int, useShingles: Boolean): Unit = {
    val writer = new PrintWriter(new FileWriter(outfile), true)
    (startDocId until endDocId).foreach(docId => {
      val sdoc = getById(docId)
      val neighborDocs = if (useShingles) 
        findNearDupsByShingles(docId, editDist)
        else findNearDupsByMinhash(docId, editDist)
      if (neighborDocs.size > 0) {
     Console.println("%s: %s".format(docId, sdoc.content))
        neighborDocs.foreach(ndoc => {
          writer.println("%s\t%s\t%d"
            .format(docId, ndoc.id, ndoc.editDist))
          Console.println("%s: %s (%d)"
            .format(ndoc.id, ndoc.content, ndoc.editDist))
        })
        Console.println("==")
      }
    })
    writer.flush()
    writer.close()
  }
  
  // adapted from rosettacode.org/wiki/Levenshtein_distance#Scala
  def editDistance(words1: Array[String], words2: Array[String]): Int = {
    val len1 = words1.length
    val len2 = words2.length
    val distances = Array.ofDim[Int](len1+1, len2+1)
    for (i <- 0 to len1;
         j <- 0 to len2) {
      if (j == 0) distances(i)(j) = i
      else if (i == 0) distances(i)(j) = j
      else distances(i)(j) = 0
    }
    for (i <- 1 to len1;
         j <- 1 to len2) {
      distances(i)(j) = if (words1(i-1).equals(words2(j-1))) 
          distances(i-1)(j-1)
        else minimum(
          distances(i-1)(j) + 1,  // deletion
          distances(i)(j-1) + 1,  // insertion
          distances(i-1)(j-1) + 1 // substitution
        )
    }
    distances(len1)(len2)
  }

  def minimum(i1: Int, i2: Int, i3: Int): Int = min(min(i1, i2), i3)
}

case class Document(id: String, 
                    content: String, 
                    md5hash: String, 
                    editDist: Int, 
                    score: Float)

Results


Of the 112 pairs of potential matches, the shingle approach with 3-grams and edit distance of 2 found 65 pairs and the MinHash approach with 20 MinHash signatures per document and edit distance of 2 found 64 pairs. Here are some interesting matches. The number in front of the colon is the id for this address, and the number in parenthesis is the edit distance of the neighbor document from the query document.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
3: art s delicatessen 12224 ventura blvd studio city
536: art s deli 12224 ventura blvd studio city (1)

43: l orangerie 903 n la cienega blvd los angeles
579: l orangerie 903 n la cienega blvd w hollywood (2)

44: le chardonnay 8284 melrose ave los angeles
584: le chardonnay los angeles 8284 melrose ave los angeles (2)

60: pinot bistro 12969 ventura blvd los angeles
605: pinot bistro 12969 ventura blvd studio city (2)

84: 21 club 21 w 52nd st new york
625: 21 club 21 w 52nd st new york city (1)

...

That's all I have today. All the code described (plus unit tests) can be found in my GitHub project. MinHashing has been something I've been trying to get my head around for a while, this exercise helped me understand it. Hopefully it helped you too.

2 comments (moderated to prevent spam):

Vitaly said...

Great as usually. Very helpful.

Sujit Pal said...

Thanks Vitaly, glad it helped.