paper_hashtag_federation/instance_scanner/query_domains_ips.py

106 lines
3.4 KiB
Python

#!/usr/bin/env python3
import json
import asyncio
import csv
from sys import stderr
from urllib.request import urlopen, Request
from collections import namedtuple
import aiodns
from picklecache import cache_return
@cache_return
def get_instances_list(token):
"""queries list of all instances from instances.social directory
returns list of dictionaries, each describing an instance"""
instances = list()
API_ENDPOINT_LIST = "https://instances.social/api/1.0/instances/list"
paginated = True
page_args = ''
while paginated:
req = Request(
API_ENDPOINT_LIST + page_args,
headers={"Authorization": "Bearer " + token}, method='GET')
resp = urlopen(req)
data = json.loads(resp.read())
try:
page_args = "?min_id=" + data["pagination"]["next_id"]
# exception thrown if ne next page ID is given
paginated = True
except KeyError:
page_args = ''
paginated = False
instances += data["instances"]
return instances
def query_instance_ip(instances):
loop = asyncio.get_event_loop()
# instantiate an aiodns resolver
resolver = aiodns.DNSResolver(loop=loop)
# limit parallel queries for not overloading the resolver
batch_size = asyncio.Semaphore(11)
# create list of query tasks
tasks = [
sem_query(i['name'], 'AAAA', resolver, batch_size)
for i in instances
]
# schedule and execute asynchronous coroutine that does the actual lookups
result = loop.run_until_complete(ip_lookup(tasks))
return result
async def sem_query(hostname, qtype, resolver, semaphore):
"""does a DNS lookup only if semaphore allows it"""
DNSResult = namedtuple('DNSResult', ('hostname', 'ips'))
async with semaphore:
try:
query_ips = list(map(lambda q: q.host, await resolver.query(hostname, qtype)))
if not query_ips:
# do not include records with empty IP address
raise aiodns.error.DNSError("empty record")
ips = []
# extract returned IP addresses from query result
for ip in query_ips:
ips.append(ip)
except (aiodns.error.DNSError, IndexError) as e:
# skip unresolvable hostnames
print(hostname + ":", e)
return None
return DNSResult(hostname=hostname, ips=ips)
async def ip_lookup(tasks):
queries = await asyncio.gather(*tasks)
print(len(queries), "Queries gathered")
results = filter(lambda x: x is not None, queries)
return list(results)
def main():
try:
with open("secret_token_instances.social.txt") as tokenfile:
instances = get_instances_list(token=tokenfile.readline().strip())
except IOError:
print("Error: API key for instances not found in ./secret_token_instances.social.txt", file=stderr)
# dump instance received data for further usage
with open("instances.json", "wt") as instancedump:
json.dump(instances, instancedump)
instance_ips = query_instance_ip(instances)
print("{} of {} instances have an IPv6 address.".format(
len(instance_ips), len(instances)))
with open("instance_ips.csv", "w", newline='') as csvfile:
ipwriter = csv.writer(csvfile)
ipwriter.writerows(instance_ips)
if __name__ == "__main__":
main()