1# SPDX-License-Identifier: GPL-2.0+
2# Copyright 2022 Google LLC
3# Copyright (C) 2022 Weidm��ller Interface GmbH & Co. KG
4# Stefan Herbrechtsmeier <stefan.herbrechtsmeier@weidmueller.com>
5#
6"""Base class for all bintools
7
8This defines the common functionality for all bintools, including running
9the tool, checking its version and fetching it if needed.
10"""
11
12import collections
13import glob
14import importlib
15import multiprocessing
16import os
17import shutil
18import tempfile
19import urllib.error
20
21from u_boot_pylib import command
22from u_boot_pylib import terminal
23from u_boot_pylib import tools
24from u_boot_pylib import tout
25
26BINMAN_DIR = os.path.dirname(os.path.realpath(__file__))
27
28# Format string for listing bintools, see also the header in list_all()
29FORMAT = '%-16.16s %-12.12s %-26.26s %s'
30
31# List of known modules, to avoid importing the module multiple times
32modules = {}
33
34# Possible ways of fetching a tool (FETCH_COUNT is number of ways)
35FETCH_ANY, FETCH_BIN, FETCH_BUILD, FETCH_COUNT = range(4)
36
37FETCH_NAMES = {
38    FETCH_ANY: 'any method',
39    FETCH_BIN: 'binary download',
40    FETCH_BUILD: 'build from source'
41    }
42
43# Status of tool fetching
44FETCHED, FAIL, PRESENT, STATUS_COUNT = range(4)
45
46class Bintool:
47    """Tool which operates on binaries to help produce entry contents
48
49    This is the base class for all bintools
50    """
51    # List of bintools to regard as missing
52    missing_list = []
53
54    # Directory to store tools. Note that this set up by set_tool_dir() which
55    # must be called before this class is used.
56    tooldir = ''
57
58    def __init__(self, name, desc, version_regex=None, version_args='-V'):
59        self.name = name
60        self.desc = desc
61        self.version_regex = version_regex
62        self.version_args = version_args
63
64    @staticmethod
65    def find_bintool_class(btype):
66        """Look up the bintool class for bintool
67
68        Args:
69            byte: Bintool to use, e.g. 'mkimage'
70
71        Returns:
72            The bintool class object if found, else a tuple:
73                module name that could not be found
74                exception received
75        """
76        # Convert something like 'u-boot' to 'u_boot' since we are only
77        # interested in the type.
78        module_name = btype.replace('-', '_')
79        module = modules.get(module_name)
80        class_name = f'Bintool{module_name}'
81
82        # Import the module if we have not already done so
83        if not module:
84            try:
85                module = importlib.import_module('binman.btool.' + module_name)
86            except ImportError as exc:
87                try:
88                    # Deal with classes which must be renamed due to conflicts
89                    # with Python libraries
90                    module = importlib.import_module('binman.btool.btool_' +
91                                                     module_name)
92                except ImportError:
93                    return module_name, exc
94            modules[module_name] = module
95
96        # Look up the expected class name
97        return getattr(module, class_name)
98
99    @staticmethod
100    def create(name):
101        """Create a new bintool object
102
103        Args:
104            name (str): Bintool to create, e.g. 'mkimage'
105
106        Returns:
107            A new object of the correct type (a subclass of Binutil)
108        """
109        cls = Bintool.find_bintool_class(name)
110        if isinstance(cls, tuple):
111            raise ValueError("Cannot import bintool module '%s': %s" % cls)
112
113        # Call its constructor to get the object we want.
114        obj = cls(name)
115        return obj
116
117    @classmethod
118    def set_tool_dir(cls, pathname):
119        """Set the path to use to store and find tools"""
120        cls.tooldir = pathname
121
122    def show(self):
123        """Show a line of information about a bintool"""
124        if self.is_present():
125            version = self.version()
126        else:
127            version = '-'
128        print(FORMAT % (self.name, version, self.desc,
129                        self.get_path() or '(not found)'))
130
131    @classmethod
132    def set_missing_list(cls, missing_list):
133        cls.missing_list = missing_list or []
134
135    @staticmethod
136    def get_tool_list(include_testing=False):
137        """Get a list of the known tools
138
139        Returns:
140            list of str: names of all tools known to binman
141        """
142        files = glob.glob(os.path.join(BINMAN_DIR, 'btool/*'))
143        names = [os.path.splitext(os.path.basename(fname))[0]
144                 for fname in files]
145        names = [name for name in names if name[0] != '_']
146        names = [name[6:] if name.startswith('btool_') else name
147                 for name in names]
148        if include_testing:
149            names.append('_testing')
150        return sorted(names)
151
152    @staticmethod
153    def list_all():
154        """List all the bintools known to binman"""
155        names = Bintool.get_tool_list()
156        print(FORMAT % ('Name', 'Version', 'Description', 'Path'))
157        print(FORMAT % ('-' * 15,'-' * 11, '-' * 25, '-' * 30))
158        for name in names:
159            btool = Bintool.create(name)
160            btool.show()
161
162    def is_present(self):
163        """Check if a bintool is available on the system
164
165        Returns:
166            bool: True if available, False if not
167        """
168        if self.name in self.missing_list:
169            return False
170        return bool(self.get_path())
171
172    def get_path(self):
173        """Get the path of a bintool
174
175        Returns:
176            str: Path to the tool, if available, else None
177        """
178        return tools.tool_find(self.name)
179
180    def fetch_tool(self, method, col, skip_present):
181        """Fetch a single tool
182
183        Args:
184            method (FETCH_...): Method to use
185            col (terminal.Color): Color terminal object
186            skip_present (boo;): Skip fetching if it is already present
187
188        Returns:
189            int: Result of fetch either FETCHED, FAIL, PRESENT
190        """
191        def try_fetch(meth):
192            res = None
193            try:
194                res = self.fetch(meth)
195            except urllib.error.URLError as uerr:
196                message = uerr.reason
197                print(col.build(col.RED, f'- {message}'))
198
199            except ValueError as exc:
200                print(f'Exception: {exc}')
201            return res
202
203        if skip_present and self.is_present():
204            return PRESENT
205        print(col.build(col.YELLOW, 'Fetch: %s' % self.name))
206        if method == FETCH_ANY:
207            for try_method in range(1, FETCH_COUNT):
208                print(f'- trying method: {FETCH_NAMES[try_method]}')
209                result = try_fetch(try_method)
210                if result:
211                    break
212        else:
213            result = try_fetch(method)
214        if not result:
215            return FAIL
216        if result is not True:
217            fname, tmpdir = result
218            dest = os.path.join(self.tooldir, self.name)
219            os.makedirs(self.tooldir, exist_ok=True)
220            print(f"- writing to '{dest}'")
221            shutil.move(fname, dest)
222            if tmpdir:
223                shutil.rmtree(tmpdir)
224        return FETCHED
225
226    @staticmethod
227    def fetch_tools(method, names_to_fetch):
228        """Fetch bintools from a suitable place
229
230        This fetches or builds the requested bintools so that they can be used
231        by binman
232
233        Args:
234            names_to_fetch (list of str): names of bintools to fetch
235
236        Returns:
237            True on success, False on failure
238        """
239        def show_status(color, prompt, names):
240            print(col.build(
241                color, f'{prompt}:%s{len(names):2}: %s' %
242                (' ' * (16 - len(prompt)), ' '.join(names))))
243
244        col = terminal.Color()
245        skip_present = False
246        name_list = names_to_fetch
247        if len(names_to_fetch) == 1 and names_to_fetch[0] in ['all', 'missing']:
248            name_list = Bintool.get_tool_list()
249            if names_to_fetch[0] == 'missing':
250                skip_present = True
251            print(col.build(col.YELLOW,
252                            'Fetching tools:      %s' % ' '.join(name_list)))
253        status = collections.defaultdict(list)
254        for name in name_list:
255            btool = Bintool.create(name)
256            result = btool.fetch_tool(method, col, skip_present)
257            status[result].append(name)
258            if result == FAIL:
259                if method == FETCH_ANY:
260                    print('- failed to fetch with all methods')
261                else:
262                    print(f"- method '{FETCH_NAMES[method]}' is not supported")
263
264        if len(name_list) > 1:
265            if skip_present:
266                show_status(col.GREEN, 'Already present', status[PRESENT])
267            show_status(col.GREEN, 'Tools fetched', status[FETCHED])
268            if status[FAIL]:
269                show_status(col.RED, 'Failures', status[FAIL])
270        return not status[FAIL]
271
272    def run_cmd_result(self, *args, binary=False, raise_on_error=True):
273        """Run the bintool using command-line arguments
274
275        Args:
276            args (list of str): Arguments to provide, in addition to the bintool
277                name
278            binary (bool): True to return output as bytes instead of str
279            raise_on_error (bool): True to raise a ValueError exception if the
280                tool returns a non-zero return code
281
282        Returns:
283            CommandResult: Resulting output from the bintool, or None if the
284                tool is not present
285        """
286        if self.name in self.missing_list:
287            return None
288        name = os.path.expanduser(self.name)  # Expand paths containing ~
289        all_args = (name,) + args
290        env = tools.get_env_with_path()
291        tout.debug(f"bintool: {' '.join(all_args)}")
292        result = command.run_pipe(
293            [all_args], capture=True, capture_stderr=True, env=env,
294            raise_on_error=False, binary=binary)
295
296        if result.return_code:
297            # Return None if the tool was not found. In this case there is no
298            # output from the tool and it does not appear on the path. We still
299            # try to run it (as above) since RunPipe() allows faking the tool's
300            # output
301            if not any([result.stdout, result.stderr, tools.tool_find(name)]):
302                tout.info(f"bintool '{name}' not found")
303                return None
304            if raise_on_error:
305                tout.info(f"bintool '{name}' failed")
306                raise ValueError("Error %d running '%s': %s" %
307                                (result.return_code, ' '.join(all_args),
308                                result.stderr or result.stdout))
309        if result.stdout:
310            tout.debug(result.stdout)
311        if result.stderr:
312            tout.debug(result.stderr)
313        return result
314
315    def run_cmd(self, *args, binary=False):
316        """Run the bintool using command-line arguments
317
318        Args:
319            args (list of str): Arguments to provide, in addition to the bintool
320                name
321            binary (bool): True to return output as bytes instead of str
322
323        Returns:
324            str or bytes: Resulting stdout from the bintool
325        """
326        result = self.run_cmd_result(*args, binary=binary)
327        if result:
328            return result.stdout
329
330    @classmethod
331    def build_from_git(cls, git_repo, make_targets, bintool_path, flags=None):
332        """Build a bintool from a git repo
333
334        This clones the repo in a temporary directory, builds it with 'make',
335        then returns the filename of the resulting executable bintool
336
337        Args:
338            git_repo (str): URL of git repo
339            make_targets (list of str): List of targets to pass to 'make' to build
340                the tool
341            bintool_path (str): Relative path of the tool in the repo, after
342                build is complete
343            flags (list of str): Flags or variables to pass to make, or None
344
345        Returns:
346            tuple:
347                str: Filename of fetched file to copy to a suitable directory
348                str: Name of temp directory to remove, or None
349            or None on error
350        """
351        tmpdir = tempfile.mkdtemp(prefix='binmanf.')
352        print(f"- clone git repo '{git_repo}' to '{tmpdir}'")
353        tools.run('git', 'clone', '--depth', '1', git_repo, tmpdir)
354        for target in make_targets:
355            print(f"- build target '{target}'")
356            cmd = ['make', '-C', tmpdir, '-j', f'{multiprocessing.cpu_count()}',
357                   target]
358            if flags:
359                cmd += flags
360            tools.run(*cmd)
361
362        fname = os.path.join(tmpdir, bintool_path)
363        if not os.path.exists(fname):
364            print(f"- File '{fname}' was not produced")
365            return None
366        return fname, tmpdir
367
368    @classmethod
369    def fetch_from_url(cls, url):
370        """Fetch a bintool from a URL
371
372        Args:
373            url (str): URL to fetch from
374
375        Returns:
376            tuple:
377                str: Filename of fetched file to copy to a suitable directory
378                str: Name of temp directory to remove, or None
379        """
380        fname, tmpdir = tools.download(url)
381        tools.run('chmod', 'a+x', fname)
382        return fname, tmpdir
383
384    @classmethod
385    def fetch_from_drive(cls, drive_id):
386        """Fetch a bintool from Google drive
387
388        Args:
389            drive_id (str): ID of file to fetch. For a URL of the form
390            'https://drive.google.com/file/d/xxx/view?usp=sharing' the value
391            passed here should be 'xxx'
392
393        Returns:
394            tuple:
395                str: Filename of fetched file to copy to a suitable directory
396                str: Name of temp directory to remove, or None
397        """
398        url = f'https://drive.google.com/uc?export=download&id={drive_id}'
399        return cls.fetch_from_url(url)
400
401    @classmethod
402    def apt_install(cls, package):
403        """Install a bintool using the 'apt' tool
404
405        This requires use of servo so may request a password
406
407        Args:
408            package (str): Name of package to install
409
410        Returns:
411            True, assuming it completes without error
412        """
413        args = ['sudo', 'apt', 'install', '-y', package]
414        print('- %s' % ' '.join(args))
415        tools.run(*args)
416        return True
417
418    @staticmethod
419    def WriteDocs(modules, test_missing=None):
420        """Write out documentation about the various bintools to stdout
421
422        Args:
423            modules: List of modules to include
424            test_missing: Used for testing. This is a module to report
425                as missing
426        """
427        print('''.. SPDX-License-Identifier: GPL-2.0+
428
429Binman bintool Documentation
430============================
431
432This file describes the bintools (binary tools) supported by binman. Bintools
433are binman's name for external executables that it runs to generate or process
434binaries. It is fairly easy to create new bintools. Just add a new file to the
435'btool' directory. You can use existing bintools as examples.
436
437
438''')
439        modules = sorted(modules)
440        missing = []
441        for name in modules:
442            module = Bintool.find_bintool_class(name)
443            docs = getattr(module, '__doc__')
444            if test_missing == name:
445                docs = None
446            if docs:
447                lines = docs.splitlines()
448                first_line = lines[0]
449                rest = [line[4:] for line in lines[1:]]
450                hdr = 'Bintool: %s: %s' % (name, first_line)
451                print(hdr)
452                print('-' * len(hdr))
453                print('\n'.join(rest))
454                print()
455                print()
456            else:
457                missing.append(name)
458
459        if missing:
460            raise ValueError('Documentation is missing for modules: %s' %
461                             ', '.join(missing))
462
463    # pylint: disable=W0613
464    def fetch(self, method):
465        """Fetch handler for a bintool
466
467        This should be implemented by the base class
468
469        Args:
470            method (FETCH_...): Method to use
471
472        Returns:
473            tuple:
474                str: Filename of fetched file to copy to a suitable directory
475                str: Name of temp directory to remove, or None
476            or True if the file was fetched and already installed
477            or None if no fetch() implementation is available
478
479        Raises:
480            Valuerror: Fetching could not be completed
481        """
482        print(f"No method to fetch bintool '{self.name}'")
483        return False
484
485    def version(self):
486        """Version handler for a bintool
487
488        Returns:
489            str: Version string for this bintool
490        """
491        if self.version_regex is None:
492            return 'unknown'
493
494        import re
495
496        result = self.run_cmd_result(self.version_args)
497        out = result.stdout.strip()
498        if not out:
499            out = result.stderr.strip()
500        if not out:
501            return 'unknown'
502
503        m_version = re.search(self.version_regex, out)
504        return m_version.group(1) if m_version else out
505
506
507class BintoolPacker(Bintool):
508    """Tool which compression / decompression entry contents
509
510    This is a bintools base class for compression / decompression packer
511
512    Properties:
513        name: Name of packer tool
514        compression: Compression type (COMPRESS_...), value of 'name' property
515            if none
516        compress_args: List of positional args provided to tool for compress,
517            ['--compress'] if none
518        decompress_args: List of positional args provided to tool for
519            decompress, ['--decompress'] if none
520        fetch_package: Name of the tool installed using the apt, value of 'name'
521            property if none
522        version_regex: Regular expressions to extract the version from tool
523            version output,  '(v[0-9.]+)' if none
524    """
525    def __init__(self, name, compression=None, compress_args=None,
526                 decompress_args=None, fetch_package=None,
527                 version_regex=r'(v[0-9.]+)', version_args='-V'):
528        desc = '%s compression' % (compression if compression else name)
529        super().__init__(name, desc, version_regex, version_args)
530        if compress_args is None:
531            compress_args = ['--compress']
532        self.compress_args = compress_args
533        if decompress_args is None:
534            decompress_args = ['--decompress']
535        self.decompress_args = decompress_args
536        if fetch_package is None:
537            fetch_package = name
538        self.fetch_package = fetch_package
539
540    def compress(self, indata):
541        """Compress data
542
543        Args:
544            indata (bytes): Data to compress
545
546        Returns:
547            bytes: Compressed data
548        """
549        with tempfile.NamedTemporaryFile(prefix='comp.tmp',
550                                         dir=tools.get_output_dir()) as tmp:
551            tools.write_file(tmp.name, indata)
552            args = self.compress_args + ['--stdout', tmp.name]
553            return self.run_cmd(*args, binary=True)
554
555    def decompress(self, indata):
556        """Decompress data
557
558        Args:
559            indata (bytes): Data to decompress
560
561        Returns:
562            bytes: Decompressed data
563        """
564        with tempfile.NamedTemporaryFile(prefix='decomp.tmp',
565                                         dir=tools.get_output_dir()) as inf:
566            tools.write_file(inf.name, indata)
567            args = self.decompress_args + ['--stdout', inf.name]
568            return self.run_cmd(*args, binary=True)
569
570    def fetch(self, method):
571        """Fetch handler
572
573        This installs the gzip package using the apt utility.
574
575        Args:
576            method (FETCH_...): Method to use
577
578        Returns:
579            True if the file was fetched and now installed, None if a method
580            other than FETCH_BIN was requested
581
582        Raises:
583            Valuerror: Fetching could not be completed
584        """
585        if method != FETCH_BIN:
586            return None
587        return self.apt_install(self.fetch_package)
588