aboutsummaryrefslogtreecommitdiff
path: root/bin/modules/handler.py
diff options
context:
space:
mode:
authorPaul Oliver <contact@pauloliver.dev>2024-02-29 02:29:14 +0100
committerPaul Oliver <contact@pauloliver.dev>2024-02-29 02:29:14 +0100
commit2250b4db92bd272dbb1fd717eb791e293c17e37a (patch)
tree0bde7ed8a2ba8b2a04da629b0317568dea2a6b3c /bin/modules/handler.py
parentde427d319c699b8bed7ed73289b3698f13ac3acc (diff)
Store python modules on './bin/modules/' subdirectory. [#37]
Diffstat (limited to 'bin/modules/handler.py')
-rw-r--r--bin/modules/handler.py435
1 files changed, 435 insertions, 0 deletions
diff --git a/bin/modules/handler.py b/bin/modules/handler.py
new file mode 100644
index 0000000..3325391
--- /dev/null
+++ b/bin/modules/handler.py
@@ -0,0 +1,435 @@
+""" SALIS: Viewer/controller for the SALIS simulator.
+
+file: handler.py
+Author: Paul Oliver
+Email: paul.t.oliver.design@gmail.com
+
+This module should be considered the 'controller' part of the Salis simulator.
+It receives and parses all user input via keyboard and console commands. It
+also takes care of genome compilation (via genome files located on the
+'genomes' directory).
+
+An user may open the Salis console by pressing the 'c' key while in a running
+session. A nice quirk is the possibility to run python commands from within the
+Salis console. As an example, to get the memory size, an user could type:
+
+>>> exec output = self.__sim.lib.sal_mem_get_size()
+
+Note that 'output' denotes a storage variable that will get printed on the
+console response. This ability gives an user a whole lot of power, and should
+be used with care.
+"""
+
+import curses
+import os
+import time
+
+
+class Handler:
+ KEY_ESCAPE = 27
+ CYCLE_TIMEOUT = 0.1
+
+ def __init__(self, sim):
+ """ Handler constructor. Simply link this class to the main simulation
+ class and printer class and create symbol dictionary.
+ """
+ self.__sim = sim
+ self.__printer = sim.printer
+ self.__inst_dict = self.__get_inst_dict()
+ self.__min_commands = [
+ ord("M"),
+ ord(" "),
+ curses.KEY_RESIZE,
+ self.KEY_ESCAPE,
+ ]
+ self.console_history = []
+
+ # Set short delay for ESCAPE key (which is used to exit the simulator).
+ os.environ.setdefault("ESCDELAY", "25")
+
+ def process_cmd(self, cmd):
+ """ Process incoming commands from curses. Commands are received via
+ ncurses' getch() function, thus, they must be transformed into their
+ character representations with 'ord()'.
+ """
+ # If in minimal mode, only listen to a subset of commands.
+ if self.__sim.minimal and cmd not in self.__min_commands:
+ return
+
+ if cmd == self.KEY_ESCAPE:
+ self.__on_quit([None], save=True)
+ elif cmd == ord("M"):
+ self.__printer.screen.clear()
+ self.__sim.minimal = not self.__sim.minimal
+ elif cmd == ord(" "):
+ self.__sim.toggle_state()
+ elif cmd == curses.KEY_LEFT:
+ self.__printer.flip_page(-1)
+ elif cmd == curses.KEY_RIGHT:
+ self.__printer.flip_page(1)
+ elif cmd == curses.KEY_DOWN:
+ self.__printer.scroll_main(-1)
+ elif cmd == curses.KEY_UP:
+ self.__printer.scroll_main(1)
+ elif cmd == curses.KEY_RESIZE:
+ self.__printer.on_resize()
+ elif cmd == ord("X"):
+ self.__printer.toggle_hex()
+ elif cmd == ord("x"):
+ self.__printer.world.zoom_out()
+ elif cmd == ord("z"):
+ self.__printer.world.zoom_in()
+ elif cmd == ord("a"):
+ self.__printer.world.pan_left()
+ self.__printer.proc_scroll_left()
+ elif cmd == ord("d"):
+ self.__printer.world.pan_right()
+ self.__printer.proc_scroll_right()
+ elif cmd == ord("s"):
+ self.__printer.world.pan_down()
+ self.__printer.proc_scroll_down()
+ elif cmd == ord("w"):
+ self.__printer.world.pan_up()
+ self.__printer.proc_scroll_up()
+ elif cmd == ord("S"):
+ self.__printer.world.pan_down(fast=True)
+ self.__printer.proc_scroll_down(fast=True)
+ elif cmd == ord("W"):
+ self.__printer.world.pan_up(fast=True)
+ self.__printer.proc_scroll_up(fast=True)
+ elif cmd == ord("Q"):
+ self.__printer.world.pan_reset()
+ self.__printer.proc_scroll_vertical_reset()
+ elif cmd == ord("A"):
+ self.__printer.world.pan_reset()
+ self.__printer.proc_scroll_horizontal_reset()
+ elif cmd == ord("o"):
+ self.__printer.proc_select_prev()
+ elif cmd == ord("p"):
+ self.__printer.proc_select_next()
+ elif cmd == ord("f"):
+ self.__printer.proc_select_first()
+ elif cmd == ord("l"):
+ self.__printer.proc_select_last()
+ elif cmd == ord("k"):
+ self.__printer.proc_scroll_to_selected()
+ elif cmd == ord("g"):
+ self.__printer.proc_toggle_gene_view()
+ elif cmd == ord("i"):
+ self.__printer.world.toggle_ip_view()
+ elif cmd == ord("\n"):
+ self.__printer.run_cursor()
+ elif cmd == ord("c"):
+ self.__printer.run_console()
+ else:
+ # Check for numeric input. Number keys [1 to 0] cycle the
+ # simulation [2 ** ((n - 1) % 10] times.
+ try:
+ if chr(cmd).isdigit():
+ factor = int(chr(cmd))
+ factor = int(2 ** ((factor - 1) % 10))
+ self.__cycle_sim(factor)
+ except ValueError:
+ pass
+
+ def handle_console(self, command_raw):
+ """ Process console commands. We parse and check for input errors. Any
+ python exception messages are redirected to the console-response
+ window.
+ """
+ if command_raw:
+ command = command_raw.split()
+
+ try:
+ # Handle both python and self-thrown exceptions.
+ if command[0] in ["q", "quit"]:
+ self.__on_quit(command, save=True)
+ elif command[0] in ["q!", "quit!"]:
+ self.__on_quit(command, save=False)
+ elif command[0] in ["i", "input"]:
+ self.__on_input(command)
+ elif command[0] in ["c", "compile"]:
+ self.__on_compile(command)
+ elif command[0] in ["n", "new"]:
+ self.__on_new(command)
+ elif command[0] in ["k", "kill"]:
+ self.__on_kill(command)
+ elif command[0] in ["e", "exec"]:
+ self.__on_exec(command)
+ elif command[0] in ["s", "scroll"]:
+ self.__on_scroll(command)
+ elif command[0] in ["p", "process"]:
+ self.__on_proc_select(command)
+ elif command[0] in ["r", "rename"]:
+ self.__on_rename(command)
+ elif command[0] in ["save"]:
+ self.__on_save(command)
+ elif command[0] in ["a", "auto"]:
+ self.__on_set_autosave(command)
+ else:
+ # Raise if a non-existing command has been given.
+ self.__raise("Invalid command: '{}'".format(command[0]))
+ except BaseException as exep:
+ # We parse and redirect python exceptions to the error
+ # console-window.
+ message = str(exep).strip()
+ message = message[0].upper() + message[1:]
+ self.__printer.show_console_error(message)
+ finally:
+ # Store command on console history.
+ self.console_history.append(command_raw.strip())
+
+
+ ###############################
+ # Private methods
+ ###############################
+
+ def __raise(self, message):
+ """ Generic exception thrower. Throws a 'RuntimeError' initialized with
+ the given message.
+ """
+ raise RuntimeError("ERROR: {}".format(message))
+
+ def __respond(self, message):
+ """ Generic console responder. Throws a 'RuntimeError' initialized with
+ the given message.
+ """
+ raise RuntimeError(message)
+
+ def __cycle_sim(self, factor):
+ """ Simply cycle Salis 'factor' number of times. Do not cycle for more
+ than a given amount of time.
+ """
+ time_max = time.time() + self.CYCLE_TIMEOUT
+
+ for _ in range(factor):
+ self.__sim.lib.sal_main_cycle()
+ self.__sim.check_autosave()
+
+ if time.time() > time_max:
+ break
+
+ def __get_inst_dict(self):
+ """ Transform the instruction list of the printer module into a
+ dictionary that's more useful for genome compilation. Instruction
+ symbols are keys, values are the actual byte representation.
+ """
+ inst_dict = {}
+
+ for i, inst in enumerate(self.__printer.inst_list):
+ inst_dict[inst[1]] = i
+
+ return inst_dict
+
+ def __on_quit(self, command, save):
+ """ Exit simulation. We can choose whether to save the simulation into
+ a save file or not.
+ """
+ if len(command) > 1:
+ self.__raise("Invalid parameters for '{}'".format(command[0]))
+
+ if save:
+ self.__sim.lib.sal_main_save(
+ self.__sim.save_file_path.encode("utf-8")
+ )
+
+ self.__sim.exit()
+
+ def __write_genome(self, genome, address_list):
+ """ Write genome stream into a given list of memory addresses. All
+ addresses must be valid or an exception is thrown.
+ """
+ # All addresses we will write to must be valid.
+ for base_addr in address_list:
+ address = int(base_addr, 0)
+
+ for _ in range(len(genome)):
+ if not self.__sim.lib.sal_mem_is_address_valid(address):
+ self.__raise("Address '{}' is invalid".format(address))
+
+ address += 1
+
+ # All looks well! Let's compile the genome into memory.
+ for base_addr in address_list:
+ address = int(base_addr, 0)
+
+ for symbol in genome:
+ self.__sim.lib.sal_mem_set_inst(
+ address, self.__inst_dict[symbol]
+ )
+ address += 1
+
+ def __on_input(self, command):
+ """ Compile organism from user typed input. Compilation can only occur
+ on valid memory addresses. An exception will be thrown when trying to
+ write into non-valid address or when input stream is invalid.
+ """
+ if len(command) < 3:
+ self.__raise("Invalid parameters for '{}'".format(command[0]))
+
+ # All characters in file must be actual instruction symbols.
+ for character in command[1]:
+ if character not in self.__inst_dict:
+ self.__raise("Invalid symbol '{}' found on stream".format(
+ character
+ ))
+
+ # All looks well, Let's write the genome into memory.
+ self.__write_genome(command[1], command[2:])
+
+ def __on_compile(self, command):
+ """ Compile organism from source genome file. Genomes must be placed on
+ the './genomes' directory. Compilation can only occur on valid memory
+ addresses. An exception will be thrown when trying to write into
+ non-valid address or when genome file is invalid.
+ """
+ if len(command) < 3:
+ self.__raise("Invalid parameters for '{}'".format(command[0]))
+
+ # Open genome file for compilation.
+ gen_file = os.path.join(self.__sim.path, "genomes", command[1])
+
+ with open(gen_file, "r") as f:
+ genome = f.read().strip()
+
+ # Entire genome must be written on a single line.
+ if "\n" in genome:
+ self.__raise("Newline detected on '{}'".format(gen_file))
+
+ # All characters in file must be actual instruction symbols.
+ for character in genome:
+ if character not in self.__inst_dict:
+ self.__raise("Invalid symbol '{}' found on '{}'".format(
+ character, gen_file
+ ))
+
+ # All looks well, Let's write the genome into memory.
+ self.__write_genome(genome, command[2:])
+
+ def __on_new(self, command):
+ """ Instantiate new organism of given size on given address. These
+ memory areas must be free and valid or an exception is thrown.
+ """
+ if len(command) < 3:
+ self.__raise("Invalid parameters for '{}'".format(command[0]))
+
+ # Check that all addresses we will allocate are free and valid.
+ for base_addr in command[2:]:
+ address = int(base_addr, 0)
+
+ for _ in range(int(command[1])):
+ if not self.__sim.lib.sal_mem_is_address_valid(address):
+ self.__raise("Address '{}' is invalid".format(address))
+ elif self.__sim.lib.sal_mem_is_allocated(address):
+ self.__raise("Address '{}' is allocated".format(address))
+
+ address += 1
+
+ # All looks well! Let's instantiate our new organism.
+ for base_addr in command[2:]:
+ address = int(base_addr, 0)
+ size = int(command[1], 0)
+ self.__sim.lib.sal_proc_create(address, size)
+
+ def __on_kill(self, command):
+ """ Kill organism on bottom of reaper queue.
+ """
+ if len(command) > 1:
+ self.__raise("Invalid parameters for '{}'".format(command[0]))
+
+ # Call proc kill function only if there's any organisms to kill.
+ if not self.__sim.lib.sal_proc_get_count():
+ self.__raise("No organisms currently alive")
+ else:
+ self.__sim.lib.sal_proc_kill()
+
+ def __on_exec(self, command):
+ """ Allow a user to execute a python command from within the console.
+ Using this is very hack-ish, and not recommended unless you're certain
+ of what you're doing!
+ """
+ if len(command) < 2:
+ self.__raise(
+ "'{}' must be followed by an executable string".format(
+ command[0]
+ )
+ )
+
+ # User may query any simulation variable or status and the console will
+ # respond. For example, to query memory size or order, type one of the
+ # following:
+ #
+ # >>> exec output = self.__sim.lib.sal_mem_get_size()
+ # >>> exec output = self.__sim.lib.sal_mem_get_order()
+ #
+ output = {}
+ exec(" ".join(command[1:]), locals(), output)
+
+ if output:
+ self.__respond("EXEC RESPONDS: {}".format(str(output)))
+
+ def __on_scroll(self, command):
+ """ We can scroll to a specific process (on PROCESS view) or to a
+ specific world address (on WORLD view) via the console.
+ """
+ if len(command) != 2:
+ self.__raise("Invalid parameters for '{}'".format(command[0]))
+
+ target = int(command[1], 0)
+
+ # If on PROCESS page, scroll to given process.
+ if self.__printer.current_page == "PROCESS":
+ if target < self.__sim.lib.sal_proc_get_capacity():
+ self.__printer.proc_scroll_to(target)
+ else:
+ self.__raise("No process with ID '{}' found".format(target))
+ elif self.__printer.current_page == "WORLD":
+ if self.__sim.lib.sal_mem_is_address_valid(target):
+ self.__printer.world.scroll_to(target)
+ else:
+ self.__raise("Address '{}' is invalid".format(address))
+ else:
+ self.__raise("'{}' must be called on PROCESS or WORLD page".format(
+ command[0])
+ )
+
+ def __on_proc_select(self, command):
+ """ Select a specific process (on PROCESS or WORLD page).
+ """
+ if len(command) != 2:
+ self.__raise("Invalid parameters for '{}'".format(command[0]))
+
+ target = int(command[1], 0)
+
+ # If on PROCESS page, scroll to given process.
+ if target < self.__sim.lib.sal_proc_get_capacity():
+ self.__printer.proc_select_by_id(target)
+ else:
+ self.__raise("No process with ID '{}' found".format(target))
+
+ def __on_rename(self, command):
+ """ Set a new simulation name. Future auto-saved files will use this
+ name as prefix.
+ """
+ if len(command) != 2:
+ self.__raise("Invalid parameters for '{}'".format(command[0]))
+
+ self.__sim.rename(command[1])
+
+ def __on_save(self, command):
+ """ Save simulation on its current state.
+ """
+ if len(command) != 1:
+ self.__raise("Invalid parameters for '{}'".format(command[0]))
+
+ self.__sim.lib.sal_main_save(self.__sim.save_file_path.encode("utf-8"))
+
+ def __on_set_autosave(self, command):
+ """ Set the simulation's auto save interval. Provide any integer
+ between 0 and (2**32 - 1). If zero is provided, auto saving will be
+ disabled.
+ """
+ if len(command) != 2:
+ self.__raise("Invalid parameters for '{}'".format(command[0]))
+
+ self.__sim.set_autosave(int(command[1], 0))