1# SPDX-License-Identifier: GPL-2.0+ 2# Copyright 2022 Google LLC 3# Copyright (C) 2022 Weidm��ller Interface GmbH & Co. KG 4# Stefan Herbrechtsmeier <stefan.herbrechtsmeier@weidmueller.com> 5# 6"""Base class for all bintools 7 8This defines the common functionality for all bintools, including running 9the tool, checking its version and fetching it if needed. 10""" 11 12import collections 13import glob 14import importlib 15import multiprocessing 16import os 17import shutil 18import tempfile 19import urllib.error 20 21from u_boot_pylib import command 22from u_boot_pylib import terminal 23from u_boot_pylib import tools 24from u_boot_pylib import tout 25 26BINMAN_DIR = os.path.dirname(os.path.realpath(__file__)) 27 28# Format string for listing bintools, see also the header in list_all() 29FORMAT = '%-16.16s %-12.12s %-26.26s %s' 30 31# List of known modules, to avoid importing the module multiple times 32modules = {} 33 34# Possible ways of fetching a tool (FETCH_COUNT is number of ways) 35FETCH_ANY, FETCH_BIN, FETCH_BUILD, FETCH_COUNT = range(4) 36 37FETCH_NAMES = { 38 FETCH_ANY: 'any method', 39 FETCH_BIN: 'binary download', 40 FETCH_BUILD: 'build from source' 41 } 42 43# Status of tool fetching 44FETCHED, FAIL, PRESENT, STATUS_COUNT = range(4) 45 46class Bintool: 47 """Tool which operates on binaries to help produce entry contents 48 49 This is the base class for all bintools 50 """ 51 # List of bintools to regard as missing 52 missing_list = [] 53 54 # Directory to store tools. Note that this set up by set_tool_dir() which 55 # must be called before this class is used. 56 tooldir = '' 57 58 def __init__(self, name, desc, version_regex=None, version_args='-V'): 59 self.name = name 60 self.desc = desc 61 self.version_regex = version_regex 62 self.version_args = version_args 63 64 @staticmethod 65 def find_bintool_class(btype): 66 """Look up the bintool class for bintool 67 68 Args: 69 byte: Bintool to use, e.g. 'mkimage' 70 71 Returns: 72 The bintool class object if found, else a tuple: 73 module name that could not be found 74 exception received 75 """ 76 # Convert something like 'u-boot' to 'u_boot' since we are only 77 # interested in the type. 78 module_name = btype.replace('-', '_') 79 module = modules.get(module_name) 80 class_name = f'Bintool{module_name}' 81 82 # Import the module if we have not already done so 83 if not module: 84 try: 85 module = importlib.import_module('binman.btool.' + module_name) 86 except ImportError as exc: 87 try: 88 # Deal with classes which must be renamed due to conflicts 89 # with Python libraries 90 module = importlib.import_module('binman.btool.btool_' + 91 module_name) 92 except ImportError: 93 return module_name, exc 94 modules[module_name] = module 95 96 # Look up the expected class name 97 return getattr(module, class_name) 98 99 @staticmethod 100 def create(name): 101 """Create a new bintool object 102 103 Args: 104 name (str): Bintool to create, e.g. 'mkimage' 105 106 Returns: 107 A new object of the correct type (a subclass of Binutil) 108 """ 109 cls = Bintool.find_bintool_class(name) 110 if isinstance(cls, tuple): 111 raise ValueError("Cannot import bintool module '%s': %s" % cls) 112 113 # Call its constructor to get the object we want. 114 obj = cls(name) 115 return obj 116 117 @classmethod 118 def set_tool_dir(cls, pathname): 119 """Set the path to use to store and find tools""" 120 cls.tooldir = pathname 121 122 def show(self): 123 """Show a line of information about a bintool""" 124 if self.is_present(): 125 version = self.version() 126 else: 127 version = '-' 128 print(FORMAT % (self.name, version, self.desc, 129 self.get_path() or '(not found)')) 130 131 @classmethod 132 def set_missing_list(cls, missing_list): 133 cls.missing_list = missing_list or [] 134 135 @staticmethod 136 def get_tool_list(include_testing=False): 137 """Get a list of the known tools 138 139 Returns: 140 list of str: names of all tools known to binman 141 """ 142 files = glob.glob(os.path.join(BINMAN_DIR, 'btool/*')) 143 names = [os.path.splitext(os.path.basename(fname))[0] 144 for fname in files] 145 names = [name for name in names if name[0] != '_'] 146 names = [name[6:] if name.startswith('btool_') else name 147 for name in names] 148 if include_testing: 149 names.append('_testing') 150 return sorted(names) 151 152 @staticmethod 153 def list_all(): 154 """List all the bintools known to binman""" 155 names = Bintool.get_tool_list() 156 print(FORMAT % ('Name', 'Version', 'Description', 'Path')) 157 print(FORMAT % ('-' * 15,'-' * 11, '-' * 25, '-' * 30)) 158 for name in names: 159 btool = Bintool.create(name) 160 btool.show() 161 162 def is_present(self): 163 """Check if a bintool is available on the system 164 165 Returns: 166 bool: True if available, False if not 167 """ 168 if self.name in self.missing_list: 169 return False 170 return bool(self.get_path()) 171 172 def get_path(self): 173 """Get the path of a bintool 174 175 Returns: 176 str: Path to the tool, if available, else None 177 """ 178 return tools.tool_find(self.name) 179 180 def fetch_tool(self, method, col, skip_present): 181 """Fetch a single tool 182 183 Args: 184 method (FETCH_...): Method to use 185 col (terminal.Color): Color terminal object 186 skip_present (boo;): Skip fetching if it is already present 187 188 Returns: 189 int: Result of fetch either FETCHED, FAIL, PRESENT 190 """ 191 def try_fetch(meth): 192 res = None 193 try: 194 res = self.fetch(meth) 195 except urllib.error.URLError as uerr: 196 message = uerr.reason 197 print(col.build(col.RED, f'- {message}')) 198 199 except ValueError as exc: 200 print(f'Exception: {exc}') 201 return res 202 203 if skip_present and self.is_present(): 204 return PRESENT 205 print(col.build(col.YELLOW, 'Fetch: %s' % self.name)) 206 if method == FETCH_ANY: 207 for try_method in range(1, FETCH_COUNT): 208 print(f'- trying method: {FETCH_NAMES[try_method]}') 209 result = try_fetch(try_method) 210 if result: 211 break 212 else: 213 result = try_fetch(method) 214 if not result: 215 return FAIL 216 if result is not True: 217 fname, tmpdir = result 218 dest = os.path.join(self.tooldir, self.name) 219 os.makedirs(self.tooldir, exist_ok=True) 220 print(f"- writing to '{dest}'") 221 shutil.move(fname, dest) 222 if tmpdir: 223 shutil.rmtree(tmpdir) 224 return FETCHED 225 226 @staticmethod 227 def fetch_tools(method, names_to_fetch): 228 """Fetch bintools from a suitable place 229 230 This fetches or builds the requested bintools so that they can be used 231 by binman 232 233 Args: 234 names_to_fetch (list of str): names of bintools to fetch 235 236 Returns: 237 True on success, False on failure 238 """ 239 def show_status(color, prompt, names): 240 print(col.build( 241 color, f'{prompt}:%s{len(names):2}: %s' % 242 (' ' * (16 - len(prompt)), ' '.join(names)))) 243 244 col = terminal.Color() 245 skip_present = False 246 name_list = names_to_fetch 247 if len(names_to_fetch) == 1 and names_to_fetch[0] in ['all', 'missing']: 248 name_list = Bintool.get_tool_list() 249 if names_to_fetch[0] == 'missing': 250 skip_present = True 251 print(col.build(col.YELLOW, 252 'Fetching tools: %s' % ' '.join(name_list))) 253 status = collections.defaultdict(list) 254 for name in name_list: 255 btool = Bintool.create(name) 256 result = btool.fetch_tool(method, col, skip_present) 257 status[result].append(name) 258 if result == FAIL: 259 if method == FETCH_ANY: 260 print('- failed to fetch with all methods') 261 else: 262 print(f"- method '{FETCH_NAMES[method]}' is not supported") 263 264 if len(name_list) > 1: 265 if skip_present: 266 show_status(col.GREEN, 'Already present', status[PRESENT]) 267 show_status(col.GREEN, 'Tools fetched', status[FETCHED]) 268 if status[FAIL]: 269 show_status(col.RED, 'Failures', status[FAIL]) 270 return not status[FAIL] 271 272 def run_cmd_result(self, *args, binary=False, raise_on_error=True): 273 """Run the bintool using command-line arguments 274 275 Args: 276 args (list of str): Arguments to provide, in addition to the bintool 277 name 278 binary (bool): True to return output as bytes instead of str 279 raise_on_error (bool): True to raise a ValueError exception if the 280 tool returns a non-zero return code 281 282 Returns: 283 CommandResult: Resulting output from the bintool, or None if the 284 tool is not present 285 """ 286 if self.name in self.missing_list: 287 return None 288 name = os.path.expanduser(self.name) # Expand paths containing ~ 289 all_args = (name,) + args 290 env = tools.get_env_with_path() 291 tout.debug(f"bintool: {' '.join(all_args)}") 292 result = command.run_pipe( 293 [all_args], capture=True, capture_stderr=True, env=env, 294 raise_on_error=False, binary=binary) 295 296 if result.return_code: 297 # Return None if the tool was not found. In this case there is no 298 # output from the tool and it does not appear on the path. We still 299 # try to run it (as above) since RunPipe() allows faking the tool's 300 # output 301 if not any([result.stdout, result.stderr, tools.tool_find(name)]): 302 tout.info(f"bintool '{name}' not found") 303 return None 304 if raise_on_error: 305 tout.info(f"bintool '{name}' failed") 306 raise ValueError("Error %d running '%s': %s" % 307 (result.return_code, ' '.join(all_args), 308 result.stderr or result.stdout)) 309 if result.stdout: 310 tout.debug(result.stdout) 311 if result.stderr: 312 tout.debug(result.stderr) 313 return result 314 315 def run_cmd(self, *args, binary=False): 316 """Run the bintool using command-line arguments 317 318 Args: 319 args (list of str): Arguments to provide, in addition to the bintool 320 name 321 binary (bool): True to return output as bytes instead of str 322 323 Returns: 324 str or bytes: Resulting stdout from the bintool 325 """ 326 result = self.run_cmd_result(*args, binary=binary) 327 if result: 328 return result.stdout 329 330 @classmethod 331 def build_from_git(cls, git_repo, make_targets, bintool_path, flags=None): 332 """Build a bintool from a git repo 333 334 This clones the repo in a temporary directory, builds it with 'make', 335 then returns the filename of the resulting executable bintool 336 337 Args: 338 git_repo (str): URL of git repo 339 make_targets (list of str): List of targets to pass to 'make' to build 340 the tool 341 bintool_path (str): Relative path of the tool in the repo, after 342 build is complete 343 flags (list of str): Flags or variables to pass to make, or None 344 345 Returns: 346 tuple: 347 str: Filename of fetched file to copy to a suitable directory 348 str: Name of temp directory to remove, or None 349 or None on error 350 """ 351 tmpdir = tempfile.mkdtemp(prefix='binmanf.') 352 print(f"- clone git repo '{git_repo}' to '{tmpdir}'") 353 tools.run('git', 'clone', '--depth', '1', git_repo, tmpdir) 354 for target in make_targets: 355 print(f"- build target '{target}'") 356 cmd = ['make', '-C', tmpdir, '-j', f'{multiprocessing.cpu_count()}', 357 target] 358 if flags: 359 cmd += flags 360 tools.run(*cmd) 361 362 fname = os.path.join(tmpdir, bintool_path) 363 if not os.path.exists(fname): 364 print(f"- File '{fname}' was not produced") 365 return None 366 return fname, tmpdir 367 368 @classmethod 369 def fetch_from_url(cls, url): 370 """Fetch a bintool from a URL 371 372 Args: 373 url (str): URL to fetch from 374 375 Returns: 376 tuple: 377 str: Filename of fetched file to copy to a suitable directory 378 str: Name of temp directory to remove, or None 379 """ 380 fname, tmpdir = tools.download(url) 381 tools.run('chmod', 'a+x', fname) 382 return fname, tmpdir 383 384 @classmethod 385 def fetch_from_drive(cls, drive_id): 386 """Fetch a bintool from Google drive 387 388 Args: 389 drive_id (str): ID of file to fetch. For a URL of the form 390 'https://drive.google.com/file/d/xxx/view?usp=sharing' the value 391 passed here should be 'xxx' 392 393 Returns: 394 tuple: 395 str: Filename of fetched file to copy to a suitable directory 396 str: Name of temp directory to remove, or None 397 """ 398 url = f'https://drive.google.com/uc?export=download&id={drive_id}' 399 return cls.fetch_from_url(url) 400 401 @classmethod 402 def apt_install(cls, package): 403 """Install a bintool using the 'apt' tool 404 405 This requires use of servo so may request a password 406 407 Args: 408 package (str): Name of package to install 409 410 Returns: 411 True, assuming it completes without error 412 """ 413 args = ['sudo', 'apt', 'install', '-y', package] 414 print('- %s' % ' '.join(args)) 415 tools.run(*args) 416 return True 417 418 @staticmethod 419 def WriteDocs(modules, test_missing=None): 420 """Write out documentation about the various bintools to stdout 421 422 Args: 423 modules: List of modules to include 424 test_missing: Used for testing. This is a module to report 425 as missing 426 """ 427 print('''.. SPDX-License-Identifier: GPL-2.0+ 428 429Binman bintool Documentation 430============================ 431 432This file describes the bintools (binary tools) supported by binman. Bintools 433are binman's name for external executables that it runs to generate or process 434binaries. It is fairly easy to create new bintools. Just add a new file to the 435'btool' directory. You can use existing bintools as examples. 436 437 438''') 439 modules = sorted(modules) 440 missing = [] 441 for name in modules: 442 module = Bintool.find_bintool_class(name) 443 docs = getattr(module, '__doc__') 444 if test_missing == name: 445 docs = None 446 if docs: 447 lines = docs.splitlines() 448 first_line = lines[0] 449 rest = [line[4:] for line in lines[1:]] 450 hdr = 'Bintool: %s: %s' % (name, first_line) 451 print(hdr) 452 print('-' * len(hdr)) 453 print('\n'.join(rest)) 454 print() 455 print() 456 else: 457 missing.append(name) 458 459 if missing: 460 raise ValueError('Documentation is missing for modules: %s' % 461 ', '.join(missing)) 462 463 # pylint: disable=W0613 464 def fetch(self, method): 465 """Fetch handler for a bintool 466 467 This should be implemented by the base class 468 469 Args: 470 method (FETCH_...): Method to use 471 472 Returns: 473 tuple: 474 str: Filename of fetched file to copy to a suitable directory 475 str: Name of temp directory to remove, or None 476 or True if the file was fetched and already installed 477 or None if no fetch() implementation is available 478 479 Raises: 480 Valuerror: Fetching could not be completed 481 """ 482 print(f"No method to fetch bintool '{self.name}'") 483 return False 484 485 def version(self): 486 """Version handler for a bintool 487 488 Returns: 489 str: Version string for this bintool 490 """ 491 if self.version_regex is None: 492 return 'unknown' 493 494 import re 495 496 result = self.run_cmd_result(self.version_args) 497 out = result.stdout.strip() 498 if not out: 499 out = result.stderr.strip() 500 if not out: 501 return 'unknown' 502 503 m_version = re.search(self.version_regex, out) 504 return m_version.group(1) if m_version else out 505 506 507class BintoolPacker(Bintool): 508 """Tool which compression / decompression entry contents 509 510 This is a bintools base class for compression / decompression packer 511 512 Properties: 513 name: Name of packer tool 514 compression: Compression type (COMPRESS_...), value of 'name' property 515 if none 516 compress_args: List of positional args provided to tool for compress, 517 ['--compress'] if none 518 decompress_args: List of positional args provided to tool for 519 decompress, ['--decompress'] if none 520 fetch_package: Name of the tool installed using the apt, value of 'name' 521 property if none 522 version_regex: Regular expressions to extract the version from tool 523 version output, '(v[0-9.]+)' if none 524 """ 525 def __init__(self, name, compression=None, compress_args=None, 526 decompress_args=None, fetch_package=None, 527 version_regex=r'(v[0-9.]+)', version_args='-V'): 528 desc = '%s compression' % (compression if compression else name) 529 super().__init__(name, desc, version_regex, version_args) 530 if compress_args is None: 531 compress_args = ['--compress'] 532 self.compress_args = compress_args 533 if decompress_args is None: 534 decompress_args = ['--decompress'] 535 self.decompress_args = decompress_args 536 if fetch_package is None: 537 fetch_package = name 538 self.fetch_package = fetch_package 539 540 def compress(self, indata): 541 """Compress data 542 543 Args: 544 indata (bytes): Data to compress 545 546 Returns: 547 bytes: Compressed data 548 """ 549 with tempfile.NamedTemporaryFile(prefix='comp.tmp', 550 dir=tools.get_output_dir()) as tmp: 551 tools.write_file(tmp.name, indata) 552 args = self.compress_args + ['--stdout', tmp.name] 553 return self.run_cmd(*args, binary=True) 554 555 def decompress(self, indata): 556 """Decompress data 557 558 Args: 559 indata (bytes): Data to decompress 560 561 Returns: 562 bytes: Decompressed data 563 """ 564 with tempfile.NamedTemporaryFile(prefix='decomp.tmp', 565 dir=tools.get_output_dir()) as inf: 566 tools.write_file(inf.name, indata) 567 args = self.decompress_args + ['--stdout', inf.name] 568 return self.run_cmd(*args, binary=True) 569 570 def fetch(self, method): 571 """Fetch handler 572 573 This installs the gzip package using the apt utility. 574 575 Args: 576 method (FETCH_...): Method to use 577 578 Returns: 579 True if the file was fetched and now installed, None if a method 580 other than FETCH_BIN was requested 581 582 Raises: 583 Valuerror: Fetching could not be completed 584 """ 585 if method != FETCH_BIN: 586 return None 587 return self.apt_install(self.fetch_package) 588