async function* queryAskStream() {
const concurrency = 25
const callQueryAskStream = transform(concurrency, async miner => {
const startTime = new Date()
try {
const peerId = minerPeerIds.get(miner)
const timeoutTimer = delay => new Promise(resolve => setTimeout(resolve, delay))
const queryAsk = await Promise.race([
client.clientQueryAsk(peerId, miner),
timeoutTimer(10000)
])
const endTime = new Date()
if (!queryAsk) {
throw new Error('Timed out')
}
return {
miner,
seqNo: queryAsk.SeqNo,
askTimestamp: queryAsk.Timestamp,
price: queryAsk.Price,
verifiedPrice: queryAsk.VerifiedPrice,
minPieceSize: queryAsk.MinPieceSize,
maxPieceSize: queryAsk.MaxPieceSize,
expiry: queryAsk.Expiry,
startTime: startTime.toISOString(),
endTime: endTime.toISOString()
}
} catch (e) {
const endTime = new Date()
console.error('Query ask error', miner, e)
return {
miner,
error: e.message,
startTime: startTime.toISOString(),
endTime: endTime.toISOString()
}
}
})
const startTime = new Date()
let counter = 0
let errors = 0
for await (const ask of callQueryAskStream(selectedMinerIndexes)) {
const now = new Date()
if (ask.error) errors++
yield {
counter,
errors,
epoch: selectedEpoch,
...ask
}
counter++
if (now - startTime > maxElapsed) {
yield {
done: true,
timeout: true,
counter,
errors
}
return
}
}
}