async function* messagesStream() {
let hits = 0
let messagesProcessed = 0
let aborted = false
const startTime = new Date()
for await (const tipSetRecord of tipSetStream()) {
const seenMessages = new Set()
const selectedTipSet = tipSetRecord.tipSet
let cso
let csoStartTime
if (selectedTipSet.Cids) {
const height = selectedTipSet.Height
console.log(`${height}, ${currentHeight - height} remaining`)
for (let i = 0; i < selectedTipSet.Cids.length; i++) {
const blockCid = selectedTipSet.Cids[i]
const blockMiner = selectedTipSet.Blocks[i].Miner
const now = new Date()
if (now - startTime > maxElapsed) {
yield {
done: true,
timeout: true
}
return
}
yield {
height,
blockMiner,
i,
length: selectedTipSet.Cids.length,
startHeight: selectedHeight,
endHeight: currentHeight,
messagesProcessed,
hits
}
try {
const messages = await client.chainGetBlockMessages(blockCid)
for (const message of messages.BlsMessages) {
yield *yieldMessage(message, 'bls')
if (aborted) return
}
} catch (e) {
console.error('messages error', height, e)
}
/* Non-miners shouldn't publish deals
for (const { Message: message } of messages.SecpkMessages) {
yield *yieldMessage(message, 'secpk')
}
*/
async function *yieldMessage (message, signatureType) {
messagesProcessed++
if (message.To === 'f05' && message.Method === 4) {
console.log('JimX message', message)
const blockCidStr = blockCid['/']
const messageCidStr = message.CID['/']
if (seenMessages.has(messageCidStr)) return
hits++
seenMessages.add(messageCidStr)
if (!cso) {
// Compute state to get results
console.log('StateCompute', height)
csoStartTime = new Date()
cso = client.stateCompute(height, null, selectedTipSet.Cids)
}
const timeoutTimer = new Promise(
(resolve, reject) =>
setTimeout(() => resolve({ timeout: 1 }), apiTimeout)
)
const results = await Promise.race([timeoutTimer, cso])
const elapsed = ((new Date()) - csoStartTime) / 1000
if (results?.timeout) {
console.log('Jim timeout', elapsed )
yield {
done: true,
abort: true,
timeout: true
}
aborted = true
return
}
const trace = results.Trace.filter(({ MsgCid }) => MsgCid['/'] === messageCidStr)
console.log('StateCompute done', height, results.Trace.length, elapsed)
if (trace.length > 0 && trace[0].MsgRct.Return) {
yield {
height,
messageCid: messageCidStr,
// signatureType,
blockCid: blockCidStr,
version: message.Version,
to: message.To,
from: message.From,
nonce: message.Nonce,
value: message.Value,
gasLimit: message.GasLimit,
gasFeeCap: message.GasFeeCap,
gasPremium: message.GasPremium,
method: message.Method,
params: message.Params,
decodedDeals: decodeDeals(message.Params),
results: cbor.decode(trace[0].MsgRct.Return, 'base64')[0]
}
} else {
console.error('Missing or broken trace', height, messageCidStr)
}
}
}
}
}
}
yield { done: true }
}