From f64fa201e99099c39d226566623751bbb21868b9 Mon Sep 17 00:00:00 2001 From: SukkaW Date: Tue, 26 Nov 2024 21:22:14 +0800 Subject: [PATCH] Chore: process line stream --- Build/build-cdn-download-conf.ts | 8 +--- Build/build-chn-cidr.ts | 5 +-- Build/build-reject-ip-list.ts | 2 +- Build/build-telegram-cidr.ts | 6 +-- Build/lib/aho-corasick.bench.ts | 3 +- Build/lib/fetch-text-by-line.bench.ts | 7 ++-- Build/lib/fetch-text-by-line.ts | 15 +++++--- Build/lib/parse-dnsmasq.ts | 2 +- Build/lib/process-line.ts | 53 ++++++++++++++++++++++----- Build/lib/set-add-from-array.bench.ts | 3 +- Build/lib/stable-sort-domain.bench.ts | 3 +- Build/lib/tldts.bench.ts | 3 +- 12 files changed, 67 insertions(+), 43 deletions(-) diff --git a/Build/build-cdn-download-conf.ts b/Build/build-cdn-download-conf.ts index 0fce6cd4..d65b7b44 100644 --- a/Build/build-cdn-download-conf.ts +++ b/Build/build-cdn-download-conf.ts @@ -5,18 +5,14 @@ import { task } from './trace'; import { SHARED_DESCRIPTION } from './constants/description'; import { appendArrayInPlace } from './lib/append-array-in-place'; import { SOURCE_DIR } from './constants/dir'; -import { processLine } from './lib/process-line'; import { DomainsetOutput } from './lib/create-file'; import { CRASHLYTICS_WHITELIST } from './constants/reject-data-source'; const getS3OSSDomainsPromise = (async (): Promise => { const trie = new HostnameTrie(); - for await (const line of await fetchRemoteTextByLine('https://publicsuffix.org/list/public_suffix_list.dat')) { - const tmp = processLine(line); - if (tmp) { - trie.add(tmp); - } + for await (const line of await fetchRemoteTextByLine('https://publicsuffix.org/list/public_suffix_list.dat', true)) { + trie.add(line); } /** diff --git a/Build/build-chn-cidr.ts b/Build/build-chn-cidr.ts index b08aa5cb..f68c64ff 100644 --- a/Build/build-chn-cidr.ts +++ b/Build/build-chn-cidr.ts @@ -1,5 +1,4 @@ import { fetchRemoteTextByLine } from './lib/fetch-text-by-line'; -import { processLineFromReadline } from './lib/process-line'; import { task } from './trace'; import { contains as containsCidr, exclude as excludeCidr } from 'fast-cidr-tools'; @@ -19,8 +18,8 @@ const PROBE_CHN_CIDR_V4 = [ export const getChnCidrPromise = createMemoizedPromise(cachedOnlyFail( async function getChnCidr() { const [_cidr4, cidr6] = await Promise.all([ - fetchRemoteTextByLine('https://raw.githubusercontent.com/misakaio/chnroutes2/master/chnroutes.txt').then(processLineFromReadline), - fetchRemoteTextByLine('https://gaoyifan.github.io/china-operator-ip/china6.txt').then(processLineFromReadline) + fetchRemoteTextByLine('https://raw.githubusercontent.com/misakaio/chnroutes2/master/chnroutes.txt', true).then(Array.fromAsync), + fetchRemoteTextByLine('https://gaoyifan.github.io/china-operator-ip/china6.txt', true).then(Array.fromAsync) ]); const cidr4 = excludeCidr( diff --git a/Build/build-reject-ip-list.ts b/Build/build-reject-ip-list.ts index c18f78b8..85f1ef71 100644 --- a/Build/build-reject-ip-list.ts +++ b/Build/build-reject-ip-list.ts @@ -15,7 +15,7 @@ const getBogusNxDomainIPsPromise: Promise<[ipv4: string[], ipv6: string[]]> = $f const ipv4: string[] = []; const ipv6: string[] = []; - for await (const line of createReadlineInterfaceFromResponse(resp)) { + for await (const line of createReadlineInterfaceFromResponse(resp, true)) { if (line.startsWith('bogus-nxdomain=')) { const ip = line.slice(15).trim(); if (isProbablyIpv4(ip)) { diff --git a/Build/build-telegram-cidr.ts b/Build/build-telegram-cidr.ts index bd1d07cc..56e7aa69 100644 --- a/Build/build-telegram-cidr.ts +++ b/Build/build-telegram-cidr.ts @@ -1,7 +1,6 @@ // @ts-check import { createReadlineInterfaceFromResponse } from './lib/fetch-text-by-line'; import { isProbablyIpv4, isProbablyIpv6 } from './lib/is-fast-ip'; -import { processLine } from './lib/process-line'; import { task } from './trace'; import { SHARED_DESCRIPTION } from './constants/description'; import { createMemoizedPromise } from './lib/memo-promise'; @@ -16,10 +15,7 @@ export const getTelegramCIDRPromise = createMemoizedPromise(async () => { const ipcidr: string[] = []; const ipcidr6: string[] = []; - for await (const line of createReadlineInterfaceFromResponse(resp)) { - const cidr = processLine(line); - if (!cidr) continue; - + for await (const cidr of createReadlineInterfaceFromResponse(resp, true)) { const [subnet] = cidr.split('/'); if (isProbablyIpv4(subnet)) { ipcidr.push(cidr); diff --git a/Build/lib/aho-corasick.bench.ts b/Build/lib/aho-corasick.bench.ts index f467cea9..8d9f2982 100644 --- a/Build/lib/aho-corasick.bench.ts +++ b/Build/lib/aho-corasick.bench.ts @@ -1,5 +1,4 @@ import { fetchRemoteTextByLine } from './fetch-text-by-line'; -import { processLineFromReadline } from './process-line'; import createKeywordFilter from './aho-corasick'; @@ -36,7 +35,7 @@ if (require.main === module) { (async () => { const { bench, group, run } = await import('mitata'); - const data = await processLineFromReadline(await fetchRemoteTextByLine('https://easylist.to/easylist/easylist.txt')); + const data = await Array.fromAsync(await fetchRemoteTextByLine('https://easylist.to/easylist/easylist.txt', true)); console.log({ dataLen: data.length }); const keywordsSet = [ '!', diff --git a/Build/lib/fetch-text-by-line.bench.ts b/Build/lib/fetch-text-by-line.bench.ts index c3bcfa5b..deeb1f40 100644 --- a/Build/lib/fetch-text-by-line.bench.ts +++ b/Build/lib/fetch-text-by-line.bench.ts @@ -1,4 +1,3 @@ -import { processLine, processLineFromReadline } from './process-line'; import { readFileByLine, readFileByLineLegacy } from './fetch-text-by-line'; import path from 'node:path'; import fsp from 'node:fs/promises'; @@ -10,9 +9,9 @@ const file = path.join(SOURCE_DIR, 'domainset/cdn.conf'); const { bench, group, run } = await import('mitata'); group(() => { - bench('readFileByLine', () => processLineFromReadline(readFileByLine(file))); - bench('readFileByLineLegacy', () => processLineFromReadline(readFileByLineLegacy(file))); - bench('fsp.readFile', () => fsp.readFile(file, 'utf-8').then((content) => content.split('\n').filter(processLine))); + bench('readFileByLine', () => Array.fromAsync(readFileByLine(file))); + bench('readFileByLineLegacy', () => Array.fromAsync(readFileByLineLegacy(file))); + bench('fsp.readFile', () => fsp.readFile(file, 'utf-8').then((content) => content.split('\n'))); }); run(); diff --git a/Build/lib/fetch-text-by-line.ts b/Build/lib/fetch-text-by-line.ts index cab25060..5b791536 100644 --- a/Build/lib/fetch-text-by-line.ts +++ b/Build/lib/fetch-text-by-line.ts @@ -6,7 +6,7 @@ import readline from 'node:readline'; import { TextLineStream } from './text-line-transform-stream'; import type { ReadableStream } from 'node:stream/web'; import { TextDecoderStream } from 'node:stream/web'; -import { processLine } from './process-line'; +import { processLine, ProcessLineStream } from './process-line'; import { $fetch } from './make-fetch-happen'; import type { NodeFetchResponse } from './make-fetch-happen'; import type { UndiciResponseData } from './fetch-retry'; @@ -40,7 +40,7 @@ function ensureResponseBody AsyncIterable) = (resp) => { +export const createReadlineInterfaceFromResponse: ((resp: NodeFetchResponse | UndiciResponseData | UnidiciWebResponse, processLine?: boolean) => ReadableStream) = (resp, processLine = false) => { const stream = ensureResponseBody(resp); const webStream: ReadableStream = 'getReader' in stream @@ -51,13 +51,18 @@ export const createReadlineInterfaceFromResponse: ((resp: NodeFetchResponse | Un : Readable.toWeb(new Readable().wrap(stream)) ); - return webStream + const resultStream = webStream .pipeThrough(new TextDecoderStream()) .pipeThrough(new TextLineStream()); + + if (processLine) { + return resultStream.pipeThrough(new ProcessLineStream()); + } + return resultStream; }; -export function fetchRemoteTextByLine(url: string) { - return $fetch(url).then(createReadlineInterfaceFromResponse); +export function fetchRemoteTextByLine(url: string, processLine = false): Promise> { + return $fetch(url).then(resp => createReadlineInterfaceFromResponse(resp, processLine)); } export async function readFileIntoProcessedArray(file: string /* | FileHandle */) { diff --git a/Build/lib/parse-dnsmasq.ts b/Build/lib/parse-dnsmasq.ts index 2f1111ff..ffe93891 100644 --- a/Build/lib/parse-dnsmasq.ts +++ b/Build/lib/parse-dnsmasq.ts @@ -19,7 +19,7 @@ export function extractDomainsFromFelixDnsmasq(line: string): string | null { export async function parseFelixDnsmasqFromResp(resp: NodeFetchResponse | UndiciResponseData | Response): Promise { const results: string[] = []; - for await (const line of createReadlineInterfaceFromResponse(resp)) { + for await (const line of createReadlineInterfaceFromResponse(resp, true)) { const domain = extractDomainsFromFelixDnsmasq(line); if (domain && isDomainLoose(domain)) { results.push(domain); diff --git a/Build/lib/process-line.ts b/Build/lib/process-line.ts index 76fcd617..23ee543f 100644 --- a/Build/lib/process-line.ts +++ b/Build/lib/process-line.ts @@ -1,3 +1,5 @@ +import { TransformStream } from 'node:stream/web'; + export function processLine(line: string): string | null { if (!line) { return null; @@ -11,8 +13,7 @@ export function processLine(line: string): string | null { const line_0: string = trimmed[0]; if ( - line_0 === '#' - || line_0 === ' ' + line_0 === ' ' || line_0 === '\r' || line_0 === '\n' || line_0 === '!' @@ -21,16 +22,48 @@ export function processLine(line: string): string | null { return null; } + if (line_0 === '#') { + if (trimmed[1] !== '#') { + // # Comment + return null; + } + if (trimmed[2] === '#' && trimmed[3] === '#') { + // ################## EOF ################## + return null; + } + /** + * AdGuard Filter can be: + * + * ##.class + * ##tag.class + * ###id + */ + } + return trimmed; } -export async function processLineFromReadline(rl: AsyncIterable): Promise { - const res: string[] = []; - for await (const line of rl) { - const l: string | null = processLine(line); - if (l) { - res.push(l); - } +export class ProcessLineStream extends TransformStream { + // private __buf = ''; + constructor() { + super({ + transform(l, controller) { + const line = processLine(l); + if (line) { + controller.enqueue(line); + } + } + }); } - return res; } + +// export class ProcessLineNodeStream extends Transform { +// _transform(chunk: string, encoding: BufferEncoding, callback: TransformCallback) { +// // Convert chunk to string and then to uppercase +// const upperCased = chunk.toUpperCase(); +// // Push transformed data to readable side +// this.push(upperCased); +// // Call callback when done +// callback(); +// } +// } diff --git a/Build/lib/set-add-from-array.bench.ts b/Build/lib/set-add-from-array.bench.ts index 85730de4..69e08f79 100644 --- a/Build/lib/set-add-from-array.bench.ts +++ b/Build/lib/set-add-from-array.bench.ts @@ -1,10 +1,9 @@ import { fetchRemoteTextByLine } from './fetch-text-by-line'; -import { processLineFromReadline } from './process-line'; import { bench, group, run } from 'mitata'; (async () => { - const data = await processLineFromReadline(await fetchRemoteTextByLine('https://osint.digitalside.it/Threat-Intel/lists/latestdomains.txt')); + const data = await Array.fromAsync(await fetchRemoteTextByLine('https://osint.digitalside.it/Threat-Intel/lists/latestdomains.txt', true)); group(() => { bench('setAddFromArray', () => { diff --git a/Build/lib/stable-sort-domain.bench.ts b/Build/lib/stable-sort-domain.bench.ts index 78094b6d..1bc0dde1 100644 --- a/Build/lib/stable-sort-domain.bench.ts +++ b/Build/lib/stable-sort-domain.bench.ts @@ -1,11 +1,10 @@ import { fetchRemoteTextByLine } from './fetch-text-by-line'; -import { processLineFromReadline } from './process-line'; import { sortDomains } from './stable-sort-domain'; import { bench, group, run } from 'mitata'; (async () => { - const data = await processLineFromReadline(await fetchRemoteTextByLine('https://osint.digitalside.it/Threat-Intel/lists/latestdomains.txt')); + const data = await Array.fromAsync(await fetchRemoteTextByLine('https://osint.digitalside.it/Threat-Intel/lists/latestdomains.txt', true)); group(() => { bench('sortDomains', () => sortDomains(data)); diff --git a/Build/lib/tldts.bench.ts b/Build/lib/tldts.bench.ts index 6c7b81be..7efa37ba 100644 --- a/Build/lib/tldts.bench.ts +++ b/Build/lib/tldts.bench.ts @@ -1,5 +1,4 @@ import { fetchRemoteTextByLine } from './fetch-text-by-line'; -import { processLineFromReadline } from './process-line'; import { bench, group, run } from 'mitata'; @@ -7,7 +6,7 @@ import * as tldts from 'tldts'; import * as tldtsExperimental from 'tldts-experimental'; (async () => { - const data = await processLineFromReadline(await fetchRemoteTextByLine('https://osint.digitalside.it/Threat-Intel/lists/latestdomains.txt')); + const data = await Array.fromAsync(await fetchRemoteTextByLine('https://osint.digitalside.it/Threat-Intel/lists/latestdomains.txt', true)); const tldtsOpt: Parameters[1] = { allowPrivateDomains: false,