Skip to content

Commit 461a56d

Browse files
committed
feat: graphsync provider
1 parent 2f8d09d commit 461a56d

File tree

8 files changed

+312
-91
lines changed

8 files changed

+312
-91
lines changed

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
"@types/bl": "^5.0.2",
1515
"@types/mime": "^2.0.3",
1616
"@types/uuid": "^8.3.4",
17+
"@types/varint": "^6.0.0",
1718
"aegir": "^36.2.3",
1819
"blockstore-core": "^1.0.5",
1920
"ipfs-unixfs-importer": "^9.0.6",
@@ -39,6 +40,7 @@
3940
"peer-id": "^0.16.0",
4041
"uint8arrays": "^3.0.0",
4142
"uuid": "^8.3.2",
43+
"varint": "^6.0.0",
4244
"varint-decoder": "^1.0.0"
4345
}
4446
}

src/graphsync.ts

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
PROTOCOL,
1818
GraphSyncBlock,
1919
decodeBlock,
20+
GraphSyncRequest,
2021
GraphSyncResponse,
2122
decodeMessage,
2223
newRequest,
@@ -28,6 +29,7 @@ import type {
2829
NodeReifier,
2930
} from "./traversal";
3031
import {Node, walkBlocks, parseContext, unixfsReifier} from "./traversal";
32+
import {responseBuilder} from "./response-builder";
3133

3234
export class GraphSync {
3335
started = false;
@@ -83,6 +85,10 @@ export class GraphSync {
8385
req.incomingResponseHook(resp);
8486
}
8587
}
88+
async _handleRequest(peer: PeerId, req: GraphSyncRequest) {
89+
const {stream} = await this.network.dialProtocol(peer, PROTOCOL);
90+
await pipe(responseBuilder(req, this.blocks), lp.encode(), stream);
91+
}
8692
async _handler(props: HandlerProps) {
8793
const source = props.stream.source as AsyncIterable<BufferList>;
8894
for await (const chunk of lp.decode()(source)) {
@@ -96,6 +102,11 @@ export class GraphSync {
96102
if (msg.rsp) {
97103
msg.rsp.forEach((resp) => this._handleResponse(resp));
98104
}
105+
if (msg.req) {
106+
msg.req.forEach((req) =>
107+
this._handleRequest(props.connection.remotePeer, req)
108+
);
109+
}
99110
}
100111
}
101112
}
@@ -127,14 +138,21 @@ export class Request extends EventEmitter {
127138
this.loader = new AsyncLoader(blocks, this.incomingBlockHook.bind(this));
128139
}
129140

130-
open(peer: PeerId, extensions?: {[key: string]: any}) {
131-
this.loader.setWaitNotify(async () => {
132-
this.loader.notifyWaiting = false;
133-
const {stream} = await this.dialer.dialProtocol(peer, PROTOCOL);
134-
await pipe(
135-
[newRequest(this.id, this.root, this.selector, extensions)],
136-
stream
137-
);
141+
open(peer: PeerId, extensions?: {[key: string]: any}): Promise<void> {
142+
return new Promise((resolve, reject) => {
143+
this.loader.setWaitNotify(async () => {
144+
this.loader.notifyWaiting = false;
145+
try {
146+
const {stream} = await this.dialer.dialProtocol(peer, PROTOCOL);
147+
pipe(
148+
[newRequest(this.id, this.root, this.selector, extensions)],
149+
stream
150+
);
151+
resolve();
152+
} catch (e) {
153+
reject(e);
154+
}
155+
});
138156
});
139157
}
140158

src/messages.ts

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ import {Buffer} from "buffer";
44
import {parse as uuidParse} from "uuid";
55
import * as dagCBOR from "@ipld/dag-cbor";
66
import type BufferList from "bl/BufferList";
7-
// @ts-ignore (no types)
8-
import vd from "varint-decoder";
7+
import * as varint from "varint";
98
import lp from "it-length-prefixed";
109
import {SelectorNode, decoderFor} from "./traversal";
1110

@@ -72,7 +71,7 @@ export const statuses = {
7271
[ResponseStatusCode.RequestCancelled]: "RequestCancelled",
7372
};
7473

75-
enum GraphSyncLinkAction {
74+
export enum GraphSyncLinkAction {
7675
Present = "p",
7776
DuplicateNotSent = "d",
7877
Missing = "m",
@@ -86,7 +85,7 @@ type GraphSyncPriority = number;
8685

8786
type GraphSyncMetadatum = [CID, GraphSyncLinkAction];
8887

89-
type GraphSyncMetadata = GraphSyncMetadatum[];
88+
export type GraphSyncMetadata = GraphSyncMetadatum[];
9089

9190
enum GraphSyncRequestType {
9291
New = "n",
@@ -114,13 +113,13 @@ export type GraphSyncResponse = {
114113
ext?: GraphSyncExtensions;
115114
};
116115

117-
type GraphSyncMessage = {
116+
export type GraphSyncMessage = {
118117
req?: GraphSyncRequest[];
119118
rsp?: GraphSyncResponse[];
120119
blk?: GraphSyncBlock[];
121120
};
122121

123-
type GraphSyncMessageRoot = {
122+
export type GraphSyncMessageRoot = {
124123
gs2: GraphSyncMessage;
125124
};
126125

@@ -130,20 +129,21 @@ export function newRequest(
130129
sel: SelectorNode,
131130
ext?: GraphSyncExtensions
132131
): BufferList {
132+
const req: GraphSyncRequest = {
133+
id: uuidParse(id) as Uint8Array,
134+
type: GraphSyncRequestType.New,
135+
pri: 0,
136+
root,
137+
sel,
138+
};
139+
if (ext) {
140+
req.ext = ext;
141+
}
133142
return lp.encode.single(
134143
new Buffer(
135-
dagCBOR.encode({
144+
dagCBOR.encode<GraphSyncMessageRoot>({
136145
gs2: {
137-
requests: [
138-
{
139-
id: uuidParse(id),
140-
type: GraphSyncRequestType.New,
141-
pri: 0,
142-
root,
143-
sel,
144-
ext,
145-
},
146-
],
146+
req: [req],
147147
},
148148
})
149149
)
@@ -158,10 +158,12 @@ export async function decodeBlock(
158158
block: GraphSyncBlock,
159159
hashers: {[key: number]: hasher.MultihashHasher<any>}
160160
): Promise<Block<any>> {
161-
const values = vd(block[0]);
162-
const cidVersion = values[0];
163-
const multicodec = values[1];
164-
const multihash = values[2];
161+
let offset = 0;
162+
const cidVersion = varint.decode(block[0], offset) as any;
163+
offset += varint.decode.bytes;
164+
const multicodec = varint.decode(block[0], offset);
165+
offset += varint.decode.bytes;
166+
const multihash = varint.decode(block[0], offset);
165167
const hasher = hashers[multihash];
166168
if (!hasher) {
167169
throw new Error("Unsuported hasher");

src/resolver.ts

Lines changed: 47 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
allSelector,
77
Node,
88
Kind,
9+
LinkLoader,
910
parseContext,
1011
walkBlocks,
1112
SelectorNode,
@@ -65,50 +66,16 @@ export function getPeerID(addr: Multiaddr): PeerId {
6566
return PeerId.createFromB58String(parts[idx]);
6667
}
6768

69+
// Iterate an IPLD traversal and resolve UnixFS blocks
6870
export async function* resolve(
69-
path: string,
70-
provider: Multiaddr,
71-
exchange: GraphSync
72-
): AsyncIterable<any> {
73-
const {root, selector: sel} = unixfsPathSelector(path);
74-
const pid = getPeerID(provider);
75-
exchange.network.peerStore.addressBook.add(pid, [provider]);
76-
const request = exchange.request(root, sel);
77-
const id = Date.now();
78-
const voucher = {
79-
ID: id,
80-
PayloadCID: root,
81-
Params: {
82-
Selector: sel,
83-
PieceCID: null,
84-
PricePerByte: new Uint8Array(),
85-
PaymentInterval: 1000,
86-
PaymentIntervalIncrease: 0,
87-
UnsealPrice: new Uint8Array(),
88-
},
89-
};
90-
request.open(pid, {
91-
[EXTENSION]: {
92-
IsRq: true,
93-
Request: {
94-
BCid: root,
95-
Type: 0,
96-
Pull: true,
97-
Paus: false,
98-
Part: false,
99-
Stor: sel,
100-
Vouch: voucher,
101-
VTyp: "RetrievalDealProposal/1",
102-
XferID: id,
103-
RestartChannel: ["", "", 0],
104-
},
105-
Response: null,
106-
},
107-
});
71+
root: CID,
72+
selector: SelectorNode,
73+
loader: LinkLoader
74+
): AsyncIterable<Uint8Array> {
10875
for await (const blk of walkBlocks(
10976
new Node(root),
110-
parseContext().parseSelector(sel),
111-
request
77+
parseContext().parseSelector(selector),
78+
loader
11279
)) {
11380
// if not cbor or dagpb just return the bytes
11481
switch (blk.cid.code) {
@@ -133,8 +100,6 @@ export async function* resolve(
133100
// ignore
134101
}
135102
}
136-
// tell the loader we're done receiving blocks for this traversal
137-
request.close();
138103
}
139104

140105
type FetchInit = {
@@ -145,7 +110,44 @@ type FetchInit = {
145110

146111
export async function fetch(url: string, init: FetchInit): Promise<Response> {
147112
const {headers, exchange, provider} = init;
148-
const content = resolve(url, provider, exchange);
113+
114+
const {root, selector: sel} = unixfsPathSelector(url);
115+
const pid = getPeerID(provider);
116+
exchange.network.peerStore.addressBook.add(pid, [provider]);
117+
const request = exchange.request(root, sel);
118+
const id = Date.now();
119+
const voucher = {
120+
ID: id,
121+
PayloadCID: root,
122+
Params: {
123+
Selector: sel,
124+
PieceCID: null,
125+
PricePerByte: new Uint8Array(),
126+
PaymentInterval: 1000,
127+
PaymentIntervalIncrease: 0,
128+
UnsealPrice: new Uint8Array(),
129+
},
130+
};
131+
request.open(pid, {
132+
[EXTENSION]: {
133+
IsRq: true,
134+
Request: {
135+
BCid: root,
136+
Type: 0,
137+
Pull: true,
138+
Paus: false,
139+
Part: false,
140+
Stor: sel,
141+
Vouch: voucher,
142+
VTyp: "RetrievalDealProposal/1",
143+
XferID: id,
144+
RestartChannel: ["", "", 0],
145+
},
146+
Response: null,
147+
},
148+
});
149+
150+
const content = resolve(root, sel, request);
149151
const iterator = content[Symbol.asyncIterator]();
150152

151153
try {
@@ -171,6 +173,7 @@ export async function fetch(url: string, init: FetchInit): Promise<Response> {
171173
console.log(e);
172174
writer.abort((e as Error).message);
173175
}
176+
request.close();
174177
}
175178
write();
176179
return new Response(readable, {

0 commit comments

Comments
 (0)