|
3 | 3 |
|
4 | 4 | package crawlercommons.urlfrontier.service.rocksdb; |
5 | 5 |
|
| 6 | +import com.google.common.util.concurrent.Striped; |
6 | 7 | import com.google.protobuf.InvalidProtocolBufferException; |
7 | 8 | import crawlercommons.urlfrontier.CrawlID; |
8 | 9 | import crawlercommons.urlfrontier.Urlfrontier.AckMessage.Status; |
|
32 | 33 | import java.util.NoSuchElementException; |
33 | 34 | import java.util.Set; |
34 | 35 | import java.util.concurrent.ConcurrentHashMap; |
| 36 | +import java.util.concurrent.locks.Lock; |
35 | 37 | import org.apache.commons.lang3.StringUtils; |
36 | 38 | import org.rocksdb.BlockBasedTableConfig; |
37 | 39 | import org.rocksdb.BloomFilter; |
@@ -66,9 +68,11 @@ public class RocksDBService extends AbstractFrontierService { |
66 | 68 |
|
67 | 69 | private Statistics statistics; |
68 | 70 |
|
| 71 | + private static final Striped<Lock> STRIPED_LOCKS = Striped.lock(128); // 128 stripes |
| 72 | + |
69 | 73 | // no explicit config |
70 | 74 | public RocksDBService(String host, int port) { |
71 | | - this(new HashMap<String, String>(), host, port); |
| 75 | + this(new HashMap<>(), host, port); |
72 | 76 | } |
73 | 77 |
|
74 | 78 | private final ConcurrentHashMap<QueueWithinCrawl, QueueWithinCrawl> queuesBeingDeleted = |
@@ -390,14 +394,15 @@ protected Status putURLItem(final URLItem value) { |
390 | 394 | return Status.SKIPPED; |
391 | 395 | } |
392 | 396 |
|
393 | | - // make it intern so that all threads accessing this method |
394 | | - // share the same instance of the String, this way we can synchronize |
395 | | - // on it and make sure that 2 threads working on the same URL won't |
| 397 | + final String existenceKeyString = (qk.toString() + "_" + url); |
| 398 | + // Synchronize on existence key (avoid interning String to reduce mem usage) |
| 399 | + // Make sure that 2 threads working on the same URL won't |
396 | 400 | // both be considered non-existant |
397 | | - final String existenceKeyString = (qk.toString() + "_" + url).intern(); |
398 | | - final byte[] existenceKey = existenceKeyString.getBytes(StandardCharsets.UTF_8); |
| 401 | + final Lock existenceLock = lockFor(existenceKeyString); |
| 402 | + existenceLock.lock(); |
399 | 403 |
|
400 | | - synchronized (existenceKeyString) { |
| 404 | + try { |
| 405 | + final byte[] existenceKey = existenceKeyString.getBytes(StandardCharsets.UTF_8); |
401 | 406 |
|
402 | 407 | // is this URL already known? |
403 | 408 | try (WriteBatch writeBatch = new WriteBatch(); |
@@ -468,6 +473,8 @@ protected Status putURLItem(final URLItem value) { |
468 | 473 | LOG.error("RocksDB exception", e); |
469 | 474 | return Status.FAIL; |
470 | 475 | } |
| 476 | + } finally { |
| 477 | + existenceLock.unlock(); |
471 | 478 | } |
472 | 479 |
|
473 | 480 | return Status.OK; |
@@ -1004,4 +1011,8 @@ public void close() { |
1004 | 1011 | this.rocksIterator.close(); |
1005 | 1012 | } |
1006 | 1013 | } |
| 1014 | + |
| 1015 | + private static Lock lockFor(String compositeKey) { |
| 1016 | + return STRIPED_LOCKS.get(compositeKey); |
| 1017 | + } |
1007 | 1018 | } |
0 commit comments