Feat: implement parallel fetch w/ HTTP 304 (#43)

This commit is contained in:
Sukka
2024-10-09 20:49:36 +08:00
committed by GitHub
parent 07d3fdf05b
commit 2e48a72a3c
10 changed files with 273 additions and 111 deletions

View File

@@ -9,6 +9,7 @@ import { performance } from 'node:perf_hooks';
import fs from 'node:fs';
import { stringHash } from './string-hash';
import { defaultRequestInit, fetchWithRetry } from './fetch-retry';
import { Custom304NotModifiedError, CustomAbortError, CustomNoETagFallbackError, fetchAssets, sleepWithAbort } from './fetch-assets';
const enum CacheStatus {
Hit = 'hit',
@@ -211,14 +212,10 @@ export class Cache<S = string> {
url: string,
extraCacheKey: string,
fn: (resp: Response) => Promise<T>,
opt: Omit<CacheApplyOption<T, S>, 'ttl' | 'incrementTtlWhenHit'>,
opt: Omit<CacheApplyOption<T, S>, 'incrementTtlWhenHit'>,
requestInit?: RequestInit
) {
const { temporaryBypass } = opt;
const ttl = TTL.ONE_WEEK_STATIC;
if (temporaryBypass) {
if (opt.temporaryBypass) {
return fn(await fetchWithRetry(url, requestInit ?? defaultRequestInit));
}
@@ -226,22 +223,34 @@ export class Cache<S = string> {
const etagKey = baseKey + '$etag';
const cachedKey = baseKey + '$cached';
const onMiss = (resp: Response) => {
console.log(picocolors.yellow('[cache] miss'), url, picocolors.gray(`ttl: ${TTL.humanReadable(ttl)}`));
const etag = this.get(etagKey);
const onMiss = (resp: Response) => {
const serializer = 'serializer' in opt ? opt.serializer : identity as any;
const etag = resp.headers.get('etag');
if (!etag) {
console.log(picocolors.red('[cache] no etag'), picocolors.gray(url));
return fn(resp);
}
const promise = fn(resp);
return promise.then((value) => {
this.set(etagKey, etag, ttl);
this.set(cachedKey, serializer(value), ttl);
if (resp.headers.has('ETag')) {
let serverETag = resp.headers.get('ETag')!;
// FUCK someonewhocares.org
if (url.includes('someonewhocares.org')) {
serverETag = serverETag.replace('-gzip', '');
}
console.log(picocolors.yellow('[cache] miss'), url, { cachedETag: etag, serverETag });
this.set(etagKey, serverETag, TTL.ONE_WEEK_STATIC);
this.set(cachedKey, serializer(value), TTL.ONE_WEEK_STATIC);
return value;
}
this.del(etagKey);
console.log(picocolors.red('[cache] no etag'), picocolors.gray(url));
if (opt.ttl) {
this.set(cachedKey, serializer(value), opt.ttl);
}
return value;
});
};
@@ -251,7 +260,6 @@ export class Cache<S = string> {
return onMiss(await fetchWithRetry(url, requestInit ?? defaultRequestInit));
}
const etag = this.get(etagKey);
const resp = await fetchWithRetry(
url,
{
@@ -265,17 +273,154 @@ export class Cache<S = string> {
}
);
if (resp.status !== 304) {
// Only miss if previously a ETag was present and the server responded with a 304
if (resp.headers.has('ETag') && resp.status !== 304) {
return onMiss(resp);
}
console.log(picocolors.green('[cache] http 304'), picocolors.gray(url));
this.updateTtl(cachedKey, ttl);
console.log(picocolors.green(`[cache] ${resp.status === 304 ? 'http 304' : 'cache hit'}`), picocolors.gray(url));
this.updateTtl(cachedKey, TTL.ONE_WEEK_STATIC);
const deserializer = 'deserializer' in opt ? opt.deserializer : identity as any;
return deserializer(cached);
}
async applyWithHttp304AndMirrors<T>(
primaryUrl: string,
mirrorUrls: string[],
extraCacheKey: string,
fn: (resp: string) => Promise<T> | T,
opt: Omit<CacheApplyOption<T, S>, 'incrementTtlWhenHit'>
): Promise<T> {
if (opt.temporaryBypass) {
return fn(await fetchAssets(primaryUrl, mirrorUrls));
}
if (mirrorUrls.length === 0) {
return this.applyWithHttp304(primaryUrl, extraCacheKey, async (resp) => fn(await resp.text()), opt);
}
const baseKey = primaryUrl + '$' + extraCacheKey;
const getETagKey = (url: string) => baseKey + '$' + url + '$etag';
const cachedKey = baseKey + '$cached';
const controller = new AbortController();
const previouslyCached = this.get(cachedKey);
const primaryETag = this.get(getETagKey(primaryUrl));
const fetchMainPromise = fetchWithRetry(
primaryUrl,
{
signal: controller.signal,
...defaultRequestInit,
headers: (typeof primaryETag === 'string' && primaryETag.length > 0)
? mergeHeaders(
defaultRequestInit.headers,
{ 'If-None-Match': primaryETag }
)
: defaultRequestInit.headers
}
).then(r => {
if (r.headers.has('etag')) {
this.set(getETagKey(primaryUrl), r.headers.get('etag')!, TTL.ONE_WEEK_STATIC);
// If we do not have a cached value, we ignore 304
if (r.status === 304 && previouslyCached != null) {
controller.abort();
throw new Custom304NotModifiedError(primaryUrl);
}
} else if (!primaryETag && previouslyCached) {
throw new CustomNoETagFallbackError(previouslyCached as string);
}
return r.text();
}).then(text => {
controller.abort();
return text;
});
const createFetchFallbackPromise = async (url: string, index: number) => {
// Most assets can be downloaded within 250ms. To avoid wasting bandwidth, we will wait for 500ms before downloading from the fallback URL.
try {
await sleepWithAbort(300 + (index + 1) * 10, controller.signal);
} catch {
console.log(picocolors.gray('[fetch cancelled early]'), picocolors.gray(url));
throw new CustomAbortError();
}
if (controller.signal.aborted) {
console.log(picocolors.gray('[fetch cancelled]'), picocolors.gray(url));
throw new CustomAbortError();
}
const etag = this.get(getETagKey(url));
const res = await fetchWithRetry(
url,
{
signal: controller.signal,
...defaultRequestInit,
headers: (typeof etag === 'string' && etag.length > 0)
? mergeHeaders(
defaultRequestInit.headers,
{ 'If-None-Match': etag }
)
: defaultRequestInit.headers
}
);
if (res.headers.has('etag')) {
this.set(getETagKey(url), res.headers.get('etag')!, TTL.ONE_WEEK_STATIC);
// If we do not have a cached value, we ignore 304
if (res.status === 304 && previouslyCached != null) {
controller.abort();
throw new Custom304NotModifiedError(url);
}
} else if (!primaryETag && previouslyCached) {
controller.abort();
throw new CustomNoETagFallbackError(previouslyCached as string);
}
const text = await res.text();
controller.abort();
return text;
};
try {
const text = await Promise.any([
fetchMainPromise,
...mirrorUrls.map(createFetchFallbackPromise)
]);
console.log(picocolors.yellow('[cache] miss'), primaryUrl);
const serializer = 'serializer' in opt ? opt.serializer : identity as any;
const value = await fn(text);
this.set(cachedKey, serializer(value), opt.ttl ?? TTL.ONE_WEEK_STATIC);
return value;
} catch (e) {
if (e instanceof AggregateError) {
const deserializer = 'deserializer' in opt ? opt.deserializer : identity as any;
for (const error of e.errors) {
if (error instanceof Custom304NotModifiedError) {
console.log(picocolors.green('[cache] http 304'), picocolors.gray(primaryUrl));
this.updateTtl(cachedKey, TTL.ONE_WEEK_STATIC);
return deserializer(previouslyCached);
}
if (error instanceof CustomNoETagFallbackError) {
console.log(picocolors.green('[cache] hit'), picocolors.gray(primaryUrl));
return deserializer(error.data);
}
}
}
console.log(`Download Rule for [${primaryUrl}] failed`);
throw e;
}
}
destroy() {
this.db.close();
}

View File

@@ -3,12 +3,30 @@ import { defaultRequestInit, fetchWithRetry } from './fetch-retry';
import { setTimeout } from 'node:timers/promises';
// eslint-disable-next-line sukka/unicorn/custom-error-definition -- typescript is better
class CustomAbortError extends Error {
export class CustomAbortError extends Error {
public readonly name = 'AbortError';
public readonly digest = 'AbortError';
}
const sleepWithAbort = (ms: number, signal: AbortSignal) => new Promise<void>((resolve, reject) => {
export class Custom304NotModifiedError extends Error {
public readonly name = 'Custom304NotModifiedError';
public readonly digest = 'Custom304NotModifiedError';
constructor(public readonly url: string) {
super('304 Not Modified');
}
}
export class CustomNoETagFallbackError extends Error {
public readonly name = 'CustomNoETagFallbackError';
public readonly digest = 'CustomNoETagFallbackError';
constructor(public readonly data: string) {
super('No ETag Fallback');
}
}
export const sleepWithAbort = (ms: number, signal: AbortSignal) => new Promise<void>((resolve, reject) => {
if (signal.aborted) {
reject(signal.reason as Error);
return;
@@ -34,7 +52,7 @@ export async function fetchAssets(url: string, fallbackUrls: string[] | readonly
const createFetchFallbackPromise = async (url: string, index: number) => {
// Most assets can be downloaded within 250ms. To avoid wasting bandwidth, we will wait for 500ms before downloading from the fallback URL.
try {
await sleepWithAbort(500 + (index + 1) * 20, controller.signal);
await sleepWithAbort(500 + (index + 1) * 10, controller.signal);
} catch {
console.log(picocolors.gray('[fetch cancelled early]'), picocolors.gray(url));
throw new CustomAbortError();

View File

@@ -2,9 +2,9 @@ import retry from 'async-retry';
import picocolors from 'picocolors';
import { setTimeout } from 'node:timers/promises';
import { setGlobalDispatcher, Agent } from 'undici';
import { setGlobalDispatcher, EnvHttpProxyAgent } from 'undici';
setGlobalDispatcher(new Agent({ allowH2: true }));
setGlobalDispatcher(new EnvHttpProxyAgent({ allowH2: true }));
function isClientError(err: unknown): err is NodeJS.ErrnoException {
if (!err || typeof err !== 'object') return false;

View File

@@ -1,13 +1,11 @@
// @ts-check
import { fetchRemoteTextByLine } from './fetch-text-by-line';
import { NetworkFilter } from '@cliqz/adblocker';
import { processLine } from './process-line';
import tldts from 'tldts-experimental';
import picocolors from 'picocolors';
import { normalizeDomain } from './normalize-domain';
import { fetchAssets } from './fetch-assets';
import { deserializeArray, fsFetchCache, serializeArray, createCacheKey } from './cache-filesystem';
import { deserializeArray, fsFetchCache, serializeArray, getFileContentHash } from './cache-filesystem';
import type { Span } from '../trace';
import createKeywordFilter from './aho-corasick';
import { looseTldtsOpt } from '../constants/loose-tldts-opt';
@@ -43,33 +41,24 @@ const domainListLineCb = (l: string, set: string[], includeAllSubDomain: boolean
set.push(includeAllSubDomain ? `.${line}` : line);
};
const cacheKey = createCacheKey(__filename);
export function processDomainLists(
span: Span,
domainListsUrl: string, mirrors: string[] | null, includeAllSubDomain = false,
ttl: number | null = null, extraCacheKey: (input: string) => string = identity
) {
return span.traceChild(`process domainlist: ${domainListsUrl}`).traceAsyncFn((childSpan) => fsFetchCache.apply(
extraCacheKey(cacheKey(domainListsUrl)),
async () => {
return span.traceChild(`process domainlist: ${domainListsUrl}`).traceAsyncFn((childSpan) => fsFetchCache.applyWithHttp304AndMirrors<string[]>(
domainListsUrl,
mirrors ?? [],
extraCacheKey(getFileContentHash(__filename)),
(text) => {
const domainSets: string[] = [];
const filterRules = text.split('\n');
if (mirrors == null || mirrors.length === 0) {
for await (const l of await fetchRemoteTextByLine(domainListsUrl)) {
domainListLineCb(l, domainSets, includeAllSubDomain, domainListsUrl);
childSpan.traceChild('parse domain list').traceSyncFn(() => {
for (let i = 0, len = filterRules.length; i < len; i++) {
domainListLineCb(filterRules[i], domainSets, includeAllSubDomain, domainListsUrl);
}
} else {
const filterRules = await childSpan
.traceChild('download domain list')
.traceAsyncFn(() => fetchAssets(domainListsUrl, mirrors).then(text => text.split('\n')));
childSpan.traceChild('parse domain list').traceSyncFn(() => {
for (let i = 0, len = filterRules.length; i < len; i++) {
domainListLineCb(filterRules[i], domainSets, includeAllSubDomain, domainListsUrl);
}
});
}
});
return domainSets;
},
@@ -109,26 +98,20 @@ export function processHosts(
hostsUrl: string, mirrors: string[] | null, includeAllSubDomain = false,
ttl: number | null = null, extraCacheKey: (input: string) => string = identity
) {
return span.traceChild(`processhosts: ${hostsUrl}`).traceAsyncFn((childSpan) => fsFetchCache.apply(
extraCacheKey(cacheKey(hostsUrl)),
async () => {
return span.traceChild(`processhosts: ${hostsUrl}`).traceAsyncFn((childSpan) => fsFetchCache.applyWithHttp304AndMirrors<string[]>(
hostsUrl,
mirrors ?? [],
extraCacheKey(getFileContentHash(__filename)),
(text) => {
const domainSets: string[] = [];
if (mirrors == null || mirrors.length === 0) {
for await (const l of await fetchRemoteTextByLine(hostsUrl)) {
hostsLineCb(l, domainSets, includeAllSubDomain, hostsUrl);
}
} else {
const filterRules = await childSpan
.traceChild('download hosts')
.traceAsyncFn(() => fetchAssets(hostsUrl, mirrors).then(text => text.split('\n')));
const filterRules = text.split('\n');
childSpan.traceChild('parse hosts').traceSyncFn(() => {
for (let i = 0, len = filterRules.length; i < len; i++) {
hostsLineCb(filterRules[i], domainSets, includeAllSubDomain, hostsUrl);
}
});
}
childSpan.traceChild('parse hosts').traceSyncFn(() => {
for (let i = 0, len = filterRules.length; i < len; i++) {
hostsLineCb(filterRules[i], domainSets, includeAllSubDomain, hostsUrl);
}
});
return domainSets;
},
@@ -155,13 +138,15 @@ export { type ParseType };
export async function processFilterRules(
parentSpan: Span,
filterRulesUrl: string,
fallbackUrls?: readonly string[] | null,
fallbackUrls?: string[] | null,
ttl: number | null = null,
allowThirdParty = false
): Promise<{ white: string[], black: string[], foundDebugDomain: boolean }> {
const [white, black, warningMessages] = await parentSpan.traceChild(`process filter rules: ${filterRulesUrl}`).traceAsyncFn((span) => fsFetchCache.apply<Readonly<[ white: string[], black: string[], warningMessages: string[] ]>>(
cacheKey(filterRulesUrl),
async () => {
const [white, black, warningMessages] = await parentSpan.traceChild(`process filter rules: ${filterRulesUrl}`).traceAsyncFn((span) => fsFetchCache.applyWithHttp304AndMirrors<Readonly<[ white: string[], black: string[], warningMessages: string[] ]>>(
filterRulesUrl,
fallbackUrls ?? [],
getFileContentHash(__filename),
(text) => {
const whitelistDomainSets = new Set<string>();
const blacklistDomainSets = new Set<string>();
@@ -221,20 +206,13 @@ export async function processFilterRules(
}
};
if (!fallbackUrls || fallbackUrls.length === 0) {
for await (const line of await fetchRemoteTextByLine(filterRulesUrl)) {
// don't trim here
lineCb(line);
}
} else {
const filterRules = await span.traceChild('download adguard filter').traceAsyncFn(() => fetchAssets(filterRulesUrl, fallbackUrls).then(text => text.split('\n')));
const filterRules = text.split('\n');
span.traceChild('parse adguard filter').traceSyncFn(() => {
for (let i = 0, len = filterRules.length; i < len; i++) {
lineCb(filterRules[i]);
}
});
}
span.traceChild('parse adguard filter').traceSyncFn(() => {
for (let i = 0, len = filterRules.length; i < len; i++) {
lineCb(filterRules[i]);
}
});
return [
Array.from(whitelistDomainSets),