Flows improvements (#16773)

This commit is contained in:
Nitwel
2023-01-19 14:15:12 +01:00
committed by GitHub
parent 7e43b08e53
commit f36ebb1a0b
3 changed files with 94 additions and 13 deletions

View File

@@ -5,28 +5,58 @@ import { getFlowManager } from '../../flows';
type Options = {
flow: string;
payload?: Record<string, any> | Record<string, any>[] | string | null;
iterationMode?: 'serial' | 'batch' | 'parallel';
batchSize?: number;
};
export default defineOperationApi<Options>({
id: 'trigger',
handler: async ({ flow, payload }, context) => {
handler: async ({ flow, payload, iterationMode, batchSize }, context) => {
const flowManager = getFlowManager();
const payloadObject = optionToObject(payload) ?? null;
let result: unknown | unknown[];
if (Array.isArray(payloadObject)) {
result = await Promise.all(
payloadObject.map((payload) => {
return flowManager.runOperationFlow(flow, payload, omit(context, 'data'));
})
);
} else {
result = await flowManager.runOperationFlow(flow, payloadObject, omit(context, 'data'));
if (iterationMode === 'serial') {
const result = [];
for (const payload of payloadObject) {
result.push(await flowManager.runOperationFlow(flow, payload, omit(context, 'data')));
}
return result;
}
if (iterationMode === 'batch') {
const size = batchSize ?? 10;
const result = [];
for (let i = 0; i < payloadObject.length; i += size) {
const batch = payloadObject.slice(i, i + size);
const batchResults = await Promise.all(
batch.map((payload) => {
return flowManager.runOperationFlow(flow, payload, omit(context, 'data'));
})
);
result.push(...batchResults);
}
return result;
}
if (iterationMode === 'parallel' || !iterationMode) {
return await Promise.all(
payloadObject.map((payload) => {
return flowManager.runOperationFlow(flow, payload, omit(context, 'data'));
})
);
}
}
return result;
return await flowManager.runOperationFlow(flow, payloadObject, omit(context, 'data'));
},
});