Colorizing Start!

- Add styles and colors to most output.
- Update TODO list
- Add ability to interrupt "waiting" condition with Control-C
This commit is contained in:
2025-10-04 17:41:01 -07:00
parent 99f5bff2d8
commit 614ea8b964
5 changed files with 108 additions and 50 deletions

View File

@ -1,4 +1,8 @@
TODO TODO
- fix handling of control-c
- redo the command loop to make some fuckin sense why do we use eceptions to do modal stuff
- make styles optional
- expose styles in config file or personal config
- add local variable support that can be interpolated into commands - add local variable support that can be interpolated into commands
- keep track of previous command outputs for last command, and any previous command with target list and command line - keep track of previous command outputs for last command, and any previous command with target list and command line
- Allow any given command to be tagged with a label, and the outputs would be stored in the output dictionary under that name (for scripting); - Allow any given command to be tagged with a label, and the outputs would be stored in the output dictionary under that name (for scripting);

View File

@ -1 +1 @@
__version__ = "0.5.0" __version__ = "0.6.0"

View File

@ -11,24 +11,27 @@ import logging
import os import os
import pprint import pprint
import re import re
import shutil
import shlex import shlex
import shutil
import sys import sys
import traceback import traceback
from itertools import islice, zip_longest from itertools import islice, zip_longest
from pathlib import Path from pathlib import Path
from typing import Iterable, Optional, Sequence, Union from typing import Iterable, Optional, Sequence, Union
import yaml
import prompt_toolkit import prompt_toolkit
import yaml
from prompt_toolkit.completion import WordCompleter from prompt_toolkit.completion import WordCompleter
from prompt_toolkit.history import InMemoryHistory, FileHistory from prompt_toolkit.history import FileHistory, InMemoryHistory
from prompt_toolkit.shortcuts import PromptSession from prompt_toolkit.shortcuts import PromptSession
from prompt_toolkit.styles import Style
from . import __version__ from . import __version__
from .fabtools import HostSet from .fabtools import HostSet
from .tools import load_sshconfig_server_list, load_bare_server_list, load_config from .tools import (load_bare_server_list, load_config,
load_sshconfig_server_list)
from .style import print, STYLES
class MultiballCommandException(BaseException): class MultiballCommandException(BaseException):
pass pass
@ -113,19 +116,19 @@ class Multiball:
self.ssh_config = Path(sc).expanduser() self.ssh_config = Path(sc).expanduser()
newhosts = load_sshconfig_server_list( newhosts = load_sshconfig_server_list(
Path(sc).expanduser()) Path(sc).expanduser())
print(f"Loaded {len(newhosts)} from {sc}") print(f"<message>Loaded {len(newhosts)} from {sc}</message>")
self.allhosts.update(newhosts) self.allhosts.update(newhosts)
if bareconfigs := config.get('hostlists'): if bareconfigs := config.get('hostlists'):
for sc in bareconfigs.split(','): for sc in bareconfigs.split(','):
newhosts = load_bare_server_list(Path(sc).expanduser()) newhosts = load_bare_server_list(Path(sc).expanduser())
print(f"Loaded {len(newhosts)} from {sc}") print(f"<message>Loaded {len(newhosts)} from {sc}</message>")
self.allhosts.update(newhosts) self.allhosts.update(newhosts)
else: else:
self.ssh_config = Path("~/.ssh/ssh_config").expanduser() self.ssh_config = Path("~/.ssh/ssh_config").expanduser()
print( print(
f"Warning: no [config] section in {configFile}. Using fallbacks.") f"<error>Warning: no [config] section in {configFile}. Using fallbacks.</error>")
self.allhosts = load_sshconfig_server_list(self.ssh_config) self.allhosts = load_sshconfig_server_list(self.ssh_config)
# setup state and environment # setup state and environment
@ -161,17 +164,17 @@ class Multiball:
""" """
if args.lower().startswith("off"): if args.lower().startswith("off"):
self.safety = False self.safety = False
print("** Safety OFF. Now unsafe.") print("<unsafe>** Safety OFF. Now unsafe.</unsafe>")
else: else:
self.safety = True self.safety = True
print("** Safety ON.") print("<safe>** Safety ON.</safe>")
def command_echo(self, command, args): def command_echo(self, command, args):
""" """
Echo string Echo string
fixme put string interpolation here so that state variables can be printed fixme put string interpolation here so that state variables can be printed
""" """
print(args) print(f"<message>{args}</message>")
def command_environment(self, command, args): def command_environment(self, command, args):
""" """
@ -200,12 +203,14 @@ class Multiball:
""" """
Prompt user with a yes/no question. Default responses result in a CommandAbort exception. Prompt user with a yes/no question. Default responses result in a CommandAbort exception.
""" """
result = prompt_toolkit.prompt( result = prompt_toolkit.prompt([("class:promptyn",
"<" + args + " [Yys/Nn] > ") "<" + args + " [Yys/Nn]>"),
("class:message",
" ")], style=STYLES)
if result and (result[0] in 'Yys'): if result and (result[0] in 'Yys'):
return True return True
else: else:
print("(no)") print("<message>(no)</message>")
raise MBCommandAbort raise MBCommandAbort
def command_arguments(self, command, args): def command_arguments(self, command, args):
@ -217,9 +222,9 @@ class Multiball:
""" """
if not self._is_scripting or not self._script_frame_arguments: if not self._is_scripting or not self._script_frame_arguments:
if not self._is_scripting: if not self._is_scripting:
print("does nothing outside of script") print("<alert>does nothing outside of script</alert>")
else: else:
print("arguments required") print("<error>arguments required</error>")
raise MBCommandAbort raise MBCommandAbort
return return
@ -230,9 +235,9 @@ class Multiball:
""" """
if not self._is_scripting or self.safety: if not self._is_scripting or self.safety:
if not self._is_scripting: if not self._is_scripting:
print("does nothing outside of script") print("<alert>does nothing outside of script</alert>")
else: else:
print("cannot run in safety mode") print("<error>cannot run in safety mode</error>")
raise MBCommandAbort raise MBCommandAbort
return return
@ -245,7 +250,7 @@ class Multiball:
# FIXME: We need to make aliases store multiple entries, and save should take the whole alias not just # FIXME: We need to make aliases store multiple entries, and save should take the whole alias not just
# the last command. # the last command.
if (not self.last_run): if (not self.last_run):
print("Nothing to log.") print("<alert>Nothing to log.</alert>")
return return
tstamp = datetime.datetime.now().strftime("%Y-%m-%d-%H%M%Z") tstamp = datetime.datetime.now().strftime("%Y-%m-%d-%H%M%Z")
outfilename = f"{tstamp}.multiball.log" outfilename = f"{tstamp}.multiball.log"
@ -254,7 +259,7 @@ class Multiball:
if (not outfilename.endswith(".log")): if (not outfilename.endswith(".log")):
outfilename = outfilename + ".log" outfilename = outfilename + ".log"
print(f"Writing {outfilename} for command `{self.last_run['cmd']}`.") print(f"<alert>Writing {outfilename} for command `{self.last_run['cmd']}`.</alert>")
with (open(outfilename, 'w')) as out: with (open(outfilename, 'w')) as out:
out.writelines([f"command: {self.last_run['cmd']}\n", out.writelines([f"command: {self.last_run['cmd']}\n",
"-------------------------------------------\n" "-------------------------------------------\n"
@ -265,12 +270,13 @@ class Multiball:
def _print_targetlist(self, tlist=None, keyword='Targeting'): def _print_targetlist(self, tlist=None, keyword='Targeting'):
if not tlist: if not tlist:
tlist = self.targethosts tlist = self.targethosts
print(f"{keyword} {len(self.targethosts)} hosts:") print(f"<message>{keyword} {len(self.targethosts)} hosts:</message>")
print(' '.join(tlist)) print(' '.join(tlist))
def _print_help(self): def _print_help(self):
for item in COMMANDS.values(): for item in COMMANDS.values():
print(', '.join(item[0]) + ' - ' + item[1]) m = ', '.join(item[0]) + ' - ' + item[1]
print(f"<bright>{m}</bright>")
print("Other entries run command on targets.") print("Other entries run command on targets.")
def _run_command(self, command: str): def _run_command(self, command: str):
@ -382,10 +388,10 @@ class Multiball:
except MBCommandRun: except MBCommandRun:
# remote command! # remote command!
if self.safety: if self.safety:
print(f"would run `{command}` on targethosts") print(f"<bright>would run `{command}` on targethosts</bright>")
print("Safety is ON. `/safety off` to turn off.") print("<alert>Safety is ON. `/safety off` to turn off.</alert>")
else: else:
print(f"running `{command}` on targethosts") print(f"<bright>running `{command}` on targethosts</bright>")
self._run_remote_command(command) self._run_remote_command(command)
def cmd_prompt(self): def cmd_prompt(self):
@ -397,10 +403,14 @@ class Multiball:
completer = WordCompleter(list(self.allhosts) + allcommands) completer = WordCompleter(list(self.allhosts) + allcommands)
if self.safety: if self.safety:
prompt = "!" prompt = "!"
pclass = "class:safe"
else: else:
prompt = '$' prompt = '$'
command = self.session.prompt( pclass = "class:unsafe"
f'{len(self.targethosts)} -> {prompt} ', completer=completer) command = self.session.prompt([("class:count", f'{len(self.targethosts)}'),
("class:prompt", '->'),
(pclass, prompt),
("class:default", " ")], completer=completer, style=STYLES)
try: try:
self.do_cmd(command) self.do_cmd(command)
except MBCommandAbort: except MBCommandAbort:
@ -409,7 +419,7 @@ class Multiball:
def multiball(*args, **kwargs): def multiball(*args, **kwargs):
print(f"Welcome to Multiball {__version__}. type /help for help.") print(f"<alert>Welcome to Multiball {__version__}. type /help for help.</alert>")
mb = Multiball() mb = Multiball()
# loop on prompt # loop on prompt
while (True): while (True):
@ -426,7 +436,7 @@ def main():
sys.exit(0) sys.exit(0)
except Exception as inst: except Exception as inst:
# fixme log exception # fixme log exception
print(f"Exception! {inst}") print(f"<error>Exception! {inst}</error")
print(traceback.format_exc()) print(traceback.format_exc())
sys.exit(-1) sys.exit(-1)

View File

@ -9,6 +9,7 @@ import paramiko
from fabric2 import Config, Connection, SerialGroup, ThreadingGroup from fabric2 import Config, Connection, SerialGroup, ThreadingGroup
from tqdm import tqdm from tqdm import tqdm
from .style import print_style
class RunResult(Enum): class RunResult(Enum):
Success = 0 Success = 0
@ -20,6 +21,7 @@ class RunResult(Enum):
def thread_run(connection, command, result_lock, result_queue): def thread_run(connection, command, result_lock, result_queue):
runresult = RunResult.Success runresult = RunResult.Success
try: try:
res = connection.open()
res = connection.run(command, warn=True, hide=True) res = connection.run(command, warn=True, hide=True)
if (res.exited != 0): if (res.exited != 0):
runresult = RunResult.CommandFailure runresult = RunResult.CommandFailure
@ -32,15 +34,22 @@ def thread_run(connection, command, result_lock, result_queue):
except Exception as inst: except Exception as inst:
res = f"Unknown error for host: {inst}" res = f"Unknown error for host: {inst}"
runresult = RunResult.Error runresult = RunResult.Error
finally:
connection.close()
with result_lock: with result_lock:
result_queue.append((connection, res, runresult)) result_queue.append((connection, res, runresult))
# A set of hosts we can target with a series of commands and also can collect output of each command # A set of hosts we can target with a series of commands and also can collect output of each command
class HostSet: class HostSet:
def _lprint(self, *args): def _lprint(self, *args, **kwargs):
s = " ".join([str(a) for a in args]) + '\n' s = " ".join([str(a) for a in args]) + '\n'
self.results.write(s) self.results.write(s)
if ("style" in kwargs):
print_style(s.rstrip(), style=kwargs["style"])
else:
print(*args) print(*args)
def __init__(self, hostlist: list, ssh_config_path: Path=None): def __init__(self, hostlist: list, ssh_config_path: Path=None):
@ -62,11 +71,13 @@ class HostSet:
for connection in self.connections: for connection in self.connections:
t = Thread(target=thread_run, args=[connection, command, reslock, resq]) t = Thread(target=thread_run, args=[connection, command, reslock, resq])
t.host = connection.original_host t.host = connection.original_host
t.connection = connection
t.start() t.start()
threads.append(t) threads.append(t)
# display status about threads, and join them when they finish # display status about threads, and join them when they finish
while (True): while (True):
try:
nt = [] nt = []
for i in threads: for i in threads:
if not i.is_alive(): if not i.is_alive():
@ -81,8 +92,15 @@ class HostSet:
else: else:
if ((time.time() - tupdate) > 10.0): if ((time.time() - tupdate) > 10.0):
tupdate = time.time() tupdate = time.time()
print("Still waiting on ", [thread.host for thread in threads]) m = "Still waiting on " + str([thread.host for thread in threads])
print_style(m, style="alert")
# FIXME have the thread killed if it's still waiting after $TIMEOUT # FIXME have the thread killed if it's still waiting after $TIMEOUT
except KeyboardInterrupt as inst:
# cancel remaining servers
print_style("(break)", style="error")
for thread in threads:
thread.connection.close()
prog.close() prog.close()
# Gather up results by output # Gather up results by output
@ -109,10 +127,10 @@ class HostSet:
self._lprint('-----> [{}]'.format(' '.join(connection.original_host for connection in connections))) self._lprint('-----> [{}]'.format(' '.join(connection.original_host for connection in connections)))
self._lprint(result) self._lprint(result)
self._lprint('#####> RUN SUMMARY <#####') self._lprint('#####> RUN SUMMARY <#####', style="alert")
if (success): if (success):
self._lprint(' Succeeded: ', ', '.join(connection.original_host for connection in success)) self._lprint(' Succeeded: ', ', '.join(connection.original_host for connection in success), style="good")
if (commandfail): if (commandfail):
self._lprint(' Command fail: ', ', '.join(connection.original_host for connection in commandfail)) self._lprint(' Command fail: ', ', '.join(connection.original_host for connection in commandfail), style="warning")
if (error): if (error):
self._lprint('Failed with Error: ', ','.join(connection.original_host for connection in error)) self._lprint('Failed with Error: ', ','.join(connection.original_host for connection in error), style="error")

26
multiball/style.py Normal file
View File

@ -0,0 +1,26 @@
from prompt_toolkit import print_formatted_text, HTML
from prompt_toolkit.styles import Style
from prompt_toolkit.formatted_text import FormattedText
STYLES = Style.from_dict({
"safe": "#012 bg:#00ff88",
"unsafe": "#012 bg:#ff2200",
"message": "#666699 bg:#000",
"count": "#6666ff bg:#000",
"prompt": "#777700 bg:#000",
"alert": "bold #f0f bg:#000",
"error": "bold #f00 bg:#000",
"warning": "bold #f50 bg:#000",
"good": "bold #090 bg:#000",
"promptyn": "bold #000 bg:#ff0",
"default": "#880 bg:#000",
"bright": "#ff0 bg:#000",
})
def print(message):
print_formatted_text(HTML(f"<default>{message}</default>"), style=STYLES)
def print_style(message, style):
print_formatted_text(FormattedText([(f"class:{style}", message)]), style=STYLES)