Skip to content

Commit 2f8d09d

Browse files
committed
feat: resolve graphsync from store if present
1 parent 0addfe8 commit 2f8d09d

File tree

5 files changed

+224
-8
lines changed

5 files changed

+224
-8
lines changed

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
"aegir": "^36.2.3",
1818
"blockstore-core": "^1.0.5",
1919
"ipfs-unixfs-importer": "^9.0.6",
20+
"it-pair": "^1.0.0",
21+
"libp2p-interfaces": "^2.0.1",
2022
"prettier": "^2.6.1"
2123
},
2224
"dependencies": {

src/async-loader.ts

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,15 @@ interface Resolvable {
1515

1616
export type BlockNotifyFn = (block: Block<any>) => void;
1717

18+
export type WaitNotifyFn = (cid: CID) => void;
19+
1820
// AsyncLoader waits for a block to be anounced if it is not available in the blockstore
1921
export class AsyncLoader implements LinkLoader {
2022
store: Blockstore;
2123
// notify callback everytime a new block is loaded
2224
tracker?: BlockNotifyFn;
25+
// notify callback when the loader is waiting for a bock
26+
waitNotify?: WaitNotifyFn;
2327
// pending are block that have been pushed but not yet loaded
2428
pending: Map<string, Block<any>> = new Map();
2529
// loaded is a set of string CIDs for content that was loaded.
@@ -30,9 +34,18 @@ export class AsyncLoader implements LinkLoader {
3034

3135
reifiers: KnownReifiers = {};
3236

33-
constructor(store: Blockstore, tracker?: BlockNotifyFn) {
37+
notifyWaiting = false;
38+
39+
constructor(
40+
store: Blockstore,
41+
tracker?: BlockNotifyFn,
42+
waitNotify?: WaitNotifyFn
43+
) {
3444
this.store = store;
3545
this.tracker = tracker;
46+
if (waitNotify) {
47+
this.setWaitNotify(waitNotify);
48+
}
3649
}
3750
async load(cid: CID): Promise<Block<any>> {
3851
const k = cid.toString();
@@ -52,6 +65,8 @@ export class AsyncLoader implements LinkLoader {
5265
this.loaded.add(k);
5366
}
5467
}
68+
// waitForBlock will queue up a promise that will get resolved once a block
69+
// is pushed to the loader.
5570
async waitForBlock(cid: CID): Promise<Block<any>> {
5671
const block = this.pending.get(cid.toString());
5772
if (block) {
@@ -61,6 +76,10 @@ export class AsyncLoader implements LinkLoader {
6176
return blockFromStore(cid, this.store);
6277
}
6378

79+
if (this.waitNotify && this.notifyWaiting) {
80+
this.waitNotify(cid);
81+
}
82+
6483
return new Promise((resolve, reject) => {
6584
this.pullQueue.set(
6685
cid.toString(),
@@ -79,6 +98,7 @@ export class AsyncLoader implements LinkLoader {
7998
this.pending.set(k, block);
8099
}
81100
}
101+
// move a pending block to the blockstore.
82102
flush(blk: Block<any>) {
83103
if (!this.loaded.has(blk.cid.toString())) {
84104
this.tracker?.(blk);
@@ -94,4 +114,9 @@ export class AsyncLoader implements LinkLoader {
94114
close() {
95115
this.pending = new Map();
96116
}
117+
118+
setWaitNotify(cb: WaitNotifyFn) {
119+
this.waitNotify = cb;
120+
this.notifyWaiting = true;
121+
}
97122
}

src/graphsync.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,15 @@ export class Request extends EventEmitter {
127127
this.loader = new AsyncLoader(blocks, this.incomingBlockHook.bind(this));
128128
}
129129

130-
async open(peer: PeerId, extensions?: {[key: string]: any}) {
131-
const {stream} = await this.dialer.dialProtocol(peer, PROTOCOL);
132-
await pipe(
133-
[newRequest(this.id, this.root, this.selector, extensions)],
134-
stream
135-
);
130+
open(peer: PeerId, extensions?: {[key: string]: any}) {
131+
this.loader.setWaitNotify(async () => {
132+
this.loader.notifyWaiting = false;
133+
const {stream} = await this.dialer.dialProtocol(peer, PROTOCOL);
134+
await pipe(
135+
[newRequest(this.id, this.root, this.selector, extensions)],
136+
stream
137+
);
138+
});
136139
}
137140

138141
async drain() {

test/mock-libp2p.ts

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import type {HandlerProps, MuxedStream, Connection} from "libp2p";
2+
import {Connection as Conn} from "libp2p-interfaces/src/connection";
3+
import PeerId from "peer-id";
4+
import {Multiaddr} from "multiaddr";
5+
import {EventEmitter} from "events";
6+
// @ts-ignore
7+
import pair from "it-pair";
8+
import type BufferList from "bl/BufferList";
9+
import drain from "it-drain";
10+
11+
class MockAddressBook {
12+
addrs: {[key: string]: Multiaddr[]} = {};
13+
14+
add(pid: PeerId, addrs: Multiaddr[]) {
15+
this.addrs[pid.toString()] = addrs;
16+
return this;
17+
}
18+
}
19+
20+
export class MockLibp2p {
21+
streamId = 0;
22+
handlers: {[key: string]: (props: HandlerProps) => void} = {};
23+
24+
peerId: PeerId;
25+
connectionManager = new EventEmitter();
26+
peerStore = {
27+
addressBook: new MockAddressBook(),
28+
};
29+
30+
sources: {[key: string]: AsyncIterable<BufferList>} = {};
31+
32+
constructor(peerId: PeerId) {
33+
this.peerId = peerId;
34+
}
35+
36+
handle(protocol: string, handler: (props: HandlerProps) => void) {
37+
this.handlers[protocol] = handler;
38+
}
39+
40+
unhandle(protocol: string | string[]) {
41+
const protos = Array.isArray(protocol) ? protocol : [protocol];
42+
protos.forEach((p) => {
43+
delete this.handlers[p];
44+
});
45+
}
46+
47+
async dial(
48+
peer: string | PeerId | Multiaddr,
49+
options?: any
50+
): Promise<Connection> {
51+
const localAddr = new Multiaddr("/ip4/127.0.0.1/tcp/8080");
52+
const remoteAddr = new Multiaddr("/ip4/127.0.0.1/tcp/8081");
53+
54+
const [localPeer, remotePeer] = [
55+
PeerId.createFromB58String(
56+
"12D3KooWSoLzampfxc4t3sy9z7yq1Cgzbi7zGXpV7nvt5hfeKUhR"
57+
),
58+
PeerId.createFromB58String(
59+
"12D3KooWSoLzampfxc4t3sy9z7yq1Cgzbi7zGXpV7nvt5hfeKRhU"
60+
),
61+
];
62+
const openStreams: MuxedStream[] = [];
63+
let streamId = 0;
64+
65+
return new Conn({
66+
localPeer: localPeer,
67+
remotePeer: remotePeer,
68+
localAddr,
69+
remoteAddr,
70+
stat: {
71+
timeline: {
72+
open: Date.now() - 10,
73+
upgraded: Date.now(),
74+
},
75+
direction: "outbound",
76+
encryption: "/noise",
77+
multiplexer: "/mplex/6.7.0",
78+
},
79+
newStream: async (protocols) => {
80+
const id = streamId++;
81+
const stream = pair();
82+
83+
stream.close = () => stream.sink([]);
84+
stream.id = id;
85+
86+
openStreams.push(stream);
87+
88+
return {
89+
stream,
90+
protocol: protocols[0],
91+
};
92+
},
93+
close: async () => {},
94+
getStreams: () => openStreams,
95+
});
96+
}
97+
98+
async dialProtocol(
99+
peer: PeerId,
100+
protocols: string[] | string,
101+
options?: any
102+
): Promise<{stream: MuxedStream; protocol: string}> {
103+
const id = "" + this.streamId++;
104+
const stream: MuxedStream =
105+
id in this.sources
106+
? {
107+
source: this.sources[id],
108+
sink: drain,
109+
}
110+
: pair();
111+
stream.close = () => {};
112+
stream.id = id;
113+
114+
const conn = {
115+
stream,
116+
protocol: typeof protocols === "string" ? protocols : protocols[0],
117+
};
118+
if (id in this.sources) {
119+
// @ts-ignore
120+
this.handlers[conn.protocol]({stream, connection: conn});
121+
}
122+
return conn;
123+
}
124+
}

test/resolver.spec.ts

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
import {expect} from "aegir/utils/chai.js";
2-
import {unixfsPathSelector} from "../src/resolver";
2+
import {resolve, unixfsPathSelector} from "../src/resolver";
3+
import {MemoryBlockstore} from "blockstore-core/memory";
4+
import {MockLibp2p} from "./mock-libp2p";
5+
import PeerId from "peer-id";
6+
import {multiaddr} from "multiaddr";
7+
import {GraphSync} from "../src/graphsync";
8+
import {importer} from "ipfs-unixfs-importer";
9+
import {concat as concatUint8Arrays} from "uint8arrays/concat";
310

411
describe("resolver", () => {
512
it("parse a unixfs path", () => {
@@ -9,4 +16,59 @@ describe("resolver", () => {
916
)
1017
).to.not.throw();
1118
});
19+
20+
it("resolves a unixfs directory from the store", async () => {
21+
const blocks = new MemoryBlockstore();
22+
23+
const first = new Uint8Array(5 * 256);
24+
const second = new Uint8Array(3 * 256);
25+
26+
// chunk and dagify it then get the root cid
27+
let cid;
28+
for await (const chunk of importer(
29+
[
30+
{path: "first", content: first},
31+
{path: "second", content: second},
32+
],
33+
blocks,
34+
{
35+
cidVersion: 1,
36+
maxChunkSize: 256,
37+
rawLeaves: true,
38+
wrapWithDirectory: true,
39+
}
40+
)) {
41+
if (chunk.path === "") {
42+
cid = chunk.cid;
43+
}
44+
}
45+
46+
const libp2p = new MockLibp2p(
47+
PeerId.createFromB58String(
48+
"12D3KooWSoLzampfxc4t3sy9z7yq1Cgzbi7zGXpV7nvt5hfeKUhR"
49+
)
50+
);
51+
const exchange = new GraphSync(libp2p, blocks);
52+
53+
if (!cid) {
54+
throw new Error("failed to import DAG");
55+
}
56+
const content = resolve(
57+
cid.toString() + "/first",
58+
multiaddr(
59+
"/ip4/127.0.0.1/tcp/41505/ws/p2p/12D3KooWSWERLeRUwpGrigog1Aa3riz9zBSShBPqdMcqYsPs7Bfw"
60+
),
61+
exchange
62+
);
63+
const iterator = content[Symbol.asyncIterator]();
64+
let {value, done} = await iterator.next();
65+
let buf = value;
66+
while (!done) {
67+
({value, done} = await iterator.next());
68+
if (value) {
69+
buf = concatUint8Arrays([buf, value], buf.length + value.length);
70+
}
71+
}
72+
expect(buf).to.deep.equal(first);
73+
});
1274
});

0 commit comments

Comments
 (0)