#! /usr/bin/python -tt # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Library General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # # Copyright Red Hat Inc. 2010 # # Author: James Antill # # Examples: # # yum updater # yum updater check import yum import yum.config from yum.plugins import TYPE_INTERACTIVE from yum.constants import TS_INSTALL_STATES requires_api_version = '2.1' plugin_type = (TYPE_INTERACTIVE,) myconf = {} mygrps = {} class CliError(yum.Errors.YumBaseError): """ Command line interface related Exception. """ def __init__(self, args=''): yum.Errors.YumBaseError.__init__(self) self.args = args # "Borrowed" from yumcommands.py def yumcommands_checkRootUID(base): """ Verify that the program is being run by the root user. @param base: a YumBase object. """ if base.conf.uid != 0: base.logger.critical('You need to be root to perform this command.') raise CliError def yumcommands_checkGPGKey(base): if not base.gpgKeyCheck(): for repo in base.repos.listEnabled(): if repo.gpgcheck != 'false' and repo.gpgkey == '': msg = """ You have enabled checking of packages via GPG keys. This is a good thing. However, you do not have any GPG public keys installed. You need to download the keys for packages you wish to install and install them. You can do that by running the command: rpm --import public.gpg.key Alternatively you can specify the url to the key you would like to use for a repository in the 'gpgkey' option in a repository section and yum will install it for you. For more information contact your distribution or package provider. """ base.logger.critical(msg) raise CliError def yumcommands_checkEnabledRepo(base, possible_local_files=[]): """ Verify that there is at least one enabled repo. @param base: a YumBase object. """ if base.repos.listEnabled(): return for lfile in possible_local_files: if lfile.endswith(".rpm") and os.path.exists(lfile): return msg = ('There are no enabled repos.\n' ' Run "yum repolist all" to see the repos you have.\n' ' You can enable repos with yum-config-manager --enable ') base.logger.critical(msg) raise CliError # Copy and paste from .update() def _cp_base_obsolete(self, obsoleting, installed): obsoleting_pkg = self.getPackageObject(obsoleting, allow_missing=True) if obsoleting_pkg is None: return [] topkg = self._test_loop(obsoleting_pkg, self._pkg2obspkg) if topkg is not None: obsoleting_pkg = topkg installed_pkg = self.getInstalledPackageObject(installed) txmbr = self.tsInfo.addObsoleting(obsoleting_pkg, installed_pkg) self.tsInfo.addObsoleted(installed_pkg, obsoleting_pkg) return [txmbr] def _cp_base_update(self, new, old): if self.tsInfo.isObsoleted(pkgtup=old): self.verbose_logger.log(logginglevels.DEBUG_2, ('Not Updating Package that is already obsoleted: %s.%s %s:%s-%s') % old) else: new = self.getPackageObject(new, allow_missing=True) if new is None: return [] return self.update(po=new) return [] def _sorted_list_grps(): """ Return the groups, in the order they should be used. """ ret = [] pri_grps = {} for grp in mygrps: pri = mygrps[grp]['priority'] if pri not in pri_grps: pri_grps[pri] = [] pri_grps[pri].append(mygrps[grp]) for pri in reversed(sorted(pri_grps)): if pri < myconf['priority']: continue ret.extend(pri_grps[pri]) return ret class UpdaterCommand: def getNames(self): return ['updater'] def getUsage(self): return "[section]" def getSummary(self): return "Does update calls, based on priority configuration" def doCheck(self, base, basecmd, extcmds): yumcommands_checkRootUID(base) yumcommands_checkGPGKey(base) yumcommands_checkEnabledRepo(base) def doCommand(self, base, basecmd, extcmds): def _calc_ret(): ''' No goto, so meh... ''' if tx_return: if check: msg = ' Updater will next update %d package(s):' pos = set() for txmbr in tx_return: if txmbr.output_state in TS_INSTALL_STATES: pos.add(txmbr.po) base.verbose_logger.info(msg, len(pos)) for po in sorted(pos): base.verbose_logger.info(' %s', po) return 100, [msg] return 2, [basecmd + ' marked %d package(s)' % len(tx_return)] return 0, [basecmd + ' done'] def _sk_name(pkgtups): ''' Just return the first name from two pkgtups. ''' return pkgtups[0][0] check = False if extcmds and extcmds[0] == 'check': extcmds = extcmds[1:] check = True extcmds = set(extcmds) updates = base.up.getUpdatesTuples() if base.conf.obsoletes: obsoletes = base.up.getObsoletesTuples(newest=1) tx_return = [] last_name = None for grp in _sorted_list_grps(): if extcmds and grp['id'] not in extcmds: continue pnames = set(grp['packages']) obs_done = set() for (obsoleting, installed) in sorted(obsoletes, key=_sk_name): if last_name is not None and last_name != obsoleting[0]: break if installed[0] in pnames or obsoleting[0] in pnames: txs = _cp_base_obsolete(base, obsoleting, installed) tx_return.extend(txs) obs_done.add(obsoleting[0]) if grp['size'] and len(tx_return) >= grp['size']: last_name = obsoleting[0] if last_name is not None: return _calc_ret() up_done = set() for (new, old) in sorted(updates, key=_sk_name): assert new[0] == old[0] if last_name is not None and last_name != new[0]: break if new[0] in pnames: txs = _cp_base_update(base, new, old) tx_return.extend(txs) up_done.add(new[0]) if grp['size'] and len(tx_return) >= grp['size']: last_name = new[0] if last_name is not None or (tx_return and not grp['continue']): return _calc_ret() for (obsoleting, installed) in sorted(obsoletes, key=_sk_name): if obsoleting[0] in obs_done: continue if last_name is not None and last_name != obsoleting[0]: break txs = _cp_base_obsolete(base, obsoleting, installed) tx_return.extend(txs) if myconf['size'] and len(tx_return) >= myconf['size']: last_name = obsoleting[0] if last_name is not None: return _calc_ret() for (new, old) in sorted(updates, key=_sk_name): assert new[0] == old[0] if new[0] in up_done: continue if last_name is not None and last_name != new[0]: break txs = _cp_base_update(base, new, old) tx_return.extend(txs) if myconf['size'] and len(tx_return) >= myconf['size']: last_name = new[0] if last_name is not None: return _calc_ret() return _calc_ret() def config_hook(conduit): c = yum.config key = 'sections' myconf[key] = c.getOption(conduit._conf, 'main', key, c.ListOption()) for key, defv in (('priority', 99),): myconf[key] = conduit.confInt('main', key, default=defv) for key, defv in (('continue', False),): myconf[key] = conduit.confBool('main', key, default=defv) noz = set(["0", ""]) for key, defv in (('size', 0),): myconf[key] = c.getOption(conduit._conf, 'main', key, c.PositiveIntOption(0, names_of_0=noz)) for grp in myconf['sections']: mygrps[grp] = {} mygrps[grp]['id'] = grp key = 'packages' mygrps[grp][key] = c.getOption(conduit._conf, grp, key, c.ListOption()) key = 'priority' mygrps[grp][key] = conduit.confInt(grp, key, default=myconf[key]) key = 'continue' mygrps[grp][key] = conduit.confBool(grp, key, default=myconf[key]) key = 'size' mygrps[grp][key] = c.getOption(conduit._conf, grp, key, c.PositiveIntOption(myconf[key], names_of_0=noz)) conduit.registerCommand(UpdaterCommand())