#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright (c) 2012 Sofian Brabez <sbz@FreeBSD.org> # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # # $FreeBSD$ # # MAINTAINER= sbz@FreeBSD.org import argparse import codecs import os import re import ssl import sys if sys.version_info.major == 3: import urllib.request as urllib2 else: import urllib2 """ FreeBSD getpatch handles Gnats and Bugzilla patch attachments """ def create_ssl_context(cafile): if os.path.exists(cafile): return ssl.create_default_context(cafile=cafile) else: return ssl._create_unverified_context() class GetPatch(object): def __init__(self, pr, category='ports'): self.pr = pr self.category = category self.patchs = [] self.url = "" self.patch = "" self.output_stdout = False self.default_locale = sys.getdefaultencoding() self.ssl_context = create_ssl_context('/usr/local/etc/ssl/cert.pem') def fetch(self, *largs, **kwargs): raise NotImplementedError() def write(self, filename, data): if filename.endswith(('.patch', '.txt')): filename = "{}.diff".format(filename[:filename.rindex('.')]) f = codecs.open(filename, encoding=self.default_locale, mode='w') f.write(data.decode(self.default_locale)) f.close() self.out("[+] {} created".format(filename)) def get(self, only_last=False, output_stdout=False): self.output_stdout = output_stdout self.fetch(self.pr, category=self.category) if len(self.patchs) == 0: self.out("[-] No patch found") sys.exit(os.EX_UNAVAILABLE) if only_last: self.patchs = [self.patchs.pop()] for patch in self.patchs: url = patch['url'] p = patch['name'] data = urllib2.urlopen(url, context=self.ssl_context).read() if self.output_stdout: sys.stdout.write(data.decode(self.default_locale)) else: self.write(p, data) def add_patch(self, url, name): self.patchs.append({'url': url, 'name': name}) def out(self, s): if not self.output_stdout: print(s) class GnatsGetPatch(GetPatch): URL_BASE = 'https://www.freebsd.org/cgi' URL = '{}/query-pr.cgi?pr='.format(URL_BASE) REGEX = r'<b>Download <a href="([^"]*)">([^<]*)</a>' def __init__(self, pr, category): GetPatch.__init__(self, pr, category) def fetch(self, *largs, **kwargs): category = kwargs['category'] target = ("{}/{}".format(category, self.pr), "{}".format(self.pr))[category == ''] self.out("[+] Fetching patch for pr {}".format(target)) pattern = re.compile(self.REGEX) u = urllib2.urlopen("{}{}".format(self.URL, target), context=self.ssl_context) data = u.read() if data is None: self.out("[-] No patch found") sys.exit(os.EX_UNAVAILABLE) for patchs in re.findall(pattern, str(data)): self.add_patch(patchs[0], patchs[1]) class BzGetPatch(GetPatch): URL_BASE = 'https://bugs.freebsd.org/bugzilla/' URL_SHOW = '{}/show_bug.cgi?id='.format(URL_BASE) REGEX_ATTACHMENTS_TABLE = r'<table id="attachment_table">(.*?)</table>' REGEX_ATTACHMENT_TR = r'(<tr id="a\d+"[^<]+>.*?</tr>)' REGEX_URL = r'<a href="([^<]+)">Details</a>' REGEX = r'<div class="details">([^ ]+) \(text/plain(?:; charset=[-\w]+)?\)' def __init__(self, pr, category): GetPatch.__init__(self, pr, category) def _get_patch_name(self, url): data = urllib2.urlopen(url, context=self.ssl_context).read() match = re.search(self.REGEX, str(data)) if match is None: return None return match.group(1) def _get_patch_url(self, data): for url in re.findall(self.REGEX_URL, str(data)): url = '{}{}'.format(self.URL_BASE, url) file_name = self._get_patch_name(url) if file_name is None: msg = "[-] Could not determine the patch file name in {}." \ "Skipping." self.out(msg.format(url)) continue download_url = url[:url.find('&')] return download_url, file_name def _get_patch_urls(self, data): patch_urls = {} match = re.search(self.REGEX_ATTACHMENTS_TABLE, str(data), re.DOTALL) if match is None: return patch_urls table = match.group(1) for tr in re.findall(self.REGEX_ATTACHMENT_TR, str(data), re.DOTALL): if (tr.find('bz_tr_obsolete') >= 0): continue download_url, file_name = self._get_patch_url(tr) patch_urls[download_url] = file_name return patch_urls def fetch(self, *largs, **kwargs): category = kwargs['category'] target = ("{}/{}".format(category, self.pr), "{}".format(self.pr))[category == ''] self.out("[+] Fetching patch for pr {}".format(target)) u = urllib2.urlopen("{}{}".format(self.URL_SHOW, self.pr), context=self.ssl_context) data = u.read() if data is None: self.out("[-] No patch found") sys.exit(os.EX_UNAVAILABLE) patch_urls = self._get_patch_urls(data) if not patch_urls: self.out("[-] No patch found") sys.exit(os.EX_UNAVAILABLE) for url, file_name in patch_urls.items(): self.add_patch(url, file_name) def main(): parser = argparse.ArgumentParser( description='Gets patch attachments from a Bug Tracking System' ) parser.add_argument('pr', metavar='pr', type=str, nargs=1, help='Pr id number') parser.add_argument('--mode', type=str, choices=['gnats', 'bz'], default='bz', help='available modes to retrieve patch') parser.add_argument('--last', action='store_true', help='only retrieve the latest iteration of a patch') parser.add_argument('--stdout', action='store_true', help='dump patch on stdout') if len(sys.argv) == 1: parser.print_help() sys.exit(os.EX_USAGE) args = parser.parse_args() category = "" pr = str(args.pr[0]) if pr and '/' in pr: category, pr = pr.split('/') Clazz = globals()['%sGetPatch' % args.mode.capitalize()] gp = Clazz(pr, category) gp.get(only_last=args.last, output_stdout=args.stdout) return os.EX_OK if __name__ == '__main__': sys.exit(main())