Chore: allow trace tool to run across workers/realms

This commit is contained in:
SukkaW
2026-04-13 16:20:29 +08:00
parent ed5d088d43
commit a4dfcaa669
4 changed files with 87 additions and 21 deletions

View File

@@ -99,6 +99,7 @@ export const buildRejectDomainSet = task(require.main === module, __filename)(as
// It is faster to add base than add others first then whitelist
rejectDomainsetOutput.addFromRuleset(readLocalRejectRulesetPromise);
rejectExtraDomainsetOutput.addFromRuleset(readLocalRejectRulesetPromise);
rejectPhisingDomainsetOutput.addFromRuleset(readLocalRejectRulesetPromise);
rejectNonIpRulesetOutput.addFromRuleset(readLocalRejectRulesetPromise);
@@ -130,7 +131,7 @@ export const buildRejectDomainSet = task(require.main === module, __filename)(as
arrayPushNonNullish(promises, domainListsExtraDownloads.map(task => task(childSpan).then(appendArrayToRejectExtraOutput)));
rejectPhisingDomainsetOutput.addFromDomainset(
span.traceChildPromise('get phishing domains', phishingWorker.getPhishingDomains())
span.traceWorkerChild('get phishing domains', rawSpan => phishingWorker.getPhishingDomains(rawSpan))
);
arrayPushNonNullish(
@@ -212,10 +213,9 @@ export const buildRejectDomainSet = task(require.main === module, __filename)(as
for (let i = 0, len = arr.length; i < len; i++) {
const line = arr[i];
if (line.startsWith('bogus-nxdomain=')) {
// bogus nxdomain needs to be blocked even after resolved
rejectIPOutput.addAnyCIDR(
line.slice(15).trim(),
false
false // bogus nxdomain needs to be blocked even after resolved
);
}
}

View File

@@ -124,10 +124,12 @@ const buildFinishedLock = path.join(ROOT_DIR, '.BUILD_FINISHED');
});
printStats(traces);
await microsoftCdnWorker.end();
await cdnDownloadWorker.end();
await telegramCidrWorker.end();
await mockAssetsWorker.end();
await Promise.all([
microsoftCdnWorker.end(),
cdnDownloadWorker.end(),
telegramCidrWorker.end(),
mockAssetsWorker.end()
]);
// Finish the build to avoid leaking timer/fetch ref
await whyIsNodeRunning();

View File

@@ -2,7 +2,8 @@ import picocolors from 'picocolors';
import { parse } from 'tldts-experimental';
import { appendArrayInPlaceCurried } from 'foxts/append-array-in-place';
import { dummySpan } from '../trace';
import { workerJob } from '../trace';
import type { RawSpan, WorkerJobResult } from '../trace';
import type { TldTsParsed } from './normalize-domain';
import { loosTldOptWithPrivateDomains } from '../constants/loose-tldts-opt';
@@ -14,18 +15,18 @@ import { processDomainListsWithPreload } from './parse-filter/domainlists';
import process from 'node:process';
export function getPhishingDomains(isDebug = false): Promise<string[]> {
return dummySpan.traceChild('get phishing domains').traceAsyncFn(async (span) => span.traceChildAsync(
'process phishing domain set',
async () => {
const downloads = [
...PHISHING_DOMAIN_LISTS_EXTRA.map(entry => processDomainListsWithPreload(...entry)),
...PHISHING_HOSTS_EXTRA.map(entry => processHostsWithPreload(...entry))
];
export function getPhishingDomains(rawSpan?: RawSpan, isDebug = false): Promise<WorkerJobResult<string[]>> {
return workerJob(rawSpan, async (childSpan) => {
const downloads = [
...PHISHING_DOMAIN_LISTS_EXTRA.map(entry => processDomainListsWithPreload(...entry)),
...PHISHING_HOSTS_EXTRA.map(entry => processHostsWithPreload(...entry))
];
const domainGroups = await Promise.all(downloads.map(task => task(childSpan)));
return childSpan.traceChildSync<string[]>('calculate and handling mass phishing domains', () => {
const domainArr: string[] = [];
const domainGroups = await Promise.all(downloads.map(task => task(dummySpan)));
domainGroups.forEach(appendArrayInPlaceCurried(domainArr));
const domainCountMap = new Map<string, number>();
@@ -117,8 +118,8 @@ export function getPhishingDomains(isDebug = false): Promise<string[]> {
}
return domainArr;
}
));
});
});
}
function calcDomainAbuseScore(subdomain: string, fullDomain: string = subdomain) {
@@ -167,5 +168,5 @@ function calcDomainAbuseScore(subdomain: string, fullDomain: string = subdomain)
}
if (!process.env.JEST_WORKER_ID && require.main === module) {
getPhishingDomains(true).catch(console.error);
getPhishingDomains(undefined, true).catch(console.error);
}

View File

@@ -24,6 +24,7 @@ export interface RawSpan {
export interface Span {
[spanTag]: true,
readonly rawSpan: RawSpan,
readonly stop: (time?: number) => void,
readonly traceChild: (name: string) => Span,
readonly traceSyncFn: <T>(fn: (span: Span) => T) => T,
@@ -32,6 +33,7 @@ export interface Span {
readonly traceChildSync: <T>(name: string, fn: (span: Span) => T) => T,
readonly traceChildAsync: <T>(name: string, fn: (span: Span) => Promise<T>) => Promise<T>,
readonly traceChildPromise: <T>(name: string, promise: Promise<T>) => Promise<T>,
readonly traceWorkerChild: <T>(name: string, factory: (rawSpan: RawSpan) => Promise<WorkerJobResult<T>>) => Promise<T>,
readonly traceResult: TraceResult
}
@@ -55,6 +57,7 @@ export function makeSpan(rawSpan: RawSpan): Span {
const span: Span = {
[spanTag]: true,
rawSpan,
stop,
traceChild,
traceSyncFn<T>(fn: (span: Span) => T) {
@@ -75,7 +78,15 @@ export function makeSpan(rawSpan: RawSpan): Span {
},
traceChildSync: <T>(name: string, fn: (span: Span) => T): T => traceChild(name).traceSyncFn(fn),
traceChildAsync: <T>(name: string, fn: (span: Span) => T | Promise<T>): Promise<T> => traceChild(name).traceAsyncFn(fn),
traceChildPromise: <T>(name: string, promise: Promise<T>): Promise<T> => traceChild(name).tracePromise(promise)
traceChildPromise: <T>(name: string, promise: Promise<T>): Promise<T> => traceChild(name).tracePromise(promise),
async traceWorkerChild<T>(name: string, factory: (rawSpan: RawSpan) => Promise<WorkerJobResult<T>>): Promise<T> {
const childSpan = traceChild(name);
const { result, traceResult, workerTimeOrigin } = await factory(childSpan.rawSpan);
mergeWorkerTrace(childSpan, traceResult, workerTimeOrigin);
childSpan.stop();
return result;
}
};
// eslint-disable-next-line sukka/no-redundant-variable -- self reference
@@ -172,6 +183,58 @@ export async function whyIsNodeRunning() {
// };
// };
function adjustTraceTimestamps(trace: TraceResult, offset: number): TraceResult {
return {
name: trace.name,
start: trace.start + offset,
end: trace.end + offset,
children: trace.children.map(child => adjustTraceTimestamps(child, offset))
};
}
function mergeWorkerTrace(
parentSpan: Span,
workerTraceResult: TraceResult,
workerTimeOrigin: number
): void {
const offset = workerTimeOrigin - performance.timeOrigin;
for (const child of workerTraceResult.children) {
parentSpan.traceResult.children.push(adjustTraceTimestamps(child, offset));
}
}
/** The envelope that a worker function returns so the main thread can recover both the result and the trace. */
export interface WorkerJobResult<T> {
result: T,
traceResult: TraceResult,
workerTimeOrigin: number
}
/**
* Worker-side wrapper. Call this instead of manually constructing spans.
*
* - When `rawSpan` is provided (normal worker invocation from the main thread),
* it is wrapped with {@link makeSpan} so all child spans are attached to the
* caller's trace tree and can be recovered after the job finishes.
* - When `rawSpan` is `undefined` (standalone / CLI invocation), a fresh
* child span of {@link dummySpan} is used instead.
*
* The impl function receives a full {@link Span} and returns its result
* normally; the wrapper packages everything into a {@link WorkerJobResult}.
*/
export async function workerJob<T>(
rawSpan: RawSpan | undefined,
impl: (span: Span) => Promise<T>
): Promise<WorkerJobResult<T>> {
const span = rawSpan == null ? dummySpan.traceChild('worker-standalone') : makeSpan(rawSpan);
const result = await impl(span);
return {
result,
traceResult: span.traceResult,
workerTimeOrigin: performance.timeOrigin
};
}
export function printTraceResult(traceResult: TraceResult) {
printTree(
traceResult,