Revert undici.request and use fetch again

This commit is contained in:
SukkaW 2024-10-15 22:34:53 +08:00
parent dc981af967
commit c5ee4bad53
6 changed files with 90 additions and 54 deletions

View File

@ -8,12 +8,13 @@ import { fastStringArrayJoin, identity, mergeHeaders } from './misc';
import { performance } from 'node:perf_hooks'; import { performance } from 'node:perf_hooks';
import fs from 'node:fs'; import fs from 'node:fs';
import { stringHash } from './string-hash'; import { stringHash } from './string-hash';
import { defaultRequestInit, requestWithLog, UndiciResponseError } from './fetch-retry'; import { defaultRequestInit, fetchWithLog, ResponseError } from './fetch-retry';
import type { UndiciResponseData } from './fetch-retry'; // import type { UndiciResponseData } from './fetch-retry';
import { Custom304NotModifiedError, CustomAbortError, CustomNoETagFallbackError, fetchAssetsWith304, sleepWithAbort } from './fetch-assets'; import { Custom304NotModifiedError, CustomAbortError, CustomNoETagFallbackError, fetchAssetsWithout304, sleepWithAbort } from './fetch-assets';
import type { HeadersInit } from 'undici'; import type { Response } from 'undici';
import type { IncomingHttpHeaders } from 'undici/types/header'; import type { IncomingHttpHeaders } from 'undici/types/header';
import { Headers } from 'undici';
const enum CacheStatus { const enum CacheStatus {
Hit = 'hit', Hit = 'hit',
@ -70,14 +71,18 @@ export const TTL = {
TWO_WEEKS: () => randomInt(10, 14) * ONE_DAY TWO_WEEKS: () => randomInt(10, 14) * ONE_DAY
}; };
function ensureETag(headers: IncomingHttpHeaders) { function ensureETag(headers: IncomingHttpHeaders | Headers) {
if (headers instanceof Headers && headers.has('etag')) {
return headers.get('etag');
}
if ('etag' in headers && typeof headers.etag === 'string' && headers.etag.length > 0) { if ('etag' in headers && typeof headers.etag === 'string' && headers.etag.length > 0) {
return headers.etag; return headers.etag;
} }
if ('ETag' in headers && typeof headers.ETag === 'string' && headers.ETag.length > 0) { if ('ETag' in headers && typeof headers.ETag === 'string' && headers.ETag.length > 0) {
return headers.ETag; return headers.ETag;
} }
return ''; return null;
} }
export class Cache<S = string> { export class Cache<S = string> {
@ -225,12 +230,12 @@ export class Cache<S = string> {
async applyWithHttp304<T>( async applyWithHttp304<T>(
url: string, url: string,
extraCacheKey: string, extraCacheKey: string,
fn: (resp: UndiciResponseData) => Promise<T>, fn: (resp: Response) => Promise<T>,
opt: Omit<CacheApplyOption<T, S>, 'incrementTtlWhenHit'> opt: Omit<CacheApplyOption<T, S>, 'incrementTtlWhenHit'>
// requestInit?: RequestInit // requestInit?: RequestInit
): Promise<T> { ): Promise<T> {
if (opt.temporaryBypass) { if (opt.temporaryBypass) {
return fn(await requestWithLog(url)); return fn(await fetchWithLog(url));
} }
const baseKey = url + '$' + extraCacheKey; const baseKey = url + '$' + extraCacheKey;
@ -239,7 +244,7 @@ export class Cache<S = string> {
const etag = this.get(etagKey); const etag = this.get(etagKey);
const onMiss = async (resp: UndiciResponseData) => { const onMiss = async (resp: Response) => {
const serializer = 'serializer' in opt ? opt.serializer : identity as any; const serializer = 'serializer' in opt ? opt.serializer : identity as any;
const value = await fn(resp); const value = await fn(resp);
@ -251,7 +256,7 @@ export class Cache<S = string> {
serverETag = serverETag.replace('-gzip', ''); serverETag = serverETag.replace('-gzip', '');
} }
console.log(picocolors.yellow('[cache] miss'), url, { status: resp.statusCode, cachedETag: etag, serverETag }); console.log(picocolors.yellow('[cache] miss'), url, { status: resp.status, cachedETag: etag, serverETag });
this.set(etagKey, serverETag, TTL.ONE_WEEK_STATIC); this.set(etagKey, serverETag, TTL.ONE_WEEK_STATIC);
this.set(cachedKey, serializer(value), TTL.ONE_WEEK_STATIC); this.set(cachedKey, serializer(value), TTL.ONE_WEEK_STATIC);
@ -269,24 +274,25 @@ export class Cache<S = string> {
const cached = this.get(cachedKey); const cached = this.get(cachedKey);
if (cached == null) { if (cached == null) {
return onMiss(await requestWithLog(url)); return onMiss(await fetchWithLog(url));
} }
const resp = await requestWithLog( const resp = await fetchWithLog(
url, url,
{ {
...defaultRequestInit,
headers: (typeof etag === 'string' && etag.length > 0) headers: (typeof etag === 'string' && etag.length > 0)
? { 'If-None-Match': etag } ? mergeHeaders<Record<string, string>>(defaultRequestInit.headers, { 'If-None-Match': etag })
: {} : defaultRequestInit.headers
} }
); );
// Only miss if previously a ETag was present and the server responded with a 304 // Only miss if previously a ETag was present and the server responded with a 304
if (!ensureETag(resp.headers) && resp.statusCode !== 304) { if (!ensureETag(resp.headers) && resp.status !== 304) {
return onMiss(resp); return onMiss(resp);
} }
console.log(picocolors.green(`[cache] ${resp.statusCode === 304 ? 'http 304' : 'cache hit'}`), picocolors.gray(url)); console.log(picocolors.green(`[cache] ${resp.status === 304 ? 'http 304' : 'cache hit'}`), picocolors.gray(url));
this.updateTtl(cachedKey, TTL.ONE_WEEK_STATIC); this.updateTtl(cachedKey, TTL.ONE_WEEK_STATIC);
const deserializer = 'deserializer' in opt ? opt.deserializer : identity as any; const deserializer = 'deserializer' in opt ? opt.deserializer : identity as any;
@ -301,16 +307,17 @@ export class Cache<S = string> {
opt: Omit<CacheApplyOption<T, S>, 'incrementTtlWhenHit'> opt: Omit<CacheApplyOption<T, S>, 'incrementTtlWhenHit'>
): Promise<T> { ): Promise<T> {
if (opt.temporaryBypass) { if (opt.temporaryBypass) {
return fn(await fetchAssetsWith304(primaryUrl, mirrorUrls)); return fn(await fetchAssetsWithout304(primaryUrl, mirrorUrls));
} }
if (mirrorUrls.length === 0) { if (mirrorUrls.length === 0) {
return this.applyWithHttp304(primaryUrl, extraCacheKey, async (resp) => fn(await resp.body.text()), opt); return this.applyWithHttp304(primaryUrl, extraCacheKey, async (resp) => fn(await resp.text()), opt);
} }
const baseKey = primaryUrl + '$' + extraCacheKey; const baseKey = primaryUrl + '$' + extraCacheKey;
const getETagKey = (url: string) => baseKey + '$' + url + '$etag'; const getETagKey = (url: string) => baseKey + '$' + url + '$etag';
const cachedKey = baseKey + '$cached'; const cachedKey = baseKey + '$cached';
const controller = new AbortController(); const controller = new AbortController();
const previouslyCached = this.get(cachedKey); const previouslyCached = this.get(cachedKey);
@ -331,16 +338,13 @@ export class Cache<S = string> {
} }
const etag = this.get(getETagKey(url)); const etag = this.get(getETagKey(url));
const res = await requestWithLog( const res = await fetchWithLog(
url, url,
{ {
signal: controller.signal, signal: controller.signal,
...defaultRequestInit, ...defaultRequestInit,
headers: (typeof etag === 'string' && etag.length > 0) headers: (typeof etag === 'string' && etag.length > 0 && typeof previouslyCached === 'string' && previouslyCached.length > 1)
? mergeHeaders<HeadersInit>( ? mergeHeaders<Record<string, string>>(defaultRequestInit.headers, { 'If-None-Match': etag })
{ 'If-None-Match': etag },
defaultRequestInit.headers
)
: defaultRequestInit.headers : defaultRequestInit.headers
} }
); );
@ -350,7 +354,7 @@ export class Cache<S = string> {
this.set(getETagKey(url), serverETag, TTL.ONE_WEEK_STATIC); this.set(getETagKey(url), serverETag, TTL.ONE_WEEK_STATIC);
} }
// If we do not have a cached value, we ignore 304 // If we do not have a cached value, we ignore 304
if (res.statusCode === 304 && typeof previouslyCached === 'string' && previouslyCached.length > 1) { if (res.status === 304 && typeof previouslyCached === 'string' && previouslyCached.length > 1) {
const err = new Custom304NotModifiedError(url, previouslyCached); const err = new Custom304NotModifiedError(url, previouslyCached);
controller.abort(err); controller.abort(err);
throw err; throw err;
@ -363,10 +367,10 @@ export class Cache<S = string> {
// either no etag and not cached // either no etag and not cached
// or has etag but not 304 // or has etag but not 304
const text = await res.body.text(); const text = await res.text();
if (text.length < 2) { if (text.length < 2) {
throw new UndiciResponseError(res, url); throw new ResponseError(res, url, 'empty response');
} }
controller.abort(); controller.abort();
@ -391,8 +395,6 @@ export class Cache<S = string> {
if (e && typeof e === 'object' && 'errors' in e && Array.isArray(e.errors)) { if (e && typeof e === 'object' && 'errors' in e && Array.isArray(e.errors)) {
const deserializer = 'deserializer' in opt ? opt.deserializer : identity as any; const deserializer = 'deserializer' in opt ? opt.deserializer : identity as any;
console.log(e.errors);
for (let i = 0, len = e.errors.length; i < len; i++) { for (let i = 0, len = e.errors.length; i < len; i++) {
const error = e.errors[i]; const error = e.errors[i];
if ('name' in error && (error.name === 'CustomAbortError' || error.name === 'AbortError')) { if ('name' in error && (error.name === 'CustomAbortError' || error.name === 'AbortError')) {
@ -409,6 +411,8 @@ export class Cache<S = string> {
return deserializer(error.data); return deserializer(error.data);
} }
} }
console.log(picocolors.red('[fetch error]'), picocolors.gray(error.url), error);
} }
} }

View File

@ -4,7 +4,7 @@ import { createMemoizedPromise } from './memo-promise';
export const getPublicSuffixListTextPromise = createMemoizedPromise(() => fsFetchCache.applyWithHttp304<string[]>( export const getPublicSuffixListTextPromise = createMemoizedPromise(() => fsFetchCache.applyWithHttp304<string[]>(
'https://publicsuffix.org/list/public_suffix_list.dat', 'https://publicsuffix.org/list/public_suffix_list.dat',
getFileContentHash(__filename), getFileContentHash(__filename),
(r) => r.body.text().then(text => text.split('\n')), (r) => r.text().then(text => text.split('\n')),
{ {
// https://github.com/publicsuffix/list/blob/master/.github/workflows/tld-update.yml // https://github.com/publicsuffix/list/blob/master/.github/workflows/tld-update.yml
// Though the action runs every 24 hours, the IANA list is updated every 7 days. // Though the action runs every 24 hours, the IANA list is updated every 7 days.

View File

@ -1,5 +1,5 @@
import picocolors from 'picocolors'; import picocolors from 'picocolors';
import { defaultRequestInit, requestWithLog, UndiciResponseError } from './fetch-retry'; import { defaultRequestInit, fetchWithLog, ResponseError } from './fetch-retry';
import { setTimeout } from 'node:timers/promises'; import { setTimeout } from 'node:timers/promises';
// eslint-disable-next-line sukka/unicorn/custom-error-definition -- typescript is better // eslint-disable-next-line sukka/unicorn/custom-error-definition -- typescript is better
@ -42,7 +42,7 @@ export function sleepWithAbort(ms: number, signal: AbortSignal) {
}); });
} }
export async function fetchAssetsWith304(url: string, fallbackUrls: string[] | readonly string[]) { export async function fetchAssetsWithout304(url: string, fallbackUrls: string[] | readonly string[]) {
const controller = new AbortController(); const controller = new AbortController();
const createFetchFallbackPromise = async (url: string, index: number) => { const createFetchFallbackPromise = async (url: string, index: number) => {
@ -59,11 +59,11 @@ export async function fetchAssetsWith304(url: string, fallbackUrls: string[] | r
console.log(picocolors.gray('[fetch cancelled]'), picocolors.gray(url)); console.log(picocolors.gray('[fetch cancelled]'), picocolors.gray(url));
throw new CustomAbortError(); throw new CustomAbortError();
} }
const res = await requestWithLog(url, { signal: controller.signal, ...defaultRequestInit }); const res = await fetchWithLog(url, { signal: controller.signal, ...defaultRequestInit });
const text = await res.body.text(); const text = await res.text();
if (text.length < 2) { if (text.length < 2) {
throw new UndiciResponseError(res, url); throw new ResponseError(res, url, 'empty response w/o 304');
} }
controller.abort(); controller.abort();

View File

@ -6,18 +6,20 @@ import undici, {
} from 'undici'; } from 'undici';
import type { import type {
Dispatcher Dispatcher,
RequestInit,
Response
} from 'undici'; } from 'undici';
export type UndiciResponseData<T = any> = Dispatcher.ResponseData<T>; export type UndiciResponseData = Dispatcher.ResponseData<any>;
import CacheableLookup from 'cacheable-lookup'; import CacheableLookup from 'cacheable-lookup';
import type { LookupOptions as CacheableLookupOptions } from 'cacheable-lookup'; import type { LookupOptions as CacheableLookupOptions } from 'cacheable-lookup';
import { inspect } from 'node:util';
const cacheableLookup = new CacheableLookup(); const cacheableLookup = new CacheableLookup();
const agent = new EnvHttpProxyAgent({ const agent = new EnvHttpProxyAgent({
// allowH2: true,
connect: { connect: {
lookup(hostname, opt, cb) { lookup(hostname, opt, cb) {
return cacheableLookup.lookup(hostname, opt as CacheableLookupOptions, cb); return cacheableLookup.lookup(hostname, opt as CacheableLookupOptions, cb);
@ -114,22 +116,23 @@ function calculateRetryAfterHeader(retryAfter: string) {
return new Date(retryAfter).getTime() - current; return new Date(retryAfter).getTime() - current;
} }
export class UndiciResponseError extends Error { export class ResponseError<T extends UndiciResponseData | Response> extends Error {
readonly code: number; readonly code: number;
readonly statusCode: number; readonly statusCode: number;
constructor(public readonly res: UndiciResponseData, public readonly url: string) { constructor(public readonly res: T, public readonly url: string, ...args: any[]) {
super('HTTP ' + res.statusCode); const statusCode = 'statusCode' in res ? res.statusCode : res.status;
super('HTTP ' + statusCode + ' ' + args.map(_ => inspect(_)).join(' '));
if ('captureStackTrace' in Error) { if ('captureStackTrace' in Error) {
Error.captureStackTrace(this, UndiciResponseError); Error.captureStackTrace(this, ResponseError);
} }
// eslint-disable-next-line sukka/unicorn/custom-error-definition -- deliberatly use previous name // eslint-disable-next-line sukka/unicorn/custom-error-definition -- deliberatly use previous name
this.name = this.constructor.name; this.name = this.constructor.name;
this.res = res; this.res = res;
this.code = res.statusCode; this.code = statusCode;
this.statusCode = res.statusCode; this.statusCode = statusCode;
} }
} }
@ -139,15 +142,43 @@ export const defaultRequestInit = {
} }
}; };
export async function requestWithLog(url: string, opt?: Parameters<typeof undici.request>[1]) { export async function fetchWithLog(url: string, init?: RequestInit) {
try { try {
const res = await undici.request(url, opt); const res = await undici.fetch(url, init);
if (res.statusCode >= 400) { if (res.status >= 400) {
throw new UndiciResponseError(res, url); throw new ResponseError(res, url);
} }
if (!(res.statusCode >= 200 && res.statusCode <= 299) && res.statusCode !== 304) { if (!(res.status >= 200 && res.status <= 299) && res.status !== 304) {
throw new UndiciResponseError(res, url); throw new ResponseError(res, url);
}
return res;
} catch (err: unknown) {
if (typeof err === 'object' && err !== null && 'name' in err) {
if ((
err.name === 'AbortError'
|| ('digest' in err && err.digest === 'AbortError')
)) {
console.log(picocolors.gray('[fetch abort]'), url);
}
} else {
console.log(picocolors.gray('[fetch fail]'), url, { name: (err as any).name }, err);
}
throw err;
}
}
export async function requestWithLog(url: string, opt?: Parameters<typeof undici.request>[1]) {
try {
const res = await undici.request(url, opt);
if (res.statusCode >= 400) {
throw new ResponseError(res, url);
}
if (!(res.statusCode >= 200 && res.statusCode <= 299) && res.statusCode !== 304) {
throw new ResponseError(res, url);
} }
return res; return res;

View File

@ -9,6 +9,7 @@ import { processLine } from './process-line';
import { $fetch } from './make-fetch-happen'; import { $fetch } from './make-fetch-happen';
import type { NodeFetchResponse } from './make-fetch-happen'; import type { NodeFetchResponse } from './make-fetch-happen';
import type { UndiciResponseData } from './fetch-retry'; import type { UndiciResponseData } from './fetch-retry';
import type { Response } from 'undici';
function getReadableStream(file: string | FileHandle): ReadableStream { function getReadableStream(file: string | FileHandle): ReadableStream {
if (typeof file === 'string') { if (typeof file === 'string') {
@ -22,8 +23,7 @@ export const readFileByLine: ((file: string | FileHandle) => AsyncIterable<strin
.pipeThrough(new TextDecoderStream()) .pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream()); .pipeThrough(new TextLineStream());
function ensureResponseBody<T extends NodeFetchResponse | UndiciResponseData>(resp: T): NonNullable<T['body']> { function ensureResponseBody<T extends NodeFetchResponse | UndiciResponseData | Response>(resp: T): NonNullable<T['body']> {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition -- NodeFetchResponse['body'] is nullable
if (resp.body == null) { if (resp.body == null) {
throw new Error('Failed to fetch remote text'); throw new Error('Failed to fetch remote text');
} }
@ -33,7 +33,7 @@ function ensureResponseBody<T extends NodeFetchResponse | UndiciResponseData>(re
return resp.body; return resp.body;
} }
export const createReadlineInterfaceFromResponse: ((resp: NodeFetchResponse | UndiciResponseData) => AsyncIterable<string>) = (resp) => { export const createReadlineInterfaceFromResponse: ((resp: NodeFetchResponse | UndiciResponseData | Response) => AsyncIterable<string>) = (resp) => {
const stream = ensureResponseBody(resp); const stream = ensureResponseBody(resp);
const webStream: ReadableStream<Uint8Array> = 'getReader' in stream const webStream: ReadableStream<Uint8Array> = 'getReader' in stream

View File

@ -2,6 +2,7 @@ import { createReadlineInterfaceFromResponse } from './fetch-text-by-line';
import { parse as tldtsParse } from 'tldts'; import { parse as tldtsParse } from 'tldts';
import type { NodeFetchResponse } from './make-fetch-happen'; import type { NodeFetchResponse } from './make-fetch-happen';
import type { UndiciResponseData } from './fetch-retry'; import type { UndiciResponseData } from './fetch-retry';
import type { Response } from 'undici';
function isDomainLoose(domain: string): boolean { function isDomainLoose(domain: string): boolean {
const { isIcann, isPrivate, isIp } = tldtsParse(domain); const { isIcann, isPrivate, isIp } = tldtsParse(domain);
@ -15,7 +16,7 @@ export function extractDomainsFromFelixDnsmasq(line: string): string | null {
return null; return null;
} }
export async function parseFelixDnsmasqFromResp(resp: NodeFetchResponse | UndiciResponseData): Promise<string[]> { export async function parseFelixDnsmasqFromResp(resp: NodeFetchResponse | UndiciResponseData | Response): Promise<string[]> {
const results: string[] = []; const results: string[] = [];
for await (const line of createReadlineInterfaceFromResponse(resp)) { for await (const line of createReadlineInterfaceFromResponse(resp)) {