async function* lookupIpsStream() {
const concurrency = 15
const callGeoLookupsStream = transform(concurrency, async ip => {
try {
const geoLookup = await (await fetch(`${geoApiBaseUrl}/geolite2/${ip}`)).json()
return {
ip,
continent: geoLookup.continent ? geoLookup.continent.code : null,
country: geoLookup.country ? geoLookup.country.isoCode : null,
subdiv1: geoLookup.subdivisions && geoLookup.subdivisions.length > 0 ? geoLookup.subdivisions[0].isoCode : null,
city: geoLookup.city && geoLookup.city.names ? geoLookup.city.names.en : null,
long: geoLookup.location ? geoLookup.location.longitude : null,
lat: geoLookup.location ? geoLookup.location.latitude : null,
geolite2: geoLookup
}
} catch (e) {
console.info('IP lookup error', ip, e.message)
return {}
}
})
const startTime = new Date()
let counter = 0
let hits = 0
let errors = 0
for await (const geoLookup of callGeoLookupsStream([...newIps])) {
const now = new Date()
if (now - startTime > maxElapsed) {
yield {
done: true,
timeout: true,
counter,
hits,
errors
}
return
}
if (geoLookup.ip) {
hits++
yield {
counter,
hits,
errors,
...geoLookup
}
} else {
errors++
yield {
counter,
hits,
errors,
}
}
counter++
}
yield {
done: true,
counter,
hits,
errors
}
}