Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions packages/module-workflow/src/client/nodes/create.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ export default class extends Instruction {
// disabled: true
// }
// },
useWorker: {
type: 'boolean',
title: '强制优先使用工作线程',
name: 'useWorker',
'x-decorator': 'FormItem',
'x-component': 'Checkbox',
// 'x-component-props': {
// disabled: true
// }
},
params: {
type: 'object',
properties: {
Expand Down
164 changes: 163 additions & 1 deletion packages/module-workflow/src/server/Plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ import path from 'path';
import { Op, Transactionable } from '@tachybase/database';
import { Logger, LoggerOptions } from '@tachybase/logger';
import Application, { Plugin, PluginOptions } from '@tachybase/server';
import { Registry } from '@tachybase/utils';
import { Registry, uid } from '@tachybase/utils';

import axios, { AxiosRequestConfig } from 'axios';
import FormData from 'form-data';
import LRUCache from 'lru-cache';
import mime from 'mime-types';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remember to handle unknown MIME types.
mime-types may return false. Handle the scenario where mime.extension(contentType) is null or undefined to avoid constructing invalid filenames.

- const ext = mime.extension(contentType);
+ const ext = mime.extension(contentType) || 'bin';

Committable suggestion skipped: line range outside the PR's diff.


import initActions from './actions';
import { EXECUTION_STATUS, JOB_STATUS } from './constants';
Expand Down Expand Up @@ -576,4 +579,163 @@ export default class PluginWorkflowServer extends Plugin {
return db.sequelize.transaction();
}
}

isJSON(str) {
try {
return JSON.parse(str);
} catch (e) {
return false;
}
}
Comment on lines +583 to +589
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Check JSON parse logic.
isJSON(str) returns the parsed object on success or false on failure. The name suggests a boolean return, but actually returns parsed JSON or false. This might be confusing. Either rename it or return only booleans.

- isJSON(str) {
-   try {
-     return JSON.parse(str);
-   } catch (e) {
-     return false;
-   }
- }
+ isJSON(str) {
+   try {
+     JSON.parse(str);
+     return true;
+   } catch (e) {
+     return false;
+   }
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
isJSON(str) {
try {
return JSON.parse(str);
} catch (e) {
return false;
}
}
isJSON(str) {
try {
JSON.parse(str);
return true;
} catch (e) {
return false;
}
}


//目前可处理url,json对象,base64
async handleResource(resource: string, origin: string, token: string) {
const parseRes = this.isJSON(resource);
const config: AxiosRequestConfig<any> = {
method: 'get',
url: resource,
responseType: 'stream',
};
const form = new FormData();
if (resource.startsWith('data:')) {
const matches = resource.match(/^data:(.+);base64,(.+)$/);
if (matches) {
const contentType = matches[1];
const base64Data = matches[2];
const buffer = Buffer.from(base64Data, 'base64');
const ext = mime.extension(contentType);
const filename = `${uid()}.${ext}`;

form.append('file', buffer, {
filename,
contentType,
});
} else {
throw new Error('Invalid data URL format');

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message 'Invalid data URL format' should be more descriptive to aid debugging. Consider including the problematic URL or additional context.

}
} else if (parseRes) {
const { url: resourceUrl, params: resourceParams, headers: resourceHeaders, body: resourceBody } = parseRes;
config.url = resourceUrl;
config.params = resourceParams;
config.headers = resourceHeaders;
if (resourceHeaders['content-type'] === 'multipart/form-data') {
const formData = new FormData();
Object.entries(resourceBody).forEach(([key, value]) => {
formData.append(key, value);
});
config.data = formData;
} else {
config.data = resourceBody;
}
const response = await axios(config);
const contentType = response.headers['content-type'];
// 根据 MIME 类型获取文件扩展名
const ext = mime.extension(contentType);
const filename = `${uid()}.${ext}`;
// 创建 FormData 实例
form.append('file', response.data, {
filename,
contentType: response.headers['content-type'],
});
} else {
// 下载指定 URL 的内容
const response = await axios(config);
// 获取文件的 MIME 类型
const contentType = response.headers['content-type'];
// 根据 MIME 类型获取文件扩展名
const ext = mime.extension(contentType);
const filename = `${uid()}.${ext}`;
// 创建 FormData 实例
form.append('file', response.data, {
filename,
contentType: response.headers['content-type'],
});
}
const uploadResponse = await axios({

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure that the axios request to upload attachments handles potential errors, such as network issues or server errors, to prevent unhandled promise rejections.

method: 'post',
url: origin + '/api/attachments:create',
data: form,
headers: {
...form.getHeaders(),
Authorization: 'Bearer ' + token,
},
});
return uploadResponse.data.data;
}

// 处理工作流中的附件字段
private async handleAttachmentFields(params: {
dataSourceName: string;
collectionName: string;
values: any;
origin: string;
token: string;
}) {
const { dataSourceName, collectionName, values, origin, token } = params;

const collection = this.app.dataSourceManager.dataSources
.get(dataSourceName)
.collectionManager.getCollection(collectionName);

const fields = collection.getFields();
const fieldNames = Object.keys(values);
const includesFields = fields.filter((field) => fieldNames.includes(field.options.name));

// 处理文件类型
for (const attachmentField of includesFields) {
if (attachmentField.options.interface === 'attachment') {
const urls = values[attachmentField.options.name];
if (Array.isArray(urls)) {
for (const i in urls) {
urls[i] = await this.handleResource(urls[i], origin, token);
}
} else {
const url = values[attachmentField.options.name];
values[attachmentField.options.name] = await this.handleResource(url, origin, token);
}
}
}

return collection;
}

// 工作线程插入数据
async workerWorkflowCreate(params) {
const { options, context, transaction } = params;

const collection = await this.handleAttachmentFields({
dataSourceName: params.dataSourceName,
collectionName: params.collectionName,
values: options.values,
origin: params.origin,
token: params.token,
});

const result = await collection.repository.create({
...options,
context,
transaction,
});
return result.toJSON();
}

// 工作线程刷新数据
async workerWorkflowUpdate(params) {
const { options, context, transaction } = params;

const collection = await this.handleAttachmentFields({
dataSourceName: params.dataSourceName,
collectionName: params.collectionName,
values: options.values,
origin: params.origin,
token: params.token,
});

const result = await collection.repository.update({
...options,
context,
transaction,
});
return result.length ?? result;
}
}
145 changes: 34 additions & 111 deletions packages/module-workflow/src/server/instructions/CreateInstruction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import _ from 'lodash';
import mime from 'mime-types';

import { Instruction } from '.';
import { PluginWorkflow } from '..';
import { JOB_STATUS } from '../constants';
import PluginWorkflowServer from '../Plugin';
import type Processor from '../Processor';
import type { FlowNodeModel } from '../types';
import { toJSON } from '../utils';
Expand All @@ -18,128 +20,49 @@ export class CreateInstruction extends Instruction {
const { collection, params: { appends = [], ...params } = {} } = node.config;
const [dataSourceName, collectionName] = parseCollectionName(collection);

const { repository, filterTargetKey } = this.workflow.app.dataSourceManager.dataSources
.get(dataSourceName)
.collectionManager.getCollection(collectionName);
const userId = _.get(processor.getScope(node.id), '$context.user.id', '');
const token = this.workflow.app.authManager.jwt.sign({ userId });
const options = processor.getParsedValue(params, node.id);
const origin = Gateway.getInstance().runAtLoop;

const transaction = this.workflow.useDataSourceTransaction(dataSourceName, processor.transaction);
const app = this.workflow.app;
const plugin = app.pm.get(PluginWorkflow);

const c = this.workflow.app.dataSourceManager.dataSources
const { repository, filterTargetKey } = this.workflow.app.dataSourceManager.dataSources
.get(dataSourceName)
.collectionManager.getCollection(collectionName);
const fields = c.getFields();
const fieldNames = Object.keys(params.values);
const includesFields = fields.filter((field) => fieldNames.includes(field.options.name));

const userId = _.get(processor.getScope(node.id), '$context.user.id', '');
const token = this.workflow.app.authManager.jwt.sign({ userId });
const isJSON = (str) => {
try {
return JSON.parse(str);
} catch (e) {
return false;
}
const context = {
stack: Array.from(new Set((processor.execution.context.stack ?? []).concat(processor.execution.id))),
};
//目前可处理url,json对象,base64
const handleResource = async (resource) => {
const parseRes = isJSON(resource);
const config: AxiosRequestConfig<any> = {
method: 'get',
url: resource,
responseType: 'stream',
};
const form = new FormData();
if (resource.startsWith('data:')) {
const matches = resource.match(/^data:(.+);base64,(.+)$/);
if (matches) {
const contentType = matches[1];
const base64Data = matches[2];
const buffer = Buffer.from(base64Data, 'base64');
const ext = mime.extension(contentType);
const filename = `${uid()}.${ext}`;

form.append('file', buffer, {
filename,
contentType,
});
} else {
throw new Error('Invalid data URL format');
}
} else if (parseRes) {
const { url: resourceUrl, params: resourceParams, headers: resourceHeaders, body: resourceBody } = parseRes;
config.url = resourceUrl;
config.params = resourceParams;
config.headers = resourceHeaders;
if (resourceHeaders['content-type'] === 'multipart/form-data') {
const formData = new FormData();
Object.entries(resourceBody).forEach(([key, value]) => {
formData.append(key, value);
});
config.data = formData;
} else {
config.data = resourceBody;
}
const response = await axios(config);
const contentType = response.headers['content-type'];
// 根据 MIME 类型获取文件扩展名
const ext = mime.extension(contentType);
const filename = `${uid()}.${ext}`;
// 创建 FormData 实例
form.append('file', response.data, {
filename,
contentType: response.headers['content-type'],
});
} else {
// 下载指定 URL 的内容
const response = await axios(config);
// 获取文件的 MIME 类型
const contentType = response.headers['content-type'];
// 根据 MIME 类型获取文件扩展名
const ext = mime.extension(contentType);
const filename = `${uid()}.${ext}`;
// 创建 FormData 实例
form.append('file', response.data, {
filename,
contentType: response.headers['content-type'],
});
}
// 发送 multipart 请求
const origin = Gateway.getInstance().runAtLoop;
const uploadResponse = await axios({
method: 'post',
url: origin + '/api/attachments:create',
data: form,
headers: {
...form.getHeaders(),
Authorization: 'Bearer ' + token,
let created;
if (node?.config?.useWorker && !transaction && app.worker.available) {
created = await app.worker.callPluginMethod({
plugin: PluginWorkflowServer,
method: 'workerWorkflowCreate',
params: {
dataSourceName,
collectionName,
origin,
token,
options,
context,
transaction,
},
});
return uploadResponse.data.data;
};

// 处理文件类型
for (const attachmentField of includesFields) {
if (attachmentField.options.interface === 'attachment') {
const urls = options.values[attachmentField.options.name];
if (Array.isArray(urls)) {
for (const i in urls) {
urls[i] = await handleResource(urls[i]);
}
} else {
const url = options.values[attachmentField.options.name];
options.values[attachmentField.options.name] = await handleResource(url);
}
}
} else {
created = await plugin.workerWorkflowCreate({
dataSourceName,
collectionName,
origin,
token,
options,
context,
transaction,
});
}

const created = await repository.create({
...options,
context: {
stack: Array.from(new Set((processor.execution.context.stack ?? []).concat(processor.execution.id))),
},
transaction,
});

let result = created;
if (created && appends.length) {
const includeFields = appends.reduce((set, field) => {
Expand Down
Loading
Loading