From ca118555214a176728b9aab87849391344306d6d Mon Sep 17 00:00:00 2001 From: Paul Oliver Date: Thu, 29 Feb 2024 02:29:13 +0100 Subject: Initial commit. --- bin/common/.keep | 0 bin/genomes/.keep | 0 bin/genomes/86.anc | 1 + bin/handler.py | 403 +++++++++++++++++++++++++ bin/lib/.keep | 0 bin/printer.py | 833 ++++++++++++++++++++++++++++++++++++++++++++++++++++ bin/salis.py | 346 ++++++++++++++++++++++ bin/sims/.keep | 0 bin/sims/auto/.keep | 0 bin/world.py | 277 +++++++++++++++++ 10 files changed, 1860 insertions(+) create mode 100644 bin/common/.keep create mode 100644 bin/genomes/.keep create mode 100644 bin/genomes/86.anc create mode 100644 bin/handler.py create mode 100644 bin/lib/.keep create mode 100644 bin/printer.py create mode 100755 bin/salis.py create mode 100644 bin/sims/.keep create mode 100644 bin/sims/auto/.keep create mode 100644 bin/world.py (limited to 'bin') diff --git a/bin/common/.keep b/bin/common/.keep new file mode 100644 index 0000000..e69de29 diff --git a/bin/genomes/.keep b/bin/genomes/.keep new file mode 100644 index 0000000..e69de29 diff --git a/bin/genomes/86.anc b/bin/genomes/86.anc new file mode 100644 index 0000000..30e76f3 --- /dev/null +++ b/bin/genomes/86.anc @@ -0,0 +1 @@ +:::[a...]b...^b^b^b-bba::.!d#d#b?d).:.{bc).::a:.:}bc:..LadWcd^a^cvb?b(.::$~b~d(..:a::: diff --git a/bin/handler.py b/bin/handler.py new file mode 100644 index 0000000..ad3b14e --- /dev/null +++ b/bin/handler.py @@ -0,0 +1,403 @@ +""" SALIS: Viewer/controller for the SALIS simulator. + +file: handler.py +Author: Paul Oliver +Email: paul.t.oliver.design@gmail.com + +This module should be considered the 'controller' part of the Salis simulator. +It receives and parses all user input via keyboard and console commands. It +also takes care of genome compilation (via genome files located on the +'genomes' directory). + +An user may open the Salis console by pressing the 'c' key while in a running +session. A nice quirk is the possibility to run python commands from within the +Salis console. As an example, to get the memory size, an user could type: + +>>> exec output = self._sim.lib.sal_mem_get_size() + +Note that 'output' denotes a storage variable that will get printed on the +console response. This ability gives an user a whole lot of power, and should +be used with care. +""" + +import os +import curses + + +class Handler: + ESCAPE_KEY = 27 + + def __init__(self, sim): + """ Handler constructor. Simply link this class to the main simulation + class and printer class and create symbol dictionary. + """ + self._sim = sim + self._printer = sim.printer + self._inst_dict = self._get_inst_dict() + self._console_history = [] + + def process_cmd(self, cmd): + """ Process incoming commands from curses. Commands are received via + ncurses' getch() function, thus, they must be transformed into their + character representations with 'ord()'. + """ + if cmd == self.ESCAPE_KEY: + self._sim.lib.sal_main_save( + self._sim.save_file_path.encode("utf-8") + ) + self._sim.exit() + elif cmd == ord(" "): + self._sim.toggle_state() + elif cmd == curses.KEY_LEFT: + self._printer.flip_page(-1) + elif cmd == curses.KEY_RIGHT: + self._printer.flip_page(1) + elif cmd == curses.KEY_DOWN: + self._printer.scroll_main(-1) + elif cmd == curses.KEY_UP: + self._printer.scroll_main(1) + elif cmd == curses.KEY_RESIZE: + self._printer.on_resize() + elif cmd == ord("X"): + self._printer.toggle_hex() + elif cmd == ord("x"): + self._printer.world.zoom_out() + elif cmd == ord("z"): + self._printer.world.zoom_in() + elif cmd == ord("a"): + self._printer.world.pan_left() + self._printer.proc_scroll_left() + elif cmd == ord("d"): + self._printer.world.pan_right() + self._printer.proc_scroll_right() + elif cmd == ord("s"): + self._printer.world.pan_down() + self._printer.proc_scroll_down() + elif cmd == ord("w"): + self._printer.world.pan_up() + self._printer.proc_scroll_up() + elif cmd == ord("S"): + self._printer.world.pan_reset() + self._printer.proc_scroll_vertical_reset() + elif cmd == ord("A"): + self._printer.world.pan_reset() + self._printer.proc_scroll_horizontal_reset() + elif cmd == ord("o"): + self._printer.proc_select_prev() + elif cmd == ord("p"): + self._printer.proc_select_next() + elif cmd == ord("f"): + self._printer.proc_select_first() + elif cmd == ord("l"): + self._printer.proc_select_last() + elif cmd == ord("k"): + self._printer.proc_scroll_to_selected() + elif cmd == ord("g"): + self._printer.proc_toggle_gene_view() + elif cmd == ord("\n"): + self._printer.run_cursor() + elif cmd == ord("c"): + self._printer.run_console() + else: + # Check for numeric input. Number keys [1 to 0] cycle the + # simulation [2 ** ((n - 1) % 10] times. + try: + if chr(cmd).isdigit(): + factor = int(chr(cmd)) + factor = int(2 ** ((factor - 1) % 10)) + self._cycle_sim(factor) + except ValueError: + pass + + def handle_console(self, command_raw): + """ Process console commands. We parse and check for input errors. Any + python exception messages are redirected to the console-response + window. + """ + if command_raw: + command = command_raw.split() + + try: + # Handle both python and self-thrown exceptions. + if command[0] in ["q", "quit"]: + self._on_quit(command, save=True) + elif command[0] in ["q!", "quit!"]: + self._on_quit(command, save=False) + elif command[0] in ["i", "input"]: + self._on_input(command) + elif command[0] in ["c", "compile"]: + self._on_compile(command) + elif command[0] in ["n", "new"]: + self._on_new(command) + elif command[0] in ["k", "kill"]: + self._on_kill(command) + elif command[0] in ["e", "exec"]: + self._on_exec(command) + elif command[0] in ["s", "scroll"]: + self._on_scroll(command) + elif command[0] in ["p", "process"]: + self._on_proc_select(command) + elif command[0] in ["r", "rename"]: + self._on_rename(command) + elif command[0] in ["save"]: + self._on_save(command) + elif command[0] in ["a", "auto"]: + self._on_set_autosave(command) + else: + # Raise if a non-existing command has been given. + self._raise("Invalid command: '{}'".format(command[0])) + except BaseException as exep: + # We parse and redirect python exceptions to the error + # console-window. + message = str(exep).strip() + message = message[0].upper() + message[1:] + self._printer.show_console_error(message) + finally: + # Store command on console history. + self._console_history.append(command_raw.strip()) + + @property + def console_history(self): + return self._console_history + + def _raise(self, message): + """ Generic exception thrower. Throws a 'RuntimeError' initialized with + the given message. + """ + raise RuntimeError("ERROR: {}".format(message)) + + def _respond(self, message): + """ Generic console responder. Throws a 'RuntimeError' initialized with + the given message. + """ + raise RuntimeError(message) + + def _cycle_sim(self, factor): + """ Simply cycle Salis 'factor' number of times. + """ + for _ in range(factor): + self._sim.lib.sal_main_cycle() + self._sim.check_autosave() + + def _get_inst_dict(self): + """ Transform the instruction list of the printer module into a + dictionary that's more useful for genome compilation. Instruction + symbols are keys, values are the actual byte representation. + """ + inst_dict = {} + + for i, inst in enumerate(self._printer.inst_list): + inst_dict[inst[1]] = i + + return inst_dict + + def _on_quit(self, command, save): + """ Exit simulation. We can choose whether to save the simulation into a + save file or not. + """ + if len(command) > 1: + self._raise("Invalid parameters for '{}'".format(command[0])) + + if save: + self._sim.lib.sal_main_save( + self._sim.save_file_path.encode("utf-8") + ) + + self._sim.exit() + + def _write_genome(self, genome, address_list): + """ Write genome stream into a given list of memory addresses. All + addresses must be valid or an exception is thrown. + """ + # All addresses we will write to must be valid. + for base_addr in address_list: + address = int(base_addr, 0) + + for _ in range(len(genome)): + if not self._sim.lib.sal_mem_is_address_valid(address): + self._raise("Address '{}' is invalid".format(address)) + + address += 1 + + # All looks well! Let's compile the genome into memory. + for base_addr in address_list: + address = int(base_addr, 0) + + for symbol in genome: + self._sim.lib.sal_mem_set_inst( + address, self._inst_dict[symbol] + ) + address += 1 + + def _on_input(self, command): + """ Compile organism from user typed input. Compilation can only occur + on valid memory addresses. An exception will be thrown when trying to + write into non-valid address or when input stream is invalid. + """ + if len(command) < 3: + self._raise("Invalid parameters for '{}'".format(command[0])) + + # All characters in file must be actual instruction symbols. + for character in command[1]: + if character not in self._inst_dict: + self._raise("Invalid symbol '{}' found on stream".format( + character + )) + + # All looks well, Let's write the genome into memory. + self._write_genome(command[1], command[2:]) + + def _on_compile(self, command): + """ Compile organism from source genome file. Genomes must be placed on + the './genomes' directory. Compilation can only occur on valid memory + addresses. An exception will be thrown when trying to write into + non-valid address or when genome file is invalid. + """ + if len(command) < 3: + self._raise("Invalid parameters for '{}'".format(command[0])) + + # Open genome file for compilation. + gen_file = os.path.join(self._sim.path, "genomes", command[1]) + + with open(gen_file, "r") as f: + genome = f.read().strip() + + # Entire genome must be written on a single line. + if "\n" in genome: + self._raise("Newline detected on '{}'".format(gen_file)) + + # All characters in file must be actual instruction symbols. + for character in genome: + if character not in self._inst_dict: + self._raise("Invalid symbol '{}' found on '{}'".format( + character, gen_file + )) + + # All looks well, Let's write the genome into memory. + self._write_genome(genome, command[2:]) + + def _on_new(self, command): + """ Instantiate new organism of given size on given address. These + memory areas must be free and valid or an exception is thrown. + """ + if len(command) < 3: + self._raise("Invalid parameters for '{}'".format(command[0])) + + # Check that all addresses we will allocate are free and valid. + for base_addr in command[2:]: + address = int(base_addr, 0) + + for _ in range(int(command[1])): + if not self._sim.lib.sal_mem_is_address_valid(address): + self._raise("Address '{}' is invalid".format(address)) + elif self._sim.lib.sal_mem_is_allocated(address): + self._raise("Address '{}' is allocated".format(address)) + + address += 1 + + # All looks well! Let's instantiate our new organism. + for base_addr in command[2:]: + address = int(base_addr, 0) + size = int(command[1], 0) + self._sim.lib.sal_proc_create(address, size) + + def _on_kill(self, command): + """ Kill organism on bottom of reaper queue. + """ + if len(command) > 1: + self._raise("Invalid parameters for '{}'".format(command[0])) + + # Call proc kill function only if there's any organisms to kill. + if not self._sim.lib.sal_proc_get_count(): + self._raise("No organisms currently alive") + else: + self._sim.lib.sal_proc_kill() + + def _on_exec(self, command): + """ Allow a user to execute a python command from within the console. + Using this is very hack-ish, and not recommended unless you're certain + of what you're doing! + """ + if len(command) < 2: + self._raise("'{}' must be followed by an executable string".format( + command[0]) + ) + + # User may query any simulation variable or status and the console will + # respond. For example, to query memory size or order, type one of the + # following: + # + # >>> exec output = self._sim.lib.sal_mem_get_size() + # >>> exec output = self._sim.lib.sal_mem_get_order() + # + output = {} + exec(" ".join(command[1:]), locals(), output) + + if output: + self._respond("EXEC RESPONDS: {}".format(str(output))) + + def _on_scroll(self, command): + """ We can scroll to a specific process (on PROCESS view) or to a + specific world address (on WORLD view) via the console. + """ + if len(command) != 2: + self._raise("Invalid parameters for '{}'".format(command[0])) + + target = int(command[1], 0) + + # If on PROCESS page, scroll to given process. + if self._printer.current_page == "PROCESS": + if target < self._sim.lib.sal_proc_get_capacity(): + self._printer.proc_scroll_to(target) + else: + self._raise("No process with ID '{}' found".format(target)) + elif self._printer.current_page == "WORLD": + if self._sim.lib.sal_mem_is_address_valid(target): + self._printer.world.scroll_to(target) + else: + self._raise("Address '{}' is invalid".format(address)) + else: + self._raise("'{}' must be called on PROCESS or WORLD page".format( + command[0]) + ) + + def _on_proc_select(self, command): + """ Select a specific process (on PROCESS or WORLD page). + """ + if len(command) != 2: + self._raise("Invalid parameters for '{}'".format(command[0])) + + target = int(command[1], 0) + + # If on PROCESS page, scroll to given process. + if target < self._sim.lib.sal_proc_get_capacity(): + self._printer.proc_select_by_id(target) + else: + self._raise("No process with ID '{}' found".format(target)) + + def _on_rename(self, command): + """ Set a new simulation name. Future auto-saved files will use this + name as prefix. + """ + if len(command) != 2: + self._raise("Invalid parameters for '{}'".format(command[0])) + + self._sim.rename(command[1]) + + def _on_save(self, command): + """ Save simulation on its current state. + """ + if len(command) != 1: + self._raise("Invalid parameters for '{}'".format(command[0])) + + self._sim.lib.sal_main_save(self._sim.save_file_path.encode("utf-8")) + + def _on_set_autosave(self, command): + """ Set the simulation's auto save interval. Provide any integer + between 0 and (2**32 - 1). If zero is provided, auto saving will be + disabled. + """ + if len(command) != 2: + self._raise("Invalid parameters for '{}'".format(command[0])) + + self._sim.set_autosave(int(command[1], 0)) diff --git a/bin/lib/.keep b/bin/lib/.keep new file mode 100644 index 0000000..e69de29 diff --git a/bin/printer.py b/bin/printer.py new file mode 100644 index 0000000..135e220 --- /dev/null +++ b/bin/printer.py @@ -0,0 +1,833 @@ +""" SALIS: Viewer/controller for the SALIS simulator. + +File: printer.py +Author: Paul Oliver +Email: paul.t.oliver.design@gmail.com + +This module should be considered the 'view' part of the Salis simulator. It +takes care of displaying the simulator's state in a nicely formatted, intuitive +format. It makes use of the curses library for terminal handling. +""" + +import curses +import curses.textpad +import os +import time +from collections import OrderedDict +from ctypes import c_uint8, c_uint32, cast, POINTER +from handler import Handler +from world import World + + +class Printer: + def __init__(self, sim): + """ Printer constructor. It takes care of starting up curses, defining + the data pages and setting the printer on its initial state. + """ + self._sim = sim + self._color_pair_count = 0 + self._screen = self._get_screen() + self._inst_list = self._get_inst_list() + self._proc_elements = self._get_proc_elements() + self._main = self._get_main() + self._pages = self._get_pages() + self._size = self._screen.getmaxyx() + self._current_page = "MEMORY" + self._main_scroll = 0 + self._selected_proc = 0 + self._selected_proc_data = (c_uint32 * len(self._proc_elements))() + self._proc_list_scroll = 0 + self._proc_element_scroll = 0 + self._proc_gene_scroll = 0 + self._proc_gene_view = False + self._curs_y = 0 + self._curs_x = 0 + self._print_hex = False + self._world = World(self, self._sim) + + def __del__(self): + """ Printer destructor exits curses. + """ + curses.endwin() + + def get_color_pair(self, fg, bg=-1): + """ We use this method to set new color pairs, keeping track of the + number of pairs already set. We return the new color pair ID. + """ + self._color_pair_count += 1 + curses.init_pair(self._color_pair_count, fg, bg) + return self._color_pair_count + + def get_cmd(self): + """ This returns the pressed key from the curses handler. It's called + during the simulation's main loop. Flushing input is important when in + non-blocking mode. + """ + ch = self._screen.getch() + curses.flushinp() + return ch + + def set_nodelay(self, nodelay): + """ Toggles between blocking and non-blocking mode on curses. + """ + self._screen.nodelay(nodelay) + + def toggle_hex(self): + """ Toggle between decimal or hexadecimal printing of all simulation + state elements. + """ + self._print_hex = not self._print_hex + + def on_resize(self): + """ Called whenever the terminal window gets resized. + """ + self._size = self._screen.getmaxyx() + self.scroll_main() + self._world.zoom_reset() + + def flip_page(self, offset): + """ Change data page by given offset (i.e. '1' for next page or '-1' + for previous one). + """ + pidx = list(self._pages.keys()).index(self._current_page) + pidx = (pidx + offset) % len(self._pages) + self._current_page = list(self._pages.keys())[pidx] + self.scroll_main() + + def scroll_main(self, offset=0): + """ Scrolling is allowed whenever the current page does not fit inside + the terminal window. This method gets called, with no offset, under + certain situations, like changing pages, just to make sure the screen + gets cleared and at least some of the data is always scrolled into + view. + """ + self._screen.clear() + len_main = len(self._main) + len_page = len(self._pages[self._current_page]) + max_scroll = (len_main + len_page + 5) - self._size[0] + self._main_scroll += offset + self._main_scroll = max(0, min(self._main_scroll, max_scroll)) + + def proc_scroll_left(self): + """ Scroll process data elements or genomes (on PROCESS view) to the + left. + """ + if self._current_page == "PROCESS": + if self._proc_gene_view: + self._proc_gene_scroll -= 1 + self._proc_gene_scroll = max(0, self._proc_gene_scroll) + else: + self._proc_element_scroll -= 1 + self._proc_element_scroll = max(0, self._proc_element_scroll) + + def proc_scroll_right(self): + """ Scroll process data elements or genomes (on PROCESS view) to the + right. + """ + if self._current_page == "PROCESS": + if self._proc_gene_view: + self._proc_gene_scroll += 1 + else: + self._proc_element_scroll += 1 + max_scroll = len(self._proc_elements) - 1 + self._proc_element_scroll = min( + max_scroll, self._proc_element_scroll + ) + + def proc_scroll_down(self): + """ Scroll process data table (on PROCESS view) up. + """ + if self._current_page == "PROCESS": + self._proc_list_scroll = max(0, self._proc_list_scroll - 1) + + def proc_scroll_up(self): + """ Scroll process data table (on PROCESS view) down. + """ + if self._current_page == "PROCESS": + self._proc_list_scroll = min( + self._sim.lib.sal_proc_get_capacity() - 1, + self._proc_list_scroll + 1 + ) + + def proc_scroll_to(self, proc_id): + """ Scroll process data table (on PROCESS view) to a specific position. + """ + if self._current_page == "PROCESS": + if proc_id < self._sim.lib.sal_proc_get_capacity(): + self._proc_list_scroll = proc_id + else: + raise RuntimeError("Error: scrolling to invalid process") + + def proc_scroll_vertical_reset(self): + """ Scroll process data table (on PROCESS view) back to top. + """ + if self._current_page == "PROCESS": + self._proc_list_scroll = 0 + + def proc_scroll_horizontal_reset(self): + """ Scroll process data or genome table (on PROCESS view) back to the + left. + """ + if self._current_page == "PROCESS": + if self._proc_gene_view: + self._proc_gene_scroll = 0 + else: + self._proc_element_scroll = 0 + + def proc_select_prev(self): + """ Select previous process. + """ + if self._current_page in ["PROCESS", "WORLD"]: + self._selected_proc -= 1 + self._selected_proc %= self._sim.lib.sal_proc_get_capacity() + + def proc_select_next(self): + """ Select next process. + """ + if self._current_page in ["PROCESS", "WORLD"]: + self._selected_proc += 1 + self._selected_proc %= self._sim.lib.sal_proc_get_capacity() + + def proc_select_first(self): + """ Select first process on reaper queue. + """ + if self._current_page in ["PROCESS", "WORLD"]: + if self._sim.lib.sal_proc_get_count(): + self._selected_proc = self._sim.lib.sal_proc_get_first() + + def proc_select_last(self): + """ Select last process on reaper queue. + """ + if self._current_page in ["PROCESS", "WORLD"]: + if self._sim.lib.sal_proc_get_count(): + self._selected_proc = self._sim.lib.sal_proc_get_last() + + def proc_select_by_id(self, proc_id): + """ Select process from given ID. + """ + if proc_id < self._sim.lib.sal_proc_get_capacity(): + self._selected_proc = proc_id + else: + raise RuntimeError("Error: attempting to select non-existing proc") + + def proc_scroll_to_selected(self): + """ Scroll WORLD or PROCESS page so that selected process becomes + visible. + """ + if self._current_page == "PROCESS": + self._proc_list_scroll = self._selected_proc + elif self._current_page == "WORLD": + if not self._sim.lib.sal_proc_is_free(self._selected_proc): + index = self._proc_elements.index("mb1a") + address = self._selected_proc_data[index] + self._world.scroll_to(address) + + def proc_toggle_gene_view(self): + """ Toggle between data element or genome view on PROCESS page. + """ + if self._current_page == "PROCESS": + self._proc_gene_view = not self._proc_gene_view + + def run_cursor(self): + """ We can toggle a visible cursor on WORLD view to aid us in selecting + processes. + """ + if self._current_page == "WORLD" and self._size[1] > World.PADDING: + curses.curs_set(True) + + while True: + self._curs_y = max(0, min(self._curs_y, self._size[0] - 1)) + self._curs_x = max(World.PADDING, min( + self._curs_x, self._size[1] - 1 + )) + self._screen.move(self._curs_y, self._curs_x) + cmd = self._screen.getch() + + if cmd in [ord("c"), curses.KEY_RESIZE, Handler.ESCAPE_KEY]: + self.on_resize() + break + elif cmd == curses.KEY_LEFT: + self._curs_x -= 1 + elif cmd == curses.KEY_RIGHT: + self._curs_x += 1 + elif cmd == curses.KEY_DOWN: + self._curs_y += 1 + elif cmd == curses.KEY_UP: + self._curs_y -= 1 + elif cmd == ord("\n"): + self._proc_select_by_cursor() + break + + curses.curs_set(False) + + def run_console(self): + """ Run the Salis console. You can use the console to control all main + aspects of the simulation, like compiling genomes into memory, creating + or killing organisms, setting auto-save interval, among other stuff. + """ + # Print a pythonic prompt. + self._print_line(self._size[0] - 1, ">>> ", scroll=False) + self._screen.refresh() + + # Create the console child window. We turn it into a Textbox object in + # order to allow line-editing and extract output easily. + console = curses.newwin(1, self._size[1] - 5, self._size[0] - 1, 5) + textbox = curses.textpad.Textbox(console, insert_mode=True) + textbox.stripspaces = True + + # Grab a copy of the console history and instantiate a pointer to the + # last element. + history = self._sim.handler.console_history + [""] + pointer = len(history) - 1 + + # Nested method reinserts recorded commands from history into console. + def access_history(cmd): + nonlocal pointer + + if pointer == len(history) - 1: + history[-1] = console.instr().strip() + + if cmd == "up" and pointer != 0: + pointer -= 1 + elif cmd == "down" and pointer < len(history) - 1: + pointer += 1 + + console.clear() + console.addstr(0, 0, history[pointer]) + console.refresh() + + # Declare custom validator to control special commands. + def validator(cmd): + EXIT = 7 + + if cmd in [curses.KEY_RESIZE, Handler.ESCAPE_KEY]: + console.clear() + return EXIT + elif cmd == curses.KEY_UP: + access_history("up") + elif cmd == curses.KEY_DOWN: + access_history("down") + else: + return cmd + + # Run the Textbox object with our custom validator. + curses.curs_set(True) + output = textbox.edit(validator) + curses.curs_set(False) + + # Finally, extract data from console and send to handler. + self._sim.handler.handle_console(output) + self._screen.clear() + + def show_console_error(self, message): + """ Shows Salis console error messages, if any. These messages might + contain actual python exception output. + """ + self._print_line(self._size[0] - 1, ">>>", curses.color_pair( + self._pair_error + ) | curses.A_BOLD) + self._screen.refresh() + + # We also use a Textbox object, just so that execution gets halted + # until a key gets pressed (even on non-blocking mode). + console = curses.newwin(1, self._size[1] - 5, self._size[0] - 1, 5) + textbox = curses.textpad.Textbox(console) + + # Curses may raise an exception if printing on the edge of the screen; + # we can just ignore it. + try: + console.addstr(0, 0, message, curses.color_pair( + self._pair_error + ) | curses.A_BOLD) + except curses.error: + pass + + # Custom validator simply exits on any key. + def validator(cmd): + EXIT = 7 + return EXIT + + textbox.edit(validator) + self._screen.clear() + + def print_page(self): + """ Print current page to screen. We use the previously generated + '_pages' dictionary to easily associate a label to a Salis function. + """ + # Update selected proc data if in WORLD view. + if self._current_page == "WORLD": + self._sim.lib.sal_proc_get_proc_data(self._selected_proc, cast( + self._selected_proc_data, POINTER(c_uint32) + )) + + # Print MAIN simulation data. + self._print_line( + 1, "SALIS[{}]".format(self._sim.args.file), curses.color_pair( + self._pair_header + ) | curses.A_BOLD + ) + self._print_widget(2, self._main) + + # Print data of currently selected page. + main_lines = len(self._main) + 3 + self._print_header(main_lines, self._current_page) + self._print_widget(main_lines + 1, self._pages[self._current_page]) + + # Print special widgets (WORLD view and PROCESS list). + if self._current_page == "WORLD": + self._world.render() + elif self._current_page == "PROCESS": + self._print_proc_list() + + @property + def screen(self): + return self._screen + + @property + def inst_list(self): + return self._inst_list + + @property + def proc_elements(self): + return self._proc_elements + + @property + def size(self): + return self._size + + @property + def current_page(self): + return self._current_page + + @property + def selected_proc(self): + return self._selected_proc + + @property + def selected_proc_data(self): + return self._selected_proc_data + + @property + def proc_list_scroll(self): + return self._proc_list_scroll + + @property + def world(self): + return self._world + + def _set_colors(self): + """ Define the color pairs for the data printer. + """ + curses.start_color() + curses.use_default_colors() + self._pair_header = self.get_color_pair(curses.COLOR_BLUE) + self._pair_selected = self.get_color_pair(curses.COLOR_YELLOW) + self._pair_error = self.get_color_pair(curses.COLOR_RED) + + def _get_screen(self): + """ Prepare and return the main curses window. We also set a shorter + delay when responding to a pressed escape key. + """ + # Set a shorter delay to the ESCAPE key, so that we may use it to exit + # Salis. + os.environ.setdefault("ESCDELAY", "25") + + # Prepare curses screen. + screen = curses.initscr() + curses.noecho() + curses.cbreak() + screen.keypad(True) + curses.curs_set(False) + + # We need color support in order to run the printer module. + if curses.has_colors(): + self._set_colors() + else: + raise RuntimeError("Error: no color support.") + + return screen + + def _get_inst_list(self): + """ Parse instruction set from C header file named 'instset.h'. We're + using the keyword 'SALIS_INST' to identify an instruction definition, + so be careful not to use this keyword anywhere else on the headers. + """ + inst_list = [] + inst_file = os.path.join(self._sim.path, "../include/instset.h") + + with open(inst_file, "r") as f: + lines = f.read().splitlines() + + for line in lines: + if line and line.split()[0] == "SALIS_INST": + inst_name = line.split()[1][:4] + inst_symb = line.split()[3] + inst_list.append((inst_name, inst_symb)) + + return inst_list + + def _get_proc_elements(self): + """ Parse process structure member variables from C header file named + 'process.h'. We're using the keyword 'SALIS_PROC_ELEMENT' to identify + element declarations, so be careful not to use this keyword anywhere + else on the headers. + """ + proc_elem_list = [] + proc_elem_file = os.path.join(self._sim.path, "../include/process.h") + + with open(proc_elem_file, "r") as f: + lines = f.read().splitlines() + + for line in lines: + if line and line.split()[0] == "SALIS_PROC_ELEMENT": + proc_elem_name = line.split()[2].split(";")[0] + + if proc_elem_name == "stack[8]": + # The stack is a special member variable, an array. We + # translate it by returning a list of stack identifiers. + proc_elem_list += ["stack[{}]".format(i) for i in range(8)] + else: + # We can assume all other struct elements are single + # variables. + proc_elem_list.append(proc_elem_name) + + return proc_elem_list + + def _get_main(self): + """ Generate main set of data fields to be printed. We associate, on a + list object, a label to each Salis function to be called. The following + elements get printed on all pages. + """ + return [ + ("e", "cycle", self._sim.lib.sal_main_get_cycle), + ("e", "epoch", self._sim.lib.sal_main_get_epoch), + ("e", "state", lambda: self._sim.state), + ("e", "autosave", lambda: self._sim.autosave), + ] + + def _get_pages(self): + """ Generate data fields to be printed on each page. We associate, on a + list object, a label to each Salis function to be called. Each list + represents a PAGE. We initialize all pages inside an ordered dictionary + object. + """ + # The following widgets help up print special sets of data elements. + # The use of nested lambdas is needed to receive updated values. + # Instruction counter widget: + inst_widget = [("e", inst[0], (lambda j: ( + lambda: self._sim.lib.sal_mem_get_inst_count(j) + ))(i)) for i, inst in enumerate(self._inst_list)] + + # Evolver module state widget: + state_widget = [("e", "state[{}]".format(i), (lambda j: ( + lambda: self._sim.lib.sal_evo_get_state(j) + ))(i)) for i in range(4)] + + # Selected process state widget: + selected_widget = [("p", element, (lambda j: ( + lambda: self._selected_proc_data[j] + ))(i)) for i, element in enumerate(self._proc_elements)] + + # With the help of the widgets above, we can declare the PAGES + # dictionary object. + return OrderedDict([ + ("MEMORY", [ + ("e", "order", self._sim.lib.sal_mem_get_order), + ("e", "size", self._sim.lib.sal_mem_get_size), + ("e", "blocks", self._sim.lib.sal_mem_get_block_start_count), + ("e", "allocated", self._sim.lib.sal_mem_get_allocated_count), + ("e", "ips", self._sim.lib.sal_mem_get_ip_count), + ("s", ""), + ("h", "INSTRUCTIONS"), + ] + inst_widget), + ("EVOLVER", [ + ("e", "last", self._sim.lib.sal_evo_get_last_changed_address), + ("e", "calls", self._sim.lib.sal_evo_get_calls_on_last_cycle), + ] + state_widget), + ("PROCESS", [ + ("e", "count", self._sim.lib.sal_proc_get_count), + ("e", "capacity", self._sim.lib.sal_proc_get_capacity), + ("e", "first", self._sim.lib.sal_proc_get_first), + ("e", "last", self._sim.lib.sal_proc_get_last), + ("e", "exec", + self._sim.lib.sal_proc_get_instructions_executed + ), + ]), + ("WORLD", [ + ("e", "position", lambda: self._world.pos), + ("e", "zoom", lambda: self._world.zoom), + ("e", "selected", lambda: self._selected_proc), + ("s", ""), + ("h", "SELECTED PROC"), + ] + selected_widget), + ]) + + def _print_line(self, ypos, line, attrs=curses.A_NORMAL, scroll=True): + """ Print a single line on screen only when it's visible. + """ + if scroll: + ypos -= self._main_scroll + + if 0 <= ypos < self._size[0]: + # Curses raises an exception each time we print on the screen's + # edge. We can just catch and ignore it. + try: + line = line[:self._size[1] - 1] + self._screen.addstr(ypos, 1, line, attrs) + except curses.error: + pass + + def _print_header(self, ypos, line): + """ Print a bold header. + """ + header_attr = curses.A_BOLD | curses.color_pair(self._pair_header) + self._print_line(ypos, line, header_attr) + + def _print_value(self, ypos, element, value, attr=curses.A_NORMAL): + """ Print a label:value pair. + """ + if type(value) == int: + if value == ((2 ** 32) - 1): + # In Salis, UINT32_MAX is used to represent NULL. We print NULL + # as three dashes. + value = "---" + elif self._print_hex: + value = hex(value) + + line = "{:<10} : {:>10}".format(element, value) + self._print_line(ypos, line, attr) + + def _print_proc_element(self, ypos, element, value): + """ Print elements of currently selected process. We highlight in + YELLOW if the selected process is running. + """ + if self._sim.lib.sal_proc_is_free(self._selected_proc): + attr = curses.A_NORMAL + else: + attr = curses.color_pair(self._pair_selected) + + self._print_value(ypos, element, value, attr) + + def _print_widget(self, ypos, widget): + """ Print a widget (data PAGE) on screen. + """ + for i, element in enumerate(widget): + if element[0] == "s": + continue + elif element[0] == "h": + self._print_header(i + ypos, element[1]) + elif element[0] == "e": + self._print_value(i + ypos, element[1], element[2]()) + elif element[0] == "p": + self._print_proc_element(i + ypos, element[1], element[2]()) + + def _clear_line(self, ypos): + """ Clear the specified line. + """ + if 0 <= ypos < self._size[0]: + self._screen.move(ypos, 0) + self._screen.clrtoeol() + + def _print_proc_data_list(self): + """ Print list of process data elements in PROCESS page. We can toggle + between printing the data elements or the genomes by pressing the 'g' + key. + """ + # First, print the table header, by extracting element names from the + # previously generated proc element list. + ypos = len(self._main) + len(self._pages["PROCESS"]) + 5 + header = " | ".join(["{:<10}".format("pidx")] + [ + "{:>10}".format(element) + for element in self._proc_elements[self._proc_element_scroll:] + ]) + self._clear_line(ypos) + self._print_header(ypos, header) + ypos += 1 + proc_id = self._proc_list_scroll + + # Print all proc elements in decimal or hexadecimal format, depending + # on hex-flag being set. + if self._print_hex: + data_format = lambda x: hex(x) + else: + data_format = lambda x: x + + # Lastly, iterate all lines and print as much process data as it fits. + # We can scroll the process data table using the 'wasd' keys. + while ypos < self._size[0]: + self._clear_line(ypos) + + if proc_id < self._sim.lib.sal_proc_get_capacity(): + if proc_id == self._selected_proc: + # Always highlight the selected process. + attr = curses.color_pair(self._pair_selected) + else: + attr = curses.A_NORMAL + + # Retrieve a copy of the selected process state and store it in + # a list object. + proc_data = (c_uint32 * len(self._proc_elements))() + self._sim.lib.sal_proc_get_proc_data(proc_id, cast( + proc_data, POINTER(c_uint32)) + ) + + # Lastly, assemble and print the next table row. + row = " | ".join(["{:<10}".format(proc_id)] + [ + "{:>10}".format(data_format(element)) + for element in proc_data[self._proc_element_scroll:] + ]) + self._print_line(ypos, row, attr) + + proc_id += 1 + ypos += 1 + + def _print_proc_gene_block(self, ypos, gidx, xpos, mbs, mba, ip, sp, pair): + """ Print a sub-set of a process genome. Namely, on of its two memory + blocks. + """ + while gidx < mbs and xpos < curses.COLS: + gaddr = mba + gidx + + if gaddr == ip: + attr = curses.color_pair(self._world.pair_sel_ip) + elif gaddr == sp: + attr = curses.color_pair(self._world.pair_sel_sp) + else: + attr = curses.color_pair(pair) + + # Retrieve instruction from memory and transform it to correct + # symbol. + inst = self._sim.lib.sal_mem_get_inst(gaddr) + symb = self._inst_list[inst][1] + + # Curses raises an exception each time we print on the screen's + # edge. We can just catch and ignore it. + try: + self._screen.addch(ypos, xpos, symb, attr) + except curses.error: + pass + + gidx += 1 + xpos += 1 + + return xpos + + def _print_proc_gene(self, ypos, proc_id): + """ Print a single process genome on the genome table. We use the same + colors to represent memory blocks, IP and SP of each process, as those + used to represent the selected process on WORLD view. + """ + # There's nothing to print if process is free. + if self._sim.lib.sal_proc_is_free(proc_id): + return + + # Process is alive. Retrieve a copy of the current process state and + # store it in a list object. + proc_data = (c_uint32 * len(self._proc_elements))() + self._sim.lib.sal_proc_get_proc_data(proc_id, cast( + proc_data, POINTER(c_uint32)) + ) + + # Let's extract all data of interest. + mb1a = proc_data[self._proc_elements.index("mb1a")] + mb1s = proc_data[self._proc_elements.index("mb1s")] + mb2a = proc_data[self._proc_elements.index("mb2a")] + mb2s = proc_data[self._proc_elements.index("mb2s")] + ip = proc_data[self._proc_elements.index("ip")] + sp = proc_data[self._proc_elements.index("sp")] + + # Always print MAIN memory block (mb1) first (on the left side). That + # way we can keep most of our attention on the parent. + xpos = self._print_proc_gene_block( + ypos, self._proc_gene_scroll, 14, mb1s, mb1a, ip, sp, + self._world.pair_sel_mb1 + ) + + # Reset gene counter and print child memory block, if it exists. + if mb1s < self._proc_gene_scroll: + gidx = self._proc_gene_scroll - mb1s + else: + gidx = 0 + + self._print_proc_gene_block( + ypos, gidx, xpos, mb2s, mb2a, ip, sp, self._world.pair_sel_mb2 + ) + + def _print_proc_gene_list(self): + """ Print list of process genomes in PROCESS page. We can toggle + between printing the genomes or the data elements by pressing the 'g' + key. + """ + # First, print the table header. We print the current gene-scroll + # position for easy reference. Return back to zero scroll with the 'A' + # key. + ypos = len(self._main) + len(self._pages["PROCESS"]) + 5 + header = "{:<10} | genes {} -->".format( + "pidx", self._proc_gene_scroll + ) + self._clear_line(ypos) + self._print_header(ypos, header) + ypos += 1 + proc_id = self._proc_list_scroll + + # Iterate all lines and print as much genetic data as it fits. We can + # scroll the gene data table using the 'wasd' keys. + while ypos < self._size[0]: + self._clear_line(ypos) + + if proc_id < self._sim.lib.sal_proc_get_capacity(): + if proc_id == self._selected_proc: + # Always highlight the selected process. + attr = curses.color_pair(self._pair_selected) + else: + attr = curses.A_NORMAL + + # Assemble and print the next table row. + row = "{:<10} |".format(proc_id) + self._print_line(ypos, row, attr) + self._print_proc_gene(ypos, proc_id) + + proc_id += 1 + ypos += 1 + + def _print_proc_list(self): + """ Print list of process genomes or process data elements in PROCESS + page. We can toggle between printing the genomes or the data elements + by pressing the 'g' key. + """ + if self._proc_gene_view: + self._print_proc_gene_list() + else: + self._print_proc_data_list() + + def _proc_select_by_cursor(self): + """ Select process located on address under cursor, if any exists. + """ + # First, calculate address under cursor. + ypos = self._curs_y + xpos = self._curs_x - World.PADDING + line_size = self._size[1] - World.PADDING + address = self._world.pos + ( + ((ypos * line_size) + xpos) * self._world.zoom + ) + + # Now, iterate all living processes and try to find one that owns the + # calculated address. + if self._sim.lib.sal_mem_is_address_valid(address): + for proc_id in range(self._sim.lib.sal_proc_get_count()): + if not self._sim.lib.sal_proc_is_free(proc_id): + proc_data = (c_uint32 * len(self._proc_elements))() + self._sim.lib.sal_proc_get_proc_data(proc_id, cast( + proc_data, POINTER(c_uint32)) + ) + mb1a = proc_data[self._proc_elements.index("mb1a")] + mb1s = proc_data[self._proc_elements.index("mb1s")] + mb2a = proc_data[self._proc_elements.index("mb2a")] + mb2s = proc_data[self._proc_elements.index("mb2s")] + + if ( + mb1a <= address < (mb1a + mb1s) or + mb2a <= address < (mb2a + mb2s) + ): + self._selected_proc = proc_id + break diff --git a/bin/salis.py b/bin/salis.py new file mode 100755 index 0000000..6dd82b0 --- /dev/null +++ b/bin/salis.py @@ -0,0 +1,346 @@ +#!/usr/bin/env python3 + +""" SALIS: Viewer/controller for the SALIS simulator. + +File: salis.py +Author: Paul Oliver +Email: paul.t.oliver.design@gmail.com + +Main handler for the Salis simulator. The Salis class takes care of +initializing, running and shutting down the simulator and other sub-modules. It +also takes care of parsing the command-line arguments and linking to the Salis +library with the help of ctypes. + +To execute this script, make sure to have python3 installed and in your path, +as well as the cython package. Also, make sure it has correct execute +permissions (chmod). +""" + +import os +import re +import sys +import time +import traceback +from argparse import ArgumentParser, HelpFormatter +from ctypes import CDLL, c_bool, c_uint8, c_uint32, c_char_p, POINTER +from handler import Handler +from printer import Printer + + +__version__ = "2.0" + + +class Salis: + def __init__(self): + """ Salis constructor. Arguments are passed through the command line + and parsed with the 'argparse' module. Library is loaded with 'CDLL' + and C headers are parsed to detect function argument and return types. + """ + self._path = self._get_path() + self._args = self._parse_args() + self._log = self._open_log_file() + self._save_file_path = self._get_save_file_path() + self._common_pipe = self._get_common_pipe() + self._lib = self._parse_lib() + self._printer = Printer(self) + self._handler = Handler(self) + self._state = "paused" + self._autosave = "---" + self._exit = False + + # Based on CLI arguments, initialize a new Salis simulation or load + # existing one from file. + if self._args.action == "new": + self._lib.sal_main_init( + self._args.order, self._common_pipe.encode("utf-8") + ) + elif self._args.action == "load": + self._lib.sal_main_load( + self._save_file_path.encode("utf-8"), + self._common_pipe.encode("utf-8") + ) + + def __del__(self): + """ Salis destructor. + """ + # In case an error occurred early during initialization, checks whether + # Salis has been initialized correctly before attempting to shut it + # down. + if hasattr(self, "_lib") and hasattr(self._lib, "sal_main_quit"): + if self._lib.sal_main_is_init(): + self._lib.sal_main_quit() + + # If simulation ended correctly, 'error.log' should be empty. Delete + # file it exists and its empty. + if ( + hasattr(self, "_log") and + os.path.isfile(self._log) and + os.stat(self._log).st_size == 0 + ): + os.remove(self._log) + + def run(self): + """ Runs main simulation loop. Curses may be placed on non-blocking + mode, which allows simulation to run freely while still listening to + user input. + """ + while not self._exit: + self._printer.print_page() + self._handler.process_cmd(self._printer.get_cmd()) + + # If in non-blocking mode, re-print data once every 15 + # milliseconds. + if self._state == "running": + end = time.time() + 0.015 + + while time.time() < end: + self._lib.sal_main_cycle() + self.check_autosave() + + def toggle_state(self): + """ Toggle between 'paused' and 'running' states. On 'running' curses + gets placed in non-blocking mode. + """ + if self._state == "paused": + self._state = "running" + self._printer.set_nodelay(True) + else: + self._state = "paused" + self._printer.set_nodelay(False) + + def rename(self, new_name): + """ Give the simulation a new name. + """ + self._args.file = new_name + self._save_file_path = self._get_save_file_path() + + def set_autosave(self, interval): + """ Set the simulation's auto-save interval. When set to zero, auto + saving is disabled, + """ + if not interval: + self._autosave = "---" + else: + self._autosave = interval + + def check_autosave(self): + """ Save simulation to './sims/auto/*' whenever the autosave interval + is reached. We use the following naming convention for auto-saved files: + + >>> ./sims/auto/...auto + """ + if self._autosave != "---": + if not self._lib.sal_main_get_cycle() % self._autosave: + auto_path = os.path.join(self._path, "sims/auto", ".".join([ + self._args.file, + "{:08x}".format(self._lib.sal_main_get_epoch()), + "{:08x}".format(self._lib.sal_main_get_cycle()), + "auto" + ])) + self._lib.sal_main_save(auto_path.encode("utf-8")) + + def exit(self): + """ Signal we want to exit the simulator. + """ + self._exit = True + + @property + def path(self): + return self._path + + @property + def save_file_path(self): + return self._save_file_path + + @property + def common_pipe(self): + return self._common_pipe + + @property + def args(self): + return self._args + + @property + def lib(self): + return self._lib + + @property + def printer(self): + return self._printer + + @property + def handler(self): + return self._handler + + @property + def state(self): + return self._state + + @property + def autosave(self): + return self._autosave + + def _get_path(self): + """ Retrieve the absolute path of this script. We need to do this in + order to detect the './lib', './sims' and './genomes' subdirectories. + """ + return os.path.dirname(__file__) + + def _get_save_file_path(self): + """ Retrieve the absolute path of the file to which we will save this + simulation when we exit Salis. + """ + return os.path.join(self._path, "sims", self._args.file) + + def _get_common_pipe(self): + """ Get absolute path of the common pipe. This FIFO object may be used + by concurrent Salis simulations to share data between themselves. + """ + return os.path.join(self._path, "common/pipe") + + def _parse_args(self): + """ Parse command-line arguments with the 'argparse' module. To learn + more about each command, invoke the simulator in one of the following + ways: + + (venv) $ python tsalis.py --help + (venv) $ python tsalis.py new --help + (venv) $ python tsalis.py load --help + + """ + # Custom formatter helps keep all help data aligned. + formatter = lambda prog: HelpFormatter(prog, max_help_position=30) + + # Initialize the main parser with our custom formatter. + parser = ArgumentParser( + description="Viewer/controller for the Salis simulator.", + formatter_class=formatter + ) + parser.add_argument( + "-v", "--version", action="version", + version="Salis: A-Life Simulator (" + __version__ + ")" + ) + + # Initialize the 'new/load' action subparsers. + subparsers = parser.add_subparsers( + dest="action", help="Possible actions..." + ) + subparsers.required = True + + # Set up subparser for the create 'new' action. + new_parser = subparsers.add_parser("new", formatter_class=formatter) + new_parser.add_argument( + "-o", "--order", required=True, type=lambda x: int(x, 0), + metavar="[1-31]", help="Create new simulation of given ORDER" + ) + new_parser.add_argument( + "-f", "--file", required=True, type=str, metavar="FILE", + help="Name of FILE to save simulation to on exit" + ) + + # Set up subparser for the 'load' existing action. + load_parser = subparsers.add_parser("load", formatter_class=formatter) + load_parser.add_argument( + "-f", "--file", required=True, type=str, metavar="FILE", + help="Load previously saved simulation from FILE" + ) + + # Finally, parse all arguments. + args = parser.parse_args() + + # Revise that parsed CL arguments are valid. + if args.action == "new": + if args.order not in range(1, 32): + parser.error("Order must be an integer between 1 and 31") + else: + savefile = os.path.join(self._path, "sims", args.file) + + # No save-file with given name has been detected. + if not os.path.isfile(savefile): + parser.error( + "Save file provided '{}' does not exist".format(savefile) + ) + + return args + + def _open_log_file(self): + """ Create a log file to store errors on. It will get deleted if no + errors are detected. + """ + log_file = os.path.join(self._path, "error.log") + sys.stderr = open(log_file, "w") + return log_file + + def _parse_lib(self): + """ Dynamically parse the Salis library C header files. We do this in + order to more easily set the correct input/output types of all loaded + functions. C functions to be parsed must be declared in a '.h' file + located on the '../include' directory, using the following syntax: + + SALIS_API restype func_name(arg1_type arg1, arg2_type arg2); + + Note to developers: the 'SALIS_API' keyword should *NOT* be used + anywhere else in the header files (not even in comments)! + """ + lib = CDLL(os.path.join(self._path, "lib/libsalis.so")) + include_dir = os.path.join(self._path, "../include") + c_includes = [ + os.path.join(include_dir, f) + for f in os.listdir(include_dir) + # Only parse '.h' header files. + if os.path.isfile(os.path.join(include_dir, f)) and f[-2:] == ".h" + ] + funcs_to_set = [] + + for include in c_includes: + with open(include, "r") as f: + text = f.read() + + # Regexp to detect C functions to parse. This is a *very lazy* + # parser. So, if you want to expand/tweak Salis, be careful when + # declaring new functions! + funcs = re.findall(r"SALIS_API([\s\S]+?);", text, re.MULTILINE) + + for func in funcs: + func = func.replace("\n", "") + func = func.replace("\t", "") + func = func.strip() + restype = func.split()[0] + name = func.split()[1].split("(")[0] + args = [ + arg.split()[0] + for arg in func.split("(")[1].split(")")[0].split(",") + ] + funcs_to_set.append({ + "name": name, + "restype": restype, + "args": args + }) + + # All Salis typedefs must be included here, associated to their CTYPES + # equivalents. + type_convert = { + "void": None, + "boolean": c_bool, + "uint8": c_uint8, + "uint8_p": POINTER(c_uint8), + "uint32": c_uint32, + "uint32_p": POINTER(c_uint32), + "string": c_char_p, + "Process": None, + } + + # Finally, set correct arguments and return types of all Salis + # functions. + for func in funcs_to_set: + func["restype"] = type_convert[func["restype"]] + func["args"] = [type_convert[arg] for arg in func["args"]] + getattr(lib, func["name"]).restype = func["restype"] + getattr(lib, func["name"]).argtype = func["args"] + + return lib + +if __name__ == "__main__": + """ Entry point... + """ + Salis().run() diff --git a/bin/sims/.keep b/bin/sims/.keep new file mode 100644 index 0000000..e69de29 diff --git a/bin/sims/auto/.keep b/bin/sims/auto/.keep new file mode 100644 index 0000000..e69de29 diff --git a/bin/world.py b/bin/world.py new file mode 100644 index 0000000..60fd427 --- /dev/null +++ b/bin/world.py @@ -0,0 +1,277 @@ +""" SALIS: Viewer/controller for the SALIS simulator. + +File: world.py +Author: Paul Oliver +Email: paul.t.oliver.design@gmail.com + +This module should be considered an extension of the 'printer' module. It takes +care of getting a pre-redered image from Salis and post-processing it in order +to print it into the curses screen. It also keeps track of user cntrollable +rendering parameters (position and zoom). +""" + +import curses +from ctypes import c_uint8, cast, POINTER + + +class World: + PADDING = 25 + + def __init__(self, printer, sim): + """ World constructor. We link to the printer and main simulation + classes. We also setup the colors for rendering the world. + """ + self._printer = printer + self._sim = sim + self._pos = 0 + self._zoom = 1 + self._set_world_colors() + + def render(self): + """ Function for rendering the world. We get a pre-rendered buffer from + Salis' memory module (its way faster to pre-render in C) and use that + to assemble the world image in Python. + """ + # Window is so narrow that world is not visible. + if self._printer.size[1] <= self.PADDING: + return + + # Get pre-rendered image from Salis' memory module. + line_width = self._printer.size[1] - self.PADDING + print_area = self._printer.size[0] * line_width + c_buffer = (c_uint8 * print_area)() + self._sim.lib.sal_mem_render_image( + self._pos, self._zoom, print_area, cast(c_buffer, POINTER(c_uint8)) + ) + + # Get data elements of selected process, if it's running, and store + # them into a convenient dict object. + if self._sim.lib.sal_proc_is_free(self._printer.selected_proc): + sel_data = None + else: + out_data = self._printer.selected_proc_data + out_elem = self._printer.proc_elements + sel_data = { + "ip": out_data[out_elem.index("ip")], + "sp": out_data[out_elem.index("sp")], + "mb1a": out_data[out_elem.index("mb1a")], + "mb1s": out_data[out_elem.index("mb1s")], + "mb2a": out_data[out_elem.index("mb2a")], + "mb2s": out_data[out_elem.index("mb2s")], + } + + # Iterate all cells on printable area and print the post-rendered + # cells. Rendered cells contain info about bit flags and instructions + # currently written into memory. + bidx = 0 + + for y in range(self._printer.size[0]): + for x in range(line_width): + xpad = x + self.PADDING + addr = self._pos + (self._zoom * bidx) + symb, attr = self._render_cell(c_buffer[bidx], addr, sel_data) + + # Curses raises an exception when printing on the edge of the + # screen; we can just ignore it. + try: + self._printer.screen.addch(y, xpad, symb, attr) + except curses.error: + pass + + bidx += 1 + + def zoom_out(self): + """ Zoom out by a factor of 2 (zoom *= 2). + """ + if self._is_world_editable(): + self._zoom = min(self._zoom * 2, self._get_max_zoom()) + + def zoom_in(self): + """ Zoom in by a factor of 2 (zoom //= 2). + """ + if self._is_world_editable(): + self._zoom = max(self._zoom // 2, 1) + + def zoom_reset(self): + """ Reset zoom to a valid value on certain events (i.e. during terminal + resizing). + """ + self._zoom = min(self._zoom, self._get_max_zoom()) + + def pan_left(self): + """ Pan world to the left (pos -= zoom). + """ + if self._is_world_editable(): + self._pos = max(self._pos - self._zoom, 0) + + def pan_right(self): + """ Pan world to the right (pos += zoom). + """ + if self._is_world_editable(): + max_pos = self._sim.lib.sal_mem_get_size() - 1 + self._pos = min(self._pos + self._zoom, max_pos) + + def pan_down(self): + """ Pan world downward (pos += zoom * columns). + """ + if self._is_world_editable(): + self._pos = max(self._pos - self._get_line_area(), 0) + + def pan_up(self): + """ Pan world upward (pos -= zoom * columns). + """ + if self._is_world_editable(): + max_pos = self._sim.lib.sal_mem_get_size() - 1 + self._pos = min(self._pos + self._get_line_area(), max_pos) + + def pan_reset(self): + """ Set world position to zero. + """ + if self._is_world_editable(): + self._pos = 0 + + def scroll_to(self, pos): + """ Move world pos to a specified position. + """ + if self._is_world_editable(): + if self._sim.lib.sal_mem_is_address_valid(pos): + self._pos = pos + else: + raise RuntimeError("Error: scrolling to an invalid address") + + @property + def pos(self): + return self._pos + + @property + def zoom(self): + return self._zoom + + @property + def pair_sel_mb2(self): + return self._pair_sel_mb2 + + @property + def pair_sel_mb1(self): + return self._pair_sel_mb1 + + @property + def pair_sel_sp(self): + return self._pair_sel_sp + + @property + def pair_sel_ip(self): + return self._pair_sel_ip + + def _set_world_colors(self): + """ Define color pairs for rendering the world. Each color has a + special meaning, referring to the selected process IP, SP and memory + blocks, or to bit flags currently set on rendered cells. + """ + self._pair_free = self._printer.get_color_pair( + curses.COLOR_BLUE + ) + self._pair_alloc = self._printer.get_color_pair( + curses.COLOR_BLACK, curses.COLOR_BLUE + ) + self._pair_mbstart = self._printer.get_color_pair( + curses.COLOR_BLACK, curses.COLOR_CYAN + ) + self._pair_ip = self._printer.get_color_pair( + curses.COLOR_BLACK, curses.COLOR_WHITE + ) + self._pair_sel_mb2 = self._printer.get_color_pair( + curses.COLOR_BLACK, curses.COLOR_GREEN + ) + self._pair_sel_mb1 = self._printer.get_color_pair( + curses.COLOR_BLACK, curses.COLOR_YELLOW + ) + self._pair_sel_sp = self._printer.get_color_pair( + curses.COLOR_BLACK, curses.COLOR_MAGENTA + ) + self._pair_sel_ip = self._printer.get_color_pair( + curses.COLOR_BLACK, curses.COLOR_RED + ) + + def _render_cell(self, byte, addr, sel_data=None): + """ Render a single cell on the WORLD view. All cells are rendered by + interpreting the values coming in from the buffer. We overlay special + colors for representing the selected organism's state, on top of the + more common colors used to represent memory state. + """ + # Paint black all cells that are out of memory bounds. + if not self._sim.lib.sal_mem_is_address_valid(addr): + return " ", curses.A_NORMAL + + # Check if cell contains part of the currently selected process. + if sel_data: + top_addr = addr + self._zoom + top_mb1a = sel_data["mb1a"] + sel_data["mb1s"] + top_mb2a = sel_data["mb2a"] + sel_data["mb2s"] + + if addr <= sel_data["ip"] < top_addr: + pair = self._pair_sel_ip + elif addr <= sel_data["sp"] < top_addr: + pair = self._pair_sel_sp + elif top_addr > sel_data["mb1a"] and top_mb1a > addr: + pair = self._pair_sel_mb1 + elif top_addr > sel_data["mb2a"] and top_mb2a > addr: + pair = self._pair_sel_mb2 + + # No pair has been selected yet; select pair based on bit-flags. + if not "pair" in locals(): + if byte >= 0x80: + pair = self._pair_ip + elif byte >= 0x40: + pair = self._pair_mbstart + elif byte >= 0x20: + pair = self._pair_alloc + else: + pair = self._pair_free + + # Select symbol to represent instructions currently on cell. + inst = byte % 32 + + if self._zoom == 1: + symb = self._printer.inst_list[inst][1] + elif inst > 16: + symb = ":" + else: + symb = "." + + # Return tuple containing our post-redered cell. + return symb, curses.color_pair(pair) + + def _get_max_zoom(self): + """ Calculate maximum needed zoom so that the entire world fits on the + terminal window. + """ + max_zoom = 1 + line_size = self._printer.size[1] - self.PADDING + coverage = self._printer.size[0] * line_size + + # We fix a maximum zoom level; otherwise, program may halt on extreme + # zoom levels. + while ( + (coverage * max_zoom) < self._sim.lib.sal_mem_get_size() and + max_zoom < 2 ** 16 + ): + max_zoom *= 2 + + return max_zoom + + def _is_world_editable(self): + """ For this to return True, printer's current page must be WORLD page. + Additionally, the WORLD panel must be visible on the terminal window + (i.e. curses.COLS > data_margin). + """ + correct_page = self._printer.current_page == "WORLD" + correct_size = self._printer.size[1] > self.PADDING + return correct_page and correct_size + + def _get_line_area(self): + """ Return amount of bytes contained in a printed WORLD line. + """ + line_size = self._printer.size[1] - self.PADDING + line_area = self._zoom * line_size + return line_area -- cgit v1.2.1