Files
multiball/multiball/__main__.py
Cassowary 226dc5a6ee 0.7.1 style fixes
Fixes potential bugs with printing things. Just really cleans up the style printing.
2025-10-20 09:38:31 -07:00

495 lines
18 KiB
Python

#
# Multiball - Run things on a buncha servers interactively.
#
#
import argparse
import datetime
import fnmatch
import logging
import os
import pprint
import re
import shlex
import shutil
import sys
import traceback
from itertools import islice, zip_longest
from pathlib import Path
from typing import Iterable, Optional, Sequence, Union
import prompt_toolkit
import yaml
from prompt_toolkit.completion import WordCompleter
from prompt_toolkit.history import FileHistory, InMemoryHistory
from prompt_toolkit.shortcuts import PromptSession
from prompt_toolkit.styles import Style
from . import __version__
from .fabtools import HostSet
from .tools import (load_bare_server_list, load_config,
load_sshconfig_server_list)
_print = print
from .style import print_style as print
from .style import STYLES
class MultiballCommandException(BaseException):
pass
class MBCommandAbort(MultiballCommandException):
pass
class MBCommandExit(MultiballCommandException):
pass
class MBCommandRun(MultiballCommandException):
pass
class MBExit(MultiballCommandException):
pass
# Dictionary of top-level commands and help strings (and eventually we'll note what kind of argumest so we can construct
# heirarchical completion from them)
COMMANDS = {
'help': (("/help", "help", "?"), "This help"),
'targethosts': (('/targethosts', '/targethosts', '/targets', '/hosts'), "Show the current list of target hosts."),
'clear': (('/clear',), "Clear target host list."),
'host': (('/host', '/add', '/target'), "Set the target host list. /host clears the list first. Supports wildcards."),
'remove': (('/remove', '/removehost', '/removetarget'), "Remove hosts from target host list. Supports wildcards."),
'all': (('/all',), "Reset the target host list to all known hosts."),
'allhosts': (('/allhosts', '/alltargets'), "Show list of all known hosts."),
'exit': (('/exit', 'exit'), "Exit."),
'environment': (('/environment', '/env', '/set'), "Set (or print) environment to send to remote host before running commands."),
'clearenv': (('/clearenv',), "Clear environment (entire or single variable)"),
'safety': (('/safety', 'safety'), "Turn on safety or turn off with `off`."),
'safe': (('/safe', ), "Run this command as if it were safe even if safety is on."),
'confirm': (('/confirm',), "[scripting] Prompt for confirmation"),
'echo': (('/echo', ), "[scripting] print string"),
'arguments': (('/arguments',), "[scripting] Abort if no arguments are specified."),
# fixme add interpolation
'unsafe': (('/unsafe', ), "[scripting] Abort command if safety is on."),
'save': (('/save', ),"Save last run log to file."),
'debug': (('/////debug', ), "[devel] enter PDB"),
# 'history': (('/history', '/h'), "Show the last 10 commands in the history."),
# 'filteralias': (('/filteralias',), "Create a filter alias which allows quick filtering."),
# 'undo': (('/undo',), "Undo last host or environment command."),
# 'alias': (('/alias', ), "Create an alias interactively.")
## Fixme unambiguate the above commands which list things into this command so things are more consistent
# 'list': (('/list',), "List various things (environment, allhosts, hosts, targets)")
# '!': (('!', '/local'), "Run a local command and capture its output.")
}
class Multiball:
def __init__(self, configFile: Optional[Path] = None):
# load configuration
self.handlers = {
'environment': self.command_environment,
'confirm': self.command_confirm,
'arguments': self.command_arguments,
'unsafe': self.command_unsafe,
'echo': self.command_echo,
'safety': self.command_safety,
'safe': self.command_safe,
'clearenv': self.command_clearenv,
'save': self.command_save,
}
configdict = load_config(Path("multiball.cfg"))
self.allhosts: set = set()
self.history: Union[InMemoryHistory, FileHistory] = InMemoryHistory()
self.environment = {}
self.safety = True
self.ssh_config = None
self.last_run = {}
if config := configdict.get('config', {}):
if 'history' in config:
self.history = FileHistory(config['history'])
if sshconfigs := config.get('sshconfigs', ''):
for sc in sshconfigs.split(','):
if self.ssh_config is None:
self.ssh_config = Path(sc).expanduser()
newhosts = load_sshconfig_server_list(
Path(sc).expanduser())
print(f"Loaded {len(newhosts)} host(s) from {sc}")
self.allhosts.update(newhosts)
if bareconfigs := config.get('hostlists'):
for sc in bareconfigs.split(','):
newhosts = load_bare_server_list(Path(sc).expanduser())
print(f"Loaded {len(newhosts)} host(s) from {sc}")
self.allhosts.update(newhosts)
else:
self.ssh_config = Path("~/.ssh/ssh_config").expanduser()
print(
f"Warning: no [config] section in {configFile}. Using fallbacks.", style="error")
self.allhosts = load_sshconfig_server_list(self.ssh_config)
# setup state and environment
self.targethosts = set(self.allhosts)
self._is_scripting = False
self._script_frame_arguments: Sequence[str] = []
self.session: PromptSession = PromptSession(history=self.history)
self.aliases = {}
# make aliases
for k in configdict:
if k.startswith('cmd.'):
# print(f"command alias: {k}")
alias = k.split('.', 1)[1]
self.aliases[alias] = list(configdict[k])
# call startup commands
if startup := configdict.get('startup', []):
# fixme execute startup and handle exceptions correctly
for cmd in startup:
self._run_command(cmd)
def command_safe(self, command, args):
try:
self.safety = False
self.do_cmd(args)
finally:
self.safety = True
def command_safety(self, command, args):
"""
Toggle the safety.
"""
if args.lower().startswith("off"):
self.safety = False
print("** Safety OFF. Now unsafe!", style="unsafe")
else:
self.safety = True
print("** Safety ON.", style="safe")
def command_echo(self, command, args):
"""
Echo string
fixme put string interpolation here so that state variables can be printed
"""
print(f"{args}")
def command_environment(self, command, args):
"""
Set an environment variable in the environment object, or print out the environment vithout any arguments.
"""
if (args):
if '=' in args:
cmd = [x.strip() for x in args.split('=', 1)]
key = cmd[0]
if (len(cmd) > 1) and (cmd[1]):
self.environment[key] = cmd[1]
else:
if cmd[0] in self.environment:
del self.environment[key]
else:
for a in args.split():
if a in self.environment:
print(f"{a}={self.environment[a]}")
else:
print(f"{a} not in environment.")
else:
for k, v in self.environment.items():
print(f"{k}={v}")
def command_confirm(self, command, args):
"""
Prompt user with a yes/no question. Default responses result in a CommandAbort exception.
"""
result = prompt_toolkit.prompt([("class:promptyn",
"<" + args + " [Yys/Nn]>"),
("class:message",
" ")], style=STYLES)
if result and (result[0] in 'Yys'):
return True
else:
print("(no)")
raise MBCommandAbort
def command_arguments(self, command, args):
"""
Rasiess CommandAbort if the scriptenv doesn't contain arguments.
FIXME make a command parser in here that constrains the scriptenv arguments to a specific set
or format, and populates the environment with the variables so they can be placed in other commands.
"""
if not self._is_scripting or not self._script_frame_arguments:
if not self._is_scripting:
print("does nothing outside of script", style="alert")
else:
print("arguments required", style="error")
raise MBCommandAbort
return
def command_unsafe(self, command, args):
"""
Raises CommandAbort if safty is on.
"""
if not self._is_scripting or self.safety:
if not self._is_scripting:
print("does nothing outside of script", style="alert")
else:
print("cannot run in safety mode", style="error")
raise MBCommandAbort
return
def command_clearenv(self, command, args):
print("cleared environment")
self.environment = dict()
def command_save(self, command, args):
# FIXME: We need to make aliases store multiple entries, and save should take the whole alias not just
# the last command.
if (not self.last_run):
print("Nothing to log.", style="alert")
return
tstamp = datetime.datetime.now().strftime("%Y-%m-%d-%H%M%Z")
outfilename = f"{tstamp}.multiball.log"
if (args):
outfilename = args[0]
if (not outfilename.endswith(".log")):
outfilename = outfilename + ".log"
print(f"Writing {outfilename} for command `{self.last_run['cmd']}`.", style="alert")
with (open(outfilename, 'w')) as out:
out.writelines([f"command: {self.last_run['cmd']}\n",
"-------------------------------------------\n"
""])
out.write(self.last_run['results'].getvalue())
out.write("\n")
def _print_targetlist(self, tlist=None, keyword='Targeting'):
if not tlist:
tlist = self.targethosts
print(f"{keyword} {len(self.targethosts)} hosts:")
print(' '.join(tlist))
def _print_help(self):
for item in COMMANDS.values():
m = ', '.join(item[0]) + ' - ' + item[1]
print(f"{m}", style="bright")
print("Other entries run command on targets.")
def _run_command(self, command: str):
if not command or command.split()[0] in COMMANDS['targethosts'][0]:
self._print_targetlist()
return
# fixme we might be able to use pattern matching to look up the command abstract and make this whole section
# more readable
commands = command.split(maxsplit=1)
abst = 'REMOTECOMMAND' # default is run a remote command
for a, c in COMMANDS.items():
if commands[0] in c[0]:
abst = a
command = commands[0]
if (len(commands) > 1):
cargs = ' '.join(commands[1:])
else:
cargs = ''
if abst == 'exit':
raise MBExit
elif abst == 'help':
self._print_help()
elif abst == 'debug':
print("==== DEBUGGING ====")
__import__("pdb").set_trace()
elif abst in self.handlers:
self.handlers[abst](command[0], cargs)
elif abst == 'clear':
print("cleared targets")
self.targethosts = set()
elif abst == 'host':
if len(commands) > 1:
targethostlist = set()
# iterate potential targethosts and match with globber
for phost in cargs.split():
if ('?' in phost) or ('*' in phost) or ('[' in phost):
# glob
for h in self.allhosts:
if fnmatch.fnmatch(h, phost):
targethostlist.add(h)
else:
targethostlist.add(phost)
if command in ('/host', '/target'):
print("note: clearing previous selection")
self.targethosts = targethostlist
else:
self.targethosts.update(targethostlist)
self._print_targetlist()
elif abst == 'remove':
if len(commands) > 1:
targethostlist = set()
# iterate potential targethosts and match with globber
for phost in cargs.split():
if ('?' in phost) or ('*' in phost) or ('[' in phost):
# glob
for h in self.allhosts:
if fnmatch.fnmatch(h, phost):
targethostlist.add(h)
else:
targethostlist.add(phost)
self.targethosts = self.targethosts - targethostlist
self._print_targetlist()
elif abst == 'all':
self.targethosts = self.allhosts
self._print_targetlist()
elif abst == 'allhosts':
self._print_targetlist(self.allhosts, "all known")
elif command in self.aliases:
print(f"running alias `{command}`")
self._run_alias(command, cargs)
else:
raise MBCommandRun
def _run_alias(self, command, cargs):
self._is_scripting = True
self._script_frame_arguments = (cargs, shlex.split(cargs))
try:
for cline in self.aliases[command]:
self._is_scripting = True
# fixme more extensive variable filling
if '{@}' in cline:
cline = cline.replace('{@}', cargs.strip())
try:
self.do_cmd(cline)
except MBCommandAbort:
# end the alias on abort
return
finally:
self._is_scripting = False
self._script_frame_arguments = None
def _run_remote_command(self, command):
remotecmd = []
for k, v in self.environment.items():
remotecmd.append(f"export {k}='{v}'")
remotecmd.append(command)
h = HostSet(self.targethosts, self.ssh_config)
h.run(';'.join(remotecmd))
self.last_run = {
'cmd': remotecmd,
'results': h.results
}
def do_cmd(self, command):
try:
self._run_command(command)
except MBCommandRun:
# remote command!
if self.safety:
print(f"would run `{command}` on targethosts", style="bright")
print("Safety is ON. `/safety off` to turn off.", style="alert")
else:
print(f"running `{command}` on targethosts", style="bright")
self._run_remote_command(command)
def cmd_prompt(self):
self._is_scripting = False
allcommands = []
for item in COMMANDS.values():
allcommands.extend(item[0])
allcommands.extend(list(self.aliases.keys()))
completer = WordCompleter(list(self.allhosts) + allcommands)
if self.safety:
prompt = "!"
pclass = "class:safe"
else:
prompt = '$'
pclass = "class:unsafe"
command = self.session.prompt([("class:count", f'{len(self.targethosts)}'),
("class:prompt", '->'),
(pclass, prompt),
("class:default", " ")], completer=completer, style=STYLES)
try:
self.do_cmd(command)
except MBCommandAbort:
# Nothing of importance happens here.
...
def parse_args(argv):
if (len(argv) > 1):
parser = argparse.ArgumentParser(
prog='multiball',
description='Run commands on a number of hosts.',
epilog='With no arguments enter interactive mode.',
)
parser.add_argument('--confirm', action='store_true', help='Skip confirmation step, use with caution.')
parser.add_argument('-t', '--target', type=str, nargs='?', action='append', help='Select one or more targets.')
parser.add_argument('-a', '--all', action='store_true', help='Select default target list.')
parser.add_argument('-c', '--command', type=str, nargs='+', action='append', help='Command to run.')
res = parser.parse_args(argv)
return res
else:
return None
def multiball(argv):
args = parse_args(argv)
## Command mode
if (args):
if (not args.all and (len(args.target) <= 0)):
print('Need either --all or at least one --target')
return 1
if (len(args.command) <= 0):
print('Need at least one --command')
return 1
print(f"Multiball {__version__}.", style="alert")
mb = Multiball()
if (args.all):
mb.targethosts = mb.allhosts
else:
mb.targethosts = args.target
for command in args.command:
mb._print_targetlist()
command = ' '.join(command)
if (not args.confirm):
try:
mb.command_confirm('', args=f"Run `{command}`?")
except MBCommandAbort:
continue
mb._run_remote_command(command)
return 0
## Interactive mode
print(f"Welcome to Multiball {__version__}. type /help for help.", style="alert")
mb = Multiball()
while (True):
# loop on prompt
try:
mb.cmd_prompt()
except MBExit:
return
def main():
try:
sys.exit(multiball(sys.argv[1:]))
except EOFError:
sys.exit(0)
except Exception as inst:
# fixme log exception
print(f"Exception! {inst}", style="error")
print(traceback.format_exc())
sys.exit(-1)
if __name__ == "__main__":
main()