From 2250b4db92bd272dbb1fd717eb791e293c17e37a Mon Sep 17 00:00:00 2001 From: Paul Oliver Date: Thu, 29 Feb 2024 02:29:14 +0100 Subject: Store python modules on './bin/modules/' subdirectory. [#37] --- .gitignore | 1 + bin/handler.py | 435 ------------------------ bin/modules/__init__.py | 0 bin/modules/handler.py | 435 ++++++++++++++++++++++++ bin/modules/printer.py | 858 ++++++++++++++++++++++++++++++++++++++++++++++++ bin/modules/world.py | 279 ++++++++++++++++ bin/printer.py | 858 ------------------------------------------------ bin/salis.py | 4 +- bin/world.py | 279 ---------------- 9 files changed, 1575 insertions(+), 1574 deletions(-) delete mode 100644 bin/handler.py create mode 100644 bin/modules/__init__.py create mode 100644 bin/modules/handler.py create mode 100644 bin/modules/printer.py create mode 100644 bin/modules/world.py delete mode 100644 bin/printer.py delete mode 100644 bin/world.py diff --git a/.gitignore b/.gitignore index b5cad49..a344b50 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ bin/__pycache__/* +bin/modules/__pycache__/* bin/common/pipe bin/error.log bin/lib/libsalis-*.so diff --git a/bin/handler.py b/bin/handler.py deleted file mode 100644 index 3325391..0000000 --- a/bin/handler.py +++ /dev/null @@ -1,435 +0,0 @@ -""" SALIS: Viewer/controller for the SALIS simulator. - -file: handler.py -Author: Paul Oliver -Email: paul.t.oliver.design@gmail.com - -This module should be considered the 'controller' part of the Salis simulator. -It receives and parses all user input via keyboard and console commands. It -also takes care of genome compilation (via genome files located on the -'genomes' directory). - -An user may open the Salis console by pressing the 'c' key while in a running -session. A nice quirk is the possibility to run python commands from within the -Salis console. As an example, to get the memory size, an user could type: - ->>> exec output = self.__sim.lib.sal_mem_get_size() - -Note that 'output' denotes a storage variable that will get printed on the -console response. This ability gives an user a whole lot of power, and should -be used with care. -""" - -import curses -import os -import time - - -class Handler: - KEY_ESCAPE = 27 - CYCLE_TIMEOUT = 0.1 - - def __init__(self, sim): - """ Handler constructor. Simply link this class to the main simulation - class and printer class and create symbol dictionary. - """ - self.__sim = sim - self.__printer = sim.printer - self.__inst_dict = self.__get_inst_dict() - self.__min_commands = [ - ord("M"), - ord(" "), - curses.KEY_RESIZE, - self.KEY_ESCAPE, - ] - self.console_history = [] - - # Set short delay for ESCAPE key (which is used to exit the simulator). - os.environ.setdefault("ESCDELAY", "25") - - def process_cmd(self, cmd): - """ Process incoming commands from curses. Commands are received via - ncurses' getch() function, thus, they must be transformed into their - character representations with 'ord()'. - """ - # If in minimal mode, only listen to a subset of commands. - if self.__sim.minimal and cmd not in self.__min_commands: - return - - if cmd == self.KEY_ESCAPE: - self.__on_quit([None], save=True) - elif cmd == ord("M"): - self.__printer.screen.clear() - self.__sim.minimal = not self.__sim.minimal - elif cmd == ord(" "): - self.__sim.toggle_state() - elif cmd == curses.KEY_LEFT: - self.__printer.flip_page(-1) - elif cmd == curses.KEY_RIGHT: - self.__printer.flip_page(1) - elif cmd == curses.KEY_DOWN: - self.__printer.scroll_main(-1) - elif cmd == curses.KEY_UP: - self.__printer.scroll_main(1) - elif cmd == curses.KEY_RESIZE: - self.__printer.on_resize() - elif cmd == ord("X"): - self.__printer.toggle_hex() - elif cmd == ord("x"): - self.__printer.world.zoom_out() - elif cmd == ord("z"): - self.__printer.world.zoom_in() - elif cmd == ord("a"): - self.__printer.world.pan_left() - self.__printer.proc_scroll_left() - elif cmd == ord("d"): - self.__printer.world.pan_right() - self.__printer.proc_scroll_right() - elif cmd == ord("s"): - self.__printer.world.pan_down() - self.__printer.proc_scroll_down() - elif cmd == ord("w"): - self.__printer.world.pan_up() - self.__printer.proc_scroll_up() - elif cmd == ord("S"): - self.__printer.world.pan_down(fast=True) - self.__printer.proc_scroll_down(fast=True) - elif cmd == ord("W"): - self.__printer.world.pan_up(fast=True) - self.__printer.proc_scroll_up(fast=True) - elif cmd == ord("Q"): - self.__printer.world.pan_reset() - self.__printer.proc_scroll_vertical_reset() - elif cmd == ord("A"): - self.__printer.world.pan_reset() - self.__printer.proc_scroll_horizontal_reset() - elif cmd == ord("o"): - self.__printer.proc_select_prev() - elif cmd == ord("p"): - self.__printer.proc_select_next() - elif cmd == ord("f"): - self.__printer.proc_select_first() - elif cmd == ord("l"): - self.__printer.proc_select_last() - elif cmd == ord("k"): - self.__printer.proc_scroll_to_selected() - elif cmd == ord("g"): - self.__printer.proc_toggle_gene_view() - elif cmd == ord("i"): - self.__printer.world.toggle_ip_view() - elif cmd == ord("\n"): - self.__printer.run_cursor() - elif cmd == ord("c"): - self.__printer.run_console() - else: - # Check for numeric input. Number keys [1 to 0] cycle the - # simulation [2 ** ((n - 1) % 10] times. - try: - if chr(cmd).isdigit(): - factor = int(chr(cmd)) - factor = int(2 ** ((factor - 1) % 10)) - self.__cycle_sim(factor) - except ValueError: - pass - - def handle_console(self, command_raw): - """ Process console commands. We parse and check for input errors. Any - python exception messages are redirected to the console-response - window. - """ - if command_raw: - command = command_raw.split() - - try: - # Handle both python and self-thrown exceptions. - if command[0] in ["q", "quit"]: - self.__on_quit(command, save=True) - elif command[0] in ["q!", "quit!"]: - self.__on_quit(command, save=False) - elif command[0] in ["i", "input"]: - self.__on_input(command) - elif command[0] in ["c", "compile"]: - self.__on_compile(command) - elif command[0] in ["n", "new"]: - self.__on_new(command) - elif command[0] in ["k", "kill"]: - self.__on_kill(command) - elif command[0] in ["e", "exec"]: - self.__on_exec(command) - elif command[0] in ["s", "scroll"]: - self.__on_scroll(command) - elif command[0] in ["p", "process"]: - self.__on_proc_select(command) - elif command[0] in ["r", "rename"]: - self.__on_rename(command) - elif command[0] in ["save"]: - self.__on_save(command) - elif command[0] in ["a", "auto"]: - self.__on_set_autosave(command) - else: - # Raise if a non-existing command has been given. - self.__raise("Invalid command: '{}'".format(command[0])) - except BaseException as exep: - # We parse and redirect python exceptions to the error - # console-window. - message = str(exep).strip() - message = message[0].upper() + message[1:] - self.__printer.show_console_error(message) - finally: - # Store command on console history. - self.console_history.append(command_raw.strip()) - - - ############################### - # Private methods - ############################### - - def __raise(self, message): - """ Generic exception thrower. Throws a 'RuntimeError' initialized with - the given message. - """ - raise RuntimeError("ERROR: {}".format(message)) - - def __respond(self, message): - """ Generic console responder. Throws a 'RuntimeError' initialized with - the given message. - """ - raise RuntimeError(message) - - def __cycle_sim(self, factor): - """ Simply cycle Salis 'factor' number of times. Do not cycle for more - than a given amount of time. - """ - time_max = time.time() + self.CYCLE_TIMEOUT - - for _ in range(factor): - self.__sim.lib.sal_main_cycle() - self.__sim.check_autosave() - - if time.time() > time_max: - break - - def __get_inst_dict(self): - """ Transform the instruction list of the printer module into a - dictionary that's more useful for genome compilation. Instruction - symbols are keys, values are the actual byte representation. - """ - inst_dict = {} - - for i, inst in enumerate(self.__printer.inst_list): - inst_dict[inst[1]] = i - - return inst_dict - - def __on_quit(self, command, save): - """ Exit simulation. We can choose whether to save the simulation into - a save file or not. - """ - if len(command) > 1: - self.__raise("Invalid parameters for '{}'".format(command[0])) - - if save: - self.__sim.lib.sal_main_save( - self.__sim.save_file_path.encode("utf-8") - ) - - self.__sim.exit() - - def __write_genome(self, genome, address_list): - """ Write genome stream into a given list of memory addresses. All - addresses must be valid or an exception is thrown. - """ - # All addresses we will write to must be valid. - for base_addr in address_list: - address = int(base_addr, 0) - - for _ in range(len(genome)): - if not self.__sim.lib.sal_mem_is_address_valid(address): - self.__raise("Address '{}' is invalid".format(address)) - - address += 1 - - # All looks well! Let's compile the genome into memory. - for base_addr in address_list: - address = int(base_addr, 0) - - for symbol in genome: - self.__sim.lib.sal_mem_set_inst( - address, self.__inst_dict[symbol] - ) - address += 1 - - def __on_input(self, command): - """ Compile organism from user typed input. Compilation can only occur - on valid memory addresses. An exception will be thrown when trying to - write into non-valid address or when input stream is invalid. - """ - if len(command) < 3: - self.__raise("Invalid parameters for '{}'".format(command[0])) - - # All characters in file must be actual instruction symbols. - for character in command[1]: - if character not in self.__inst_dict: - self.__raise("Invalid symbol '{}' found on stream".format( - character - )) - - # All looks well, Let's write the genome into memory. - self.__write_genome(command[1], command[2:]) - - def __on_compile(self, command): - """ Compile organism from source genome file. Genomes must be placed on - the './genomes' directory. Compilation can only occur on valid memory - addresses. An exception will be thrown when trying to write into - non-valid address or when genome file is invalid. - """ - if len(command) < 3: - self.__raise("Invalid parameters for '{}'".format(command[0])) - - # Open genome file for compilation. - gen_file = os.path.join(self.__sim.path, "genomes", command[1]) - - with open(gen_file, "r") as f: - genome = f.read().strip() - - # Entire genome must be written on a single line. - if "\n" in genome: - self.__raise("Newline detected on '{}'".format(gen_file)) - - # All characters in file must be actual instruction symbols. - for character in genome: - if character not in self.__inst_dict: - self.__raise("Invalid symbol '{}' found on '{}'".format( - character, gen_file - )) - - # All looks well, Let's write the genome into memory. - self.__write_genome(genome, command[2:]) - - def __on_new(self, command): - """ Instantiate new organism of given size on given address. These - memory areas must be free and valid or an exception is thrown. - """ - if len(command) < 3: - self.__raise("Invalid parameters for '{}'".format(command[0])) - - # Check that all addresses we will allocate are free and valid. - for base_addr in command[2:]: - address = int(base_addr, 0) - - for _ in range(int(command[1])): - if not self.__sim.lib.sal_mem_is_address_valid(address): - self.__raise("Address '{}' is invalid".format(address)) - elif self.__sim.lib.sal_mem_is_allocated(address): - self.__raise("Address '{}' is allocated".format(address)) - - address += 1 - - # All looks well! Let's instantiate our new organism. - for base_addr in command[2:]: - address = int(base_addr, 0) - size = int(command[1], 0) - self.__sim.lib.sal_proc_create(address, size) - - def __on_kill(self, command): - """ Kill organism on bottom of reaper queue. - """ - if len(command) > 1: - self.__raise("Invalid parameters for '{}'".format(command[0])) - - # Call proc kill function only if there's any organisms to kill. - if not self.__sim.lib.sal_proc_get_count(): - self.__raise("No organisms currently alive") - else: - self.__sim.lib.sal_proc_kill() - - def __on_exec(self, command): - """ Allow a user to execute a python command from within the console. - Using this is very hack-ish, and not recommended unless you're certain - of what you're doing! - """ - if len(command) < 2: - self.__raise( - "'{}' must be followed by an executable string".format( - command[0] - ) - ) - - # User may query any simulation variable or status and the console will - # respond. For example, to query memory size or order, type one of the - # following: - # - # >>> exec output = self.__sim.lib.sal_mem_get_size() - # >>> exec output = self.__sim.lib.sal_mem_get_order() - # - output = {} - exec(" ".join(command[1:]), locals(), output) - - if output: - self.__respond("EXEC RESPONDS: {}".format(str(output))) - - def __on_scroll(self, command): - """ We can scroll to a specific process (on PROCESS view) or to a - specific world address (on WORLD view) via the console. - """ - if len(command) != 2: - self.__raise("Invalid parameters for '{}'".format(command[0])) - - target = int(command[1], 0) - - # If on PROCESS page, scroll to given process. - if self.__printer.current_page == "PROCESS": - if target < self.__sim.lib.sal_proc_get_capacity(): - self.__printer.proc_scroll_to(target) - else: - self.__raise("No process with ID '{}' found".format(target)) - elif self.__printer.current_page == "WORLD": - if self.__sim.lib.sal_mem_is_address_valid(target): - self.__printer.world.scroll_to(target) - else: - self.__raise("Address '{}' is invalid".format(address)) - else: - self.__raise("'{}' must be called on PROCESS or WORLD page".format( - command[0]) - ) - - def __on_proc_select(self, command): - """ Select a specific process (on PROCESS or WORLD page). - """ - if len(command) != 2: - self.__raise("Invalid parameters for '{}'".format(command[0])) - - target = int(command[1], 0) - - # If on PROCESS page, scroll to given process. - if target < self.__sim.lib.sal_proc_get_capacity(): - self.__printer.proc_select_by_id(target) - else: - self.__raise("No process with ID '{}' found".format(target)) - - def __on_rename(self, command): - """ Set a new simulation name. Future auto-saved files will use this - name as prefix. - """ - if len(command) != 2: - self.__raise("Invalid parameters for '{}'".format(command[0])) - - self.__sim.rename(command[1]) - - def __on_save(self, command): - """ Save simulation on its current state. - """ - if len(command) != 1: - self.__raise("Invalid parameters for '{}'".format(command[0])) - - self.__sim.lib.sal_main_save(self.__sim.save_file_path.encode("utf-8")) - - def __on_set_autosave(self, command): - """ Set the simulation's auto save interval. Provide any integer - between 0 and (2**32 - 1). If zero is provided, auto saving will be - disabled. - """ - if len(command) != 2: - self.__raise("Invalid parameters for '{}'".format(command[0])) - - self.__sim.set_autosave(int(command[1], 0)) diff --git a/bin/modules/__init__.py b/bin/modules/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bin/modules/handler.py b/bin/modules/handler.py new file mode 100644 index 0000000..3325391 --- /dev/null +++ b/bin/modules/handler.py @@ -0,0 +1,435 @@ +""" SALIS: Viewer/controller for the SALIS simulator. + +file: handler.py +Author: Paul Oliver +Email: paul.t.oliver.design@gmail.com + +This module should be considered the 'controller' part of the Salis simulator. +It receives and parses all user input via keyboard and console commands. It +also takes care of genome compilation (via genome files located on the +'genomes' directory). + +An user may open the Salis console by pressing the 'c' key while in a running +session. A nice quirk is the possibility to run python commands from within the +Salis console. As an example, to get the memory size, an user could type: + +>>> exec output = self.__sim.lib.sal_mem_get_size() + +Note that 'output' denotes a storage variable that will get printed on the +console response. This ability gives an user a whole lot of power, and should +be used with care. +""" + +import curses +import os +import time + + +class Handler: + KEY_ESCAPE = 27 + CYCLE_TIMEOUT = 0.1 + + def __init__(self, sim): + """ Handler constructor. Simply link this class to the main simulation + class and printer class and create symbol dictionary. + """ + self.__sim = sim + self.__printer = sim.printer + self.__inst_dict = self.__get_inst_dict() + self.__min_commands = [ + ord("M"), + ord(" "), + curses.KEY_RESIZE, + self.KEY_ESCAPE, + ] + self.console_history = [] + + # Set short delay for ESCAPE key (which is used to exit the simulator). + os.environ.setdefault("ESCDELAY", "25") + + def process_cmd(self, cmd): + """ Process incoming commands from curses. Commands are received via + ncurses' getch() function, thus, they must be transformed into their + character representations with 'ord()'. + """ + # If in minimal mode, only listen to a subset of commands. + if self.__sim.minimal and cmd not in self.__min_commands: + return + + if cmd == self.KEY_ESCAPE: + self.__on_quit([None], save=True) + elif cmd == ord("M"): + self.__printer.screen.clear() + self.__sim.minimal = not self.__sim.minimal + elif cmd == ord(" "): + self.__sim.toggle_state() + elif cmd == curses.KEY_LEFT: + self.__printer.flip_page(-1) + elif cmd == curses.KEY_RIGHT: + self.__printer.flip_page(1) + elif cmd == curses.KEY_DOWN: + self.__printer.scroll_main(-1) + elif cmd == curses.KEY_UP: + self.__printer.scroll_main(1) + elif cmd == curses.KEY_RESIZE: + self.__printer.on_resize() + elif cmd == ord("X"): + self.__printer.toggle_hex() + elif cmd == ord("x"): + self.__printer.world.zoom_out() + elif cmd == ord("z"): + self.__printer.world.zoom_in() + elif cmd == ord("a"): + self.__printer.world.pan_left() + self.__printer.proc_scroll_left() + elif cmd == ord("d"): + self.__printer.world.pan_right() + self.__printer.proc_scroll_right() + elif cmd == ord("s"): + self.__printer.world.pan_down() + self.__printer.proc_scroll_down() + elif cmd == ord("w"): + self.__printer.world.pan_up() + self.__printer.proc_scroll_up() + elif cmd == ord("S"): + self.__printer.world.pan_down(fast=True) + self.__printer.proc_scroll_down(fast=True) + elif cmd == ord("W"): + self.__printer.world.pan_up(fast=True) + self.__printer.proc_scroll_up(fast=True) + elif cmd == ord("Q"): + self.__printer.world.pan_reset() + self.__printer.proc_scroll_vertical_reset() + elif cmd == ord("A"): + self.__printer.world.pan_reset() + self.__printer.proc_scroll_horizontal_reset() + elif cmd == ord("o"): + self.__printer.proc_select_prev() + elif cmd == ord("p"): + self.__printer.proc_select_next() + elif cmd == ord("f"): + self.__printer.proc_select_first() + elif cmd == ord("l"): + self.__printer.proc_select_last() + elif cmd == ord("k"): + self.__printer.proc_scroll_to_selected() + elif cmd == ord("g"): + self.__printer.proc_toggle_gene_view() + elif cmd == ord("i"): + self.__printer.world.toggle_ip_view() + elif cmd == ord("\n"): + self.__printer.run_cursor() + elif cmd == ord("c"): + self.__printer.run_console() + else: + # Check for numeric input. Number keys [1 to 0] cycle the + # simulation [2 ** ((n - 1) % 10] times. + try: + if chr(cmd).isdigit(): + factor = int(chr(cmd)) + factor = int(2 ** ((factor - 1) % 10)) + self.__cycle_sim(factor) + except ValueError: + pass + + def handle_console(self, command_raw): + """ Process console commands. We parse and check for input errors. Any + python exception messages are redirected to the console-response + window. + """ + if command_raw: + command = command_raw.split() + + try: + # Handle both python and self-thrown exceptions. + if command[0] in ["q", "quit"]: + self.__on_quit(command, save=True) + elif command[0] in ["q!", "quit!"]: + self.__on_quit(command, save=False) + elif command[0] in ["i", "input"]: + self.__on_input(command) + elif command[0] in ["c", "compile"]: + self.__on_compile(command) + elif command[0] in ["n", "new"]: + self.__on_new(command) + elif command[0] in ["k", "kill"]: + self.__on_kill(command) + elif command[0] in ["e", "exec"]: + self.__on_exec(command) + elif command[0] in ["s", "scroll"]: + self.__on_scroll(command) + elif command[0] in ["p", "process"]: + self.__on_proc_select(command) + elif command[0] in ["r", "rename"]: + self.__on_rename(command) + elif command[0] in ["save"]: + self.__on_save(command) + elif command[0] in ["a", "auto"]: + self.__on_set_autosave(command) + else: + # Raise if a non-existing command has been given. + self.__raise("Invalid command: '{}'".format(command[0])) + except BaseException as exep: + # We parse and redirect python exceptions to the error + # console-window. + message = str(exep).strip() + message = message[0].upper() + message[1:] + self.__printer.show_console_error(message) + finally: + # Store command on console history. + self.console_history.append(command_raw.strip()) + + + ############################### + # Private methods + ############################### + + def __raise(self, message): + """ Generic exception thrower. Throws a 'RuntimeError' initialized with + the given message. + """ + raise RuntimeError("ERROR: {}".format(message)) + + def __respond(self, message): + """ Generic console responder. Throws a 'RuntimeError' initialized with + the given message. + """ + raise RuntimeError(message) + + def __cycle_sim(self, factor): + """ Simply cycle Salis 'factor' number of times. Do not cycle for more + than a given amount of time. + """ + time_max = time.time() + self.CYCLE_TIMEOUT + + for _ in range(factor): + self.__sim.lib.sal_main_cycle() + self.__sim.check_autosave() + + if time.time() > time_max: + break + + def __get_inst_dict(self): + """ Transform the instruction list of the printer module into a + dictionary that's more useful for genome compilation. Instruction + symbols are keys, values are the actual byte representation. + """ + inst_dict = {} + + for i, inst in enumerate(self.__printer.inst_list): + inst_dict[inst[1]] = i + + return inst_dict + + def __on_quit(self, command, save): + """ Exit simulation. We can choose whether to save the simulation into + a save file or not. + """ + if len(command) > 1: + self.__raise("Invalid parameters for '{}'".format(command[0])) + + if save: + self.__sim.lib.sal_main_save( + self.__sim.save_file_path.encode("utf-8") + ) + + self.__sim.exit() + + def __write_genome(self, genome, address_list): + """ Write genome stream into a given list of memory addresses. All + addresses must be valid or an exception is thrown. + """ + # All addresses we will write to must be valid. + for base_addr in address_list: + address = int(base_addr, 0) + + for _ in range(len(genome)): + if not self.__sim.lib.sal_mem_is_address_valid(address): + self.__raise("Address '{}' is invalid".format(address)) + + address += 1 + + # All looks well! Let's compile the genome into memory. + for base_addr in address_list: + address = int(base_addr, 0) + + for symbol in genome: + self.__sim.lib.sal_mem_set_inst( + address, self.__inst_dict[symbol] + ) + address += 1 + + def __on_input(self, command): + """ Compile organism from user typed input. Compilation can only occur + on valid memory addresses. An exception will be thrown when trying to + write into non-valid address or when input stream is invalid. + """ + if len(command) < 3: + self.__raise("Invalid parameters for '{}'".format(command[0])) + + # All characters in file must be actual instruction symbols. + for character in command[1]: + if character not in self.__inst_dict: + self.__raise("Invalid symbol '{}' found on stream".format( + character + )) + + # All looks well, Let's write the genome into memory. + self.__write_genome(command[1], command[2:]) + + def __on_compile(self, command): + """ Compile organism from source genome file. Genomes must be placed on + the './genomes' directory. Compilation can only occur on valid memory + addresses. An exception will be thrown when trying to write into + non-valid address or when genome file is invalid. + """ + if len(command) < 3: + self.__raise("Invalid parameters for '{}'".format(command[0])) + + # Open genome file for compilation. + gen_file = os.path.join(self.__sim.path, "genomes", command[1]) + + with open(gen_file, "r") as f: + genome = f.read().strip() + + # Entire genome must be written on a single line. + if "\n" in genome: + self.__raise("Newline detected on '{}'".format(gen_file)) + + # All characters in file must be actual instruction symbols. + for character in genome: + if character not in self.__inst_dict: + self.__raise("Invalid symbol '{}' found on '{}'".format( + character, gen_file + )) + + # All looks well, Let's write the genome into memory. + self.__write_genome(genome, command[2:]) + + def __on_new(self, command): + """ Instantiate new organism of given size on given address. These + memory areas must be free and valid or an exception is thrown. + """ + if len(command) < 3: + self.__raise("Invalid parameters for '{}'".format(command[0])) + + # Check that all addresses we will allocate are free and valid. + for base_addr in command[2:]: + address = int(base_addr, 0) + + for _ in range(int(command[1])): + if not self.__sim.lib.sal_mem_is_address_valid(address): + self.__raise("Address '{}' is invalid".format(address)) + elif self.__sim.lib.sal_mem_is_allocated(address): + self.__raise("Address '{}' is allocated".format(address)) + + address += 1 + + # All looks well! Let's instantiate our new organism. + for base_addr in command[2:]: + address = int(base_addr, 0) + size = int(command[1], 0) + self.__sim.lib.sal_proc_create(address, size) + + def __on_kill(self, command): + """ Kill organism on bottom of reaper queue. + """ + if len(command) > 1: + self.__raise("Invalid parameters for '{}'".format(command[0])) + + # Call proc kill function only if there's any organisms to kill. + if not self.__sim.lib.sal_proc_get_count(): + self.__raise("No organisms currently alive") + else: + self.__sim.lib.sal_proc_kill() + + def __on_exec(self, command): + """ Allow a user to execute a python command from within the console. + Using this is very hack-ish, and not recommended unless you're certain + of what you're doing! + """ + if len(command) < 2: + self.__raise( + "'{}' must be followed by an executable string".format( + command[0] + ) + ) + + # User may query any simulation variable or status and the console will + # respond. For example, to query memory size or order, type one of the + # following: + # + # >>> exec output = self.__sim.lib.sal_mem_get_size() + # >>> exec output = self.__sim.lib.sal_mem_get_order() + # + output = {} + exec(" ".join(command[1:]), locals(), output) + + if output: + self.__respond("EXEC RESPONDS: {}".format(str(output))) + + def __on_scroll(self, command): + """ We can scroll to a specific process (on PROCESS view) or to a + specific world address (on WORLD view) via the console. + """ + if len(command) != 2: + self.__raise("Invalid parameters for '{}'".format(command[0])) + + target = int(command[1], 0) + + # If on PROCESS page, scroll to given process. + if self.__printer.current_page == "PROCESS": + if target < self.__sim.lib.sal_proc_get_capacity(): + self.__printer.proc_scroll_to(target) + else: + self.__raise("No process with ID '{}' found".format(target)) + elif self.__printer.current_page == "WORLD": + if self.__sim.lib.sal_mem_is_address_valid(target): + self.__printer.world.scroll_to(target) + else: + self.__raise("Address '{}' is invalid".format(address)) + else: + self.__raise("'{}' must be called on PROCESS or WORLD page".format( + command[0]) + ) + + def __on_proc_select(self, command): + """ Select a specific process (on PROCESS or WORLD page). + """ + if len(command) != 2: + self.__raise("Invalid parameters for '{}'".format(command[0])) + + target = int(command[1], 0) + + # If on PROCESS page, scroll to given process. + if target < self.__sim.lib.sal_proc_get_capacity(): + self.__printer.proc_select_by_id(target) + else: + self.__raise("No process with ID '{}' found".format(target)) + + def __on_rename(self, command): + """ Set a new simulation name. Future auto-saved files will use this + name as prefix. + """ + if len(command) != 2: + self.__raise("Invalid parameters for '{}'".format(command[0])) + + self.__sim.rename(command[1]) + + def __on_save(self, command): + """ Save simulation on its current state. + """ + if len(command) != 1: + self.__raise("Invalid parameters for '{}'".format(command[0])) + + self.__sim.lib.sal_main_save(self.__sim.save_file_path.encode("utf-8")) + + def __on_set_autosave(self, command): + """ Set the simulation's auto save interval. Provide any integer + between 0 and (2**32 - 1). If zero is provided, auto saving will be + disabled. + """ + if len(command) != 2: + self.__raise("Invalid parameters for '{}'".format(command[0])) + + self.__sim.set_autosave(int(command[1], 0)) diff --git a/bin/modules/printer.py b/bin/modules/printer.py new file mode 100644 index 0000000..51a5b33 --- /dev/null +++ b/bin/modules/printer.py @@ -0,0 +1,858 @@ +""" SALIS: Viewer/controller for the SALIS simulator. + +File: printer.py +Author: Paul Oliver +Email: paul.t.oliver.design@gmail.com + +This module should be considered the 'view' part of the Salis simulator. It +takes care of displaying the simulator's state in a nicely formatted, intuitive +format. It makes use of the curses library for terminal handling. +""" + +import curses +import curses.textpad +import os +import time + +from collections import OrderedDict +from ctypes import c_uint8, c_uint32, cast, POINTER +from modules.handler import Handler +from modules.world import World + + +class Printer: + ESCAPE_KEY = 27 + + def __init__(self, sim): + """ Printer constructor. It takes care of starting up curses, defining + the data pages and setting the printer on its initial state. + """ + self.__sim = sim + self.__color_pair_count = 0 + + # Initialize curses screen, instruction and proc-element list before + # other private elements that depend on them. + self.screen = self.__get_screen() + self.inst_list = self.__get_inst_list() + self.proc_elements = self.__get_proc_elements() + + # We can now initialize all other privates. + self.__main = self.__get_main() + self.__pages = self.__get_pages() + self.__minimal = self.__get_minimal() + self.__main_scroll = 0 + self.__proc_element_scroll = 0 + self.__proc_gene_scroll = 0 + self.__proc_gene_view = False + self.__curs_y = 0 + self.__curs_x = 0 + self.__print_hex = False + self.size = self.screen.getmaxyx() + self.current_page = "MEMORY" + self.selected_proc = 0 + self.selected_proc_data = (c_uint32 * len(self.proc_elements))() + self.proc_list_scroll = 0 + self.world = World(self, self.__sim) + + def __del__(self): + """ Printer destructor exits curses. + """ + curses.endwin() + + def get_color_pair(self, fg, bg=-1): + """ We use this method to set new color pairs, keeping track of the + number of pairs already set. We return the new color pair ID. + """ + self.__color_pair_count += 1 + curses.init_pair(self.__color_pair_count, fg, bg) + return self.__color_pair_count + + def get_cmd(self): + """ This returns the pressed key from the curses handler. It's called + during the simulation's main loop. Flushing input is important when in + non-blocking mode. + """ + ch = self.screen.getch() + curses.flushinp() + return ch + + def set_nodelay(self, nodelay): + """ Toggles between blocking and non-blocking mode on curses. + """ + self.screen.nodelay(nodelay) + + def toggle_hex(self): + """ Toggle between decimal or hexadecimal printing of all simulation + state elements. + """ + self.__print_hex = not self.__print_hex + + def on_resize(self): + """ Called whenever the terminal window gets resized. + """ + self.size = self.screen.getmaxyx() + self.scroll_main() + self.world.zoom_reset() + + def flip_page(self, offset): + """ Change data page by given offset (i.e. '1' for next page or '-1' + for previous one). + """ + pidx = list(self.__pages.keys()).index(self.current_page) + pidx = (pidx + offset) % len(self.__pages) + self.current_page = list(self.__pages.keys())[pidx] + self.scroll_main() + + def scroll_main(self, offset=0): + """ Scrolling is allowed whenever the current page does not fit inside + the terminal window. This method gets called, with no offset, under + certain situations, like changing pages, just to make sure the screen + gets cleared and at least some of the data is always scrolled into + view. + """ + self.screen.clear() + len_main = len(self.__main) + len_page = len(self.__pages[self.current_page]) + max_scroll = (len_main + len_page + 5) - self.size[0] + self.__main_scroll += offset + self.__main_scroll = max(0, min(self.__main_scroll, max_scroll)) + + def proc_scroll_left(self): + """ Scroll process data elements or genomes (on PROCESS view) to the + left. + """ + if self.current_page == "PROCESS": + if self.__proc_gene_view: + self.__proc_gene_scroll -= 1 + self.__proc_gene_scroll = max(0, self.__proc_gene_scroll) + else: + self.__proc_element_scroll -= 1 + self.__proc_element_scroll = max(0, self.__proc_element_scroll) + + def proc_scroll_right(self): + """ Scroll process data elements or genomes (on PROCESS view) to the + right. + """ + if self.current_page == "PROCESS": + if self.__proc_gene_view: + self.__proc_gene_scroll += 1 + else: + self.__proc_element_scroll += 1 + max_scroll = len(self.proc_elements) - 1 + self.__proc_element_scroll = min( + max_scroll, self.__proc_element_scroll + ) + + def proc_scroll_down(self, fast=False): + """ Scroll process data table (on PROCESS view) up. + """ + if self.current_page == "PROCESS": + if fast: + len_page = len(self.__main) + len(self.__pages["PROCESS"]) + 6 + scroll = max(0, self.size[0] - len_page) + else: + scroll = 1 + + self.proc_list_scroll = max(0, self.proc_list_scroll - scroll) + + def proc_scroll_up(self, fast=False): + """ Scroll process data table (on PROCESS view) down. + """ + if self.current_page == "PROCESS": + if fast: + len_page = len(self.__main) + len(self.__pages["PROCESS"]) + 6 + scroll = max(0, self.size[0] - len_page) + else: + scroll = 1 + + self.proc_list_scroll = min( + self.__sim.lib.sal_proc_get_capacity() - 1, + self.proc_list_scroll + scroll + ) + + def proc_scroll_to(self, proc_id): + """ Scroll process data table (on PROCESS view) to a specific position. + """ + if self.current_page == "PROCESS": + if proc_id < self.__sim.lib.sal_proc_get_capacity(): + self.proc_list_scroll = proc_id + else: + raise RuntimeError("Error: scrolling to invalid process") + + def proc_scroll_vertical_reset(self): + """ Scroll process data table (on PROCESS view) back to top. + """ + if self.current_page == "PROCESS": + self.proc_list_scroll = 0 + + def proc_scroll_horizontal_reset(self): + """ Scroll process data or genome table (on PROCESS view) back to the + left. + """ + if self.current_page == "PROCESS": + if self.__proc_gene_view: + self.__proc_gene_scroll = 0 + else: + self.__proc_element_scroll = 0 + + def proc_select_prev(self): + """ Select previous process. + """ + if self.current_page in ["PROCESS", "WORLD"]: + self.selected_proc -= 1 + self.selected_proc %= self.__sim.lib.sal_proc_get_capacity() + + def proc_select_next(self): + """ Select next process. + """ + if self.current_page in ["PROCESS", "WORLD"]: + self.selected_proc += 1 + self.selected_proc %= self.__sim.lib.sal_proc_get_capacity() + + def proc_select_first(self): + """ Select first process on reaper queue. + """ + if self.current_page in ["PROCESS", "WORLD"]: + if self.__sim.lib.sal_proc_get_count(): + self.selected_proc = self.__sim.lib.sal_proc_get_first() + + def proc_select_last(self): + """ Select last process on reaper queue. + """ + if self.current_page in ["PROCESS", "WORLD"]: + if self.__sim.lib.sal_proc_get_count(): + self.selected_proc = self.__sim.lib.sal_proc_get_last() + + def proc_select_by_id(self, proc_id): + """ Select process from given ID. + """ + if proc_id < self.__sim.lib.sal_proc_get_capacity(): + self.selected_proc = proc_id + else: + raise RuntimeError("Error: attempting to select non-existing proc") + + def proc_scroll_to_selected(self): + """ Scroll WORLD or PROCESS page so that selected process becomes + visible. + """ + if self.current_page == "PROCESS": + self.proc_list_scroll = self.selected_proc + elif self.current_page == "WORLD": + if not self.__sim.lib.sal_proc_is_free(self.selected_proc): + index = self.proc_elements.index("mb1a") + address = self.selected_proc_data[index] + self.world.scroll_to(address) + + def proc_toggle_gene_view(self): + """ Toggle between data element or genome view on PROCESS page. + """ + if self.current_page == "PROCESS": + self.__proc_gene_view = not self.__proc_gene_view + + def run_cursor(self): + """ We can toggle a visible cursor on WORLD view to aid us in selecting + processes. + """ + if self.current_page == "WORLD" and self.size[1] > World.PADDING: + curses.curs_set(True) + + while True: + self.__curs_y = max(0, min(self.__curs_y, self.size[0] - 1)) + self.__curs_x = max(World.PADDING, min( + self.__curs_x, self.size[1] - 1 + )) + self.screen.move(self.__curs_y, self.__curs_x) + cmd = self.screen.getch() + + if cmd in [ord("c"), curses.KEY_RESIZE, self.ESCAPE_KEY]: + self.on_resize() + break + elif cmd == curses.KEY_LEFT: + self.__curs_x -= 1 + elif cmd == curses.KEY_RIGHT: + self.__curs_x += 1 + elif cmd == curses.KEY_DOWN: + self.__curs_y += 1 + elif cmd == curses.KEY_UP: + self.__curs_y -= 1 + elif cmd == ord("\n"): + self.__proc_select_by_cursor() + break + + curses.curs_set(False) + + def run_console(self): + """ Run the Salis console. You can use the console to control all main + aspects of the simulation, like compiling genomes into memory, creating + or killing organisms, setting auto-save interval, among other stuff. + """ + # Print a pythonic prompt. + self.__print_line(self.size[0] - 1, ">>> ", scroll=False) + self.screen.refresh() + + # Create the console child window. We turn it into a Textbox object in + # order to allow line-editing and extract output easily. + console = curses.newwin(1, self.size[1] - 5, self.size[0] - 1, 5) + textbox = curses.textpad.Textbox(console, insert_mode=True) + textbox.stripspaces = True + + # Grab a copy of the console history and instantiate a pointer to the + # last element. + history = self.__sim.handler.console_history + [""] + pointer = len(history) - 1 + + # Nested method reinserts recorded commands from history into console. + def access_history(cmd): + nonlocal pointer + + if pointer == len(history) - 1: + history[-1] = console.instr().strip() + + if cmd == "up" and pointer != 0: + pointer -= 1 + elif cmd == "down" and pointer < len(history) - 1: + pointer += 1 + + console.clear() + console.addstr(0, 0, history[pointer]) + console.refresh() + + # Declare custom validator to control special commands. + def validator(cmd): + EXIT = 7 + + if cmd in [curses.KEY_RESIZE, self.ESCAPE_KEY]: + console.clear() + return EXIT + # Provide general code for back-space key, in case it's not + # correctly defined. + elif cmd in [127, curses.KEY_BACKSPACE]: + return curses.KEY_BACKSPACE + elif cmd == curses.KEY_UP: + access_history("up") + elif cmd == curses.KEY_DOWN: + access_history("down") + else: + return cmd + + # Run the Textbox object with our custom validator. + curses.curs_set(True) + output = textbox.edit(validator) + curses.curs_set(False) + + # Finally, extract data from console and send to handler. Respond to + # any possible resize event here. + self.__sim.handler.handle_console(output) + self.screen.clear() + self.on_resize() + + def show_console_error(self, message): + """ Shows Salis console error messages, if any. These messages might + contain actual python exception output. + """ + self.__print_line(self.size[0] - 1, ">>>", curses.color_pair( + self.__pair_error + ) | curses.A_BOLD) + self.screen.refresh() + + # We also use a Textbox object, just so that execution gets halted + # until a key gets pressed (even on non-blocking mode). + console = curses.newwin(1, self.size[1] - 5, self.size[0] - 1, 5) + textbox = curses.textpad.Textbox(console) + + # Curses may raise an exception if printing on the edge of the screen; + # we can just ignore it. + try: + console.addstr(0, 0, message, curses.color_pair( + self.__pair_error + ) | curses.A_BOLD) + except curses.error: + pass + + # Custom validator simply exits on any key. + def validator(cmd): + EXIT = 7 + return EXIT + + textbox.edit(validator) + self.screen.clear() + self.on_resize() + + def print_page(self): + """ Print current page to screen. We use the previously generated + '__pages' dictionary to easily associate a label to a Salis function. + """ + # If in minimal mode, print only minial widget. + if self.__sim.minimal: + self.__print_minimal() + return + + # Update selected proc data if in WORLD view. + if self.current_page == "WORLD": + self.__sim.lib.sal_proc_get_proc_data(self.selected_proc, cast( + self.selected_proc_data, POINTER(c_uint32) + )) + + # Print MAIN simulation data. + self.__print_line( + 1, "SALIS[{}]".format(self.__sim.args.file), curses.color_pair( + self.__pair_header + ) | curses.A_BOLD + ) + self.__print_widget(2, self.__main) + + # Print data of currently selected page. + main_lines = len(self.__main) + 3 + self.__print_header(main_lines, self.current_page) + self.__print_widget(main_lines + 1, self.__pages[self.current_page]) + + # Print special widgets (WORLD view and PROCESS list). + if self.current_page == "WORLD": + self.world.render() + elif self.current_page == "PROCESS": + self.__print_proc_list() + + + ############################### + # Private methods + ############################### + + def __set_colors(self): + """ Define the color pairs for the data printer. + """ + curses.start_color() + curses.use_default_colors() + self.__pair_header = self.get_color_pair(curses.COLOR_BLUE) + self.__pair_selected = self.get_color_pair(curses.COLOR_YELLOW) + self.__pair_error = self.get_color_pair(curses.COLOR_RED) + + def __get_screen(self): + """ Prepare and return the main curses window. We also set a shorter + delay when responding to a pressed escape key. + """ + # Set a shorter delay to the ESCAPE key, so that we may use it to exit + # Salis. + os.environ.setdefault("ESCDELAY", "25") + + # Prepare curses screen. + screen = curses.initscr() + curses.noecho() + curses.cbreak() + screen.keypad(True) + curses.curs_set(False) + + # We need color support in order to run the printer module. + if curses.has_colors(): + self.__set_colors() + else: + raise RuntimeError("Error: no color support.") + + return screen + + def __get_inst_list(self): + """ Parse instruction set from C header file named 'instset.h'. We're + using the keyword 'SALIS_INST' to identify an instruction definition, + so be careful not to use this keyword anywhere else on the headers. + """ + inst_list = [] + inst_file = os.path.join(self.__sim.path, "../include/instset.h") + + with open(inst_file, "r") as f: + lines = f.read().splitlines() + + for line in lines: + if line and line.split()[0] == "SALIS_INST": + inst_name = line.split()[1][:4] + inst_symb = line.split()[3] + inst_list.append((inst_name, inst_symb)) + + return inst_list + + def __get_proc_elements(self): + """ Parse process structure member variables from C header file named + 'process.h'. We're using the keyword 'SALIS_PROC_ELEMENT' to identify + element declarations, so be careful not to use this keyword anywhere + else on the headers. + """ + proc_elem_list = [] + proc_elem_file = os.path.join(self.__sim.path, "../include/process.h") + + with open(proc_elem_file, "r") as f: + lines = f.read().splitlines() + + for line in lines: + if line and line.split()[0] == "SALIS_PROC_ELEMENT": + proc_elem_name = line.split()[2].split(";")[0] + + if proc_elem_name == "stack[8]": + # The stack is a special member variable, an array. We + # translate it by returning a list of stack identifiers. + proc_elem_list += ["stack[{}]".format(i) for i in range(8)] + else: + # We can assume all other struct elements are single + # variables. + proc_elem_list.append(proc_elem_name) + + return proc_elem_list + + def __get_main(self): + """ Generate main set of data fields to be printed. We associate, on a + list object, a label to each Salis function to be called. The following + elements get printed on all pages. + """ + return [ + ("e", "cycle", self.__sim.lib.sal_main_get_cycle), + ("e", "epoch", self.__sim.lib.sal_main_get_epoch), + ("e", "state", lambda: self.__sim.state), + ("e", "autosave", lambda: self.__sim.autosave), + ] + + def __get_pages(self): + """ Generate data fields to be printed on each page. We associate, on a + list object, a label to each Salis function to be called. Each list + represents a PAGE. We initialize all pages inside an ordered dictionary + object. + """ + # The following comprehensions build up widgets to help up print sets + # of data elements. The use of nested lambdas is needed to receive + # updated values. + # Instruction counter widget: + inst_widget = [("e", inst[0], (lambda j: ( + lambda: self.__sim.lib.sal_mem_get_inst_count(j) + ))(i)) for i, inst in enumerate(self.inst_list)] + + # Evolver module state widget: + state_widget = [("e", "state[{}]".format(i), (lambda j: ( + lambda: self.__sim.lib.sal_evo_get_state(j) + ))(i)) for i in range(4)] + + # Selected process state widget: + selected_widget = [("p", element, (lambda j: ( + lambda: self.selected_proc_data[j] + ))(i)) for i, element in enumerate(self.proc_elements)] + + # With the help of the widgets above, we can declare the PAGES + # dictionary object. + return OrderedDict([ + ("MEMORY", [ + ("e", "order", self.__sim.lib.sal_mem_get_order), + ("e", "size", self.__sim.lib.sal_mem_get_size), + ("e", "allocated", self.__sim.lib.sal_mem_get_allocated), + ("s", ""), + ("h", "INSTRUCTIONS"), + ] + inst_widget), + ("EVOLVER", [ + ("e", "last", self.__sim.lib.sal_evo_get_last_changed_address), + ("e", "calls", self.__sim.lib.sal_evo_get_calls_on_last_cycle), + ] + state_widget), + ("PROCESS", [ + ("e", "count", self.__sim.lib.sal_proc_get_count), + ("e", "capacity", self.__sim.lib.sal_proc_get_capacity), + ("e", "first", self.__sim.lib.sal_proc_get_first), + ("e", "last", self.__sim.lib.sal_proc_get_last), + ("e", "selected", lambda: self.selected_proc), + ]), + ("WORLD", [ + ("e", "position", lambda: self.world.pos), + ("e", "zoom", lambda: self.world.zoom), + ("e", "selected", lambda: self.selected_proc), + ("s", ""), + ("h", "SELECTED PROC"), + ] + selected_widget), + ]) + + def __print_line(self, ypos, line, attrs=curses.A_NORMAL, scroll=True): + """ Print a single line on screen only when it's visible. + """ + if scroll: + ypos -= self.__main_scroll + + if 0 <= ypos < self.size[0]: + # Curses raises an exception each time we print on the screen's + # edge. We can just catch and ignore it. + try: + line = line[:self.size[1] - 1] + self.screen.addstr(ypos, 1, line, attrs) + except curses.error: + pass + + def __print_header(self, ypos, line): + """ Print a bold header. + """ + header_attr = curses.A_BOLD | curses.color_pair(self.__pair_header) + self.__print_line(ypos, line, header_attr) + + def __print_value(self, ypos, element, value, attr=curses.A_NORMAL): + """ Print a label:value pair. + """ + if type(value) == int: + if value == ((2 ** 32) - 1): + # In Salis, UINT32_MAX is used to represent NULL. We print NULL + # as three dashes. + value = "---" + elif self.__print_hex: + value = hex(value) + + line = "{:<10} : {:>10}".format(element, value) + self.__print_line(ypos, line, attr) + + def __print_proc_element(self, ypos, element, value): + """ Print elements of currently selected process. We highlight in + YELLOW if the selected process is running. + """ + if self.__sim.lib.sal_proc_is_free(self.selected_proc): + attr = curses.A_NORMAL + else: + attr = curses.color_pair(self.__pair_selected) + + self.__print_value(ypos, element, value, attr) + + def __print_widget(self, ypos, widget): + """ Print a widget (data PAGE) on screen. + """ + for i, element in enumerate(widget): + if element[0] == "s": + continue + elif element[0] == "h": + self.__print_header(i + ypos, element[1]) + elif element[0] == "e": + self.__print_value(i + ypos, element[1], element[2]()) + elif element[0] == "p": + self.__print_proc_element(i + ypos, element[1], element[2]()) + + def __clear_line(self, ypos): + """ Clear the specified line. + """ + if 0 <= ypos < self.size[0]: + self.screen.move(ypos, 0) + self.screen.clrtoeol() + + def __print_proc_data_list(self): + """ Print list of process data elements in PROCESS page. We can toggle + between printing the data elements or the genomes by pressing the 'g' + key. + """ + # First, print the table header, by extracting element names from the + # previously generated proc element list. + ypos = len(self.__main) + len(self.__pages["PROCESS"]) + 5 + header = " | ".join(["{:<10}".format("pidx")] + [ + "{:>10}".format(element) + for element in self.proc_elements[self.__proc_element_scroll:] + ]) + self.__clear_line(ypos) + self.__print_header(ypos, header) + ypos += 1 + proc_id = self.proc_list_scroll + + # Print all proc IDs and elements in decimal or hexadecimal format, + # depending on hex-flag being set. + if self.__print_hex: + data_format = lambda x: hex(x) + else: + data_format = lambda x: x + + # Lastly, iterate all lines and print as much process data as it fits. + # We can scroll the process data table using the 'wasd' keys. + while ypos < self.size[0]: + self.__clear_line(ypos) + + if proc_id < self.__sim.lib.sal_proc_get_capacity(): + if proc_id == self.selected_proc: + # Always highlight the selected process. + attr = curses.color_pair(self.__pair_selected) + else: + attr = curses.A_NORMAL + + # Retrieve a copy of the selected process state and store it in + # a list object. + proc_data = (c_uint32 * len(self.proc_elements))() + self.__sim.lib.sal_proc_get_proc_data(proc_id, cast( + proc_data, POINTER(c_uint32)) + ) + + # Lastly, assemble and print the next table row. + row = " | ".join(["{:<10}".format(data_format(proc_id))] + [ + "{:>10}".format(data_format(element)) + for element in proc_data[self.__proc_element_scroll:] + ]) + self.__print_line(ypos, row, attr) + + proc_id += 1 + ypos += 1 + + def __print_proc_gene_block( + self, ypos, gidx, xpos, mbs, mba, ip, sp, pair + ): + """ Print a sub-set of a process genome. Namely, on of its two memory + blocks. + """ + while gidx < mbs and xpos < self.size[1]: + gaddr = mba + gidx + + if gaddr == ip: + attr = curses.color_pair(self.world.pair_sel_ip) + elif gaddr == sp: + attr = curses.color_pair(self.world.pair_sel_sp) + else: + attr = curses.color_pair(pair) + + # Retrieve instruction from memory and transform it to correct + # symbol. + inst = self.__sim.lib.sal_mem_get_inst(gaddr) + symb = self.inst_list[inst][1] + + # Curses raises an exception each time we print on the screen's + # edge. We can just catch and ignore it. + try: + self.screen.addstr(ypos, xpos, symb, attr) + except curses.error: + pass + + gidx += 1 + xpos += 1 + + return xpos + + def __print_proc_gene(self, ypos, proc_id): + """ Print a single process genome on the genome table. We use the same + colors to represent memory blocks, IP and SP of each process, as those + used to represent the selected process on WORLD view. + """ + # There's nothing to print if process is free. + if self.__sim.lib.sal_proc_is_free(proc_id): + return + + # Process is alive. Retrieve a copy of the current process state and + # store it in a list object. + proc_data = (c_uint32 * len(self.proc_elements))() + self.__sim.lib.sal_proc_get_proc_data(proc_id, cast( + proc_data, POINTER(c_uint32)) + ) + + # Let's extract all data of interest. + mb1a = proc_data[self.proc_elements.index("mb1a")] + mb1s = proc_data[self.proc_elements.index("mb1s")] + mb2a = proc_data[self.proc_elements.index("mb2a")] + mb2s = proc_data[self.proc_elements.index("mb2s")] + ip = proc_data[self.proc_elements.index("ip")] + sp = proc_data[self.proc_elements.index("sp")] + + # Always print MAIN memory block (mb1) first (on the left side). That + # way we can keep most of our attention on the parent. + xpos = self.__print_proc_gene_block( + ypos, self.__proc_gene_scroll, 14, mb1s, mb1a, ip, sp, + self.world.pair_sel_mb1 + ) + + # Reset gene counter and print child memory block, if it exists. + if mb1s < self.__proc_gene_scroll: + gidx = self.__proc_gene_scroll - mb1s + else: + gidx = 0 + + self.__print_proc_gene_block( + ypos, gidx, xpos, mb2s, mb2a, ip, sp, self.world.pair_sel_mb2 + ) + + def __print_proc_gene_list(self): + """ Print list of process genomes in PROCESS page. We can toggle + between printing the genomes or the data elements by pressing the 'g' + key. + """ + # Print all proc IDs and gene scroll in decimal or hexadecimal format, + # depending on hex-flag being set. + if self.__print_hex: + data_format = lambda x: hex(x) + else: + data_format = lambda x: x + + # First, print the table header. We print the current gene-scroll + # position for easy reference. Return back to zero scroll with the 'A' + # key. + ypos = len(self.__main) + len(self.__pages["PROCESS"]) + 5 + header = "{:<10} | genes {} -->".format( + "pidx", data_format(self.__proc_gene_scroll) + ) + self.__clear_line(ypos) + self.__print_header(ypos, header) + ypos += 1 + proc_id = self.proc_list_scroll + + # Iterate all lines and print as much genetic data as it fits. We can + # scroll the gene data table using the 'wasd' keys. + while ypos < self.size[0]: + self.__clear_line(ypos) + + if proc_id < self.__sim.lib.sal_proc_get_capacity(): + if proc_id == self.selected_proc: + # Always highlight the selected process. + attr = curses.color_pair(self.__pair_selected) + else: + attr = curses.A_NORMAL + + # Assemble and print the next table row. + row = "{:<10} |".format(data_format(proc_id)) + self.__print_line(ypos, row, attr) + self.__print_proc_gene(ypos, proc_id) + + proc_id += 1 + ypos += 1 + + def __print_proc_list(self): + """ Print list of process genomes or process data elements in PROCESS + page. We can toggle between printing the genomes or the data elements + by pressing the 'g' key. + """ + if self.__proc_gene_view: + self.__print_proc_gene_list() + else: + self.__print_proc_data_list() + + def __proc_select_by_cursor(self): + """ Select process located on address under cursor, if any exists. + """ + # First, calculate address under cursor. + ypos = self.__curs_y + xpos = self.__curs_x - World.PADDING + line_size = self.size[1] - World.PADDING + address = self.world.pos + ( + ((ypos * line_size) + xpos) * self.world.zoom + ) + + # Now, iterate all living processes and try to find one that owns the + # calculated address. + if self.__sim.lib.sal_mem_is_address_valid(address): + for proc_id in range(self.__sim.lib.sal_proc_get_capacity()): + if not self.__sim.lib.sal_proc_is_free(proc_id): + proc_data = (c_uint32 * len(self.proc_elements))() + self.__sim.lib.sal_proc_get_proc_data(proc_id, cast( + proc_data, POINTER(c_uint32)) + ) + mb1a = proc_data[self.proc_elements.index("mb1a")] + mb1s = proc_data[self.proc_elements.index("mb1s")] + mb2a = proc_data[self.proc_elements.index("mb2a")] + mb2s = proc_data[self.proc_elements.index("mb2s")] + + if ( + mb1a <= address < (mb1a + mb1s) or + mb2a <= address < (mb2a + mb2s) + ): + self.selected_proc = proc_id + break + + def __get_minimal(self): + """ Generate set of data fields to be printed on minimal mode. + """ + return [ + ("cycle", self.__sim.lib.sal_main_get_cycle), + ("epoch", self.__sim.lib.sal_main_get_epoch), + ("procs", self.__sim.lib.sal_proc_get_count), + ] + + def __print_minimal(self): + """ Print minimal mode data fields. + """ + self.__print_line(1, "Salis --- Minimal mode") + + for i, field in enumerate(self.__minimal): + self.__print_line(i + 2, "{}: {}".format(field[0], field[1]())) diff --git a/bin/modules/world.py b/bin/modules/world.py new file mode 100644 index 0000000..08f2734 --- /dev/null +++ b/bin/modules/world.py @@ -0,0 +1,279 @@ +""" SALIS: Viewer/controller for the SALIS simulator. + +File: world.py +Author: Paul Oliver +Email: paul.t.oliver.design@gmail.com + +This module should be considered an extension of the 'printer' module. It takes +care of getting a pre-redered image from Salis and post-processing it in order +to print it into the curses screen. It also keeps track of user controllable +rendering parameters (position and zoom). +""" + +import curses + +from ctypes import c_uint8, cast, POINTER + + +class World: + PADDING = 25 + + def __init__(self, printer, sim): + """ World constructor. We link to the printer and main simulation + classes. We also setup the colors for rendering the world. + """ + self.__printer = printer + self.__sim = sim + self.__set_world_colors() + self.__show_ip = True + self.pos = 0 + self.zoom = 1 + + def render(self): + """ Function for rendering the world. We get a pre-rendered buffer from + Salis' memory module (its way faster to pre-render in C) and use that + to assemble the world image in Python. + """ + # Window is so narrow that world is not visible. + if self.__printer.size[1] <= self.PADDING: + return + + # Get pre-rendered image from Salis' memory module. + line_width = self.__printer.size[1] - self.PADDING + print_area = self.__printer.size[0] * line_width + c_buffer = (c_uint8 * print_area)() + self.__sim.lib.sal_ren_get_image( + self.pos, self.zoom, print_area, cast(c_buffer, POINTER(c_uint8)) + ) + + # Get data elements of selected process, if it's running, and store + # them into a convenient dict object. + if self.__sim.lib.sal_proc_is_free(self.__printer.selected_proc): + sel_data = None + else: + out_data = self.__printer.selected_proc_data + out_elem = self.__printer.proc_elements + sel_data = { + "ip": out_data[out_elem.index("ip")], + "sp": out_data[out_elem.index("sp")], + "mb1a": out_data[out_elem.index("mb1a")], + "mb1s": out_data[out_elem.index("mb1s")], + "mb2a": out_data[out_elem.index("mb2a")], + "mb2s": out_data[out_elem.index("mb2s")], + } + + # Iterate all cells on printable area and print the post-rendered + # cells. Rendered cells contain info about bit flags and instructions + # currently written into memory. + bidx = 0 + + for y in range(self.__printer.size[0]): + for x in range(line_width): + xpad = x + self.PADDING + addr = self.pos + (self.zoom * bidx) + symb, attr = self.__render_cell(c_buffer[bidx], addr, sel_data) + + # Curses raises an exception when printing on the edge of the + # screen; we can just ignore it. + try: + self.__printer.screen.addstr(y, xpad, symb, attr) + except curses.error: + pass + + bidx += 1 + + def zoom_out(self): + """ Zoom out by a factor of 2 (zoom *= 2). + """ + if self.__is_world_editable(): + self.zoom = min(self.zoom * 2, self.__get_max_zoom()) + + def zoom_in(self): + """ Zoom in by a factor of 2 (zoom //= 2). + """ + if self.__is_world_editable(): + self.zoom = max(self.zoom // 2, 1) + + def zoom_reset(self): + """ Reset zoom to a valid value on certain events (i.e. during terminal + resizing). + """ + self.zoom = min(self.zoom, self.__get_max_zoom()) + + def pan_left(self): + """ Pan world to the left (pos -= zoom). + """ + if self.__is_world_editable(): + self.pos = max(self.pos - self.zoom, 0) + + def pan_right(self): + """ Pan world to the right (pos += zoom). + """ + if self.__is_world_editable(): + max_pos = self.__sim.lib.sal_mem_get_size() - 1 + self.pos = min(self.pos + self.zoom, max_pos) + + def pan_down(self, fast=False): + """ Pan world downward. + """ + if self.__is_world_editable(): + if fast: + self.pos = max(self.pos - self.__get_world_area(), 0) + else: + self.pos = max(self.pos - self.__get_line_area(), 0) + + def pan_up(self, fast=False): + """ Pan world upward. + """ + if self.__is_world_editable(): + max_pos = self.__sim.lib.sal_mem_get_size() - 1 + + if fast: + self.pos = min(self.pos + self.__get_world_area(), max_pos) + else: + self.pos = min(self.pos + self.__get_line_area(), max_pos) + + def pan_reset(self): + """ Set world position to zero. + """ + if self.__is_world_editable(): + self.pos = 0 + + def scroll_to(self, pos): + """ Move world pos to a specified position. + """ + if self.__is_world_editable(): + if self.__sim.lib.sal_mem_is_address_valid(pos): + self.pos = pos + else: + raise RuntimeError("Error: scrolling to an invalid address") + + def toggle_ip_view(self): + """ Turn on/off IP visualization. Turning off IPs might make it easier + to visualize the underlying memory block structure. + """ + if self.__is_world_editable(): + self.__show_ip = not self.__show_ip + + + ############################### + # Private methods + ############################### + + def __set_world_colors(self): + """ Define color pairs for rendering the world. Each color has a + special meaning, referring to the selected process IP, SP and memory + blocks, or to bit flags currently set on rendered cells. + """ + self.pair_free = self.__printer.get_color_pair( + curses.COLOR_BLUE + ) + self.pair_alloc = self.__printer.get_color_pair( + curses.COLOR_BLACK, curses.COLOR_BLUE + ) + self.pair_mbstart = self.__printer.get_color_pair( + curses.COLOR_BLACK, curses.COLOR_CYAN + ) + self.pair_ip = self.__printer.get_color_pair( + curses.COLOR_BLACK, curses.COLOR_WHITE + ) + self.pair_sel_mb2 = self.__printer.get_color_pair( + curses.COLOR_BLACK, curses.COLOR_GREEN + ) + self.pair_sel_mb1 = self.__printer.get_color_pair( + curses.COLOR_BLACK, curses.COLOR_YELLOW + ) + self.pair_sel_sp = self.__printer.get_color_pair( + curses.COLOR_BLACK, curses.COLOR_MAGENTA + ) + self.pair_sel_ip = self.__printer.get_color_pair( + curses.COLOR_BLACK, curses.COLOR_RED + ) + + def __render_cell(self, byte, addr, sel_data=None): + """ Render a single cell on the WORLD view. All cells are rendered by + interpreting the values coming in from the buffer. We overlay special + colors for representing the selected organism's state, on top of the + more common colors used to represent memory state. + """ + # Paint black all cells that are out of memory bounds. + if not self.__sim.lib.sal_mem_is_address_valid(addr): + return " ", curses.A_NORMAL + + # Check if cell contains part of the currently selected process. + if sel_data: + top_addr = addr + self.zoom + top_mb1a = sel_data["mb1a"] + sel_data["mb1s"] + top_mb2a = sel_data["mb2a"] + sel_data["mb2s"] + + if addr <= sel_data["ip"] < top_addr: + pair = self.pair_sel_ip + elif addr <= sel_data["sp"] < top_addr: + pair = self.pair_sel_sp + elif top_addr > sel_data["mb1a"] and top_mb1a > addr: + pair = self.pair_sel_mb1 + elif top_addr > sel_data["mb2a"] and top_mb2a > addr: + pair = self.pair_sel_mb2 + + # No pair has been selected yet; select pair based on bit-flags. + if not "pair" in locals(): + if self.__show_ip and byte >= 0x80: + pair = self.pair_ip + elif (byte % 0x80) >= 0x40: + pair = self.pair_mbstart + elif (byte % 0x40) >= 0x20: + pair = self.pair_alloc + else: + pair = self.pair_free + + # Select symbol to represent instructions currently on cell. + inst = byte % 32 + + if self.zoom == 1: + symb = self.__printer.inst_list[inst][1] + elif inst > 16: + symb = ":" + else: + symb = "." + + # Return tuple containing our post-redered cell. + return symb, curses.color_pair(pair) + + def __get_max_zoom(self): + """ Calculate maximum needed zoom so that the entire world fits on the + terminal window. + """ + max_zoom = 1 + line_size = self.__printer.size[1] - self.PADDING + coverage = self.__printer.size[0] * line_size + + # We fix a maximum zoom level; otherwise, program may halt on extreme + # zoom levels. + while ( + (coverage * max_zoom) < self.__sim.lib.sal_mem_get_size() and + max_zoom < 2 ** 16 + ): + max_zoom *= 2 + + return max_zoom + + def __is_world_editable(self): + """ For this to return True, printer's current page must be WORLD page. + Additionally, the WORLD panel must be visible on the terminal window + (i.e. curses.COLS > data_margin). + """ + correct_page = self.__printer.current_page == "WORLD" + correct_size = self.__printer.size[1] > self.PADDING + return correct_page and correct_size + + def __get_line_area(self): + """ Return amount of bytes contained in a printed WORLD line. + """ + line_size = self.__printer.size[1] - self.PADDING + line_area = self.zoom * line_size + return line_area + + def __get_world_area(self): + """ Return amount of bytes contained in the entire WORLD view. + """ + return self.__get_line_area() * self.__printer.size[0] diff --git a/bin/printer.py b/bin/printer.py deleted file mode 100644 index 1fca43d..0000000 --- a/bin/printer.py +++ /dev/null @@ -1,858 +0,0 @@ -""" SALIS: Viewer/controller for the SALIS simulator. - -File: printer.py -Author: Paul Oliver -Email: paul.t.oliver.design@gmail.com - -This module should be considered the 'view' part of the Salis simulator. It -takes care of displaying the simulator's state in a nicely formatted, intuitive -format. It makes use of the curses library for terminal handling. -""" - -import curses -import curses.textpad -import os -import time - -from collections import OrderedDict -from ctypes import c_uint8, c_uint32, cast, POINTER -from handler import Handler -from world import World - - -class Printer: - ESCAPE_KEY = 27 - - def __init__(self, sim): - """ Printer constructor. It takes care of starting up curses, defining - the data pages and setting the printer on its initial state. - """ - self.__sim = sim - self.__color_pair_count = 0 - - # Initialize curses screen, instruction and proc-element list before - # other private elements that depend on them. - self.screen = self.__get_screen() - self.inst_list = self.__get_inst_list() - self.proc_elements = self.__get_proc_elements() - - # We can now initialize all other privates. - self.__main = self.__get_main() - self.__pages = self.__get_pages() - self.__minimal = self.__get_minimal() - self.__main_scroll = 0 - self.__proc_element_scroll = 0 - self.__proc_gene_scroll = 0 - self.__proc_gene_view = False - self.__curs_y = 0 - self.__curs_x = 0 - self.__print_hex = False - self.size = self.screen.getmaxyx() - self.current_page = "MEMORY" - self.selected_proc = 0 - self.selected_proc_data = (c_uint32 * len(self.proc_elements))() - self.proc_list_scroll = 0 - self.world = World(self, self.__sim) - - def __del__(self): - """ Printer destructor exits curses. - """ - curses.endwin() - - def get_color_pair(self, fg, bg=-1): - """ We use this method to set new color pairs, keeping track of the - number of pairs already set. We return the new color pair ID. - """ - self.__color_pair_count += 1 - curses.init_pair(self.__color_pair_count, fg, bg) - return self.__color_pair_count - - def get_cmd(self): - """ This returns the pressed key from the curses handler. It's called - during the simulation's main loop. Flushing input is important when in - non-blocking mode. - """ - ch = self.screen.getch() - curses.flushinp() - return ch - - def set_nodelay(self, nodelay): - """ Toggles between blocking and non-blocking mode on curses. - """ - self.screen.nodelay(nodelay) - - def toggle_hex(self): - """ Toggle between decimal or hexadecimal printing of all simulation - state elements. - """ - self.__print_hex = not self.__print_hex - - def on_resize(self): - """ Called whenever the terminal window gets resized. - """ - self.size = self.screen.getmaxyx() - self.scroll_main() - self.world.zoom_reset() - - def flip_page(self, offset): - """ Change data page by given offset (i.e. '1' for next page or '-1' - for previous one). - """ - pidx = list(self.__pages.keys()).index(self.current_page) - pidx = (pidx + offset) % len(self.__pages) - self.current_page = list(self.__pages.keys())[pidx] - self.scroll_main() - - def scroll_main(self, offset=0): - """ Scrolling is allowed whenever the current page does not fit inside - the terminal window. This method gets called, with no offset, under - certain situations, like changing pages, just to make sure the screen - gets cleared and at least some of the data is always scrolled into - view. - """ - self.screen.clear() - len_main = len(self.__main) - len_page = len(self.__pages[self.current_page]) - max_scroll = (len_main + len_page + 5) - self.size[0] - self.__main_scroll += offset - self.__main_scroll = max(0, min(self.__main_scroll, max_scroll)) - - def proc_scroll_left(self): - """ Scroll process data elements or genomes (on PROCESS view) to the - left. - """ - if self.current_page == "PROCESS": - if self.__proc_gene_view: - self.__proc_gene_scroll -= 1 - self.__proc_gene_scroll = max(0, self.__proc_gene_scroll) - else: - self.__proc_element_scroll -= 1 - self.__proc_element_scroll = max(0, self.__proc_element_scroll) - - def proc_scroll_right(self): - """ Scroll process data elements or genomes (on PROCESS view) to the - right. - """ - if self.current_page == "PROCESS": - if self.__proc_gene_view: - self.__proc_gene_scroll += 1 - else: - self.__proc_element_scroll += 1 - max_scroll = len(self.proc_elements) - 1 - self.__proc_element_scroll = min( - max_scroll, self.__proc_element_scroll - ) - - def proc_scroll_down(self, fast=False): - """ Scroll process data table (on PROCESS view) up. - """ - if self.current_page == "PROCESS": - if fast: - len_page = len(self.__main) + len(self.__pages["PROCESS"]) + 6 - scroll = max(0, self.size[0] - len_page) - else: - scroll = 1 - - self.proc_list_scroll = max(0, self.proc_list_scroll - scroll) - - def proc_scroll_up(self, fast=False): - """ Scroll process data table (on PROCESS view) down. - """ - if self.current_page == "PROCESS": - if fast: - len_page = len(self.__main) + len(self.__pages["PROCESS"]) + 6 - scroll = max(0, self.size[0] - len_page) - else: - scroll = 1 - - self.proc_list_scroll = min( - self.__sim.lib.sal_proc_get_capacity() - 1, - self.proc_list_scroll + scroll - ) - - def proc_scroll_to(self, proc_id): - """ Scroll process data table (on PROCESS view) to a specific position. - """ - if self.current_page == "PROCESS": - if proc_id < self.__sim.lib.sal_proc_get_capacity(): - self.proc_list_scroll = proc_id - else: - raise RuntimeError("Error: scrolling to invalid process") - - def proc_scroll_vertical_reset(self): - """ Scroll process data table (on PROCESS view) back to top. - """ - if self.current_page == "PROCESS": - self.proc_list_scroll = 0 - - def proc_scroll_horizontal_reset(self): - """ Scroll process data or genome table (on PROCESS view) back to the - left. - """ - if self.current_page == "PROCESS": - if self.__proc_gene_view: - self.__proc_gene_scroll = 0 - else: - self.__proc_element_scroll = 0 - - def proc_select_prev(self): - """ Select previous process. - """ - if self.current_page in ["PROCESS", "WORLD"]: - self.selected_proc -= 1 - self.selected_proc %= self.__sim.lib.sal_proc_get_capacity() - - def proc_select_next(self): - """ Select next process. - """ - if self.current_page in ["PROCESS", "WORLD"]: - self.selected_proc += 1 - self.selected_proc %= self.__sim.lib.sal_proc_get_capacity() - - def proc_select_first(self): - """ Select first process on reaper queue. - """ - if self.current_page in ["PROCESS", "WORLD"]: - if self.__sim.lib.sal_proc_get_count(): - self.selected_proc = self.__sim.lib.sal_proc_get_first() - - def proc_select_last(self): - """ Select last process on reaper queue. - """ - if self.current_page in ["PROCESS", "WORLD"]: - if self.__sim.lib.sal_proc_get_count(): - self.selected_proc = self.__sim.lib.sal_proc_get_last() - - def proc_select_by_id(self, proc_id): - """ Select process from given ID. - """ - if proc_id < self.__sim.lib.sal_proc_get_capacity(): - self.selected_proc = proc_id - else: - raise RuntimeError("Error: attempting to select non-existing proc") - - def proc_scroll_to_selected(self): - """ Scroll WORLD or PROCESS page so that selected process becomes - visible. - """ - if self.current_page == "PROCESS": - self.proc_list_scroll = self.selected_proc - elif self.current_page == "WORLD": - if not self.__sim.lib.sal_proc_is_free(self.selected_proc): - index = self.proc_elements.index("mb1a") - address = self.selected_proc_data[index] - self.world.scroll_to(address) - - def proc_toggle_gene_view(self): - """ Toggle between data element or genome view on PROCESS page. - """ - if self.current_page == "PROCESS": - self.__proc_gene_view = not self.__proc_gene_view - - def run_cursor(self): - """ We can toggle a visible cursor on WORLD view to aid us in selecting - processes. - """ - if self.current_page == "WORLD" and self.size[1] > World.PADDING: - curses.curs_set(True) - - while True: - self.__curs_y = max(0, min(self.__curs_y, self.size[0] - 1)) - self.__curs_x = max(World.PADDING, min( - self.__curs_x, self.size[1] - 1 - )) - self.screen.move(self.__curs_y, self.__curs_x) - cmd = self.screen.getch() - - if cmd in [ord("c"), curses.KEY_RESIZE, self.ESCAPE_KEY]: - self.on_resize() - break - elif cmd == curses.KEY_LEFT: - self.__curs_x -= 1 - elif cmd == curses.KEY_RIGHT: - self.__curs_x += 1 - elif cmd == curses.KEY_DOWN: - self.__curs_y += 1 - elif cmd == curses.KEY_UP: - self.__curs_y -= 1 - elif cmd == ord("\n"): - self.__proc_select_by_cursor() - break - - curses.curs_set(False) - - def run_console(self): - """ Run the Salis console. You can use the console to control all main - aspects of the simulation, like compiling genomes into memory, creating - or killing organisms, setting auto-save interval, among other stuff. - """ - # Print a pythonic prompt. - self.__print_line(self.size[0] - 1, ">>> ", scroll=False) - self.screen.refresh() - - # Create the console child window. We turn it into a Textbox object in - # order to allow line-editing and extract output easily. - console = curses.newwin(1, self.size[1] - 5, self.size[0] - 1, 5) - textbox = curses.textpad.Textbox(console, insert_mode=True) - textbox.stripspaces = True - - # Grab a copy of the console history and instantiate a pointer to the - # last element. - history = self.__sim.handler.console_history + [""] - pointer = len(history) - 1 - - # Nested method reinserts recorded commands from history into console. - def access_history(cmd): - nonlocal pointer - - if pointer == len(history) - 1: - history[-1] = console.instr().strip() - - if cmd == "up" and pointer != 0: - pointer -= 1 - elif cmd == "down" and pointer < len(history) - 1: - pointer += 1 - - console.clear() - console.addstr(0, 0, history[pointer]) - console.refresh() - - # Declare custom validator to control special commands. - def validator(cmd): - EXIT = 7 - - if cmd in [curses.KEY_RESIZE, self.ESCAPE_KEY]: - console.clear() - return EXIT - # Provide general code for back-space key, in case it's not - # correctly defined. - elif cmd in [127, curses.KEY_BACKSPACE]: - return curses.KEY_BACKSPACE - elif cmd == curses.KEY_UP: - access_history("up") - elif cmd == curses.KEY_DOWN: - access_history("down") - else: - return cmd - - # Run the Textbox object with our custom validator. - curses.curs_set(True) - output = textbox.edit(validator) - curses.curs_set(False) - - # Finally, extract data from console and send to handler. Respond to - # any possible resize event here. - self.__sim.handler.handle_console(output) - self.screen.clear() - self.on_resize() - - def show_console_error(self, message): - """ Shows Salis console error messages, if any. These messages might - contain actual python exception output. - """ - self.__print_line(self.size[0] - 1, ">>>", curses.color_pair( - self.__pair_error - ) | curses.A_BOLD) - self.screen.refresh() - - # We also use a Textbox object, just so that execution gets halted - # until a key gets pressed (even on non-blocking mode). - console = curses.newwin(1, self.size[1] - 5, self.size[0] - 1, 5) - textbox = curses.textpad.Textbox(console) - - # Curses may raise an exception if printing on the edge of the screen; - # we can just ignore it. - try: - console.addstr(0, 0, message, curses.color_pair( - self.__pair_error - ) | curses.A_BOLD) - except curses.error: - pass - - # Custom validator simply exits on any key. - def validator(cmd): - EXIT = 7 - return EXIT - - textbox.edit(validator) - self.screen.clear() - self.on_resize() - - def print_page(self): - """ Print current page to screen. We use the previously generated - '__pages' dictionary to easily associate a label to a Salis function. - """ - # If in minimal mode, print only minial widget. - if self.__sim.minimal: - self.__print_minimal() - return - - # Update selected proc data if in WORLD view. - if self.current_page == "WORLD": - self.__sim.lib.sal_proc_get_proc_data(self.selected_proc, cast( - self.selected_proc_data, POINTER(c_uint32) - )) - - # Print MAIN simulation data. - self.__print_line( - 1, "SALIS[{}]".format(self.__sim.args.file), curses.color_pair( - self.__pair_header - ) | curses.A_BOLD - ) - self.__print_widget(2, self.__main) - - # Print data of currently selected page. - main_lines = len(self.__main) + 3 - self.__print_header(main_lines, self.current_page) - self.__print_widget(main_lines + 1, self.__pages[self.current_page]) - - # Print special widgets (WORLD view and PROCESS list). - if self.current_page == "WORLD": - self.world.render() - elif self.current_page == "PROCESS": - self.__print_proc_list() - - - ############################### - # Private methods - ############################### - - def __set_colors(self): - """ Define the color pairs for the data printer. - """ - curses.start_color() - curses.use_default_colors() - self.__pair_header = self.get_color_pair(curses.COLOR_BLUE) - self.__pair_selected = self.get_color_pair(curses.COLOR_YELLOW) - self.__pair_error = self.get_color_pair(curses.COLOR_RED) - - def __get_screen(self): - """ Prepare and return the main curses window. We also set a shorter - delay when responding to a pressed escape key. - """ - # Set a shorter delay to the ESCAPE key, so that we may use it to exit - # Salis. - os.environ.setdefault("ESCDELAY", "25") - - # Prepare curses screen. - screen = curses.initscr() - curses.noecho() - curses.cbreak() - screen.keypad(True) - curses.curs_set(False) - - # We need color support in order to run the printer module. - if curses.has_colors(): - self.__set_colors() - else: - raise RuntimeError("Error: no color support.") - - return screen - - def __get_inst_list(self): - """ Parse instruction set from C header file named 'instset.h'. We're - using the keyword 'SALIS_INST' to identify an instruction definition, - so be careful not to use this keyword anywhere else on the headers. - """ - inst_list = [] - inst_file = os.path.join(self.__sim.path, "../include/instset.h") - - with open(inst_file, "r") as f: - lines = f.read().splitlines() - - for line in lines: - if line and line.split()[0] == "SALIS_INST": - inst_name = line.split()[1][:4] - inst_symb = line.split()[3] - inst_list.append((inst_name, inst_symb)) - - return inst_list - - def __get_proc_elements(self): - """ Parse process structure member variables from C header file named - 'process.h'. We're using the keyword 'SALIS_PROC_ELEMENT' to identify - element declarations, so be careful not to use this keyword anywhere - else on the headers. - """ - proc_elem_list = [] - proc_elem_file = os.path.join(self.__sim.path, "../include/process.h") - - with open(proc_elem_file, "r") as f: - lines = f.read().splitlines() - - for line in lines: - if line and line.split()[0] == "SALIS_PROC_ELEMENT": - proc_elem_name = line.split()[2].split(";")[0] - - if proc_elem_name == "stack[8]": - # The stack is a special member variable, an array. We - # translate it by returning a list of stack identifiers. - proc_elem_list += ["stack[{}]".format(i) for i in range(8)] - else: - # We can assume all other struct elements are single - # variables. - proc_elem_list.append(proc_elem_name) - - return proc_elem_list - - def __get_main(self): - """ Generate main set of data fields to be printed. We associate, on a - list object, a label to each Salis function to be called. The following - elements get printed on all pages. - """ - return [ - ("e", "cycle", self.__sim.lib.sal_main_get_cycle), - ("e", "epoch", self.__sim.lib.sal_main_get_epoch), - ("e", "state", lambda: self.__sim.state), - ("e", "autosave", lambda: self.__sim.autosave), - ] - - def __get_pages(self): - """ Generate data fields to be printed on each page. We associate, on a - list object, a label to each Salis function to be called. Each list - represents a PAGE. We initialize all pages inside an ordered dictionary - object. - """ - # The following comprehensions build up widgets to help up print sets - # of data elements. The use of nested lambdas is needed to receive - # updated values. - # Instruction counter widget: - inst_widget = [("e", inst[0], (lambda j: ( - lambda: self.__sim.lib.sal_mem_get_inst_count(j) - ))(i)) for i, inst in enumerate(self.inst_list)] - - # Evolver module state widget: - state_widget = [("e", "state[{}]".format(i), (lambda j: ( - lambda: self.__sim.lib.sal_evo_get_state(j) - ))(i)) for i in range(4)] - - # Selected process state widget: - selected_widget = [("p", element, (lambda j: ( - lambda: self.selected_proc_data[j] - ))(i)) for i, element in enumerate(self.proc_elements)] - - # With the help of the widgets above, we can declare the PAGES - # dictionary object. - return OrderedDict([ - ("MEMORY", [ - ("e", "order", self.__sim.lib.sal_mem_get_order), - ("e", "size", self.__sim.lib.sal_mem_get_size), - ("e", "allocated", self.__sim.lib.sal_mem_get_allocated), - ("s", ""), - ("h", "INSTRUCTIONS"), - ] + inst_widget), - ("EVOLVER", [ - ("e", "last", self.__sim.lib.sal_evo_get_last_changed_address), - ("e", "calls", self.__sim.lib.sal_evo_get_calls_on_last_cycle), - ] + state_widget), - ("PROCESS", [ - ("e", "count", self.__sim.lib.sal_proc_get_count), - ("e", "capacity", self.__sim.lib.sal_proc_get_capacity), - ("e", "first", self.__sim.lib.sal_proc_get_first), - ("e", "last", self.__sim.lib.sal_proc_get_last), - ("e", "selected", lambda: self.selected_proc), - ]), - ("WORLD", [ - ("e", "position", lambda: self.world.pos), - ("e", "zoom", lambda: self.world.zoom), - ("e", "selected", lambda: self.selected_proc), - ("s", ""), - ("h", "SELECTED PROC"), - ] + selected_widget), - ]) - - def __print_line(self, ypos, line, attrs=curses.A_NORMAL, scroll=True): - """ Print a single line on screen only when it's visible. - """ - if scroll: - ypos -= self.__main_scroll - - if 0 <= ypos < self.size[0]: - # Curses raises an exception each time we print on the screen's - # edge. We can just catch and ignore it. - try: - line = line[:self.size[1] - 1] - self.screen.addstr(ypos, 1, line, attrs) - except curses.error: - pass - - def __print_header(self, ypos, line): - """ Print a bold header. - """ - header_attr = curses.A_BOLD | curses.color_pair(self.__pair_header) - self.__print_line(ypos, line, header_attr) - - def __print_value(self, ypos, element, value, attr=curses.A_NORMAL): - """ Print a label:value pair. - """ - if type(value) == int: - if value == ((2 ** 32) - 1): - # In Salis, UINT32_MAX is used to represent NULL. We print NULL - # as three dashes. - value = "---" - elif self.__print_hex: - value = hex(value) - - line = "{:<10} : {:>10}".format(element, value) - self.__print_line(ypos, line, attr) - - def __print_proc_element(self, ypos, element, value): - """ Print elements of currently selected process. We highlight in - YELLOW if the selected process is running. - """ - if self.__sim.lib.sal_proc_is_free(self.selected_proc): - attr = curses.A_NORMAL - else: - attr = curses.color_pair(self.__pair_selected) - - self.__print_value(ypos, element, value, attr) - - def __print_widget(self, ypos, widget): - """ Print a widget (data PAGE) on screen. - """ - for i, element in enumerate(widget): - if element[0] == "s": - continue - elif element[0] == "h": - self.__print_header(i + ypos, element[1]) - elif element[0] == "e": - self.__print_value(i + ypos, element[1], element[2]()) - elif element[0] == "p": - self.__print_proc_element(i + ypos, element[1], element[2]()) - - def __clear_line(self, ypos): - """ Clear the specified line. - """ - if 0 <= ypos < self.size[0]: - self.screen.move(ypos, 0) - self.screen.clrtoeol() - - def __print_proc_data_list(self): - """ Print list of process data elements in PROCESS page. We can toggle - between printing the data elements or the genomes by pressing the 'g' - key. - """ - # First, print the table header, by extracting element names from the - # previously generated proc element list. - ypos = len(self.__main) + len(self.__pages["PROCESS"]) + 5 - header = " | ".join(["{:<10}".format("pidx")] + [ - "{:>10}".format(element) - for element in self.proc_elements[self.__proc_element_scroll:] - ]) - self.__clear_line(ypos) - self.__print_header(ypos, header) - ypos += 1 - proc_id = self.proc_list_scroll - - # Print all proc IDs and elements in decimal or hexadecimal format, - # depending on hex-flag being set. - if self.__print_hex: - data_format = lambda x: hex(x) - else: - data_format = lambda x: x - - # Lastly, iterate all lines and print as much process data as it fits. - # We can scroll the process data table using the 'wasd' keys. - while ypos < self.size[0]: - self.__clear_line(ypos) - - if proc_id < self.__sim.lib.sal_proc_get_capacity(): - if proc_id == self.selected_proc: - # Always highlight the selected process. - attr = curses.color_pair(self.__pair_selected) - else: - attr = curses.A_NORMAL - - # Retrieve a copy of the selected process state and store it in - # a list object. - proc_data = (c_uint32 * len(self.proc_elements))() - self.__sim.lib.sal_proc_get_proc_data(proc_id, cast( - proc_data, POINTER(c_uint32)) - ) - - # Lastly, assemble and print the next table row. - row = " | ".join(["{:<10}".format(data_format(proc_id))] + [ - "{:>10}".format(data_format(element)) - for element in proc_data[self.__proc_element_scroll:] - ]) - self.__print_line(ypos, row, attr) - - proc_id += 1 - ypos += 1 - - def __print_proc_gene_block( - self, ypos, gidx, xpos, mbs, mba, ip, sp, pair - ): - """ Print a sub-set of a process genome. Namely, on of its two memory - blocks. - """ - while gidx < mbs and xpos < self.size[1]: - gaddr = mba + gidx - - if gaddr == ip: - attr = curses.color_pair(self.world.pair_sel_ip) - elif gaddr == sp: - attr = curses.color_pair(self.world.pair_sel_sp) - else: - attr = curses.color_pair(pair) - - # Retrieve instruction from memory and transform it to correct - # symbol. - inst = self.__sim.lib.sal_mem_get_inst(gaddr) - symb = self.inst_list[inst][1] - - # Curses raises an exception each time we print on the screen's - # edge. We can just catch and ignore it. - try: - self.screen.addstr(ypos, xpos, symb, attr) - except curses.error: - pass - - gidx += 1 - xpos += 1 - - return xpos - - def __print_proc_gene(self, ypos, proc_id): - """ Print a single process genome on the genome table. We use the same - colors to represent memory blocks, IP and SP of each process, as those - used to represent the selected process on WORLD view. - """ - # There's nothing to print if process is free. - if self.__sim.lib.sal_proc_is_free(proc_id): - return - - # Process is alive. Retrieve a copy of the current process state and - # store it in a list object. - proc_data = (c_uint32 * len(self.proc_elements))() - self.__sim.lib.sal_proc_get_proc_data(proc_id, cast( - proc_data, POINTER(c_uint32)) - ) - - # Let's extract all data of interest. - mb1a = proc_data[self.proc_elements.index("mb1a")] - mb1s = proc_data[self.proc_elements.index("mb1s")] - mb2a = proc_data[self.proc_elements.index("mb2a")] - mb2s = proc_data[self.proc_elements.index("mb2s")] - ip = proc_data[self.proc_elements.index("ip")] - sp = proc_data[self.proc_elements.index("sp")] - - # Always print MAIN memory block (mb1) first (on the left side). That - # way we can keep most of our attention on the parent. - xpos = self.__print_proc_gene_block( - ypos, self.__proc_gene_scroll, 14, mb1s, mb1a, ip, sp, - self.world.pair_sel_mb1 - ) - - # Reset gene counter and print child memory block, if it exists. - if mb1s < self.__proc_gene_scroll: - gidx = self.__proc_gene_scroll - mb1s - else: - gidx = 0 - - self.__print_proc_gene_block( - ypos, gidx, xpos, mb2s, mb2a, ip, sp, self.world.pair_sel_mb2 - ) - - def __print_proc_gene_list(self): - """ Print list of process genomes in PROCESS page. We can toggle - between printing the genomes or the data elements by pressing the 'g' - key. - """ - # Print all proc IDs and gene scroll in decimal or hexadecimal format, - # depending on hex-flag being set. - if self.__print_hex: - data_format = lambda x: hex(x) - else: - data_format = lambda x: x - - # First, print the table header. We print the current gene-scroll - # position for easy reference. Return back to zero scroll with the 'A' - # key. - ypos = len(self.__main) + len(self.__pages["PROCESS"]) + 5 - header = "{:<10} | genes {} -->".format( - "pidx", data_format(self.__proc_gene_scroll) - ) - self.__clear_line(ypos) - self.__print_header(ypos, header) - ypos += 1 - proc_id = self.proc_list_scroll - - # Iterate all lines and print as much genetic data as it fits. We can - # scroll the gene data table using the 'wasd' keys. - while ypos < self.size[0]: - self.__clear_line(ypos) - - if proc_id < self.__sim.lib.sal_proc_get_capacity(): - if proc_id == self.selected_proc: - # Always highlight the selected process. - attr = curses.color_pair(self.__pair_selected) - else: - attr = curses.A_NORMAL - - # Assemble and print the next table row. - row = "{:<10} |".format(data_format(proc_id)) - self.__print_line(ypos, row, attr) - self.__print_proc_gene(ypos, proc_id) - - proc_id += 1 - ypos += 1 - - def __print_proc_list(self): - """ Print list of process genomes or process data elements in PROCESS - page. We can toggle between printing the genomes or the data elements - by pressing the 'g' key. - """ - if self.__proc_gene_view: - self.__print_proc_gene_list() - else: - self.__print_proc_data_list() - - def __proc_select_by_cursor(self): - """ Select process located on address under cursor, if any exists. - """ - # First, calculate address under cursor. - ypos = self.__curs_y - xpos = self.__curs_x - World.PADDING - line_size = self.size[1] - World.PADDING - address = self.world.pos + ( - ((ypos * line_size) + xpos) * self.world.zoom - ) - - # Now, iterate all living processes and try to find one that owns the - # calculated address. - if self.__sim.lib.sal_mem_is_address_valid(address): - for proc_id in range(self.__sim.lib.sal_proc_get_capacity()): - if not self.__sim.lib.sal_proc_is_free(proc_id): - proc_data = (c_uint32 * len(self.proc_elements))() - self.__sim.lib.sal_proc_get_proc_data(proc_id, cast( - proc_data, POINTER(c_uint32)) - ) - mb1a = proc_data[self.proc_elements.index("mb1a")] - mb1s = proc_data[self.proc_elements.index("mb1s")] - mb2a = proc_data[self.proc_elements.index("mb2a")] - mb2s = proc_data[self.proc_elements.index("mb2s")] - - if ( - mb1a <= address < (mb1a + mb1s) or - mb2a <= address < (mb2a + mb2s) - ): - self.selected_proc = proc_id - break - - def __get_minimal(self): - """ Generate set of data fields to be printed on minimal mode. - """ - return [ - ("cycle", self.__sim.lib.sal_main_get_cycle), - ("epoch", self.__sim.lib.sal_main_get_epoch), - ("procs", self.__sim.lib.sal_proc_get_count), - ] - - def __print_minimal(self): - """ Print minimal mode data fields. - """ - self.__print_line(1, "Salis --- Minimal mode") - - for i, field in enumerate(self.__minimal): - self.__print_line(i + 2, "{}: {}".format(field[0], field[1]())) diff --git a/bin/salis.py b/bin/salis.py index 200da33..b59671a 100755 --- a/bin/salis.py +++ b/bin/salis.py @@ -24,8 +24,8 @@ import traceback from argparse import ArgumentParser, HelpFormatter from ctypes import CDLL, c_bool, c_uint8, c_uint32, c_char_p, POINTER -from handler import Handler -from printer import Printer +from modules.handler import Handler +from modules.printer import Printer from subprocess import check_call diff --git a/bin/world.py b/bin/world.py deleted file mode 100644 index 08f2734..0000000 --- a/bin/world.py +++ /dev/null @@ -1,279 +0,0 @@ -""" SALIS: Viewer/controller for the SALIS simulator. - -File: world.py -Author: Paul Oliver -Email: paul.t.oliver.design@gmail.com - -This module should be considered an extension of the 'printer' module. It takes -care of getting a pre-redered image from Salis and post-processing it in order -to print it into the curses screen. It also keeps track of user controllable -rendering parameters (position and zoom). -""" - -import curses - -from ctypes import c_uint8, cast, POINTER - - -class World: - PADDING = 25 - - def __init__(self, printer, sim): - """ World constructor. We link to the printer and main simulation - classes. We also setup the colors for rendering the world. - """ - self.__printer = printer - self.__sim = sim - self.__set_world_colors() - self.__show_ip = True - self.pos = 0 - self.zoom = 1 - - def render(self): - """ Function for rendering the world. We get a pre-rendered buffer from - Salis' memory module (its way faster to pre-render in C) and use that - to assemble the world image in Python. - """ - # Window is so narrow that world is not visible. - if self.__printer.size[1] <= self.PADDING: - return - - # Get pre-rendered image from Salis' memory module. - line_width = self.__printer.size[1] - self.PADDING - print_area = self.__printer.size[0] * line_width - c_buffer = (c_uint8 * print_area)() - self.__sim.lib.sal_ren_get_image( - self.pos, self.zoom, print_area, cast(c_buffer, POINTER(c_uint8)) - ) - - # Get data elements of selected process, if it's running, and store - # them into a convenient dict object. - if self.__sim.lib.sal_proc_is_free(self.__printer.selected_proc): - sel_data = None - else: - out_data = self.__printer.selected_proc_data - out_elem = self.__printer.proc_elements - sel_data = { - "ip": out_data[out_elem.index("ip")], - "sp": out_data[out_elem.index("sp")], - "mb1a": out_data[out_elem.index("mb1a")], - "mb1s": out_data[out_elem.index("mb1s")], - "mb2a": out_data[out_elem.index("mb2a")], - "mb2s": out_data[out_elem.index("mb2s")], - } - - # Iterate all cells on printable area and print the post-rendered - # cells. Rendered cells contain info about bit flags and instructions - # currently written into memory. - bidx = 0 - - for y in range(self.__printer.size[0]): - for x in range(line_width): - xpad = x + self.PADDING - addr = self.pos + (self.zoom * bidx) - symb, attr = self.__render_cell(c_buffer[bidx], addr, sel_data) - - # Curses raises an exception when printing on the edge of the - # screen; we can just ignore it. - try: - self.__printer.screen.addstr(y, xpad, symb, attr) - except curses.error: - pass - - bidx += 1 - - def zoom_out(self): - """ Zoom out by a factor of 2 (zoom *= 2). - """ - if self.__is_world_editable(): - self.zoom = min(self.zoom * 2, self.__get_max_zoom()) - - def zoom_in(self): - """ Zoom in by a factor of 2 (zoom //= 2). - """ - if self.__is_world_editable(): - self.zoom = max(self.zoom // 2, 1) - - def zoom_reset(self): - """ Reset zoom to a valid value on certain events (i.e. during terminal - resizing). - """ - self.zoom = min(self.zoom, self.__get_max_zoom()) - - def pan_left(self): - """ Pan world to the left (pos -= zoom). - """ - if self.__is_world_editable(): - self.pos = max(self.pos - self.zoom, 0) - - def pan_right(self): - """ Pan world to the right (pos += zoom). - """ - if self.__is_world_editable(): - max_pos = self.__sim.lib.sal_mem_get_size() - 1 - self.pos = min(self.pos + self.zoom, max_pos) - - def pan_down(self, fast=False): - """ Pan world downward. - """ - if self.__is_world_editable(): - if fast: - self.pos = max(self.pos - self.__get_world_area(), 0) - else: - self.pos = max(self.pos - self.__get_line_area(), 0) - - def pan_up(self, fast=False): - """ Pan world upward. - """ - if self.__is_world_editable(): - max_pos = self.__sim.lib.sal_mem_get_size() - 1 - - if fast: - self.pos = min(self.pos + self.__get_world_area(), max_pos) - else: - self.pos = min(self.pos + self.__get_line_area(), max_pos) - - def pan_reset(self): - """ Set world position to zero. - """ - if self.__is_world_editable(): - self.pos = 0 - - def scroll_to(self, pos): - """ Move world pos to a specified position. - """ - if self.__is_world_editable(): - if self.__sim.lib.sal_mem_is_address_valid(pos): - self.pos = pos - else: - raise RuntimeError("Error: scrolling to an invalid address") - - def toggle_ip_view(self): - """ Turn on/off IP visualization. Turning off IPs might make it easier - to visualize the underlying memory block structure. - """ - if self.__is_world_editable(): - self.__show_ip = not self.__show_ip - - - ############################### - # Private methods - ############################### - - def __set_world_colors(self): - """ Define color pairs for rendering the world. Each color has a - special meaning, referring to the selected process IP, SP and memory - blocks, or to bit flags currently set on rendered cells. - """ - self.pair_free = self.__printer.get_color_pair( - curses.COLOR_BLUE - ) - self.pair_alloc = self.__printer.get_color_pair( - curses.COLOR_BLACK, curses.COLOR_BLUE - ) - self.pair_mbstart = self.__printer.get_color_pair( - curses.COLOR_BLACK, curses.COLOR_CYAN - ) - self.pair_ip = self.__printer.get_color_pair( - curses.COLOR_BLACK, curses.COLOR_WHITE - ) - self.pair_sel_mb2 = self.__printer.get_color_pair( - curses.COLOR_BLACK, curses.COLOR_GREEN - ) - self.pair_sel_mb1 = self.__printer.get_color_pair( - curses.COLOR_BLACK, curses.COLOR_YELLOW - ) - self.pair_sel_sp = self.__printer.get_color_pair( - curses.COLOR_BLACK, curses.COLOR_MAGENTA - ) - self.pair_sel_ip = self.__printer.get_color_pair( - curses.COLOR_BLACK, curses.COLOR_RED - ) - - def __render_cell(self, byte, addr, sel_data=None): - """ Render a single cell on the WORLD view. All cells are rendered by - interpreting the values coming in from the buffer. We overlay special - colors for representing the selected organism's state, on top of the - more common colors used to represent memory state. - """ - # Paint black all cells that are out of memory bounds. - if not self.__sim.lib.sal_mem_is_address_valid(addr): - return " ", curses.A_NORMAL - - # Check if cell contains part of the currently selected process. - if sel_data: - top_addr = addr + self.zoom - top_mb1a = sel_data["mb1a"] + sel_data["mb1s"] - top_mb2a = sel_data["mb2a"] + sel_data["mb2s"] - - if addr <= sel_data["ip"] < top_addr: - pair = self.pair_sel_ip - elif addr <= sel_data["sp"] < top_addr: - pair = self.pair_sel_sp - elif top_addr > sel_data["mb1a"] and top_mb1a > addr: - pair = self.pair_sel_mb1 - elif top_addr > sel_data["mb2a"] and top_mb2a > addr: - pair = self.pair_sel_mb2 - - # No pair has been selected yet; select pair based on bit-flags. - if not "pair" in locals(): - if self.__show_ip and byte >= 0x80: - pair = self.pair_ip - elif (byte % 0x80) >= 0x40: - pair = self.pair_mbstart - elif (byte % 0x40) >= 0x20: - pair = self.pair_alloc - else: - pair = self.pair_free - - # Select symbol to represent instructions currently on cell. - inst = byte % 32 - - if self.zoom == 1: - symb = self.__printer.inst_list[inst][1] - elif inst > 16: - symb = ":" - else: - symb = "." - - # Return tuple containing our post-redered cell. - return symb, curses.color_pair(pair) - - def __get_max_zoom(self): - """ Calculate maximum needed zoom so that the entire world fits on the - terminal window. - """ - max_zoom = 1 - line_size = self.__printer.size[1] - self.PADDING - coverage = self.__printer.size[0] * line_size - - # We fix a maximum zoom level; otherwise, program may halt on extreme - # zoom levels. - while ( - (coverage * max_zoom) < self.__sim.lib.sal_mem_get_size() and - max_zoom < 2 ** 16 - ): - max_zoom *= 2 - - return max_zoom - - def __is_world_editable(self): - """ For this to return True, printer's current page must be WORLD page. - Additionally, the WORLD panel must be visible on the terminal window - (i.e. curses.COLS > data_margin). - """ - correct_page = self.__printer.current_page == "WORLD" - correct_size = self.__printer.size[1] > self.PADDING - return correct_page and correct_size - - def __get_line_area(self): - """ Return amount of bytes contained in a printed WORLD line. - """ - line_size = self.__printer.size[1] - self.PADDING - line_area = self.zoom * line_size - return line_area - - def __get_world_area(self): - """ Return amount of bytes contained in the entire WORLD view. - """ - return self.__get_line_area() * self.__printer.size[0] -- cgit v1.2.1