Skip to content

Commit 1f07fb8

Browse files
authored
Merge pull request #1287 from permaweb/revert-1282-feat/hb-unit-gateway-fix
Revert "fix(cu): call block gateway less"
2 parents d9865ee + 5cb2f4b commit 1f07fb8

File tree

7 files changed

+28
-95
lines changed

7 files changed

+28
-95
lines changed

servers/cu/src/domain/dal.js

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,6 @@ export const loadBlocksMetaSchema = z.function()
4848
z.array(blockSchema.passthrough())
4949
))
5050

51-
export const getLatestBlockSchema = z.function()
52-
.returns(z.promise(z.number()))
53-
5451
// Process
5552

5653
export const findProcessSchema = z.function()

servers/cu/src/domain/lib/loadMessages.js

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import { Transform } from 'node:stream'
22

3-
import { Rejected, Resolved, fromPromise, of } from 'hyper-async'
3+
import { Resolved, fromPromise, of } from 'hyper-async'
44
import { T, always, ascend, cond, equals, identity, ifElse, isNil, last, length, pipe, prop, reduce, uniqBy } from 'ramda'
55
import ms from 'ms'
66

77
import { mapFrom, parseTags } from '../utils.js'
8-
import { findBlocksSchema, getLatestBlockSchema, loadBlocksMetaSchema, loadMessagesSchema, loadTimestampSchema, saveBlocksSchema } from '../dal.js'
8+
import { findBlocksSchema, loadBlocksMetaSchema, loadMessagesSchema, loadTimestampSchema, saveBlocksSchema } from '../dal.js'
99

1010
export const toSeconds = (millis) => Math.floor(millis / 1000)
1111

@@ -329,11 +329,10 @@ export function cronMessagesBetweenWith ({
329329
}
330330
}
331331

332-
function reconcileBlocksWith ({ logger, loadBlocksMeta, findBlocks, saveBlocks, getLatestBlock }) {
332+
function reconcileBlocksWith ({ loadBlocksMeta, findBlocks, saveBlocks }) {
333333
findBlocks = fromPromise(findBlocksSchema.implement(findBlocks))
334334
saveBlocks = fromPromise(saveBlocksSchema.implement(saveBlocks))
335335
loadBlocksMeta = fromPromise(loadBlocksMetaSchema.implement(loadBlocksMeta))
336-
getLatestBlock = fromPromise(getLatestBlockSchema.implement(getLatestBlock))
337336

338337
return ({ min, maxTimestamp }) => {
339338
/**
@@ -349,25 +348,6 @@ function reconcileBlocksWith ({ logger, loadBlocksMeta, findBlocks, saveBlocks,
349348
.map((fromDb) => findMissingBlocksIn(fromDb, { min, maxTimestamp }))
350349
.chain((missingRange) => {
351350
if (!missingRange) return Resolved(fromDb)
352-
const latestBlocksMatch = missingRange?.min === fromDb?.[fromDb?.length - 1]?.height
353-
if (latestBlocksMatch) {
354-
logger('Latest blocks match at height %d. Checking Arweave for latest block', missingRange.min)
355-
return of()
356-
.chain(getLatestBlock)
357-
.chain((latestBlock) => {
358-
if (latestBlock === missingRange?.min) {
359-
logger('Latest block matches missing range min height %d. Bypassing GQL call', missingRange.min)
360-
return Resolved(fromDb)
361-
}
362-
logger('Latest blocks do not match (arweave: %d, db: %d). Fetching missing blocks from gateway', latestBlock, missingRange.min)
363-
return Rejected(missingRange)
364-
})
365-
}
366-
return Rejected(missingRange)
367-
})
368-
.bichain((missingRange) => {
369-
if (!missingRange) return Resolved(fromDb)
370-
logger('Loading missing blocks within range of %j', missingRange)
371351

372352
/**
373353
* Load any missing blocks within the determined range,
@@ -382,7 +362,7 @@ function reconcileBlocksWith ({ logger, loadBlocksMeta, findBlocks, saveBlocks,
382362
*/
383363
.chain((fromGateway) => saveBlocks(fromGateway).map(() => fromGateway))
384364
.map((fromGateway) => mergeBlocks(fromDb, fromGateway))
385-
}, Resolved)
365+
})
386366
})
387367
}
388368
}
@@ -519,10 +499,10 @@ function loadScheduledMessagesWith ({ loadMessages, logger }) {
519499
)
520500
}
521501

522-
function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, loadTransactionData, saveBlocks, getLatestBlock, logger }) {
502+
function loadCronMessagesWith ({ loadTimestamp, findBlocks, loadBlocksMeta, loadTransactionData, saveBlocks, logger }) {
523503
loadTimestamp = fromPromise(loadTimestampSchema.implement(loadTimestamp))
524504

525-
const reconcileBlocks = reconcileBlocksWith({ logger, findBlocks, loadBlocksMeta, saveBlocks, getLatestBlock })
505+
const reconcileBlocks = reconcileBlocksWith({ findBlocks, loadBlocksMeta, saveBlocks })
526506

527507
return (ctx) => of(ctx)
528508
.chain(parseCrons)

servers/cu/src/effects/ao-block.js

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import pMap from 'p-map'
55
import CircuitBreaker from 'opossum'
66

77
import { blockSchema } from '../domain/model.js'
8-
import { backoff, strFromFetchError } from '../domain/utils.js'
8+
import { backoff, okRes, strFromFetchError } from '../domain/utils.js'
99
import { BLOCKS_TABLE } from './db.js'
1010

1111
const blockDocSchema = z.object({
@@ -94,15 +94,6 @@ export function findBlocksWith ({ db }) {
9494
}
9595
}
9696

97-
export function getLatestBlockWith ({ ARWEAVE_URL }) {
98-
return () => {
99-
return of(ARWEAVE_URL)
100-
.chain(fromPromise((url) => fetch(url).then(res => res.json())))
101-
.map(path(['height']))
102-
.toPromise()
103-
}
104-
}
105-
10697
/**
10798
* @typedef Env2
10899
* @property {fetch} fetch
@@ -117,7 +108,7 @@ export function getLatestBlockWith ({ ARWEAVE_URL }) {
117108
* @returns {LoadBlocksMeta}
118109
*/
119110
export function loadBlocksMetaWith ({
120-
fetch, GRAPHQL_URLS, pageSize, logger, gatewayCounter, breakerOptions = {
111+
fetch, GRAPHQL_URLS, pageSize, logger, breakerOptions = {
121112
timeout: 10000, // 10 seconds timeout
122113
errorThresholdPercentage: 50, // open circuit after 50% failures
123114
resetTimeout: 15000, // attempt to close circuit after 15 seconds
@@ -176,14 +167,7 @@ export function loadBlocksMetaWith ({
176167
},
177168
retry
178169
)
179-
.then((res) => {
180-
if (res.ok) {
181-
gatewayCounter.inc(1, { query_name: 'GetBlocks', result: 'success' })
182-
return res
183-
}
184-
gatewayCounter.inc(1, { query_name: 'GetBlocks', result: 'error' })
185-
throw res
186-
})
170+
.then(okRes)
187171
.catch(async (e) => {
188172
logger(
189173
'Error Encountered when fetching page of block metadata from gateway with minBlock \'%s\' and maxTimestamp \'%s\'',
@@ -280,8 +264,8 @@ export function loadBlocksMetaWith ({
280264

281265
return (args) =>
282266
of(args)
283-
.chain(fromPromise(({ min, maxTimestamp }) => {
284-
return fetchAllPages({ min, maxTimestamp })
267+
.chain(fromPromise(({ min, maxTimestamp }) =>
268+
fetchAllPages({ min, maxTimestamp })
285269
.then(prop('edges'))
286270
.then(pluck('node'))
287271
.then(map(block => ({
@@ -292,6 +276,6 @@ export function loadBlocksMetaWith ({
292276
*/
293277
timestamp: block.timestamp * 1000
294278
})))
295-
}))
279+
))
296280
.toPromise()
297281
}

servers/cu/src/effects/ao-process.js

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -828,7 +828,6 @@ export function findLatestProcessMemoryWith ({
828828
findFileCheckpointBefore,
829829
findRecordCheckpointBefore,
830830
address,
831-
gatewayCounter,
832831
queryGateway,
833832
queryCheckpointGateway,
834833
loadTransactionData,
@@ -1473,7 +1472,6 @@ export function saveCheckpointWith ({
14731472
readProcessMemoryFile,
14741473
queryCheckpointGateway,
14751474
queryGateway,
1476-
gatewayCounter,
14771475
hashWasmMemory,
14781476
buildAndSignDataItem,
14791477
uploadDataItem,

servers/cu/src/effects/arweave.js

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ export function buildAndSignDataItemWith ({ WALLET, createDataItem = createData
5050
* @param {Env1} env
5151
* @returns {LoadTransactionMeta}
5252
*/
53-
export function loadTransactionMetaWith ({ gatewayCounter, fetch, GRAPHQL_URL, logger }) {
53+
export function loadTransactionMetaWith ({ fetch, GRAPHQL_URL, logger }) {
5454
// TODO: create a dataloader and use that to batch load contracts
5555

5656
const GET_PROCESSES_QUERY = `
@@ -123,15 +123,11 @@ export function loadTransactionMetaWith ({ gatewayCounter, fetch, GRAPHQL_URL, l
123123
.then(transactionConnectionSchema.parse)
124124
.then(path(['data', 'transactions', 'edges', '0', 'node']))
125125
.then((node) => {
126-
if (node) {
127-
gatewayCounter.inc(1, { query_name: 'GetTransactionMeta', result: 'success' })
128-
return node
129-
}
126+
if (node) return node
130127
logger('Transaction "%s" was not found on gateway', id)
131128
// TODO: better error handling
132129
const err = new Error(`Transaction '${id}' not found on gateway`)
133130
err.status = 404
134-
gatewayCounter.inc(1, { query_name: 'GetTransactionMeta', result: 'error' })
135131
throw err
136132
})
137133
))

servers/cu/src/effects/main.cu.js

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -162,18 +162,6 @@ export const createEffects = async (ctx) => {
162162

163163
const gauge = MetricsClient.gaugeWith({})
164164

165-
const evaluationCounter = MetricsClient.counterWith({})({
166-
name: 'ao_process_total_evaluations',
167-
description: 'The total number of evaluations on a CU',
168-
labelNames: ['stream_type', 'message_type', 'process_error']
169-
})
170-
171-
const gatewayCounter = MetricsClient.counterWith({})({
172-
name: 'ao_process_total_graphql_queries',
173-
description: 'The total number of GraphQL queries on a CU',
174-
labelNames: ['query_name', 'result']
175-
})
176-
177165
const readProcessMemoryFile = AoProcessClient.readProcessMemoryFileWith({
178166
DIR: ctx.PROCESS_MEMORY_CACHE_FILE_DIR,
179167
readFile
@@ -228,7 +216,6 @@ export const createEffects = async (ctx) => {
228216
const saveCheckpoint = AoProcessClient.saveCheckpointWith({
229217
address,
230218
readProcessMemoryFile,
231-
gatewayCounter,
232219
queryGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger: ctx.logger }),
233220
queryCheckpointGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL, logger: ctx.logger }),
234221
hashWasmMemory: WasmClient.hashWasmMemoryWith({ logger: ctx.logger }),
@@ -294,6 +281,12 @@ export const createEffects = async (ctx) => {
294281
const loadMemoryUsage = () => process.memoryUsage()
295282
const loadProcessCacheUsage = () => wasmMemoryCache.data.loadProcessCacheUsage()
296283

284+
const evaluationCounter = MetricsClient.counterWith({})({
285+
name: 'ao_process_total_evaluations',
286+
description: 'The total number of evaluations on a CU',
287+
labelNames: ['stream_type', 'message_type', 'process_error']
288+
})
289+
297290
/**
298291
* TODO: Gas can grow to a huge number. We need to make sure this doesn't crash when that happens
299292
*/
@@ -306,7 +299,7 @@ export const createEffects = async (ctx) => {
306299
const BLOCK_GRAPHQL_ARRAY = ctx.GRAPHQL_URLS.length > 0 ? ctx.GRAPHQL_URLS : [ctx.GRAPHQL_URL]
307300

308301
const common = (logger) => ({
309-
loadTransactionMeta: ArweaveClient.loadTransactionMetaWith({ fetch: ctx.fetch, gatewayCounter, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }),
302+
loadTransactionMeta: ArweaveClient.loadTransactionMetaWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }),
310303
loadTransactionData: ArweaveClient.loadTransactionDataWith({ fetch: ctx.fetch, ARWEAVE_URL: ctx.ARWEAVE_URL, logger }),
311304
isProcessOwnerSupported: AoProcessClient.isProcessOwnerSupportedWith({ ALLOW_OWNERS: ctx.ALLOW_OWNERS }),
312305
findProcess: AoProcessClient.findProcessWith({ db, logger }),
@@ -318,7 +311,6 @@ export const createEffects = async (ctx) => {
318311
findFileCheckpointBefore: AoProcessClient.findFileCheckpointBeforeWith({ db }),
319312
findRecordCheckpointBefore: AoProcessClient.findRecordCheckpointBeforeWith({ db }),
320313
address,
321-
gatewayCounter,
322314
queryGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }),
323315
queryCheckpointGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL, logger }),
324316
PROCESS_IGNORE_ARWEAVE_CHECKPOINTS: ctx.PROCESS_IGNORE_ARWEAVE_CHECKPOINTS,
@@ -338,14 +330,12 @@ export const createEffects = async (ctx) => {
338330
logger
339331
}),
340332
evaluationCounter,
341-
gatewayCounter,
342333
// gasCounter,
343334
saveProcess: AoProcessClient.saveProcessWith({ db, logger }),
344335
findEvaluation: AoEvaluationClient.findEvaluationWith({ db, logger }),
345336
saveEvaluation: AoEvaluationClient.saveEvaluationWith({ db, logger }),
346337
findBlocks: AoBlockClient.findBlocksWith({ db, logger }),
347338
saveBlocks: AoBlockClient.saveBlocksWith({ db, logger }),
348-
getLatestBlock: AoBlockClient.getLatestBlockWith({ ARWEAVE_URL: ctx.ARWEAVE_URL }),
349339
loadBlocksMeta: AoBlockClient.loadBlocksMetaWith({ fetch: ctx.fetch, GRAPHQL_URLS: BLOCK_GRAPHQL_ARRAY, pageSize: 90, logger }),
350340
findModule: AoModuleClient.findModuleWith({ db, logger }),
351341
saveModule: AoModuleClient.saveModuleWith({ db, logger }),
@@ -376,7 +366,6 @@ export const createEffects = async (ctx) => {
376366
loadTimestamp: AoSuClient.loadTimestampWith({ fetch: ctx.fetch, logger }),
377367
loadProcess: AoSuClient.loadProcessWith({ fetch: ctx.fetch, logger }),
378368
loadMessages: AoSuClient.loadMessagesWith({
379-
gatewayCounter,
380369
hashChain: (...args) => hashChainWorker.exec('hashChain', args),
381370
fetch: ctx.fetch,
382371
pageSize: 1000,

servers/cu/src/effects/main.hb.js

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -139,18 +139,6 @@ export const createEffects = async (ctx) => {
139139

140140
const gauge = MetricsClient.gaugeWith({})
141141

142-
const evaluationCounter = MetricsClient.counterWith({})({
143-
name: 'ao_process_total_evaluations',
144-
description: 'The total number of evaluations on a CU',
145-
labelNames: ['stream_type', 'message_type', 'process_error']
146-
})
147-
148-
const gatewayCounter = MetricsClient.counterWith({})({
149-
name: 'ao_process_total_graphql_queries',
150-
description: 'The total number of GraphQL queries on a CU',
151-
labelNames: ['query_name', 'result']
152-
})
153-
154142
const readProcessMemoryFile = AoProcessClient.readProcessMemoryFileWith({
155143
DIR: ctx.PROCESS_MEMORY_CACHE_FILE_DIR,
156144
readFile
@@ -205,7 +193,6 @@ export const createEffects = async (ctx) => {
205193
const saveCheckpoint = AoProcessClient.saveCheckpointWith({
206194
address,
207195
readProcessMemoryFile,
208-
gatewayCounter,
209196
queryGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger: ctx.logger }),
210197
queryCheckpointGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL, logger: ctx.logger }),
211198
hashWasmMemory: WasmClient.hashWasmMemoryWith({ logger: ctx.logger }),
@@ -271,6 +258,12 @@ export const createEffects = async (ctx) => {
271258
const loadMemoryUsage = () => process.memoryUsage()
272259
const loadProcessCacheUsage = () => wasmMemoryCache.data.loadProcessCacheUsage()
273260

261+
const evaluationCounter = MetricsClient.counterWith({})({
262+
name: 'ao_process_total_evaluations',
263+
description: 'The total number of evaluations on a CU',
264+
labelNames: ['stream_type', 'message_type', 'process_error']
265+
})
266+
274267
/**
275268
* TODO: Gas can grow to a huge number. We need to make sure this doesn't crash when that happens
276269
*/
@@ -283,7 +276,7 @@ export const createEffects = async (ctx) => {
283276
const BLOCK_GRAPHQL_ARRAY = ctx.GRAPHQL_URLS.length > 0 ? ctx.GRAPHQL_URLS : [ctx.GRAPHQL_URL]
284277

285278
const common = (logger) => ({
286-
loadTransactionMeta: ArweaveClient.loadTransactionMetaWith({ gatewayCounter, fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }),
279+
loadTransactionMeta: ArweaveClient.loadTransactionMetaWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }),
287280
loadTransactionData: ArweaveClient.loadTransactionDataWith({ fetch: ctx.fetch, ARWEAVE_URL: ctx.ARWEAVE_URL, logger }),
288281
isProcessOwnerSupported: AoProcessClient.isProcessOwnerSupportedWith({ ALLOW_OWNERS: ctx.ALLOW_OWNERS }),
289282
findProcess: AoProcessClient.findProcessWith({ db, logger }),
@@ -295,7 +288,6 @@ export const createEffects = async (ctx) => {
295288
findFileCheckpointBefore: AoProcessClient.findFileCheckpointBeforeWith({ db }),
296289
findRecordCheckpointBefore: AoProcessClient.findRecordCheckpointBeforeWith({ db }),
297290
address,
298-
gatewayCounter,
299291
queryGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.GRAPHQL_URL, logger }),
300292
queryCheckpointGateway: ArweaveClient.queryGatewayWith({ fetch: ctx.fetch, GRAPHQL_URL: ctx.CHECKPOINT_GRAPHQL_URL, logger }),
301293
PROCESS_IGNORE_ARWEAVE_CHECKPOINTS: ctx.PROCESS_IGNORE_ARWEAVE_CHECKPOINTS,
@@ -315,14 +307,12 @@ export const createEffects = async (ctx) => {
315307
logger
316308
}),
317309
evaluationCounter,
318-
gatewayCounter,
319310
// gasCounter,
320311
saveProcess: AoProcessClient.saveProcessWith({ db, logger }),
321312
findEvaluation: AoEvaluationClient.findEvaluationWith({ db, logger }),
322313
saveEvaluation: AoEvaluationClient.saveEvaluationWith({ db, logger }),
323314
findBlocks: AoBlockClient.findBlocksWith({ db, logger }),
324315
saveBlocks: AoBlockClient.saveBlocksWith({ db, logger }),
325-
getLatestBlock: AoBlockClient.getLatestBlockWith({ ARWEAVE_URL: ctx.ARWEAVE_URL }),
326316
loadBlocksMeta: AoBlockClient.loadBlocksMetaWith({ fetch: ctx.fetch, GRAPHQL_URLS: BLOCK_GRAPHQL_ARRAY, pageSize: 90, logger }),
327317
findModule: AoModuleClient.findModuleWith({ db, logger }),
328318
saveModule: AoModuleClient.saveModuleWith({ db, logger }),
@@ -353,7 +343,6 @@ export const createEffects = async (ctx) => {
353343
loadTimestamp: HbClient.loadTimestampWith({ fetch: ctx.fetch, logger }),
354344
loadProcess: HbClient.loadProcessWith({ fetch: ctx.fetch, logger }),
355345
loadMessages: HbClient.loadMessagesWith({
356-
gatewayCounter,
357346
hashChain: (...args) => hashChainWorker.exec('hashChain', args),
358347
fetch: ctx.fetch,
359348
pageSize: 1000,

0 commit comments

Comments
 (0)