Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions servers/cu/src/domain/api/dryRun.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ export function dryRunWith (env) {
* So we explicitly set cron to undefined, for posterity
*/
cron: undefined,
needsOnlyMemory: true,
dryRun: true
needsOnlyMemory: true
}).map((res) => {
const cached = { age: new Date().getTime(), ctx: res }
/**
Expand Down
4 changes: 2 additions & 2 deletions servers/cu/src/domain/api/readState.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export function readStateWith (env) {
const loadModule = loadModuleWith(env)
const evaluate = evaluateWith(env)

return ({ processId, messageId, to, ordinate, cron, needsOnlyMemory, dryRun, body }) => {
return ({ processId, messageId, to, ordinate, cron, needsOnlyMemory, body }) => {
messageId = messageId || [to, ordinate, cron].filter(isNotNil).join(':') || 'latest'

const stats = {
Expand Down Expand Up @@ -84,7 +84,7 @@ export function readStateWith (env) {
* there is only one instance of the work used to resolve each Async,
* every time, thus preventing duplication of work
*/
pending = of({ id: processId, messageId, to, ordinate, cron, stats, needsOnlyMemory, body, dryRun })
pending = of({ id: processId, messageId, to, ordinate, cron, stats, needsOnlyMemory, body })
.chain(loadProcessMeta)
.chain(loadProcess)
.chain((ctx) => {
Expand Down
3 changes: 1 addition & 2 deletions servers/cu/src/domain/dal.js
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,7 @@ export const loadMessagesSchema = z.function()
assignmentId: z.string().nullish(),
hashChain: z.string().nullish(),
isColdStart: z.boolean(),
body: z.any().optional(),
dryRun: z.boolean().optional()
body: z.any().optional()
})
)
/**
Expand Down
24 changes: 1 addition & 23 deletions servers/cu/src/domain/lib/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -192,35 +192,13 @@ export function evaluateWith (env) {
continue
}

const skip = message.Skip === 'true'
if (skip) logger(`Skipping message "${name}" because 'Skip' tag is set to 'true'`)

prev = await Promise.resolve(prev)
.then((prev) =>
Promise.resolve(prev.Memory)
/**
* Where the actual evaluation is performed
*/
.then((Memory) => {
// console.dir({ m: 'Evaluating message', message }, {depth:null})
if (skip) {
return {
Memory,
Error: undefined,
Messages: [],
Assignments: [],
Spawns: [],
Output: {
data: '',
prompt: '',
print: false
},
Patches: [],
GasUsed: 0
}
}
return ctx.evaluator({ first, noSave, name, deepHash, cron, ordinate, isAssignment, processId: ctx.id, Memory, message, AoGlobal })
})
.then((Memory) => ctx.evaluator({ first, noSave, name, deepHash, cron, ordinate, isAssignment, processId: ctx.id, Memory, message, AoGlobal }))
/**
* These values are folded,
* so that we can potentially update the process memory cache
Expand Down
3 changes: 1 addition & 2 deletions servers/cu/src/domain/lib/loadMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,7 @@ function loadScheduledMessagesWith ({ loadMessages, logger }) {
assignmentId: ctx.mostRecentAssignmentId,
hashChain: ctx.mostRecentHashChain,
isColdStart: ctx.isColdStart,
body: ctx.body,
dryRun: ctx.dryRun
body: ctx.body
})
)
}
Expand Down
1 change: 0 additions & 1 deletion servers/cu/src/domain/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,6 @@ export const messageSchema = z.object({
* Whether the message is a cron generated message or not
*/
Cron: z.boolean(),
Skip: z.string().default('false'),
'Read-Only': z.boolean().default(false)
}),
AoGlobal: z.object({
Expand Down
16 changes: 8 additions & 8 deletions servers/cu/src/effects/hb/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ export const mapNode = (type) => pipe(
Target: path(['Process']),
Epoch: pipe(path(['Epoch']), parseInt),
Nonce: pipe(path(['Slot'])(tags) ? path(['Slot']) : path(['Nonce']), parseInt),
Skip: pathOr('false', ['Skip']),
Timestamp: pipe(path(['Timestamp']), parseInt),
'Block-Height': pipe(
/**
Expand Down Expand Up @@ -431,7 +430,7 @@ export const loadMessagesWith = ({ hashChain, fetch, logger: _logger, pageSize }
* When the currently fetched page is drained, the next page is fetched
* dynamically
*/
function fetchAllPages ({ suUrl, processId, isColdStart, from, to, body, dryRun }) {
function fetchAllPages ({ suUrl, processId, isColdStart, from, to, body }) {
/**
* The HB SU 'from' and 'to' are both inclusive.
* So when we pass from (which is the cached most recent evaluated message)
Expand All @@ -454,10 +453,9 @@ export const loadMessagesWith = ({ hashChain, fetch, logger: _logger, pageSize }
body.edges.length === (+to - +from + 1) &&
+body.edges[0]?.node?.assignment?.Tags?.find(t => t.name === 'Nonce' || t.name === 'Slot')?.value === +from
if (bodyIsValid) return body
if (!dryRun) throw new Error('Body is not valid: would attempt to fetch from scheduler in loadMessages')
return fetchPageDataloader.load({ suUrl, processId, from, to, pageSize })
},
{ maxRetries: 1, delay: 500, log: logger, name: `loadMessages(${JSON.stringify({ suUrl, processId, params: params.toString() })})` }
{ maxRetries: 5, delay: 500, log: logger, name: `loadMessages(${JSON.stringify({ suUrl, processId, params: params.toString() })})` }
)
}

Expand Down Expand Up @@ -583,10 +581,10 @@ export const loadMessagesWith = ({ hashChain, fetch, logger: _logger, pageSize }
.then(({
suUrl, processId, block: processBlock, owner: processOwner, tags: processTags,
moduleId, moduleOwner, moduleTags, fromOrdinate, toOrdinate, assignmentId, hashChain,
isColdStart, body, dryRun
isColdStart, body
}) => {
return [
Readable.from(fetchAllPages({ suUrl, processId, isColdStart, from: fromOrdinate, to: toOrdinate, body, dryRun })()),
Readable.from(fetchAllPages({ suUrl, processId, isColdStart, from: fromOrdinate, to: toOrdinate, body })()),
Transform.from(mapAoMessage({
processId,
processBlock,
Expand All @@ -607,15 +605,17 @@ export const loadMessagesWith = ({ hashChain, fetch, logger: _logger, pageSize }

export const loadMessageMetaWith = ({ fetch, logger }) => {
return async ({ suUrl, processId, messageUid, body }) => {
const params = toParams({ processId, to: messageUid, from: messageUid, pageSize: 1 })

return backoff(
() => {
const bodyIsValid = body &&
body.edges.length === 1 &&
+body.edges[0]?.node?.assignment?.Tags?.find(t => t.name === 'Nonce' || t.name === 'Slot')?.value === +messageUid
if (bodyIsValid) return body
throw new Error('Body is not valid: would attempt to fetch from scheduler in loadMessageMeta')
return fetch(`${suUrl}/~scheduler@1.0/schedule?${params.toString()}`).then(okRes)
},
{ maxRetries: 2, delay: 500, log: logger, name: `loadMessageMeta(${JSON.stringify({ suUrl, processId, messageUid })})` }
{ maxRetries: 5, delay: 500, log: logger, name: `loadMessageMeta(${JSON.stringify({ suUrl, processId, messageUid })})` }
)
.catch(async (err) => {
logger(
Expand Down