Refactored code - fix "manual" JSON bug

This commit is contained in:
evm
2021-04-23 12:44:30 -04:00
parent d82db10046
commit 512f988d58

View File

@@ -16,80 +16,143 @@
#
# HAVE A NICE DAY.
import json
import requests
from urllib.parse import urljoin
from collections import defaultdict
from . import util
from .error import AllstarPackageError
#class for interacting with ALLSTAR repository
#can use alternate URL if you have cloned ALLSTAR locally
BASE_URL = "https://allstar.jhuapl.edu"
#Example:
#import allstar
#
#repo = allstar.AllstarRepo("armel")
#for pkg in repo.packages():
# for b in pkg.binaries():
class AllstarRepo(object):
def __init__(self, arch, base_url='https://allstar.jhuapl.edu'):
class Repo(object):
"""A Repo represents an architecture repository of the ALLSTAR dataset.
The ALLSTAR site will return JSON information about the repository and
all packages in that repository. This class is a simple wrapper that
can query the appropriate URLs and return Python objects.
Example:
r = Repo('amd64')
for pkg in r.package_list():
for binary in r.package_binaries(pkg):
process(binary)
"""
def __init__(self, arch, base_url=BASE_URL):
"""Inits Repo class for a specified architecture.
Args:
arch: Architecture to query. Valid architectures are:
'amd64', 'armel', 'i386', 'mipsel', 'ppc64el', and 's390x'
"""
self.arch = arch
self.base_url = base_url
self.rsession = requests.Session()
self.packages = {}
self.packages_by_name = {}
self.packages_by_part = defaultdict(list)
self._generate_package_list()
def _generate_package_list(self):
for part in range(1,5):
url = urljoin(self.base_url, '/repo/jessie-list-p{}-final.txt'.format(part))
for part in range(1, 5):
url = urljoin(self.base_url,
'/repo/jessie-list-p{}-final.txt'.format(part))
r = self.rsession.get(url)
for pkg in r.text.split():
self.packages[pkg] = part
self.packages_by_name[pkg] = part
self.packages_by_part[part].append(pkg)
def packages(self):
"""Get a list of all packages that are in the repo.
Returns:
A list of strings with the names of all packages.
"""
return list(self.packages_by_name.keys())
def package(self, pkg):
return Package(pkg, self.arch,
self.base_url)
def _package_part(self, pkg):
return self.packages[pkg]
class Package(object):
"""A Package represents a package in the ALLSTAR dataset.
"""
def _package_index(self, pkg):
pkg_url = urljoin(self.base_url, '/repo/p{}/{}/{}/'.format(self._package_part(pkg),
self.arch, pkg))
index_url = urljoin(pkg_url, 'index.json')
return self.rsession.get(index_url).json()
def __init__(self, name, arch, base_url=BASE_URL):
self.name = name
self.arch = arch
self.part = util.package_part(self.name)
self.base = urljoin(base_url,
f'/repo/p{self.part}/{self.arch}/{self.name}/')
def package_list(self):
return list(self.packages.keys())
self.rsession = requests.Session()
index_url = urljoin(self.base, 'index.json')
resp = self.rsession.get(index_url)
if resp.status_code == 404:
raise AllstarPackageError(f'No such package: {self.name}')
index_json = resp.text
try:
self.index = json.loads(index_json)
except json.JSONDecodeError:
index_json = self._fix_index(index_json)
self.index = json.loads(index_json)
def package_binaries(self, pkg):
binaries = []
index = self._package_index(pkg)
self.documentation = self.index['documentation']
self.binaries = self.index['binaries']
for i in range(0, len(index['binaries'])):
f = index['binaries'][i]['file']
binary_url = urljoin(self.base_url,
'/repo/p{}/{}/{}/{}'.format(self._package_part(pkg),
self.arch, pkg, f))
r = self.rsession.get(binary_url)
binaries.append({'name': index['binaries'][i]['name'],
def _fix_index(self, index):
"""Deal with buggy json generation from ALLSTAR
where having multiple "manual" entries was improperly handled.
Have to change multiple "manual" entries to a json list
"""
index_offset = 0
while True:
mans_start = index.find('"manual": ', index_offset)
if mans_start == -1:
break
mans_start = mans_start + len('"manual": ')
# Need to skip the '"' that's at start
mans_end = index.find('"', mans_start + 1)
if mans_end == -1:
break
# Want the '"' at the end
mans_end = mans_end + 1
mans = index[mans_start:mans_end]
fixed_mans = '",\n"'.join(mans.split('\n'))
index = f'{index[:mans_start]} [ {fixed_mans} ]\n {index[mans_end:]}'
index_offset = mans_end
return index
def has_binaries(self):
return len(self.binaries) > 0
def get_binaries(self):
ret = []
for b in self.binaries:
name = b['name']
url = urljoin(self.base, f'{name}')
r = self.rsession.get(url)
ret.append({'name': name,
'content': r.content})
return ret
return binaries
def package_gimples(self, pkg):
gimples = []
index = self._package_index(pkg)
for i in range(0, len(index['binaries'])):
for j in range(0, len(index['binaries'][i]['units'])):
if('gimple' in index['binaries'][i]['units'][j]):
g = index['binaries'][i]['units'][j]['gimple']
gimple_url = urljoin(self.base_url,
'/repo/p{}/{}/{}/{}'.format(self._package_part(pkg),
self.arch,
pkg,
g))
r = self.rsession.get(gimple_url)
gimples.append({'name': index['binaries'][i]['units'][j]['name'],
def get_gimples(self):
ret = []
for b in self.binaries:
for u in b['units']:
if 'gimple' in u:
name = u['gimple']
url = urljoin(self.base, name)
r = self.rsession.get(url)
ret.append({'name': name,
'content': r.content})
return gimples
return ret