-
Notifications
You must be signed in to change notification settings - Fork 110
feat:workflow node use work thread #123
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,9 +2,12 @@ import path from 'path'; | |||||||||||||||||||||||||||||||
| import { Op, Transactionable } from '@tachybase/database'; | ||||||||||||||||||||||||||||||||
| import { Logger, LoggerOptions } from '@tachybase/logger'; | ||||||||||||||||||||||||||||||||
| import Application, { Plugin, PluginOptions } from '@tachybase/server'; | ||||||||||||||||||||||||||||||||
| import { Registry } from '@tachybase/utils'; | ||||||||||||||||||||||||||||||||
| import { Registry, uid } from '@tachybase/utils'; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| import axios, { AxiosRequestConfig } from 'axios'; | ||||||||||||||||||||||||||||||||
| import FormData from 'form-data'; | ||||||||||||||||||||||||||||||||
| import LRUCache from 'lru-cache'; | ||||||||||||||||||||||||||||||||
| import mime from 'mime-types'; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| import initActions from './actions'; | ||||||||||||||||||||||||||||||||
| import { EXECUTION_STATUS, JOB_STATUS } from './constants'; | ||||||||||||||||||||||||||||||||
|
|
@@ -576,4 +579,163 @@ export default class PluginWorkflowServer extends Plugin { | |||||||||||||||||||||||||||||||
| return db.sequelize.transaction(); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| isJSON(str) { | ||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||
| return JSON.parse(str); | ||||||||||||||||||||||||||||||||
| } catch (e) { | ||||||||||||||||||||||||||||||||
| return false; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
Comment on lines
+583
to
+589
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Check JSON parse logic. - isJSON(str) {
- try {
- return JSON.parse(str);
- } catch (e) {
- return false;
- }
- }
+ isJSON(str) {
+ try {
+ JSON.parse(str);
+ return true;
+ } catch (e) {
+ return false;
+ }
+ }📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| //目前可处理url,json对象,base64 | ||||||||||||||||||||||||||||||||
| async handleResource(resource: string, origin: string, token: string) { | ||||||||||||||||||||||||||||||||
| const parseRes = this.isJSON(resource); | ||||||||||||||||||||||||||||||||
| const config: AxiosRequestConfig<any> = { | ||||||||||||||||||||||||||||||||
| method: 'get', | ||||||||||||||||||||||||||||||||
| url: resource, | ||||||||||||||||||||||||||||||||
| responseType: 'stream', | ||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||
| const form = new FormData(); | ||||||||||||||||||||||||||||||||
| if (resource.startsWith('data:')) { | ||||||||||||||||||||||||||||||||
| const matches = resource.match(/^data:(.+);base64,(.+)$/); | ||||||||||||||||||||||||||||||||
| if (matches) { | ||||||||||||||||||||||||||||||||
| const contentType = matches[1]; | ||||||||||||||||||||||||||||||||
| const base64Data = matches[2]; | ||||||||||||||||||||||||||||||||
| const buffer = Buffer.from(base64Data, 'base64'); | ||||||||||||||||||||||||||||||||
| const ext = mime.extension(contentType); | ||||||||||||||||||||||||||||||||
| const filename = `${uid()}.${ext}`; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| form.append('file', buffer, { | ||||||||||||||||||||||||||||||||
| filename, | ||||||||||||||||||||||||||||||||
| contentType, | ||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||
| throw new Error('Invalid data URL format'); | ||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error message 'Invalid data URL format' should be more descriptive to aid debugging. Consider including the problematic URL or additional context. |
||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } else if (parseRes) { | ||||||||||||||||||||||||||||||||
| const { url: resourceUrl, params: resourceParams, headers: resourceHeaders, body: resourceBody } = parseRes; | ||||||||||||||||||||||||||||||||
| config.url = resourceUrl; | ||||||||||||||||||||||||||||||||
| config.params = resourceParams; | ||||||||||||||||||||||||||||||||
| config.headers = resourceHeaders; | ||||||||||||||||||||||||||||||||
| if (resourceHeaders['content-type'] === 'multipart/form-data') { | ||||||||||||||||||||||||||||||||
| const formData = new FormData(); | ||||||||||||||||||||||||||||||||
| Object.entries(resourceBody).forEach(([key, value]) => { | ||||||||||||||||||||||||||||||||
| formData.append(key, value); | ||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||
| config.data = formData; | ||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||
| config.data = resourceBody; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| const response = await axios(config); | ||||||||||||||||||||||||||||||||
| const contentType = response.headers['content-type']; | ||||||||||||||||||||||||||||||||
| // 根据 MIME 类型获取文件扩展名 | ||||||||||||||||||||||||||||||||
| const ext = mime.extension(contentType); | ||||||||||||||||||||||||||||||||
| const filename = `${uid()}.${ext}`; | ||||||||||||||||||||||||||||||||
| // 创建 FormData 实例 | ||||||||||||||||||||||||||||||||
| form.append('file', response.data, { | ||||||||||||||||||||||||||||||||
| filename, | ||||||||||||||||||||||||||||||||
| contentType: response.headers['content-type'], | ||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||
| // 下载指定 URL 的内容 | ||||||||||||||||||||||||||||||||
| const response = await axios(config); | ||||||||||||||||||||||||||||||||
| // 获取文件的 MIME 类型 | ||||||||||||||||||||||||||||||||
| const contentType = response.headers['content-type']; | ||||||||||||||||||||||||||||||||
| // 根据 MIME 类型获取文件扩展名 | ||||||||||||||||||||||||||||||||
| const ext = mime.extension(contentType); | ||||||||||||||||||||||||||||||||
| const filename = `${uid()}.${ext}`; | ||||||||||||||||||||||||||||||||
| // 创建 FormData 实例 | ||||||||||||||||||||||||||||||||
| form.append('file', response.data, { | ||||||||||||||||||||||||||||||||
| filename, | ||||||||||||||||||||||||||||||||
| contentType: response.headers['content-type'], | ||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| const uploadResponse = await axios({ | ||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure that the |
||||||||||||||||||||||||||||||||
| method: 'post', | ||||||||||||||||||||||||||||||||
| url: origin + '/api/attachments:create', | ||||||||||||||||||||||||||||||||
| data: form, | ||||||||||||||||||||||||||||||||
| headers: { | ||||||||||||||||||||||||||||||||
| ...form.getHeaders(), | ||||||||||||||||||||||||||||||||
| Authorization: 'Bearer ' + token, | ||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||
| return uploadResponse.data.data; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| // 处理工作流中的附件字段 | ||||||||||||||||||||||||||||||||
| private async handleAttachmentFields(params: { | ||||||||||||||||||||||||||||||||
| dataSourceName: string; | ||||||||||||||||||||||||||||||||
| collectionName: string; | ||||||||||||||||||||||||||||||||
| values: any; | ||||||||||||||||||||||||||||||||
| origin: string; | ||||||||||||||||||||||||||||||||
| token: string; | ||||||||||||||||||||||||||||||||
| }) { | ||||||||||||||||||||||||||||||||
| const { dataSourceName, collectionName, values, origin, token } = params; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const collection = this.app.dataSourceManager.dataSources | ||||||||||||||||||||||||||||||||
| .get(dataSourceName) | ||||||||||||||||||||||||||||||||
| .collectionManager.getCollection(collectionName); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const fields = collection.getFields(); | ||||||||||||||||||||||||||||||||
| const fieldNames = Object.keys(values); | ||||||||||||||||||||||||||||||||
| const includesFields = fields.filter((field) => fieldNames.includes(field.options.name)); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| // 处理文件类型 | ||||||||||||||||||||||||||||||||
| for (const attachmentField of includesFields) { | ||||||||||||||||||||||||||||||||
| if (attachmentField.options.interface === 'attachment') { | ||||||||||||||||||||||||||||||||
| const urls = values[attachmentField.options.name]; | ||||||||||||||||||||||||||||||||
| if (Array.isArray(urls)) { | ||||||||||||||||||||||||||||||||
| for (const i in urls) { | ||||||||||||||||||||||||||||||||
| urls[i] = await this.handleResource(urls[i], origin, token); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||
| const url = values[attachmentField.options.name]; | ||||||||||||||||||||||||||||||||
| values[attachmentField.options.name] = await this.handleResource(url, origin, token); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| return collection; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| // 工作线程插入数据 | ||||||||||||||||||||||||||||||||
| async workerWorkflowCreate(params) { | ||||||||||||||||||||||||||||||||
| const { options, context, transaction } = params; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const collection = await this.handleAttachmentFields({ | ||||||||||||||||||||||||||||||||
| dataSourceName: params.dataSourceName, | ||||||||||||||||||||||||||||||||
| collectionName: params.collectionName, | ||||||||||||||||||||||||||||||||
| values: options.values, | ||||||||||||||||||||||||||||||||
| origin: params.origin, | ||||||||||||||||||||||||||||||||
| token: params.token, | ||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const result = await collection.repository.create({ | ||||||||||||||||||||||||||||||||
| ...options, | ||||||||||||||||||||||||||||||||
| context, | ||||||||||||||||||||||||||||||||
| transaction, | ||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||
| return result.toJSON(); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| // 工作线程刷新数据 | ||||||||||||||||||||||||||||||||
| async workerWorkflowUpdate(params) { | ||||||||||||||||||||||||||||||||
| const { options, context, transaction } = params; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const collection = await this.handleAttachmentFields({ | ||||||||||||||||||||||||||||||||
| dataSourceName: params.dataSourceName, | ||||||||||||||||||||||||||||||||
| collectionName: params.collectionName, | ||||||||||||||||||||||||||||||||
| values: options.values, | ||||||||||||||||||||||||||||||||
| origin: params.origin, | ||||||||||||||||||||||||||||||||
| token: params.token, | ||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const result = await collection.repository.update({ | ||||||||||||||||||||||||||||||||
| ...options, | ||||||||||||||||||||||||||||||||
| context, | ||||||||||||||||||||||||||||||||
| transaction, | ||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||
| return result.length ?? result; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remember to handle unknown MIME types.
mime-typesmay returnfalse. Handle the scenario wheremime.extension(contentType)is null or undefined to avoid constructing invalid filenames.