Refactor: use TransformStream

This commit is contained in:
SukkaW 2023-11-23 17:38:04 +08:00
parent 7a6107761f
commit 265df07893
5 changed files with 143 additions and 49 deletions

View File

@ -17,8 +17,7 @@ const outputSurgeDir = path.resolve(__dirname, '../List');
const outputClashDir = path.resolve(__dirname, '../Clash'); const outputClashDir = path.resolve(__dirname, '../Clash');
export const buildCommon = task(__filename, async () => { export const buildCommon = task(__filename, async () => {
/** @type {Promise<unknown>[]} */ const promises: Promise<unknown>[] = [];
const promises = [];
const pw = new PathScurry(sourceDir); const pw = new PathScurry(sourceDir);
for await (const entry of pw) { for await (const entry of pw) {
@ -50,12 +49,10 @@ if (import.meta.main) {
} }
const processFile = async (sourcePath: string) => { const processFile = async (sourcePath: string) => {
/** @type {string[]} */ const lines: string[] = [];
const lines = [];
let title = ''; let title = '';
/** @type {string[]} */ const descriptions: string[] = [];
const descriptions = [];
for await (const line of readFileByLine(sourcePath)) { for await (const line of readFileByLine(sourcePath)) {
if (line === MAGIC_COMMAND_SKIP) { if (line === MAGIC_COMMAND_SKIP) {

View File

@ -43,7 +43,7 @@ export const downloadPreviousBuild = task(__filename, async () => {
if (flag & ALL_FILES_EXISTS) { if (flag & ALL_FILES_EXISTS) {
console.log('All files exists, skip download.'); console.log('All files exists, skip download.');
return; // return;
} }
const extractedPath = path.join(os.tmpdir(), `sukka-surge-last-build-extracted-${Date.now()}`); const extractedPath = path.join(os.tmpdir(), `sukka-surge-last-build-extracted-${Date.now()}`);
@ -80,6 +80,7 @@ export const downloadPreviousBuild = task(__filename, async () => {
const targetFile = Bun.file(targetPath); const targetFile = Bun.file(targetPath);
const targetFileSink = targetFile.writer(); const targetFileSink = targetFile.writer();
// I don't know, but for some reason it is impossible to consume entry with AsyncIterator
await new Promise<void>((resolve, reject) => { await new Promise<void>((resolve, reject) => {
entry.on('data', (chunk) => { entry.on('data', (chunk) => {
targetFileSink.write(chunk); targetFileSink.write(chunk);

View File

@ -1,34 +1,16 @@
import type { BunFile } from 'bun'; import type { BunFile } from 'bun';
import { fetchWithRetry, defaultRequestInit } from './fetch-retry'; import { fetchWithRetry, defaultRequestInit } from './fetch-retry';
import { TextLineStream } from './text-line-transform-stream';
import { PolyfillTextDecoderStream } from './text-decoder-stream';
const decoder = new TextDecoder('utf-8'); export function readFileByLine(file: string | BunFile) {
export async function* readFileByLine(file: string | BunFile): AsyncGenerator<string> {
if (typeof file === 'string') { if (typeof file === 'string') {
file = Bun.file(file); file = Bun.file(file);
} }
return file.stream().pipeThrough(new PolyfillTextDecoderStream()).pipeThrough(new TextLineStream());
let buf = '';
for await (const chunk of file.stream()) {
const chunkStr = decoder.decode(chunk).replaceAll('\r\n', '\n');
for (let i = 0, len = chunkStr.length; i < len; i++) {
const char = chunkStr[i];
if (char === '\n') {
yield buf;
buf = '';
} else {
buf += char;
}
}
}
if (buf) {
yield buf;
}
} }
export async function* createReadlineInterfaceFromResponse(resp: Response): AsyncGenerator<string> { export async function createReadlineInterfaceFromResponse(resp: Response) {
if (!resp.body) { if (!resp.body) {
throw new Error('Failed to fetch remote text'); throw new Error('Failed to fetch remote text');
} }
@ -36,26 +18,9 @@ export async function* createReadlineInterfaceFromResponse(resp: Response): Asyn
throw new Error('Body has already been consumed.'); throw new Error('Body has already been consumed.');
} }
let buf = ''; return (resp.body as ReadableStream<Uint8Array>).pipeThrough(new PolyfillTextDecoderStream()).pipeThrough(new TextLineStream());
for await (const chunk of resp.body) {
const chunkStr = decoder.decode(chunk).replaceAll('\r\n', '\n');
for (let i = 0, len = chunkStr.length; i < len; i++) {
const char = chunkStr[i];
if (char === '\n') {
yield buf;
buf = '';
} else {
buf += char;
}
}
}
if (buf) {
yield buf;
}
} }
export function fetchRemoteTextAndCreateReadlineInterface(url: string | URL): Promise<AsyncGenerator<string>> { export function fetchRemoteTextAndCreateReadlineInterface(url: string | URL) {
return fetchWithRetry(url, defaultRequestInit).then(res => createReadlineInterfaceFromResponse(res)); return fetchWithRetry(url, defaultRequestInit).then(res => createReadlineInterfaceFromResponse(res));
} }

View File

@ -0,0 +1,56 @@
// Copyright 2016 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Polyfill for TextEncoderStream and TextDecoderStream
// Modified by Sukka (https://skk.moe) to increase compatibility and performance with Bun.
export class PolyfillTextDecoderStream extends TransformStream<Uint8Array, string> {
readonly encoding: string;
readonly fatal: boolean;
readonly ignoreBOM: boolean;
constructor(
encoding: Encoding = 'utf-8',
{
fatal = false,
ignoreBOM = false,
}: ConstructorParameters<typeof TextDecoder>[1] = {},
) {
const decoder = new TextDecoder(encoding, { fatal, ignoreBOM });
super({
transform(chunk: Uint8Array, controller: TransformStreamDefaultController<string>) {
const decoded = decoder.decode(chunk);
if (decoded.length > 0) {
controller.enqueue(decoded);
}
},
flush(controller: TransformStreamDefaultController<string>) {
// If {fatal: false} is in options (the default), then the final call to
// decode() can produce extra output (usually the unicode replacement
// character 0xFFFD). When fatal is true, this call is just used for its
// side-effect of throwing a TypeError exception if the input is
// incomplete.
const output = decoder.decode();
if (output.length > 0) {
controller.enqueue(output);
}
}
});
this.encoding = encoding;
this.fatal = fatal;
this.ignoreBOM = ignoreBOM;
}
}

View File

@ -0,0 +1,75 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// This module is browser compatible.
// Modified by Sukka (https://skk.moe) to increase compatibility and performance with Bun.
interface TextLineStreamOptions {
/** Allow splitting by solo \r */
allowCR: boolean;
}
/** Transform a stream into a stream where each chunk is divided by a newline,
* be it `\n` or `\r\n`. `\r` can be enabled via the `allowCR` option.
*
* ```ts
* import { TextLineStream } from 'https://deno.land/std@$STD_VERSION/streams/text_line_stream.ts';
* const res = await fetch('https://example.com');
* const lines = res.body!
* .pipeThrough(new TextDecoderStream())
* .pipeThrough(new TextLineStream());
* ```
*/
export class TextLineStream extends TransformStream<string, string> {
private __allowCR: boolean;
private __buf = '';
constructor(options?: TextLineStreamOptions) {
super({
transform: (chunk, controller) => this.handle(chunk, controller),
flush: (controller) => {
if (this.__buf.length > 0) {
if (
this.__allowCR &&
this.__buf[this.__buf.length - 1] === '\r'
) controller.enqueue(this.__buf.slice(0, -1));
else controller.enqueue(this.__buf);
}
},
});
this.__allowCR = options?.allowCR ?? false;
}
private handle(chunk: string, controller: TransformStreamDefaultController<string>) {
chunk = this.__buf + chunk;
for (;;) {
const lfIndex = chunk.indexOf('\n');
if (this.__allowCR) {
const crIndex = chunk.indexOf('\r');
if (
crIndex !== -1 && crIndex !== (chunk.length - 1) &&
(lfIndex === -1 || (lfIndex - 1) > crIndex)
) {
controller.enqueue(chunk.slice(0, crIndex));
chunk = chunk.slice(crIndex + 1);
continue;
}
}
if (lfIndex !== -1) {
let crOrLfIndex = lfIndex;
if (chunk[lfIndex - 1] === '\r') {
crOrLfIndex--;
}
controller.enqueue(chunk.slice(0, crOrLfIndex));
chunk = chunk.slice(lfIndex + 1);
continue;
}
break;
}
this.__buf = chunk;
}
}