aboutsummaryrefslogtreecommitdiff
path: root/bin/modules
diff options
context:
space:
mode:
authorPaul Oliver <contact@pauloliver.dev>2024-02-29 02:29:14 +0100
committerPaul Oliver <contact@pauloliver.dev>2024-02-29 02:29:14 +0100
commit2250b4db92bd272dbb1fd717eb791e293c17e37a (patch)
tree0bde7ed8a2ba8b2a04da629b0317568dea2a6b3c /bin/modules
parentde427d319c699b8bed7ed73289b3698f13ac3acc (diff)
Store python modules on './bin/modules/' subdirectory. [#37]
Diffstat (limited to 'bin/modules')
-rw-r--r--bin/modules/__init__.py0
-rw-r--r--bin/modules/handler.py435
-rw-r--r--bin/modules/printer.py858
-rw-r--r--bin/modules/world.py279
4 files changed, 1572 insertions, 0 deletions
diff --git a/bin/modules/__init__.py b/bin/modules/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/bin/modules/__init__.py
diff --git a/bin/modules/handler.py b/bin/modules/handler.py
new file mode 100644
index 0000000..3325391
--- /dev/null
+++ b/bin/modules/handler.py
@@ -0,0 +1,435 @@
+""" SALIS: Viewer/controller for the SALIS simulator.
+
+file: handler.py
+Author: Paul Oliver
+Email: paul.t.oliver.design@gmail.com
+
+This module should be considered the 'controller' part of the Salis simulator.
+It receives and parses all user input via keyboard and console commands. It
+also takes care of genome compilation (via genome files located on the
+'genomes' directory).
+
+An user may open the Salis console by pressing the 'c' key while in a running
+session. A nice quirk is the possibility to run python commands from within the
+Salis console. As an example, to get the memory size, an user could type:
+
+>>> exec output = self.__sim.lib.sal_mem_get_size()
+
+Note that 'output' denotes a storage variable that will get printed on the
+console response. This ability gives an user a whole lot of power, and should
+be used with care.
+"""
+
+import curses
+import os
+import time
+
+
+class Handler:
+ KEY_ESCAPE = 27
+ CYCLE_TIMEOUT = 0.1
+
+ def __init__(self, sim):
+ """ Handler constructor. Simply link this class to the main simulation
+ class and printer class and create symbol dictionary.
+ """
+ self.__sim = sim
+ self.__printer = sim.printer
+ self.__inst_dict = self.__get_inst_dict()
+ self.__min_commands = [
+ ord("M"),
+ ord(" "),
+ curses.KEY_RESIZE,
+ self.KEY_ESCAPE,
+ ]
+ self.console_history = []
+
+ # Set short delay for ESCAPE key (which is used to exit the simulator).
+ os.environ.setdefault("ESCDELAY", "25")
+
+ def process_cmd(self, cmd):
+ """ Process incoming commands from curses. Commands are received via
+ ncurses' getch() function, thus, they must be transformed into their
+ character representations with 'ord()'.
+ """
+ # If in minimal mode, only listen to a subset of commands.
+ if self.__sim.minimal and cmd not in self.__min_commands:
+ return
+
+ if cmd == self.KEY_ESCAPE:
+ self.__on_quit([None], save=True)
+ elif cmd == ord("M"):
+ self.__printer.screen.clear()
+ self.__sim.minimal = not self.__sim.minimal
+ elif cmd == ord(" "):
+ self.__sim.toggle_state()
+ elif cmd == curses.KEY_LEFT:
+ self.__printer.flip_page(-1)
+ elif cmd == curses.KEY_RIGHT:
+ self.__printer.flip_page(1)
+ elif cmd == curses.KEY_DOWN:
+ self.__printer.scroll_main(-1)
+ elif cmd == curses.KEY_UP:
+ self.__printer.scroll_main(1)
+ elif cmd == curses.KEY_RESIZE:
+ self.__printer.on_resize()
+ elif cmd == ord("X"):
+ self.__printer.toggle_hex()
+ elif cmd == ord("x"):
+ self.__printer.world.zoom_out()
+ elif cmd == ord("z"):
+ self.__printer.world.zoom_in()
+ elif cmd == ord("a"):
+ self.__printer.world.pan_left()
+ self.__printer.proc_scroll_left()
+ elif cmd == ord("d"):
+ self.__printer.world.pan_right()
+ self.__printer.proc_scroll_right()
+ elif cmd == ord("s"):
+ self.__printer.world.pan_down()
+ self.__printer.proc_scroll_down()
+ elif cmd == ord("w"):
+ self.__printer.world.pan_up()
+ self.__printer.proc_scroll_up()
+ elif cmd == ord("S"):
+ self.__printer.world.pan_down(fast=True)
+ self.__printer.proc_scroll_down(fast=True)
+ elif cmd == ord("W"):
+ self.__printer.world.pan_up(fast=True)
+ self.__printer.proc_scroll_up(fast=True)
+ elif cmd == ord("Q"):
+ self.__printer.world.pan_reset()
+ self.__printer.proc_scroll_vertical_reset()
+ elif cmd == ord("A"):
+ self.__printer.world.pan_reset()
+ self.__printer.proc_scroll_horizontal_reset()
+ elif cmd == ord("o"):
+ self.__printer.proc_select_prev()
+ elif cmd == ord("p"):
+ self.__printer.proc_select_next()
+ elif cmd == ord("f"):
+ self.__printer.proc_select_first()
+ elif cmd == ord("l"):
+ self.__printer.proc_select_last()
+ elif cmd == ord("k"):
+ self.__printer.proc_scroll_to_selected()
+ elif cmd == ord("g"):
+ self.__printer.proc_toggle_gene_view()
+ elif cmd == ord("i"):
+ self.__printer.world.toggle_ip_view()
+ elif cmd == ord("\n"):
+ self.__printer.run_cursor()
+ elif cmd == ord("c"):
+ self.__printer.run_console()
+ else:
+ # Check for numeric input. Number keys [1 to 0] cycle the
+ # simulation [2 ** ((n - 1) % 10] times.
+ try:
+ if chr(cmd).isdigit():
+ factor = int(chr(cmd))
+ factor = int(2 ** ((factor - 1) % 10))
+ self.__cycle_sim(factor)
+ except ValueError:
+ pass
+
+ def handle_console(self, command_raw):
+ """ Process console commands. We parse and check for input errors. Any
+ python exception messages are redirected to the console-response
+ window.
+ """
+ if command_raw:
+ command = command_raw.split()
+
+ try:
+ # Handle both python and self-thrown exceptions.
+ if command[0] in ["q", "quit"]:
+ self.__on_quit(command, save=True)
+ elif command[0] in ["q!", "quit!"]:
+ self.__on_quit(command, save=False)
+ elif command[0] in ["i", "input"]:
+ self.__on_input(command)
+ elif command[0] in ["c", "compile"]:
+ self.__on_compile(command)
+ elif command[0] in ["n", "new"]:
+ self.__on_new(command)
+ elif command[0] in ["k", "kill"]:
+ self.__on_kill(command)
+ elif command[0] in ["e", "exec"]:
+ self.__on_exec(command)
+ elif command[0] in ["s", "scroll"]:
+ self.__on_scroll(command)
+ elif command[0] in ["p", "process"]:
+ self.__on_proc_select(command)
+ elif command[0] in ["r", "rename"]:
+ self.__on_rename(command)
+ elif command[0] in ["save"]:
+ self.__on_save(command)
+ elif command[0] in ["a", "auto"]:
+ self.__on_set_autosave(command)
+ else:
+ # Raise if a non-existing command has been given.
+ self.__raise("Invalid command: '{}'".format(command[0]))
+ except BaseException as exep:
+ # We parse and redirect python exceptions to the error
+ # console-window.
+ message = str(exep).strip()
+ message = message[0].upper() + message[1:]
+ self.__printer.show_console_error(message)
+ finally:
+ # Store command on console history.
+ self.console_history.append(command_raw.strip())
+
+
+ ###############################
+ # Private methods
+ ###############################
+
+ def __raise(self, message):
+ """ Generic exception thrower. Throws a 'RuntimeError' initialized with
+ the given message.
+ """
+ raise RuntimeError("ERROR: {}".format(message))
+
+ def __respond(self, message):
+ """ Generic console responder. Throws a 'RuntimeError' initialized with
+ the given message.
+ """
+ raise RuntimeError(message)
+
+ def __cycle_sim(self, factor):
+ """ Simply cycle Salis 'factor' number of times. Do not cycle for more
+ than a given amount of time.
+ """
+ time_max = time.time() + self.CYCLE_TIMEOUT
+
+ for _ in range(factor):
+ self.__sim.lib.sal_main_cycle()
+ self.__sim.check_autosave()
+
+ if time.time() > time_max:
+ break
+
+ def __get_inst_dict(self):
+ """ Transform the instruction list of the printer module into a
+ dictionary that's more useful for genome compilation. Instruction
+ symbols are keys, values are the actual byte representation.
+ """
+ inst_dict = {}
+
+ for i, inst in enumerate(self.__printer.inst_list):
+ inst_dict[inst[1]] = i
+
+ return inst_dict
+
+ def __on_quit(self, command, save):
+ """ Exit simulation. We can choose whether to save the simulation into
+ a save file or not.
+ """
+ if len(command) > 1:
+ self.__raise("Invalid parameters for '{}'".format(command[0]))
+
+ if save:
+ self.__sim.lib.sal_main_save(
+ self.__sim.save_file_path.encode("utf-8")
+ )
+
+ self.__sim.exit()
+
+ def __write_genome(self, genome, address_list):
+ """ Write genome stream into a given list of memory addresses. All
+ addresses must be valid or an exception is thrown.
+ """
+ # All addresses we will write to must be valid.
+ for base_addr in address_list:
+ address = int(base_addr, 0)
+
+ for _ in range(len(genome)):
+ if not self.__sim.lib.sal_mem_is_address_valid(address):
+ self.__raise("Address '{}' is invalid".format(address))
+
+ address += 1
+
+ # All looks well! Let's compile the genome into memory.
+ for base_addr in address_list:
+ address = int(base_addr, 0)
+
+ for symbol in genome:
+ self.__sim.lib.sal_mem_set_inst(
+ address, self.__inst_dict[symbol]
+ )
+ address += 1
+
+ def __on_input(self, command):
+ """ Compile organism from user typed input. Compilation can only occur
+ on valid memory addresses. An exception will be thrown when trying to
+ write into non-valid address or when input stream is invalid.
+ """
+ if len(command) < 3:
+ self.__raise("Invalid parameters for '{}'".format(command[0]))
+
+ # All characters in file must be actual instruction symbols.
+ for character in command[1]:
+ if character not in self.__inst_dict:
+ self.__raise("Invalid symbol '{}' found on stream".format(
+ character
+ ))
+
+ # All looks well, Let's write the genome into memory.
+ self.__write_genome(command[1], command[2:])
+
+ def __on_compile(self, command):
+ """ Compile organism from source genome file. Genomes must be placed on
+ the './genomes' directory. Compilation can only occur on valid memory
+ addresses. An exception will be thrown when trying to write into
+ non-valid address or when genome file is invalid.
+ """
+ if len(command) < 3:
+ self.__raise("Invalid parameters for '{}'".format(command[0]))
+
+ # Open genome file for compilation.
+ gen_file = os.path.join(self.__sim.path, "genomes", command[1])
+
+ with open(gen_file, "r") as f:
+ genome = f.read().strip()
+
+ # Entire genome must be written on a single line.
+ if "\n" in genome:
+ self.__raise("Newline detected on '{}'".format(gen_file))
+
+ # All characters in file must be actual instruction symbols.
+ for character in genome:
+ if character not in self.__inst_dict:
+ self.__raise("Invalid symbol '{}' found on '{}'".format(
+ character, gen_file
+ ))
+
+ # All looks well, Let's write the genome into memory.
+ self.__write_genome(genome, command[2:])
+
+ def __on_new(self, command):
+ """ Instantiate new organism of given size on given address. These
+ memory areas must be free and valid or an exception is thrown.
+ """
+ if len(command) < 3:
+ self.__raise("Invalid parameters for '{}'".format(command[0]))
+
+ # Check that all addresses we will allocate are free and valid.
+ for base_addr in command[2:]:
+ address = int(base_addr, 0)
+
+ for _ in range(int(command[1])):
+ if not self.__sim.lib.sal_mem_is_address_valid(address):
+ self.__raise("Address '{}' is invalid".format(address))
+ elif self.__sim.lib.sal_mem_is_allocated(address):
+ self.__raise("Address '{}' is allocated".format(address))
+
+ address += 1
+
+ # All looks well! Let's instantiate our new organism.
+ for base_addr in command[2:]:
+ address = int(base_addr, 0)
+ size = int(command[1], 0)
+ self.__sim.lib.sal_proc_create(address, size)
+
+ def __on_kill(self, command):
+ """ Kill organism on bottom of reaper queue.
+ """
+ if len(command) > 1:
+ self.__raise("Invalid parameters for '{}'".format(command[0]))
+
+ # Call proc kill function only if there's any organisms to kill.
+ if not self.__sim.lib.sal_proc_get_count():
+ self.__raise("No organisms currently alive")
+ else:
+ self.__sim.lib.sal_proc_kill()
+
+ def __on_exec(self, command):
+ """ Allow a user to execute a python command from within the console.
+ Using this is very hack-ish, and not recommended unless you're certain
+ of what you're doing!
+ """
+ if len(command) < 2:
+ self.__raise(
+ "'{}' must be followed by an executable string".format(
+ command[0]
+ )
+ )
+
+ # User may query any simulation variable or status and the console will
+ # respond. For example, to query memory size or order, type one of the
+ # following:
+ #
+ # >>> exec output = self.__sim.lib.sal_mem_get_size()
+ # >>> exec output = self.__sim.lib.sal_mem_get_order()
+ #
+ output = {}
+ exec(" ".join(command[1:]), locals(), output)
+
+ if output:
+ self.__respond("EXEC RESPONDS: {}".format(str(output)))
+
+ def __on_scroll(self, command):
+ """ We can scroll to a specific process (on PROCESS view) or to a
+ specific world address (on WORLD view) via the console.
+ """
+ if len(command) != 2:
+ self.__raise("Invalid parameters for '{}'".format(command[0]))
+
+ target = int(command[1], 0)
+
+ # If on PROCESS page, scroll to given process.
+ if self.__printer.current_page == "PROCESS":
+ if target < self.__sim.lib.sal_proc_get_capacity():
+ self.__printer.proc_scroll_to(target)
+ else:
+ self.__raise("No process with ID '{}' found".format(target))
+ elif self.__printer.current_page == "WORLD":
+ if self.__sim.lib.sal_mem_is_address_valid(target):
+ self.__printer.world.scroll_to(target)
+ else:
+ self.__raise("Address '{}' is invalid".format(address))
+ else:
+ self.__raise("'{}' must be called on PROCESS or WORLD page".format(
+ command[0])
+ )
+
+ def __on_proc_select(self, command):
+ """ Select a specific process (on PROCESS or WORLD page).
+ """
+ if len(command) != 2:
+ self.__raise("Invalid parameters for '{}'".format(command[0]))
+
+ target = int(command[1], 0)
+
+ # If on PROCESS page, scroll to given process.
+ if target < self.__sim.lib.sal_proc_get_capacity():
+ self.__printer.proc_select_by_id(target)
+ else:
+ self.__raise("No process with ID '{}' found".format(target))
+
+ def __on_rename(self, command):
+ """ Set a new simulation name. Future auto-saved files will use this
+ name as prefix.
+ """
+ if len(command) != 2:
+ self.__raise("Invalid parameters for '{}'".format(command[0]))
+
+ self.__sim.rename(command[1])
+
+ def __on_save(self, command):
+ """ Save simulation on its current state.
+ """
+ if len(command) != 1:
+ self.__raise("Invalid parameters for '{}'".format(command[0]))
+
+ self.__sim.lib.sal_main_save(self.__sim.save_file_path.encode("utf-8"))
+
+ def __on_set_autosave(self, command):
+ """ Set the simulation's auto save interval. Provide any integer
+ between 0 and (2**32 - 1). If zero is provided, auto saving will be
+ disabled.
+ """
+ if len(command) != 2:
+ self.__raise("Invalid parameters for '{}'".format(command[0]))
+
+ self.__sim.set_autosave(int(command[1], 0))
diff --git a/bin/modules/printer.py b/bin/modules/printer.py
new file mode 100644
index 0000000..51a5b33
--- /dev/null
+++ b/bin/modules/printer.py
@@ -0,0 +1,858 @@
+""" SALIS: Viewer/controller for the SALIS simulator.
+
+File: printer.py
+Author: Paul Oliver
+Email: paul.t.oliver.design@gmail.com
+
+This module should be considered the 'view' part of the Salis simulator. It
+takes care of displaying the simulator's state in a nicely formatted, intuitive
+format. It makes use of the curses library for terminal handling.
+"""
+
+import curses
+import curses.textpad
+import os
+import time
+
+from collections import OrderedDict
+from ctypes import c_uint8, c_uint32, cast, POINTER
+from modules.handler import Handler
+from modules.world import World
+
+
+class Printer:
+ ESCAPE_KEY = 27
+
+ def __init__(self, sim):
+ """ Printer constructor. It takes care of starting up curses, defining
+ the data pages and setting the printer on its initial state.
+ """
+ self.__sim = sim
+ self.__color_pair_count = 0
+
+ # Initialize curses screen, instruction and proc-element list before
+ # other private elements that depend on them.
+ self.screen = self.__get_screen()
+ self.inst_list = self.__get_inst_list()
+ self.proc_elements = self.__get_proc_elements()
+
+ # We can now initialize all other privates.
+ self.__main = self.__get_main()
+ self.__pages = self.__get_pages()
+ self.__minimal = self.__get_minimal()
+ self.__main_scroll = 0
+ self.__proc_element_scroll = 0
+ self.__proc_gene_scroll = 0
+ self.__proc_gene_view = False
+ self.__curs_y = 0
+ self.__curs_x = 0
+ self.__print_hex = False
+ self.size = self.screen.getmaxyx()
+ self.current_page = "MEMORY"
+ self.selected_proc = 0
+ self.selected_proc_data = (c_uint32 * len(self.proc_elements))()
+ self.proc_list_scroll = 0
+ self.world = World(self, self.__sim)
+
+ def __del__(self):
+ """ Printer destructor exits curses.
+ """
+ curses.endwin()
+
+ def get_color_pair(self, fg, bg=-1):
+ """ We use this method to set new color pairs, keeping track of the
+ number of pairs already set. We return the new color pair ID.
+ """
+ self.__color_pair_count += 1
+ curses.init_pair(self.__color_pair_count, fg, bg)
+ return self.__color_pair_count
+
+ def get_cmd(self):
+ """ This returns the pressed key from the curses handler. It's called
+ during the simulation's main loop. Flushing input is important when in
+ non-blocking mode.
+ """
+ ch = self.screen.getch()
+ curses.flushinp()
+ return ch
+
+ def set_nodelay(self, nodelay):
+ """ Toggles between blocking and non-blocking mode on curses.
+ """
+ self.screen.nodelay(nodelay)
+
+ def toggle_hex(self):
+ """ Toggle between decimal or hexadecimal printing of all simulation
+ state elements.
+ """
+ self.__print_hex = not self.__print_hex
+
+ def on_resize(self):
+ """ Called whenever the terminal window gets resized.
+ """
+ self.size = self.screen.getmaxyx()
+ self.scroll_main()
+ self.world.zoom_reset()
+
+ def flip_page(self, offset):
+ """ Change data page by given offset (i.e. '1' for next page or '-1'
+ for previous one).
+ """
+ pidx = list(self.__pages.keys()).index(self.current_page)
+ pidx = (pidx + offset) % len(self.__pages)
+ self.current_page = list(self.__pages.keys())[pidx]
+ self.scroll_main()
+
+ def scroll_main(self, offset=0):
+ """ Scrolling is allowed whenever the current page does not fit inside
+ the terminal window. This method gets called, with no offset, under
+ certain situations, like changing pages, just to make sure the screen
+ gets cleared and at least some of the data is always scrolled into
+ view.
+ """
+ self.screen.clear()
+ len_main = len(self.__main)
+ len_page = len(self.__pages[self.current_page])
+ max_scroll = (len_main + len_page + 5) - self.size[0]
+ self.__main_scroll += offset
+ self.__main_scroll = max(0, min(self.__main_scroll, max_scroll))
+
+ def proc_scroll_left(self):
+ """ Scroll process data elements or genomes (on PROCESS view) to the
+ left.
+ """
+ if self.current_page == "PROCESS":
+ if self.__proc_gene_view:
+ self.__proc_gene_scroll -= 1
+ self.__proc_gene_scroll = max(0, self.__proc_gene_scroll)
+ else:
+ self.__proc_element_scroll -= 1
+ self.__proc_element_scroll = max(0, self.__proc_element_scroll)
+
+ def proc_scroll_right(self):
+ """ Scroll process data elements or genomes (on PROCESS view) to the
+ right.
+ """
+ if self.current_page == "PROCESS":
+ if self.__proc_gene_view:
+ self.__proc_gene_scroll += 1
+ else:
+ self.__proc_element_scroll += 1
+ max_scroll = len(self.proc_elements) - 1
+ self.__proc_element_scroll = min(
+ max_scroll, self.__proc_element_scroll
+ )
+
+ def proc_scroll_down(self, fast=False):
+ """ Scroll process data table (on PROCESS view) up.
+ """
+ if self.current_page == "PROCESS":
+ if fast:
+ len_page = len(self.__main) + len(self.__pages["PROCESS"]) + 6
+ scroll = max(0, self.size[0] - len_page)
+ else:
+ scroll = 1
+
+ self.proc_list_scroll = max(0, self.proc_list_scroll - scroll)
+
+ def proc_scroll_up(self, fast=False):
+ """ Scroll process data table (on PROCESS view) down.
+ """
+ if self.current_page == "PROCESS":
+ if fast:
+ len_page = len(self.__main) + len(self.__pages["PROCESS"]) + 6
+ scroll = max(0, self.size[0] - len_page)
+ else:
+ scroll = 1
+
+ self.proc_list_scroll = min(
+ self.__sim.lib.sal_proc_get_capacity() - 1,
+ self.proc_list_scroll + scroll
+ )
+
+ def proc_scroll_to(self, proc_id):
+ """ Scroll process data table (on PROCESS view) to a specific position.
+ """
+ if self.current_page == "PROCESS":
+ if proc_id < self.__sim.lib.sal_proc_get_capacity():
+ self.proc_list_scroll = proc_id
+ else:
+ raise RuntimeError("Error: scrolling to invalid process")
+
+ def proc_scroll_vertical_reset(self):
+ """ Scroll process data table (on PROCESS view) back to top.
+ """
+ if self.current_page == "PROCESS":
+ self.proc_list_scroll = 0
+
+ def proc_scroll_horizontal_reset(self):
+ """ Scroll process data or genome table (on PROCESS view) back to the
+ left.
+ """
+ if self.current_page == "PROCESS":
+ if self.__proc_gene_view:
+ self.__proc_gene_scroll = 0
+ else:
+ self.__proc_element_scroll = 0
+
+ def proc_select_prev(self):
+ """ Select previous process.
+ """
+ if self.current_page in ["PROCESS", "WORLD"]:
+ self.selected_proc -= 1
+ self.selected_proc %= self.__sim.lib.sal_proc_get_capacity()
+
+ def proc_select_next(self):
+ """ Select next process.
+ """
+ if self.current_page in ["PROCESS", "WORLD"]:
+ self.selected_proc += 1
+ self.selected_proc %= self.__sim.lib.sal_proc_get_capacity()
+
+ def proc_select_first(self):
+ """ Select first process on reaper queue.
+ """
+ if self.current_page in ["PROCESS", "WORLD"]:
+ if self.__sim.lib.sal_proc_get_count():
+ self.selected_proc = self.__sim.lib.sal_proc_get_first()
+
+ def proc_select_last(self):
+ """ Select last process on reaper queue.
+ """
+ if self.current_page in ["PROCESS", "WORLD"]:
+ if self.__sim.lib.sal_proc_get_count():
+ self.selected_proc = self.__sim.lib.sal_proc_get_last()
+
+ def proc_select_by_id(self, proc_id):
+ """ Select process from given ID.
+ """
+ if proc_id < self.__sim.lib.sal_proc_get_capacity():
+ self.selected_proc = proc_id
+ else:
+ raise RuntimeError("Error: attempting to select non-existing proc")
+
+ def proc_scroll_to_selected(self):
+ """ Scroll WORLD or PROCESS page so that selected process becomes
+ visible.
+ """
+ if self.current_page == "PROCESS":
+ self.proc_list_scroll = self.selected_proc
+ elif self.current_page == "WORLD":
+ if not self.__sim.lib.sal_proc_is_free(self.selected_proc):
+ index = self.proc_elements.index("mb1a")
+ address = self.selected_proc_data[index]
+ self.world.scroll_to(address)
+
+ def proc_toggle_gene_view(self):
+ """ Toggle between data element or genome view on PROCESS page.
+ """
+ if self.current_page == "PROCESS":
+ self.__proc_gene_view = not self.__proc_gene_view
+
+ def run_cursor(self):
+ """ We can toggle a visible cursor on WORLD view to aid us in selecting
+ processes.
+ """
+ if self.current_page == "WORLD" and self.size[1] > World.PADDING:
+ curses.curs_set(True)
+
+ while True:
+ self.__curs_y = max(0, min(self.__curs_y, self.size[0] - 1))
+ self.__curs_x = max(World.PADDING, min(
+ self.__curs_x, self.size[1] - 1
+ ))
+ self.screen.move(self.__curs_y, self.__curs_x)
+ cmd = self.screen.getch()
+
+ if cmd in [ord("c"), curses.KEY_RESIZE, self.ESCAPE_KEY]:
+ self.on_resize()
+ break
+ elif cmd == curses.KEY_LEFT:
+ self.__curs_x -= 1
+ elif cmd == curses.KEY_RIGHT:
+ self.__curs_x += 1
+ elif cmd == curses.KEY_DOWN:
+ self.__curs_y += 1
+ elif cmd == curses.KEY_UP:
+ self.__curs_y -= 1
+ elif cmd == ord("\n"):
+ self.__proc_select_by_cursor()
+ break
+
+ curses.curs_set(False)
+
+ def run_console(self):
+ """ Run the Salis console. You can use the console to control all main
+ aspects of the simulation, like compiling genomes into memory, creating
+ or killing organisms, setting auto-save interval, among other stuff.
+ """
+ # Print a pythonic prompt.
+ self.__print_line(self.size[0] - 1, ">>> ", scroll=False)
+ self.screen.refresh()
+
+ # Create the console child window. We turn it into a Textbox object in
+ # order to allow line-editing and extract output easily.
+ console = curses.newwin(1, self.size[1] - 5, self.size[0] - 1, 5)
+ textbox = curses.textpad.Textbox(console, insert_mode=True)
+ textbox.stripspaces = True
+
+ # Grab a copy of the console history and instantiate a pointer to the
+ # last element.
+ history = self.__sim.handler.console_history + [""]
+ pointer = len(history) - 1
+
+ # Nested method reinserts recorded commands from history into console.
+ def access_history(cmd):
+ nonlocal pointer
+
+ if pointer == len(history) - 1:
+ history[-1] = console.instr().strip()
+
+ if cmd == "up" and pointer != 0:
+ pointer -= 1
+ elif cmd == "down" and pointer < len(history) - 1:
+ pointer += 1
+
+ console.clear()
+ console.addstr(0, 0, history[pointer])
+ console.refresh()
+
+ # Declare custom validator to control special commands.
+ def validator(cmd):
+ EXIT = 7
+
+ if cmd in [curses.KEY_RESIZE, self.ESCAPE_KEY]:
+ console.clear()
+ return EXIT
+ # Provide general code for back-space key, in case it's not
+ # correctly defined.
+ elif cmd in [127, curses.KEY_BACKSPACE]:
+ return curses.KEY_BACKSPACE
+ elif cmd == curses.KEY_UP:
+ access_history("up")
+ elif cmd == curses.KEY_DOWN:
+ access_history("down")
+ else:
+ return cmd
+
+ # Run the Textbox object with our custom validator.
+ curses.curs_set(True)
+ output = textbox.edit(validator)
+ curses.curs_set(False)
+
+ # Finally, extract data from console and send to handler. Respond to
+ # any possible resize event here.
+ self.__sim.handler.handle_console(output)
+ self.screen.clear()
+ self.on_resize()
+
+ def show_console_error(self, message):
+ """ Shows Salis console error messages, if any. These messages might
+ contain actual python exception output.
+ """
+ self.__print_line(self.size[0] - 1, ">>>", curses.color_pair(
+ self.__pair_error
+ ) | curses.A_BOLD)
+ self.screen.refresh()
+
+ # We also use a Textbox object, just so that execution gets halted
+ # until a key gets pressed (even on non-blocking mode).
+ console = curses.newwin(1, self.size[1] - 5, self.size[0] - 1, 5)
+ textbox = curses.textpad.Textbox(console)
+
+ # Curses may raise an exception if printing on the edge of the screen;
+ # we can just ignore it.
+ try:
+ console.addstr(0, 0, message, curses.color_pair(
+ self.__pair_error
+ ) | curses.A_BOLD)
+ except curses.error:
+ pass
+
+ # Custom validator simply exits on any key.
+ def validator(cmd):
+ EXIT = 7
+ return EXIT
+
+ textbox.edit(validator)
+ self.screen.clear()
+ self.on_resize()
+
+ def print_page(self):
+ """ Print current page to screen. We use the previously generated
+ '__pages' dictionary to easily associate a label to a Salis function.
+ """
+ # If in minimal mode, print only minial widget.
+ if self.__sim.minimal:
+ self.__print_minimal()
+ return
+
+ # Update selected proc data if in WORLD view.
+ if self.current_page == "WORLD":
+ self.__sim.lib.sal_proc_get_proc_data(self.selected_proc, cast(
+ self.selected_proc_data, POINTER(c_uint32)
+ ))
+
+ # Print MAIN simulation data.
+ self.__print_line(
+ 1, "SALIS[{}]".format(self.__sim.args.file), curses.color_pair(
+ self.__pair_header
+ ) | curses.A_BOLD
+ )
+ self.__print_widget(2, self.__main)
+
+ # Print data of currently selected page.
+ main_lines = len(self.__main) + 3
+ self.__print_header(main_lines, self.current_page)
+ self.__print_widget(main_lines + 1, self.__pages[self.current_page])
+
+ # Print special widgets (WORLD view and PROCESS list).
+ if self.current_page == "WORLD":
+ self.world.render()
+ elif self.current_page == "PROCESS":
+ self.__print_proc_list()
+
+
+ ###############################
+ # Private methods
+ ###############################
+
+ def __set_colors(self):
+ """ Define the color pairs for the data printer.
+ """
+ curses.start_color()
+ curses.use_default_colors()
+ self.__pair_header = self.get_color_pair(curses.COLOR_BLUE)
+ self.__pair_selected = self.get_color_pair(curses.COLOR_YELLOW)
+ self.__pair_error = self.get_color_pair(curses.COLOR_RED)
+
+ def __get_screen(self):
+ """ Prepare and return the main curses window. We also set a shorter
+ delay when responding to a pressed escape key.
+ """
+ # Set a shorter delay to the ESCAPE key, so that we may use it to exit
+ # Salis.
+ os.environ.setdefault("ESCDELAY", "25")
+
+ # Prepare curses screen.
+ screen = curses.initscr()
+ curses.noecho()
+ curses.cbreak()
+ screen.keypad(True)
+ curses.curs_set(False)
+
+ # We need color support in order to run the printer module.
+ if curses.has_colors():
+ self.__set_colors()
+ else:
+ raise RuntimeError("Error: no color support.")
+
+ return screen
+
+ def __get_inst_list(self):
+ """ Parse instruction set from C header file named 'instset.h'. We're
+ using the keyword 'SALIS_INST' to identify an instruction definition,
+ so be careful not to use this keyword anywhere else on the headers.
+ """
+ inst_list = []
+ inst_file = os.path.join(self.__sim.path, "../include/instset.h")
+
+ with open(inst_file, "r") as f:
+ lines = f.read().splitlines()
+
+ for line in lines:
+ if line and line.split()[0] == "SALIS_INST":
+ inst_name = line.split()[1][:4]
+ inst_symb = line.split()[3]
+ inst_list.append((inst_name, inst_symb))
+
+ return inst_list
+
+ def __get_proc_elements(self):
+ """ Parse process structure member variables from C header file named
+ 'process.h'. We're using the keyword 'SALIS_PROC_ELEMENT' to identify
+ element declarations, so be careful not to use this keyword anywhere
+ else on the headers.
+ """
+ proc_elem_list = []
+ proc_elem_file = os.path.join(self.__sim.path, "../include/process.h")
+
+ with open(proc_elem_file, "r") as f:
+ lines = f.read().splitlines()
+
+ for line in lines:
+ if line and line.split()[0] == "SALIS_PROC_ELEMENT":
+ proc_elem_name = line.split()[2].split(";")[0]
+
+ if proc_elem_name == "stack[8]":
+ # The stack is a special member variable, an array. We
+ # translate it by returning a list of stack identifiers.
+ proc_elem_list += ["stack[{}]".format(i) for i in range(8)]
+ else:
+ # We can assume all other struct elements are single
+ # variables.
+ proc_elem_list.append(proc_elem_name)
+
+ return proc_elem_list
+
+ def __get_main(self):
+ """ Generate main set of data fields to be printed. We associate, on a
+ list object, a label to each Salis function to be called. The following
+ elements get printed on all pages.
+ """
+ return [
+ ("e", "cycle", self.__sim.lib.sal_main_get_cycle),
+ ("e", "epoch", self.__sim.lib.sal_main_get_epoch),
+ ("e", "state", lambda: self.__sim.state),
+ ("e", "autosave", lambda: self.__sim.autosave),
+ ]
+
+ def __get_pages(self):
+ """ Generate data fields to be printed on each page. We associate, on a
+ list object, a label to each Salis function to be called. Each list
+ represents a PAGE. We initialize all pages inside an ordered dictionary
+ object.
+ """
+ # The following comprehensions build up widgets to help up print sets
+ # of data elements. The use of nested lambdas is needed to receive
+ # updated values.
+ # Instruction counter widget:
+ inst_widget = [("e", inst[0], (lambda j: (
+ lambda: self.__sim.lib.sal_mem_get_inst_count(j)
+ ))(i)) for i, inst in enumerate(self.inst_list)]
+
+ # Evolver module state widget:
+ state_widget = [("e", "state[{}]".format(i), (lambda j: (
+ lambda: self.__sim.lib.sal_evo_get_state(j)
+ ))(i)) for i in range(4)]
+
+ # Selected process state widget:
+ selected_widget = [("p", element, (lambda j: (
+ lambda: self.selected_proc_data[j]
+ ))(i)) for i, element in enumerate(self.proc_elements)]
+
+ # With the help of the widgets above, we can declare the PAGES
+ # dictionary object.
+ return OrderedDict([
+ ("MEMORY", [
+ ("e", "order", self.__sim.lib.sal_mem_get_order),
+ ("e", "size", self.__sim.lib.sal_mem_get_size),
+ ("e", "allocated", self.__sim.lib.sal_mem_get_allocated),
+ ("s", ""),
+ ("h", "INSTRUCTIONS"),
+ ] + inst_widget),
+ ("EVOLVER", [
+ ("e", "last", self.__sim.lib.sal_evo_get_last_changed_address),
+ ("e", "calls", self.__sim.lib.sal_evo_get_calls_on_last_cycle),
+ ] + state_widget),
+ ("PROCESS", [
+ ("e", "count", self.__sim.lib.sal_proc_get_count),
+ ("e", "capacity", self.__sim.lib.sal_proc_get_capacity),
+ ("e", "first", self.__sim.lib.sal_proc_get_first),
+ ("e", "last", self.__sim.lib.sal_proc_get_last),
+ ("e", "selected", lambda: self.selected_proc),
+ ]),
+ ("WORLD", [
+ ("e", "position", lambda: self.world.pos),
+ ("e", "zoom", lambda: self.world.zoom),
+ ("e", "selected", lambda: self.selected_proc),
+ ("s", ""),
+ ("h", "SELECTED PROC"),
+ ] + selected_widget),
+ ])
+
+ def __print_line(self, ypos, line, attrs=curses.A_NORMAL, scroll=True):
+ """ Print a single line on screen only when it's visible.
+ """
+ if scroll:
+ ypos -= self.__main_scroll
+
+ if 0 <= ypos < self.size[0]:
+ # Curses raises an exception each time we print on the screen's
+ # edge. We can just catch and ignore it.
+ try:
+ line = line[:self.size[1] - 1]
+ self.screen.addstr(ypos, 1, line, attrs)
+ except curses.error:
+ pass
+
+ def __print_header(self, ypos, line):
+ """ Print a bold header.
+ """
+ header_attr = curses.A_BOLD | curses.color_pair(self.__pair_header)
+ self.__print_line(ypos, line, header_attr)
+
+ def __print_value(self, ypos, element, value, attr=curses.A_NORMAL):
+ """ Print a label:value pair.
+ """
+ if type(value) == int:
+ if value == ((2 ** 32) - 1):
+ # In Salis, UINT32_MAX is used to represent NULL. We print NULL
+ # as three dashes.
+ value = "---"
+ elif self.__print_hex:
+ value = hex(value)
+
+ line = "{:<10} : {:>10}".format(element, value)
+ self.__print_line(ypos, line, attr)
+
+ def __print_proc_element(self, ypos, element, value):
+ """ Print elements of currently selected process. We highlight in
+ YELLOW if the selected process is running.
+ """
+ if self.__sim.lib.sal_proc_is_free(self.selected_proc):
+ attr = curses.A_NORMAL
+ else:
+ attr = curses.color_pair(self.__pair_selected)
+
+ self.__print_value(ypos, element, value, attr)
+
+ def __print_widget(self, ypos, widget):
+ """ Print a widget (data PAGE) on screen.
+ """
+ for i, element in enumerate(widget):
+ if element[0] == "s":
+ continue
+ elif element[0] == "h":
+ self.__print_header(i + ypos, element[1])
+ elif element[0] == "e":
+ self.__print_value(i + ypos, element[1], element[2]())
+ elif element[0] == "p":
+ self.__print_proc_element(i + ypos, element[1], element[2]())
+
+ def __clear_line(self, ypos):
+ """ Clear the specified line.
+ """
+ if 0 <= ypos < self.size[0]:
+ self.screen.move(ypos, 0)
+ self.screen.clrtoeol()
+
+ def __print_proc_data_list(self):
+ """ Print list of process data elements in PROCESS page. We can toggle
+ between printing the data elements or the genomes by pressing the 'g'
+ key.
+ """
+ # First, print the table header, by extracting element names from the
+ # previously generated proc element list.
+ ypos = len(self.__main) + len(self.__pages["PROCESS"]) + 5
+ header = " | ".join(["{:<10}".format("pidx")] + [
+ "{:>10}".format(element)
+ for element in self.proc_elements[self.__proc_element_scroll:]
+ ])
+ self.__clear_line(ypos)
+ self.__print_header(ypos, header)
+ ypos += 1
+ proc_id = self.proc_list_scroll
+
+ # Print all proc IDs and elements in decimal or hexadecimal format,
+ # depending on hex-flag being set.
+ if self.__print_hex:
+ data_format = lambda x: hex(x)
+ else:
+ data_format = lambda x: x
+
+ # Lastly, iterate all lines and print as much process data as it fits.
+ # We can scroll the process data table using the 'wasd' keys.
+ while ypos < self.size[0]:
+ self.__clear_line(ypos)
+
+ if proc_id < self.__sim.lib.sal_proc_get_capacity():
+ if proc_id == self.selected_proc:
+ # Always highlight the selected process.
+ attr = curses.color_pair(self.__pair_selected)
+ else:
+ attr = curses.A_NORMAL
+
+ # Retrieve a copy of the selected process state and store it in
+ # a list object.
+ proc_data = (c_uint32 * len(self.proc_elements))()
+ self.__sim.lib.sal_proc_get_proc_data(proc_id, cast(
+ proc_data, POINTER(c_uint32))
+ )
+
+ # Lastly, assemble and print the next table row.
+ row = " | ".join(["{:<10}".format(data_format(proc_id))] + [
+ "{:>10}".format(data_format(element))
+ for element in proc_data[self.__proc_element_scroll:]
+ ])
+ self.__print_line(ypos, row, attr)
+
+ proc_id += 1
+ ypos += 1
+
+ def __print_proc_gene_block(
+ self, ypos, gidx, xpos, mbs, mba, ip, sp, pair
+ ):
+ """ Print a sub-set of a process genome. Namely, on of its two memory
+ blocks.
+ """
+ while gidx < mbs and xpos < self.size[1]:
+ gaddr = mba + gidx
+
+ if gaddr == ip:
+ attr = curses.color_pair(self.world.pair_sel_ip)
+ elif gaddr == sp:
+ attr = curses.color_pair(self.world.pair_sel_sp)
+ else:
+ attr = curses.color_pair(pair)
+
+ # Retrieve instruction from memory and transform it to correct
+ # symbol.
+ inst = self.__sim.lib.sal_mem_get_inst(gaddr)
+ symb = self.inst_list[inst][1]
+
+ # Curses raises an exception each time we print on the screen's
+ # edge. We can just catch and ignore it.
+ try:
+ self.screen.addstr(ypos, xpos, symb, attr)
+ except curses.error:
+ pass
+
+ gidx += 1
+ xpos += 1
+
+ return xpos
+
+ def __print_proc_gene(self, ypos, proc_id):
+ """ Print a single process genome on the genome table. We use the same
+ colors to represent memory blocks, IP and SP of each process, as those
+ used to represent the selected process on WORLD view.
+ """
+ # There's nothing to print if process is free.
+ if self.__sim.lib.sal_proc_is_free(proc_id):
+ return
+
+ # Process is alive. Retrieve a copy of the current process state and
+ # store it in a list object.
+ proc_data = (c_uint32 * len(self.proc_elements))()
+ self.__sim.lib.sal_proc_get_proc_data(proc_id, cast(
+ proc_data, POINTER(c_uint32))
+ )
+
+ # Let's extract all data of interest.
+ mb1a = proc_data[self.proc_elements.index("mb1a")]
+ mb1s = proc_data[self.proc_elements.index("mb1s")]
+ mb2a = proc_data[self.proc_elements.index("mb2a")]
+ mb2s = proc_data[self.proc_elements.index("mb2s")]
+ ip = proc_data[self.proc_elements.index("ip")]
+ sp = proc_data[self.proc_elements.index("sp")]
+
+ # Always print MAIN memory block (mb1) first (on the left side). That
+ # way we can keep most of our attention on the parent.
+ xpos = self.__print_proc_gene_block(
+ ypos, self.__proc_gene_scroll, 14, mb1s, mb1a, ip, sp,
+ self.world.pair_sel_mb1
+ )
+
+ # Reset gene counter and print child memory block, if it exists.
+ if mb1s < self.__proc_gene_scroll:
+ gidx = self.__proc_gene_scroll - mb1s
+ else:
+ gidx = 0
+
+ self.__print_proc_gene_block(
+ ypos, gidx, xpos, mb2s, mb2a, ip, sp, self.world.pair_sel_mb2
+ )
+
+ def __print_proc_gene_list(self):
+ """ Print list of process genomes in PROCESS page. We can toggle
+ between printing the genomes or the data elements by pressing the 'g'
+ key.
+ """
+ # Print all proc IDs and gene scroll in decimal or hexadecimal format,
+ # depending on hex-flag being set.
+ if self.__print_hex:
+ data_format = lambda x: hex(x)
+ else:
+ data_format = lambda x: x
+
+ # First, print the table header. We print the current gene-scroll
+ # position for easy reference. Return back to zero scroll with the 'A'
+ # key.
+ ypos = len(self.__main) + len(self.__pages["PROCESS"]) + 5
+ header = "{:<10} | genes {} -->".format(
+ "pidx", data_format(self.__proc_gene_scroll)
+ )
+ self.__clear_line(ypos)
+ self.__print_header(ypos, header)
+ ypos += 1
+ proc_id = self.proc_list_scroll
+
+ # Iterate all lines and print as much genetic data as it fits. We can
+ # scroll the gene data table using the 'wasd' keys.
+ while ypos < self.size[0]:
+ self.__clear_line(ypos)
+
+ if proc_id < self.__sim.lib.sal_proc_get_capacity():
+ if proc_id == self.selected_proc:
+ # Always highlight the selected process.
+ attr = curses.color_pair(self.__pair_selected)
+ else:
+ attr = curses.A_NORMAL
+
+ # Assemble and print the next table row.
+ row = "{:<10} |".format(data_format(proc_id))
+ self.__print_line(ypos, row, attr)
+ self.__print_proc_gene(ypos, proc_id)
+
+ proc_id += 1
+ ypos += 1
+
+ def __print_proc_list(self):
+ """ Print list of process genomes or process data elements in PROCESS
+ page. We can toggle between printing the genomes or the data elements
+ by pressing the 'g' key.
+ """
+ if self.__proc_gene_view:
+ self.__print_proc_gene_list()
+ else:
+ self.__print_proc_data_list()
+
+ def __proc_select_by_cursor(self):
+ """ Select process located on address under cursor, if any exists.
+ """
+ # First, calculate address under cursor.
+ ypos = self.__curs_y
+ xpos = self.__curs_x - World.PADDING
+ line_size = self.size[1] - World.PADDING
+ address = self.world.pos + (
+ ((ypos * line_size) + xpos) * self.world.zoom
+ )
+
+ # Now, iterate all living processes and try to find one that owns the
+ # calculated address.
+ if self.__sim.lib.sal_mem_is_address_valid(address):
+ for proc_id in range(self.__sim.lib.sal_proc_get_capacity()):
+ if not self.__sim.lib.sal_proc_is_free(proc_id):
+ proc_data = (c_uint32 * len(self.proc_elements))()
+ self.__sim.lib.sal_proc_get_proc_data(proc_id, cast(
+ proc_data, POINTER(c_uint32))
+ )
+ mb1a = proc_data[self.proc_elements.index("mb1a")]
+ mb1s = proc_data[self.proc_elements.index("mb1s")]
+ mb2a = proc_data[self.proc_elements.index("mb2a")]
+ mb2s = proc_data[self.proc_elements.index("mb2s")]
+
+ if (
+ mb1a <= address < (mb1a + mb1s) or
+ mb2a <= address < (mb2a + mb2s)
+ ):
+ self.selected_proc = proc_id
+ break
+
+ def __get_minimal(self):
+ """ Generate set of data fields to be printed on minimal mode.
+ """
+ return [
+ ("cycle", self.__sim.lib.sal_main_get_cycle),
+ ("epoch", self.__sim.lib.sal_main_get_epoch),
+ ("procs", self.__sim.lib.sal_proc_get_count),
+ ]
+
+ def __print_minimal(self):
+ """ Print minimal mode data fields.
+ """
+ self.__print_line(1, "Salis --- Minimal mode")
+
+ for i, field in enumerate(self.__minimal):
+ self.__print_line(i + 2, "{}: {}".format(field[0], field[1]()))
diff --git a/bin/modules/world.py b/bin/modules/world.py
new file mode 100644
index 0000000..08f2734
--- /dev/null
+++ b/bin/modules/world.py
@@ -0,0 +1,279 @@
+""" SALIS: Viewer/controller for the SALIS simulator.
+
+File: world.py
+Author: Paul Oliver
+Email: paul.t.oliver.design@gmail.com
+
+This module should be considered an extension of the 'printer' module. It takes
+care of getting a pre-redered image from Salis and post-processing it in order
+to print it into the curses screen. It also keeps track of user controllable
+rendering parameters (position and zoom).
+"""
+
+import curses
+
+from ctypes import c_uint8, cast, POINTER
+
+
+class World:
+ PADDING = 25
+
+ def __init__(self, printer, sim):
+ """ World constructor. We link to the printer and main simulation
+ classes. We also setup the colors for rendering the world.
+ """
+ self.__printer = printer
+ self.__sim = sim
+ self.__set_world_colors()
+ self.__show_ip = True
+ self.pos = 0
+ self.zoom = 1
+
+ def render(self):
+ """ Function for rendering the world. We get a pre-rendered buffer from
+ Salis' memory module (its way faster to pre-render in C) and use that
+ to assemble the world image in Python.
+ """
+ # Window is so narrow that world is not visible.
+ if self.__printer.size[1] <= self.PADDING:
+ return
+
+ # Get pre-rendered image from Salis' memory module.
+ line_width = self.__printer.size[1] - self.PADDING
+ print_area = self.__printer.size[0] * line_width
+ c_buffer = (c_uint8 * print_area)()
+ self.__sim.lib.sal_ren_get_image(
+ self.pos, self.zoom, print_area, cast(c_buffer, POINTER(c_uint8))
+ )
+
+ # Get data elements of selected process, if it's running, and store
+ # them into a convenient dict object.
+ if self.__sim.lib.sal_proc_is_free(self.__printer.selected_proc):
+ sel_data = None
+ else:
+ out_data = self.__printer.selected_proc_data
+ out_elem = self.__printer.proc_elements
+ sel_data = {
+ "ip": out_data[out_elem.index("ip")],
+ "sp": out_data[out_elem.index("sp")],
+ "mb1a": out_data[out_elem.index("mb1a")],
+ "mb1s": out_data[out_elem.index("mb1s")],
+ "mb2a": out_data[out_elem.index("mb2a")],
+ "mb2s": out_data[out_elem.index("mb2s")],
+ }
+
+ # Iterate all cells on printable area and print the post-rendered
+ # cells. Rendered cells contain info about bit flags and instructions
+ # currently written into memory.
+ bidx = 0
+
+ for y in range(self.__printer.size[0]):
+ for x in range(line_width):
+ xpad = x + self.PADDING
+ addr = self.pos + (self.zoom * bidx)
+ symb, attr = self.__render_cell(c_buffer[bidx], addr, sel_data)
+
+ # Curses raises an exception when printing on the edge of the
+ # screen; we can just ignore it.
+ try:
+ self.__printer.screen.addstr(y, xpad, symb, attr)
+ except curses.error:
+ pass
+
+ bidx += 1
+
+ def zoom_out(self):
+ """ Zoom out by a factor of 2 (zoom *= 2).
+ """
+ if self.__is_world_editable():
+ self.zoom = min(self.zoom * 2, self.__get_max_zoom())
+
+ def zoom_in(self):
+ """ Zoom in by a factor of 2 (zoom //= 2).
+ """
+ if self.__is_world_editable():
+ self.zoom = max(self.zoom // 2, 1)
+
+ def zoom_reset(self):
+ """ Reset zoom to a valid value on certain events (i.e. during terminal
+ resizing).
+ """
+ self.zoom = min(self.zoom, self.__get_max_zoom())
+
+ def pan_left(self):
+ """ Pan world to the left (pos -= zoom).
+ """
+ if self.__is_world_editable():
+ self.pos = max(self.pos - self.zoom, 0)
+
+ def pan_right(self):
+ """ Pan world to the right (pos += zoom).
+ """
+ if self.__is_world_editable():
+ max_pos = self.__sim.lib.sal_mem_get_size() - 1
+ self.pos = min(self.pos + self.zoom, max_pos)
+
+ def pan_down(self, fast=False):
+ """ Pan world downward.
+ """
+ if self.__is_world_editable():
+ if fast:
+ self.pos = max(self.pos - self.__get_world_area(), 0)
+ else:
+ self.pos = max(self.pos - self.__get_line_area(), 0)
+
+ def pan_up(self, fast=False):
+ """ Pan world upward.
+ """
+ if self.__is_world_editable():
+ max_pos = self.__sim.lib.sal_mem_get_size() - 1
+
+ if fast:
+ self.pos = min(self.pos + self.__get_world_area(), max_pos)
+ else:
+ self.pos = min(self.pos + self.__get_line_area(), max_pos)
+
+ def pan_reset(self):
+ """ Set world position to zero.
+ """
+ if self.__is_world_editable():
+ self.pos = 0
+
+ def scroll_to(self, pos):
+ """ Move world pos to a specified position.
+ """
+ if self.__is_world_editable():
+ if self.__sim.lib.sal_mem_is_address_valid(pos):
+ self.pos = pos
+ else:
+ raise RuntimeError("Error: scrolling to an invalid address")
+
+ def toggle_ip_view(self):
+ """ Turn on/off IP visualization. Turning off IPs might make it easier
+ to visualize the underlying memory block structure.
+ """
+ if self.__is_world_editable():
+ self.__show_ip = not self.__show_ip
+
+
+ ###############################
+ # Private methods
+ ###############################
+
+ def __set_world_colors(self):
+ """ Define color pairs for rendering the world. Each color has a
+ special meaning, referring to the selected process IP, SP and memory
+ blocks, or to bit flags currently set on rendered cells.
+ """
+ self.pair_free = self.__printer.get_color_pair(
+ curses.COLOR_BLUE
+ )
+ self.pair_alloc = self.__printer.get_color_pair(
+ curses.COLOR_BLACK, curses.COLOR_BLUE
+ )
+ self.pair_mbstart = self.__printer.get_color_pair(
+ curses.COLOR_BLACK, curses.COLOR_CYAN
+ )
+ self.pair_ip = self.__printer.get_color_pair(
+ curses.COLOR_BLACK, curses.COLOR_WHITE
+ )
+ self.pair_sel_mb2 = self.__printer.get_color_pair(
+ curses.COLOR_BLACK, curses.COLOR_GREEN
+ )
+ self.pair_sel_mb1 = self.__printer.get_color_pair(
+ curses.COLOR_BLACK, curses.COLOR_YELLOW
+ )
+ self.pair_sel_sp = self.__printer.get_color_pair(
+ curses.COLOR_BLACK, curses.COLOR_MAGENTA
+ )
+ self.pair_sel_ip = self.__printer.get_color_pair(
+ curses.COLOR_BLACK, curses.COLOR_RED
+ )
+
+ def __render_cell(self, byte, addr, sel_data=None):
+ """ Render a single cell on the WORLD view. All cells are rendered by
+ interpreting the values coming in from the buffer. We overlay special
+ colors for representing the selected organism's state, on top of the
+ more common colors used to represent memory state.
+ """
+ # Paint black all cells that are out of memory bounds.
+ if not self.__sim.lib.sal_mem_is_address_valid(addr):
+ return " ", curses.A_NORMAL
+
+ # Check if cell contains part of the currently selected process.
+ if sel_data:
+ top_addr = addr + self.zoom
+ top_mb1a = sel_data["mb1a"] + sel_data["mb1s"]
+ top_mb2a = sel_data["mb2a"] + sel_data["mb2s"]
+
+ if addr <= sel_data["ip"] < top_addr:
+ pair = self.pair_sel_ip
+ elif addr <= sel_data["sp"] < top_addr:
+ pair = self.pair_sel_sp
+ elif top_addr > sel_data["mb1a"] and top_mb1a > addr:
+ pair = self.pair_sel_mb1
+ elif top_addr > sel_data["mb2a"] and top_mb2a > addr:
+ pair = self.pair_sel_mb2
+
+ # No pair has been selected yet; select pair based on bit-flags.
+ if not "pair" in locals():
+ if self.__show_ip and byte >= 0x80:
+ pair = self.pair_ip
+ elif (byte % 0x80) >= 0x40:
+ pair = self.pair_mbstart
+ elif (byte % 0x40) >= 0x20:
+ pair = self.pair_alloc
+ else:
+ pair = self.pair_free
+
+ # Select symbol to represent instructions currently on cell.
+ inst = byte % 32
+
+ if self.zoom == 1:
+ symb = self.__printer.inst_list[inst][1]
+ elif inst > 16:
+ symb = ":"
+ else:
+ symb = "."
+
+ # Return tuple containing our post-redered cell.
+ return symb, curses.color_pair(pair)
+
+ def __get_max_zoom(self):
+ """ Calculate maximum needed zoom so that the entire world fits on the
+ terminal window.
+ """
+ max_zoom = 1
+ line_size = self.__printer.size[1] - self.PADDING
+ coverage = self.__printer.size[0] * line_size
+
+ # We fix a maximum zoom level; otherwise, program may halt on extreme
+ # zoom levels.
+ while (
+ (coverage * max_zoom) < self.__sim.lib.sal_mem_get_size() and
+ max_zoom < 2 ** 16
+ ):
+ max_zoom *= 2
+
+ return max_zoom
+
+ def __is_world_editable(self):
+ """ For this to return True, printer's current page must be WORLD page.
+ Additionally, the WORLD panel must be visible on the terminal window
+ (i.e. curses.COLS > data_margin).
+ """
+ correct_page = self.__printer.current_page == "WORLD"
+ correct_size = self.__printer.size[1] > self.PADDING
+ return correct_page and correct_size
+
+ def __get_line_area(self):
+ """ Return amount of bytes contained in a printed WORLD line.
+ """
+ line_size = self.__printer.size[1] - self.PADDING
+ line_area = self.zoom * line_size
+ return line_area
+
+ def __get_world_area(self):
+ """ Return amount of bytes contained in the entire WORLD view.
+ """
+ return self.__get_line_area() * self.__printer.size[0]