aboutsummaryrefslogtreecommitdiff
path: root/bin
diff options
context:
space:
mode:
authorPaul Oliver <contact@pauloliver.dev>2024-02-29 02:29:13 +0100
committerPaul Oliver <contact@pauloliver.dev>2024-02-29 02:29:13 +0100
commitca118555214a176728b9aab87849391344306d6d (patch)
tree833cffdd4066a7114b1d79d6eeaa2e0152408fc8 /bin
Initial commit.
Diffstat (limited to 'bin')
-rw-r--r--bin/common/.keep0
-rw-r--r--bin/genomes/.keep0
-rw-r--r--bin/genomes/86.anc1
-rw-r--r--bin/handler.py403
-rw-r--r--bin/lib/.keep0
-rw-r--r--bin/printer.py833
-rwxr-xr-xbin/salis.py346
-rw-r--r--bin/sims/.keep0
-rw-r--r--bin/sims/auto/.keep0
-rw-r--r--bin/world.py277
10 files changed, 1860 insertions, 0 deletions
diff --git a/bin/common/.keep b/bin/common/.keep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/bin/common/.keep
diff --git a/bin/genomes/.keep b/bin/genomes/.keep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/bin/genomes/.keep
diff --git a/bin/genomes/86.anc b/bin/genomes/86.anc
new file mode 100644
index 0000000..30e76f3
--- /dev/null
+++ b/bin/genomes/86.anc
@@ -0,0 +1 @@
+:::[a...]b...^b^b^b-bba::.!d#d#b?d).:.{bc).::a:.:}bc:..LadWcd^a^cvb?b(.::$~b~d(..:a:::
diff --git a/bin/handler.py b/bin/handler.py
new file mode 100644
index 0000000..ad3b14e
--- /dev/null
+++ b/bin/handler.py
@@ -0,0 +1,403 @@
+""" SALIS: Viewer/controller for the SALIS simulator.
+
+file: handler.py
+Author: Paul Oliver
+Email: paul.t.oliver.design@gmail.com
+
+This module should be considered the 'controller' part of the Salis simulator.
+It receives and parses all user input via keyboard and console commands. It
+also takes care of genome compilation (via genome files located on the
+'genomes' directory).
+
+An user may open the Salis console by pressing the 'c' key while in a running
+session. A nice quirk is the possibility to run python commands from within the
+Salis console. As an example, to get the memory size, an user could type:
+
+>>> exec output = self._sim.lib.sal_mem_get_size()
+
+Note that 'output' denotes a storage variable that will get printed on the
+console response. This ability gives an user a whole lot of power, and should
+be used with care.
+"""
+
+import os
+import curses
+
+
+class Handler:
+ ESCAPE_KEY = 27
+
+ def __init__(self, sim):
+ """ Handler constructor. Simply link this class to the main simulation
+ class and printer class and create symbol dictionary.
+ """
+ self._sim = sim
+ self._printer = sim.printer
+ self._inst_dict = self._get_inst_dict()
+ self._console_history = []
+
+ def process_cmd(self, cmd):
+ """ Process incoming commands from curses. Commands are received via
+ ncurses' getch() function, thus, they must be transformed into their
+ character representations with 'ord()'.
+ """
+ if cmd == self.ESCAPE_KEY:
+ self._sim.lib.sal_main_save(
+ self._sim.save_file_path.encode("utf-8")
+ )
+ self._sim.exit()
+ elif cmd == ord(" "):
+ self._sim.toggle_state()
+ elif cmd == curses.KEY_LEFT:
+ self._printer.flip_page(-1)
+ elif cmd == curses.KEY_RIGHT:
+ self._printer.flip_page(1)
+ elif cmd == curses.KEY_DOWN:
+ self._printer.scroll_main(-1)
+ elif cmd == curses.KEY_UP:
+ self._printer.scroll_main(1)
+ elif cmd == curses.KEY_RESIZE:
+ self._printer.on_resize()
+ elif cmd == ord("X"):
+ self._printer.toggle_hex()
+ elif cmd == ord("x"):
+ self._printer.world.zoom_out()
+ elif cmd == ord("z"):
+ self._printer.world.zoom_in()
+ elif cmd == ord("a"):
+ self._printer.world.pan_left()
+ self._printer.proc_scroll_left()
+ elif cmd == ord("d"):
+ self._printer.world.pan_right()
+ self._printer.proc_scroll_right()
+ elif cmd == ord("s"):
+ self._printer.world.pan_down()
+ self._printer.proc_scroll_down()
+ elif cmd == ord("w"):
+ self._printer.world.pan_up()
+ self._printer.proc_scroll_up()
+ elif cmd == ord("S"):
+ self._printer.world.pan_reset()
+ self._printer.proc_scroll_vertical_reset()
+ elif cmd == ord("A"):
+ self._printer.world.pan_reset()
+ self._printer.proc_scroll_horizontal_reset()
+ elif cmd == ord("o"):
+ self._printer.proc_select_prev()
+ elif cmd == ord("p"):
+ self._printer.proc_select_next()
+ elif cmd == ord("f"):
+ self._printer.proc_select_first()
+ elif cmd == ord("l"):
+ self._printer.proc_select_last()
+ elif cmd == ord("k"):
+ self._printer.proc_scroll_to_selected()
+ elif cmd == ord("g"):
+ self._printer.proc_toggle_gene_view()
+ elif cmd == ord("\n"):
+ self._printer.run_cursor()
+ elif cmd == ord("c"):
+ self._printer.run_console()
+ else:
+ # Check for numeric input. Number keys [1 to 0] cycle the
+ # simulation [2 ** ((n - 1) % 10] times.
+ try:
+ if chr(cmd).isdigit():
+ factor = int(chr(cmd))
+ factor = int(2 ** ((factor - 1) % 10))
+ self._cycle_sim(factor)
+ except ValueError:
+ pass
+
+ def handle_console(self, command_raw):
+ """ Process console commands. We parse and check for input errors. Any
+ python exception messages are redirected to the console-response
+ window.
+ """
+ if command_raw:
+ command = command_raw.split()
+
+ try:
+ # Handle both python and self-thrown exceptions.
+ if command[0] in ["q", "quit"]:
+ self._on_quit(command, save=True)
+ elif command[0] in ["q!", "quit!"]:
+ self._on_quit(command, save=False)
+ elif command[0] in ["i", "input"]:
+ self._on_input(command)
+ elif command[0] in ["c", "compile"]:
+ self._on_compile(command)
+ elif command[0] in ["n", "new"]:
+ self._on_new(command)
+ elif command[0] in ["k", "kill"]:
+ self._on_kill(command)
+ elif command[0] in ["e", "exec"]:
+ self._on_exec(command)
+ elif command[0] in ["s", "scroll"]:
+ self._on_scroll(command)
+ elif command[0] in ["p", "process"]:
+ self._on_proc_select(command)
+ elif command[0] in ["r", "rename"]:
+ self._on_rename(command)
+ elif command[0] in ["save"]:
+ self._on_save(command)
+ elif command[0] in ["a", "auto"]:
+ self._on_set_autosave(command)
+ else:
+ # Raise if a non-existing command has been given.
+ self._raise("Invalid command: '{}'".format(command[0]))
+ except BaseException as exep:
+ # We parse and redirect python exceptions to the error
+ # console-window.
+ message = str(exep).strip()
+ message = message[0].upper() + message[1:]
+ self._printer.show_console_error(message)
+ finally:
+ # Store command on console history.
+ self._console_history.append(command_raw.strip())
+
+ @property
+ def console_history(self):
+ return self._console_history
+
+ def _raise(self, message):
+ """ Generic exception thrower. Throws a 'RuntimeError' initialized with
+ the given message.
+ """
+ raise RuntimeError("ERROR: {}".format(message))
+
+ def _respond(self, message):
+ """ Generic console responder. Throws a 'RuntimeError' initialized with
+ the given message.
+ """
+ raise RuntimeError(message)
+
+ def _cycle_sim(self, factor):
+ """ Simply cycle Salis 'factor' number of times.
+ """
+ for _ in range(factor):
+ self._sim.lib.sal_main_cycle()
+ self._sim.check_autosave()
+
+ def _get_inst_dict(self):
+ """ Transform the instruction list of the printer module into a
+ dictionary that's more useful for genome compilation. Instruction
+ symbols are keys, values are the actual byte representation.
+ """
+ inst_dict = {}
+
+ for i, inst in enumerate(self._printer.inst_list):
+ inst_dict[inst[1]] = i
+
+ return inst_dict
+
+ def _on_quit(self, command, save):
+ """ Exit simulation. We can choose whether to save the simulation into a
+ save file or not.
+ """
+ if len(command) > 1:
+ self._raise("Invalid parameters for '{}'".format(command[0]))
+
+ if save:
+ self._sim.lib.sal_main_save(
+ self._sim.save_file_path.encode("utf-8")
+ )
+
+ self._sim.exit()
+
+ def _write_genome(self, genome, address_list):
+ """ Write genome stream into a given list of memory addresses. All
+ addresses must be valid or an exception is thrown.
+ """
+ # All addresses we will write to must be valid.
+ for base_addr in address_list:
+ address = int(base_addr, 0)
+
+ for _ in range(len(genome)):
+ if not self._sim.lib.sal_mem_is_address_valid(address):
+ self._raise("Address '{}' is invalid".format(address))
+
+ address += 1
+
+ # All looks well! Let's compile the genome into memory.
+ for base_addr in address_list:
+ address = int(base_addr, 0)
+
+ for symbol in genome:
+ self._sim.lib.sal_mem_set_inst(
+ address, self._inst_dict[symbol]
+ )
+ address += 1
+
+ def _on_input(self, command):
+ """ Compile organism from user typed input. Compilation can only occur
+ on valid memory addresses. An exception will be thrown when trying to
+ write into non-valid address or when input stream is invalid.
+ """
+ if len(command) < 3:
+ self._raise("Invalid parameters for '{}'".format(command[0]))
+
+ # All characters in file must be actual instruction symbols.
+ for character in command[1]:
+ if character not in self._inst_dict:
+ self._raise("Invalid symbol '{}' found on stream".format(
+ character
+ ))
+
+ # All looks well, Let's write the genome into memory.
+ self._write_genome(command[1], command[2:])
+
+ def _on_compile(self, command):
+ """ Compile organism from source genome file. Genomes must be placed on
+ the './genomes' directory. Compilation can only occur on valid memory
+ addresses. An exception will be thrown when trying to write into
+ non-valid address or when genome file is invalid.
+ """
+ if len(command) < 3:
+ self._raise("Invalid parameters for '{}'".format(command[0]))
+
+ # Open genome file for compilation.
+ gen_file = os.path.join(self._sim.path, "genomes", command[1])
+
+ with open(gen_file, "r") as f:
+ genome = f.read().strip()
+
+ # Entire genome must be written on a single line.
+ if "\n" in genome:
+ self._raise("Newline detected on '{}'".format(gen_file))
+
+ # All characters in file must be actual instruction symbols.
+ for character in genome:
+ if character not in self._inst_dict:
+ self._raise("Invalid symbol '{}' found on '{}'".format(
+ character, gen_file
+ ))
+
+ # All looks well, Let's write the genome into memory.
+ self._write_genome(genome, command[2:])
+
+ def _on_new(self, command):
+ """ Instantiate new organism of given size on given address. These
+ memory areas must be free and valid or an exception is thrown.
+ """
+ if len(command) < 3:
+ self._raise("Invalid parameters for '{}'".format(command[0]))
+
+ # Check that all addresses we will allocate are free and valid.
+ for base_addr in command[2:]:
+ address = int(base_addr, 0)
+
+ for _ in range(int(command[1])):
+ if not self._sim.lib.sal_mem_is_address_valid(address):
+ self._raise("Address '{}' is invalid".format(address))
+ elif self._sim.lib.sal_mem_is_allocated(address):
+ self._raise("Address '{}' is allocated".format(address))
+
+ address += 1
+
+ # All looks well! Let's instantiate our new organism.
+ for base_addr in command[2:]:
+ address = int(base_addr, 0)
+ size = int(command[1], 0)
+ self._sim.lib.sal_proc_create(address, size)
+
+ def _on_kill(self, command):
+ """ Kill organism on bottom of reaper queue.
+ """
+ if len(command) > 1:
+ self._raise("Invalid parameters for '{}'".format(command[0]))
+
+ # Call proc kill function only if there's any organisms to kill.
+ if not self._sim.lib.sal_proc_get_count():
+ self._raise("No organisms currently alive")
+ else:
+ self._sim.lib.sal_proc_kill()
+
+ def _on_exec(self, command):
+ """ Allow a user to execute a python command from within the console.
+ Using this is very hack-ish, and not recommended unless you're certain
+ of what you're doing!
+ """
+ if len(command) < 2:
+ self._raise("'{}' must be followed by an executable string".format(
+ command[0])
+ )
+
+ # User may query any simulation variable or status and the console will
+ # respond. For example, to query memory size or order, type one of the
+ # following:
+ #
+ # >>> exec output = self._sim.lib.sal_mem_get_size()
+ # >>> exec output = self._sim.lib.sal_mem_get_order()
+ #
+ output = {}
+ exec(" ".join(command[1:]), locals(), output)
+
+ if output:
+ self._respond("EXEC RESPONDS: {}".format(str(output)))
+
+ def _on_scroll(self, command):
+ """ We can scroll to a specific process (on PROCESS view) or to a
+ specific world address (on WORLD view) via the console.
+ """
+ if len(command) != 2:
+ self._raise("Invalid parameters for '{}'".format(command[0]))
+
+ target = int(command[1], 0)
+
+ # If on PROCESS page, scroll to given process.
+ if self._printer.current_page == "PROCESS":
+ if target < self._sim.lib.sal_proc_get_capacity():
+ self._printer.proc_scroll_to(target)
+ else:
+ self._raise("No process with ID '{}' found".format(target))
+ elif self._printer.current_page == "WORLD":
+ if self._sim.lib.sal_mem_is_address_valid(target):
+ self._printer.world.scroll_to(target)
+ else:
+ self._raise("Address '{}' is invalid".format(address))
+ else:
+ self._raise("'{}' must be called on PROCESS or WORLD page".format(
+ command[0])
+ )
+
+ def _on_proc_select(self, command):
+ """ Select a specific process (on PROCESS or WORLD page).
+ """
+ if len(command) != 2:
+ self._raise("Invalid parameters for '{}'".format(command[0]))
+
+ target = int(command[1], 0)
+
+ # If on PROCESS page, scroll to given process.
+ if target < self._sim.lib.sal_proc_get_capacity():
+ self._printer.proc_select_by_id(target)
+ else:
+ self._raise("No process with ID '{}' found".format(target))
+
+ def _on_rename(self, command):
+ """ Set a new simulation name. Future auto-saved files will use this
+ name as prefix.
+ """
+ if len(command) != 2:
+ self._raise("Invalid parameters for '{}'".format(command[0]))
+
+ self._sim.rename(command[1])
+
+ def _on_save(self, command):
+ """ Save simulation on its current state.
+ """
+ if len(command) != 1:
+ self._raise("Invalid parameters for '{}'".format(command[0]))
+
+ self._sim.lib.sal_main_save(self._sim.save_file_path.encode("utf-8"))
+
+ def _on_set_autosave(self, command):
+ """ Set the simulation's auto save interval. Provide any integer
+ between 0 and (2**32 - 1). If zero is provided, auto saving will be
+ disabled.
+ """
+ if len(command) != 2:
+ self._raise("Invalid parameters for '{}'".format(command[0]))
+
+ self._sim.set_autosave(int(command[1], 0))
diff --git a/bin/lib/.keep b/bin/lib/.keep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/bin/lib/.keep
diff --git a/bin/printer.py b/bin/printer.py
new file mode 100644
index 0000000..135e220
--- /dev/null
+++ b/bin/printer.py
@@ -0,0 +1,833 @@
+""" SALIS: Viewer/controller for the SALIS simulator.
+
+File: printer.py
+Author: Paul Oliver
+Email: paul.t.oliver.design@gmail.com
+
+This module should be considered the 'view' part of the Salis simulator. It
+takes care of displaying the simulator's state in a nicely formatted, intuitive
+format. It makes use of the curses library for terminal handling.
+"""
+
+import curses
+import curses.textpad
+import os
+import time
+from collections import OrderedDict
+from ctypes import c_uint8, c_uint32, cast, POINTER
+from handler import Handler
+from world import World
+
+
+class Printer:
+ def __init__(self, sim):
+ """ Printer constructor. It takes care of starting up curses, defining
+ the data pages and setting the printer on its initial state.
+ """
+ self._sim = sim
+ self._color_pair_count = 0
+ self._screen = self._get_screen()
+ self._inst_list = self._get_inst_list()
+ self._proc_elements = self._get_proc_elements()
+ self._main = self._get_main()
+ self._pages = self._get_pages()
+ self._size = self._screen.getmaxyx()
+ self._current_page = "MEMORY"
+ self._main_scroll = 0
+ self._selected_proc = 0
+ self._selected_proc_data = (c_uint32 * len(self._proc_elements))()
+ self._proc_list_scroll = 0
+ self._proc_element_scroll = 0
+ self._proc_gene_scroll = 0
+ self._proc_gene_view = False
+ self._curs_y = 0
+ self._curs_x = 0
+ self._print_hex = False
+ self._world = World(self, self._sim)
+
+ def __del__(self):
+ """ Printer destructor exits curses.
+ """
+ curses.endwin()
+
+ def get_color_pair(self, fg, bg=-1):
+ """ We use this method to set new color pairs, keeping track of the
+ number of pairs already set. We return the new color pair ID.
+ """
+ self._color_pair_count += 1
+ curses.init_pair(self._color_pair_count, fg, bg)
+ return self._color_pair_count
+
+ def get_cmd(self):
+ """ This returns the pressed key from the curses handler. It's called
+ during the simulation's main loop. Flushing input is important when in
+ non-blocking mode.
+ """
+ ch = self._screen.getch()
+ curses.flushinp()
+ return ch
+
+ def set_nodelay(self, nodelay):
+ """ Toggles between blocking and non-blocking mode on curses.
+ """
+ self._screen.nodelay(nodelay)
+
+ def toggle_hex(self):
+ """ Toggle between decimal or hexadecimal printing of all simulation
+ state elements.
+ """
+ self._print_hex = not self._print_hex
+
+ def on_resize(self):
+ """ Called whenever the terminal window gets resized.
+ """
+ self._size = self._screen.getmaxyx()
+ self.scroll_main()
+ self._world.zoom_reset()
+
+ def flip_page(self, offset):
+ """ Change data page by given offset (i.e. '1' for next page or '-1'
+ for previous one).
+ """
+ pidx = list(self._pages.keys()).index(self._current_page)
+ pidx = (pidx + offset) % len(self._pages)
+ self._current_page = list(self._pages.keys())[pidx]
+ self.scroll_main()
+
+ def scroll_main(self, offset=0):
+ """ Scrolling is allowed whenever the current page does not fit inside
+ the terminal window. This method gets called, with no offset, under
+ certain situations, like changing pages, just to make sure the screen
+ gets cleared and at least some of the data is always scrolled into
+ view.
+ """
+ self._screen.clear()
+ len_main = len(self._main)
+ len_page = len(self._pages[self._current_page])
+ max_scroll = (len_main + len_page + 5) - self._size[0]
+ self._main_scroll += offset
+ self._main_scroll = max(0, min(self._main_scroll, max_scroll))
+
+ def proc_scroll_left(self):
+ """ Scroll process data elements or genomes (on PROCESS view) to the
+ left.
+ """
+ if self._current_page == "PROCESS":
+ if self._proc_gene_view:
+ self._proc_gene_scroll -= 1
+ self._proc_gene_scroll = max(0, self._proc_gene_scroll)
+ else:
+ self._proc_element_scroll -= 1
+ self._proc_element_scroll = max(0, self._proc_element_scroll)
+
+ def proc_scroll_right(self):
+ """ Scroll process data elements or genomes (on PROCESS view) to the
+ right.
+ """
+ if self._current_page == "PROCESS":
+ if self._proc_gene_view:
+ self._proc_gene_scroll += 1
+ else:
+ self._proc_element_scroll += 1
+ max_scroll = len(self._proc_elements) - 1
+ self._proc_element_scroll = min(
+ max_scroll, self._proc_element_scroll
+ )
+
+ def proc_scroll_down(self):
+ """ Scroll process data table (on PROCESS view) up.
+ """
+ if self._current_page == "PROCESS":
+ self._proc_list_scroll = max(0, self._proc_list_scroll - 1)
+
+ def proc_scroll_up(self):
+ """ Scroll process data table (on PROCESS view) down.
+ """
+ if self._current_page == "PROCESS":
+ self._proc_list_scroll = min(
+ self._sim.lib.sal_proc_get_capacity() - 1,
+ self._proc_list_scroll + 1
+ )
+
+ def proc_scroll_to(self, proc_id):
+ """ Scroll process data table (on PROCESS view) to a specific position.
+ """
+ if self._current_page == "PROCESS":
+ if proc_id < self._sim.lib.sal_proc_get_capacity():
+ self._proc_list_scroll = proc_id
+ else:
+ raise RuntimeError("Error: scrolling to invalid process")
+
+ def proc_scroll_vertical_reset(self):
+ """ Scroll process data table (on PROCESS view) back to top.
+ """
+ if self._current_page == "PROCESS":
+ self._proc_list_scroll = 0
+
+ def proc_scroll_horizontal_reset(self):
+ """ Scroll process data or genome table (on PROCESS view) back to the
+ left.
+ """
+ if self._current_page == "PROCESS":
+ if self._proc_gene_view:
+ self._proc_gene_scroll = 0
+ else:
+ self._proc_element_scroll = 0
+
+ def proc_select_prev(self):
+ """ Select previous process.
+ """
+ if self._current_page in ["PROCESS", "WORLD"]:
+ self._selected_proc -= 1
+ self._selected_proc %= self._sim.lib.sal_proc_get_capacity()
+
+ def proc_select_next(self):
+ """ Select next process.
+ """
+ if self._current_page in ["PROCESS", "WORLD"]:
+ self._selected_proc += 1
+ self._selected_proc %= self._sim.lib.sal_proc_get_capacity()
+
+ def proc_select_first(self):
+ """ Select first process on reaper queue.
+ """
+ if self._current_page in ["PROCESS", "WORLD"]:
+ if self._sim.lib.sal_proc_get_count():
+ self._selected_proc = self._sim.lib.sal_proc_get_first()
+
+ def proc_select_last(self):
+ """ Select last process on reaper queue.
+ """
+ if self._current_page in ["PROCESS", "WORLD"]:
+ if self._sim.lib.sal_proc_get_count():
+ self._selected_proc = self._sim.lib.sal_proc_get_last()
+
+ def proc_select_by_id(self, proc_id):
+ """ Select process from given ID.
+ """
+ if proc_id < self._sim.lib.sal_proc_get_capacity():
+ self._selected_proc = proc_id
+ else:
+ raise RuntimeError("Error: attempting to select non-existing proc")
+
+ def proc_scroll_to_selected(self):
+ """ Scroll WORLD or PROCESS page so that selected process becomes
+ visible.
+ """
+ if self._current_page == "PROCESS":
+ self._proc_list_scroll = self._selected_proc
+ elif self._current_page == "WORLD":
+ if not self._sim.lib.sal_proc_is_free(self._selected_proc):
+ index = self._proc_elements.index("mb1a")
+ address = self._selected_proc_data[index]
+ self._world.scroll_to(address)
+
+ def proc_toggle_gene_view(self):
+ """ Toggle between data element or genome view on PROCESS page.
+ """
+ if self._current_page == "PROCESS":
+ self._proc_gene_view = not self._proc_gene_view
+
+ def run_cursor(self):
+ """ We can toggle a visible cursor on WORLD view to aid us in selecting
+ processes.
+ """
+ if self._current_page == "WORLD" and self._size[1] > World.PADDING:
+ curses.curs_set(True)
+
+ while True:
+ self._curs_y = max(0, min(self._curs_y, self._size[0] - 1))
+ self._curs_x = max(World.PADDING, min(
+ self._curs_x, self._size[1] - 1
+ ))
+ self._screen.move(self._curs_y, self._curs_x)
+ cmd = self._screen.getch()
+
+ if cmd in [ord("c"), curses.KEY_RESIZE, Handler.ESCAPE_KEY]:
+ self.on_resize()
+ break
+ elif cmd == curses.KEY_LEFT:
+ self._curs_x -= 1
+ elif cmd == curses.KEY_RIGHT:
+ self._curs_x += 1
+ elif cmd == curses.KEY_DOWN:
+ self._curs_y += 1
+ elif cmd == curses.KEY_UP:
+ self._curs_y -= 1
+ elif cmd == ord("\n"):
+ self._proc_select_by_cursor()
+ break
+
+ curses.curs_set(False)
+
+ def run_console(self):
+ """ Run the Salis console. You can use the console to control all main
+ aspects of the simulation, like compiling genomes into memory, creating
+ or killing organisms, setting auto-save interval, among other stuff.
+ """
+ # Print a pythonic prompt.
+ self._print_line(self._size[0] - 1, ">>> ", scroll=False)
+ self._screen.refresh()
+
+ # Create the console child window. We turn it into a Textbox object in
+ # order to allow line-editing and extract output easily.
+ console = curses.newwin(1, self._size[1] - 5, self._size[0] - 1, 5)
+ textbox = curses.textpad.Textbox(console, insert_mode=True)
+ textbox.stripspaces = True
+
+ # Grab a copy of the console history and instantiate a pointer to the
+ # last element.
+ history = self._sim.handler.console_history + [""]
+ pointer = len(history) - 1
+
+ # Nested method reinserts recorded commands from history into console.
+ def access_history(cmd):
+ nonlocal pointer
+
+ if pointer == len(history) - 1:
+ history[-1] = console.instr().strip()
+
+ if cmd == "up" and pointer != 0:
+ pointer -= 1
+ elif cmd == "down" and pointer < len(history) - 1:
+ pointer += 1
+
+ console.clear()
+ console.addstr(0, 0, history[pointer])
+ console.refresh()
+
+ # Declare custom validator to control special commands.
+ def validator(cmd):
+ EXIT = 7
+
+ if cmd in [curses.KEY_RESIZE, Handler.ESCAPE_KEY]:
+ console.clear()
+ return EXIT
+ elif cmd == curses.KEY_UP:
+ access_history("up")
+ elif cmd == curses.KEY_DOWN:
+ access_history("down")
+ else:
+ return cmd
+
+ # Run the Textbox object with our custom validator.
+ curses.curs_set(True)
+ output = textbox.edit(validator)
+ curses.curs_set(False)
+
+ # Finally, extract data from console and send to handler.
+ self._sim.handler.handle_console(output)
+ self._screen.clear()
+
+ def show_console_error(self, message):
+ """ Shows Salis console error messages, if any. These messages might
+ contain actual python exception output.
+ """
+ self._print_line(self._size[0] - 1, ">>>", curses.color_pair(
+ self._pair_error
+ ) | curses.A_BOLD)
+ self._screen.refresh()
+
+ # We also use a Textbox object, just so that execution gets halted
+ # until a key gets pressed (even on non-blocking mode).
+ console = curses.newwin(1, self._size[1] - 5, self._size[0] - 1, 5)
+ textbox = curses.textpad.Textbox(console)
+
+ # Curses may raise an exception if printing on the edge of the screen;
+ # we can just ignore it.
+ try:
+ console.addstr(0, 0, message, curses.color_pair(
+ self._pair_error
+ ) | curses.A_BOLD)
+ except curses.error:
+ pass
+
+ # Custom validator simply exits on any key.
+ def validator(cmd):
+ EXIT = 7
+ return EXIT
+
+ textbox.edit(validator)
+ self._screen.clear()
+
+ def print_page(self):
+ """ Print current page to screen. We use the previously generated
+ '_pages' dictionary to easily associate a label to a Salis function.
+ """
+ # Update selected proc data if in WORLD view.
+ if self._current_page == "WORLD":
+ self._sim.lib.sal_proc_get_proc_data(self._selected_proc, cast(
+ self._selected_proc_data, POINTER(c_uint32)
+ ))
+
+ # Print MAIN simulation data.
+ self._print_line(
+ 1, "SALIS[{}]".format(self._sim.args.file), curses.color_pair(
+ self._pair_header
+ ) | curses.A_BOLD
+ )
+ self._print_widget(2, self._main)
+
+ # Print data of currently selected page.
+ main_lines = len(self._main) + 3
+ self._print_header(main_lines, self._current_page)
+ self._print_widget(main_lines + 1, self._pages[self._current_page])
+
+ # Print special widgets (WORLD view and PROCESS list).
+ if self._current_page == "WORLD":
+ self._world.render()
+ elif self._current_page == "PROCESS":
+ self._print_proc_list()
+
+ @property
+ def screen(self):
+ return self._screen
+
+ @property
+ def inst_list(self):
+ return self._inst_list
+
+ @property
+ def proc_elements(self):
+ return self._proc_elements
+
+ @property
+ def size(self):
+ return self._size
+
+ @property
+ def current_page(self):
+ return self._current_page
+
+ @property
+ def selected_proc(self):
+ return self._selected_proc
+
+ @property
+ def selected_proc_data(self):
+ return self._selected_proc_data
+
+ @property
+ def proc_list_scroll(self):
+ return self._proc_list_scroll
+
+ @property
+ def world(self):
+ return self._world
+
+ def _set_colors(self):
+ """ Define the color pairs for the data printer.
+ """
+ curses.start_color()
+ curses.use_default_colors()
+ self._pair_header = self.get_color_pair(curses.COLOR_BLUE)
+ self._pair_selected = self.get_color_pair(curses.COLOR_YELLOW)
+ self._pair_error = self.get_color_pair(curses.COLOR_RED)
+
+ def _get_screen(self):
+ """ Prepare and return the main curses window. We also set a shorter
+ delay when responding to a pressed escape key.
+ """
+ # Set a shorter delay to the ESCAPE key, so that we may use it to exit
+ # Salis.
+ os.environ.setdefault("ESCDELAY", "25")
+
+ # Prepare curses screen.
+ screen = curses.initscr()
+ curses.noecho()
+ curses.cbreak()
+ screen.keypad(True)
+ curses.curs_set(False)
+
+ # We need color support in order to run the printer module.
+ if curses.has_colors():
+ self._set_colors()
+ else:
+ raise RuntimeError("Error: no color support.")
+
+ return screen
+
+ def _get_inst_list(self):
+ """ Parse instruction set from C header file named 'instset.h'. We're
+ using the keyword 'SALIS_INST' to identify an instruction definition,
+ so be careful not to use this keyword anywhere else on the headers.
+ """
+ inst_list = []
+ inst_file = os.path.join(self._sim.path, "../include/instset.h")
+
+ with open(inst_file, "r") as f:
+ lines = f.read().splitlines()
+
+ for line in lines:
+ if line and line.split()[0] == "SALIS_INST":
+ inst_name = line.split()[1][:4]
+ inst_symb = line.split()[3]
+ inst_list.append((inst_name, inst_symb))
+
+ return inst_list
+
+ def _get_proc_elements(self):
+ """ Parse process structure member variables from C header file named
+ 'process.h'. We're using the keyword 'SALIS_PROC_ELEMENT' to identify
+ element declarations, so be careful not to use this keyword anywhere
+ else on the headers.
+ """
+ proc_elem_list = []
+ proc_elem_file = os.path.join(self._sim.path, "../include/process.h")
+
+ with open(proc_elem_file, "r") as f:
+ lines = f.read().splitlines()
+
+ for line in lines:
+ if line and line.split()[0] == "SALIS_PROC_ELEMENT":
+ proc_elem_name = line.split()[2].split(";")[0]
+
+ if proc_elem_name == "stack[8]":
+ # The stack is a special member variable, an array. We
+ # translate it by returning a list of stack identifiers.
+ proc_elem_list += ["stack[{}]".format(i) for i in range(8)]
+ else:
+ # We can assume all other struct elements are single
+ # variables.
+ proc_elem_list.append(proc_elem_name)
+
+ return proc_elem_list
+
+ def _get_main(self):
+ """ Generate main set of data fields to be printed. We associate, on a
+ list object, a label to each Salis function to be called. The following
+ elements get printed on all pages.
+ """
+ return [
+ ("e", "cycle", self._sim.lib.sal_main_get_cycle),
+ ("e", "epoch", self._sim.lib.sal_main_get_epoch),
+ ("e", "state", lambda: self._sim.state),
+ ("e", "autosave", lambda: self._sim.autosave),
+ ]
+
+ def _get_pages(self):
+ """ Generate data fields to be printed on each page. We associate, on a
+ list object, a label to each Salis function to be called. Each list
+ represents a PAGE. We initialize all pages inside an ordered dictionary
+ object.
+ """
+ # The following widgets help up print special sets of data elements.
+ # The use of nested lambdas is needed to receive updated values.
+ # Instruction counter widget:
+ inst_widget = [("e", inst[0], (lambda j: (
+ lambda: self._sim.lib.sal_mem_get_inst_count(j)
+ ))(i)) for i, inst in enumerate(self._inst_list)]
+
+ # Evolver module state widget:
+ state_widget = [("e", "state[{}]".format(i), (lambda j: (
+ lambda: self._sim.lib.sal_evo_get_state(j)
+ ))(i)) for i in range(4)]
+
+ # Selected process state widget:
+ selected_widget = [("p", element, (lambda j: (
+ lambda: self._selected_proc_data[j]
+ ))(i)) for i, element in enumerate(self._proc_elements)]
+
+ # With the help of the widgets above, we can declare the PAGES
+ # dictionary object.
+ return OrderedDict([
+ ("MEMORY", [
+ ("e", "order", self._sim.lib.sal_mem_get_order),
+ ("e", "size", self._sim.lib.sal_mem_get_size),
+ ("e", "blocks", self._sim.lib.sal_mem_get_block_start_count),
+ ("e", "allocated", self._sim.lib.sal_mem_get_allocated_count),
+ ("e", "ips", self._sim.lib.sal_mem_get_ip_count),
+ ("s", ""),
+ ("h", "INSTRUCTIONS"),
+ ] + inst_widget),
+ ("EVOLVER", [
+ ("e", "last", self._sim.lib.sal_evo_get_last_changed_address),
+ ("e", "calls", self._sim.lib.sal_evo_get_calls_on_last_cycle),
+ ] + state_widget),
+ ("PROCESS", [
+ ("e", "count", self._sim.lib.sal_proc_get_count),
+ ("e", "capacity", self._sim.lib.sal_proc_get_capacity),
+ ("e", "first", self._sim.lib.sal_proc_get_first),
+ ("e", "last", self._sim.lib.sal_proc_get_last),
+ ("e", "exec",
+ self._sim.lib.sal_proc_get_instructions_executed
+ ),
+ ]),
+ ("WORLD", [
+ ("e", "position", lambda: self._world.pos),
+ ("e", "zoom", lambda: self._world.zoom),
+ ("e", "selected", lambda: self._selected_proc),
+ ("s", ""),
+ ("h", "SELECTED PROC"),
+ ] + selected_widget),
+ ])
+
+ def _print_line(self, ypos, line, attrs=curses.A_NORMAL, scroll=True):
+ """ Print a single line on screen only when it's visible.
+ """
+ if scroll:
+ ypos -= self._main_scroll
+
+ if 0 <= ypos < self._size[0]:
+ # Curses raises an exception each time we print on the screen's
+ # edge. We can just catch and ignore it.
+ try:
+ line = line[:self._size[1] - 1]
+ self._screen.addstr(ypos, 1, line, attrs)
+ except curses.error:
+ pass
+
+ def _print_header(self, ypos, line):
+ """ Print a bold header.
+ """
+ header_attr = curses.A_BOLD | curses.color_pair(self._pair_header)
+ self._print_line(ypos, line, header_attr)
+
+ def _print_value(self, ypos, element, value, attr=curses.A_NORMAL):
+ """ Print a label:value pair.
+ """
+ if type(value) == int:
+ if value == ((2 ** 32) - 1):
+ # In Salis, UINT32_MAX is used to represent NULL. We print NULL
+ # as three dashes.
+ value = "---"
+ elif self._print_hex:
+ value = hex(value)
+
+ line = "{:<10} : {:>10}".format(element, value)
+ self._print_line(ypos, line, attr)
+
+ def _print_proc_element(self, ypos, element, value):
+ """ Print elements of currently selected process. We highlight in
+ YELLOW if the selected process is running.
+ """
+ if self._sim.lib.sal_proc_is_free(self._selected_proc):
+ attr = curses.A_NORMAL
+ else:
+ attr = curses.color_pair(self._pair_selected)
+
+ self._print_value(ypos, element, value, attr)
+
+ def _print_widget(self, ypos, widget):
+ """ Print a widget (data PAGE) on screen.
+ """
+ for i, element in enumerate(widget):
+ if element[0] == "s":
+ continue
+ elif element[0] == "h":
+ self._print_header(i + ypos, element[1])
+ elif element[0] == "e":
+ self._print_value(i + ypos, element[1], element[2]())
+ elif element[0] == "p":
+ self._print_proc_element(i + ypos, element[1], element[2]())
+
+ def _clear_line(self, ypos):
+ """ Clear the specified line.
+ """
+ if 0 <= ypos < self._size[0]:
+ self._screen.move(ypos, 0)
+ self._screen.clrtoeol()
+
+ def _print_proc_data_list(self):
+ """ Print list of process data elements in PROCESS page. We can toggle
+ between printing the data elements or the genomes by pressing the 'g'
+ key.
+ """
+ # First, print the table header, by extracting element names from the
+ # previously generated proc element list.
+ ypos = len(self._main) + len(self._pages["PROCESS"]) + 5
+ header = " | ".join(["{:<10}".format("pidx")] + [
+ "{:>10}".format(element)
+ for element in self._proc_elements[self._proc_element_scroll:]
+ ])
+ self._clear_line(ypos)
+ self._print_header(ypos, header)
+ ypos += 1
+ proc_id = self._proc_list_scroll
+
+ # Print all proc elements in decimal or hexadecimal format, depending
+ # on hex-flag being set.
+ if self._print_hex:
+ data_format = lambda x: hex(x)
+ else:
+ data_format = lambda x: x
+
+ # Lastly, iterate all lines and print as much process data as it fits.
+ # We can scroll the process data table using the 'wasd' keys.
+ while ypos < self._size[0]:
+ self._clear_line(ypos)
+
+ if proc_id < self._sim.lib.sal_proc_get_capacity():
+ if proc_id == self._selected_proc:
+ # Always highlight the selected process.
+ attr = curses.color_pair(self._pair_selected)
+ else:
+ attr = curses.A_NORMAL
+
+ # Retrieve a copy of the selected process state and store it in
+ # a list object.
+ proc_data = (c_uint32 * len(self._proc_elements))()
+ self._sim.lib.sal_proc_get_proc_data(proc_id, cast(
+ proc_data, POINTER(c_uint32))
+ )
+
+ # Lastly, assemble and print the next table row.
+ row = " | ".join(["{:<10}".format(proc_id)] + [
+ "{:>10}".format(data_format(element))
+ for element in proc_data[self._proc_element_scroll:]
+ ])
+ self._print_line(ypos, row, attr)
+
+ proc_id += 1
+ ypos += 1
+
+ def _print_proc_gene_block(self, ypos, gidx, xpos, mbs, mba, ip, sp, pair):
+ """ Print a sub-set of a process genome. Namely, on of its two memory
+ blocks.
+ """
+ while gidx < mbs and xpos < curses.COLS:
+ gaddr = mba + gidx
+
+ if gaddr == ip:
+ attr = curses.color_pair(self._world.pair_sel_ip)
+ elif gaddr == sp:
+ attr = curses.color_pair(self._world.pair_sel_sp)
+ else:
+ attr = curses.color_pair(pair)
+
+ # Retrieve instruction from memory and transform it to correct
+ # symbol.
+ inst = self._sim.lib.sal_mem_get_inst(gaddr)
+ symb = self._inst_list[inst][1]
+
+ # Curses raises an exception each time we print on the screen's
+ # edge. We can just catch and ignore it.
+ try:
+ self._screen.addch(ypos, xpos, symb, attr)
+ except curses.error:
+ pass
+
+ gidx += 1
+ xpos += 1
+
+ return xpos
+
+ def _print_proc_gene(self, ypos, proc_id):
+ """ Print a single process genome on the genome table. We use the same
+ colors to represent memory blocks, IP and SP of each process, as those
+ used to represent the selected process on WORLD view.
+ """
+ # There's nothing to print if process is free.
+ if self._sim.lib.sal_proc_is_free(proc_id):
+ return
+
+ # Process is alive. Retrieve a copy of the current process state and
+ # store it in a list object.
+ proc_data = (c_uint32 * len(self._proc_elements))()
+ self._sim.lib.sal_proc_get_proc_data(proc_id, cast(
+ proc_data, POINTER(c_uint32))
+ )
+
+ # Let's extract all data of interest.
+ mb1a = proc_data[self._proc_elements.index("mb1a")]
+ mb1s = proc_data[self._proc_elements.index("mb1s")]
+ mb2a = proc_data[self._proc_elements.index("mb2a")]
+ mb2s = proc_data[self._proc_elements.index("mb2s")]
+ ip = proc_data[self._proc_elements.index("ip")]
+ sp = proc_data[self._proc_elements.index("sp")]
+
+ # Always print MAIN memory block (mb1) first (on the left side). That
+ # way we can keep most of our attention on the parent.
+ xpos = self._print_proc_gene_block(
+ ypos, self._proc_gene_scroll, 14, mb1s, mb1a, ip, sp,
+ self._world.pair_sel_mb1
+ )
+
+ # Reset gene counter and print child memory block, if it exists.
+ if mb1s < self._proc_gene_scroll:
+ gidx = self._proc_gene_scroll - mb1s
+ else:
+ gidx = 0
+
+ self._print_proc_gene_block(
+ ypos, gidx, xpos, mb2s, mb2a, ip, sp, self._world.pair_sel_mb2
+ )
+
+ def _print_proc_gene_list(self):
+ """ Print list of process genomes in PROCESS page. We can toggle
+ between printing the genomes or the data elements by pressing the 'g'
+ key.
+ """
+ # First, print the table header. We print the current gene-scroll
+ # position for easy reference. Return back to zero scroll with the 'A'
+ # key.
+ ypos = len(self._main) + len(self._pages["PROCESS"]) + 5
+ header = "{:<10} | genes {} -->".format(
+ "pidx", self._proc_gene_scroll
+ )
+ self._clear_line(ypos)
+ self._print_header(ypos, header)
+ ypos += 1
+ proc_id = self._proc_list_scroll
+
+ # Iterate all lines and print as much genetic data as it fits. We can
+ # scroll the gene data table using the 'wasd' keys.
+ while ypos < self._size[0]:
+ self._clear_line(ypos)
+
+ if proc_id < self._sim.lib.sal_proc_get_capacity():
+ if proc_id == self._selected_proc:
+ # Always highlight the selected process.
+ attr = curses.color_pair(self._pair_selected)
+ else:
+ attr = curses.A_NORMAL
+
+ # Assemble and print the next table row.
+ row = "{:<10} |".format(proc_id)
+ self._print_line(ypos, row, attr)
+ self._print_proc_gene(ypos, proc_id)
+
+ proc_id += 1
+ ypos += 1
+
+ def _print_proc_list(self):
+ """ Print list of process genomes or process data elements in PROCESS
+ page. We can toggle between printing the genomes or the data elements
+ by pressing the 'g' key.
+ """
+ if self._proc_gene_view:
+ self._print_proc_gene_list()
+ else:
+ self._print_proc_data_list()
+
+ def _proc_select_by_cursor(self):
+ """ Select process located on address under cursor, if any exists.
+ """
+ # First, calculate address under cursor.
+ ypos = self._curs_y
+ xpos = self._curs_x - World.PADDING
+ line_size = self._size[1] - World.PADDING
+ address = self._world.pos + (
+ ((ypos * line_size) + xpos) * self._world.zoom
+ )
+
+ # Now, iterate all living processes and try to find one that owns the
+ # calculated address.
+ if self._sim.lib.sal_mem_is_address_valid(address):
+ for proc_id in range(self._sim.lib.sal_proc_get_count()):
+ if not self._sim.lib.sal_proc_is_free(proc_id):
+ proc_data = (c_uint32 * len(self._proc_elements))()
+ self._sim.lib.sal_proc_get_proc_data(proc_id, cast(
+ proc_data, POINTER(c_uint32))
+ )
+ mb1a = proc_data[self._proc_elements.index("mb1a")]
+ mb1s = proc_data[self._proc_elements.index("mb1s")]
+ mb2a = proc_data[self._proc_elements.index("mb2a")]
+ mb2s = proc_data[self._proc_elements.index("mb2s")]
+
+ if (
+ mb1a <= address < (mb1a + mb1s) or
+ mb2a <= address < (mb2a + mb2s)
+ ):
+ self._selected_proc = proc_id
+ break
diff --git a/bin/salis.py b/bin/salis.py
new file mode 100755
index 0000000..6dd82b0
--- /dev/null
+++ b/bin/salis.py
@@ -0,0 +1,346 @@
+#!/usr/bin/env python3
+
+""" SALIS: Viewer/controller for the SALIS simulator.
+
+File: salis.py
+Author: Paul Oliver
+Email: paul.t.oliver.design@gmail.com
+
+Main handler for the Salis simulator. The Salis class takes care of
+initializing, running and shutting down the simulator and other sub-modules. It
+also takes care of parsing the command-line arguments and linking to the Salis
+library with the help of ctypes.
+
+To execute this script, make sure to have python3 installed and in your path,
+as well as the cython package. Also, make sure it has correct execute
+permissions (chmod).
+"""
+
+import os
+import re
+import sys
+import time
+import traceback
+from argparse import ArgumentParser, HelpFormatter
+from ctypes import CDLL, c_bool, c_uint8, c_uint32, c_char_p, POINTER
+from handler import Handler
+from printer import Printer
+
+
+__version__ = "2.0"
+
+
+class Salis:
+ def __init__(self):
+ """ Salis constructor. Arguments are passed through the command line
+ and parsed with the 'argparse' module. Library is loaded with 'CDLL'
+ and C headers are parsed to detect function argument and return types.
+ """
+ self._path = self._get_path()
+ self._args = self._parse_args()
+ self._log = self._open_log_file()
+ self._save_file_path = self._get_save_file_path()
+ self._common_pipe = self._get_common_pipe()
+ self._lib = self._parse_lib()
+ self._printer = Printer(self)
+ self._handler = Handler(self)
+ self._state = "paused"
+ self._autosave = "---"
+ self._exit = False
+
+ # Based on CLI arguments, initialize a new Salis simulation or load
+ # existing one from file.
+ if self._args.action == "new":
+ self._lib.sal_main_init(
+ self._args.order, self._common_pipe.encode("utf-8")
+ )
+ elif self._args.action == "load":
+ self._lib.sal_main_load(
+ self._save_file_path.encode("utf-8"),
+ self._common_pipe.encode("utf-8")
+ )
+
+ def __del__(self):
+ """ Salis destructor.
+ """
+ # In case an error occurred early during initialization, checks whether
+ # Salis has been initialized correctly before attempting to shut it
+ # down.
+ if hasattr(self, "_lib") and hasattr(self._lib, "sal_main_quit"):
+ if self._lib.sal_main_is_init():
+ self._lib.sal_main_quit()
+
+ # If simulation ended correctly, 'error.log' should be empty. Delete
+ # file it exists and its empty.
+ if (
+ hasattr(self, "_log") and
+ os.path.isfile(self._log) and
+ os.stat(self._log).st_size == 0
+ ):
+ os.remove(self._log)
+
+ def run(self):
+ """ Runs main simulation loop. Curses may be placed on non-blocking
+ mode, which allows simulation to run freely while still listening to
+ user input.
+ """
+ while not self._exit:
+ self._printer.print_page()
+ self._handler.process_cmd(self._printer.get_cmd())
+
+ # If in non-blocking mode, re-print data once every 15
+ # milliseconds.
+ if self._state == "running":
+ end = time.time() + 0.015
+
+ while time.time() < end:
+ self._lib.sal_main_cycle()
+ self.check_autosave()
+
+ def toggle_state(self):
+ """ Toggle between 'paused' and 'running' states. On 'running' curses
+ gets placed in non-blocking mode.
+ """
+ if self._state == "paused":
+ self._state = "running"
+ self._printer.set_nodelay(True)
+ else:
+ self._state = "paused"
+ self._printer.set_nodelay(False)
+
+ def rename(self, new_name):
+ """ Give the simulation a new name.
+ """
+ self._args.file = new_name
+ self._save_file_path = self._get_save_file_path()
+
+ def set_autosave(self, interval):
+ """ Set the simulation's auto-save interval. When set to zero, auto
+ saving is disabled,
+ """
+ if not interval:
+ self._autosave = "---"
+ else:
+ self._autosave = interval
+
+ def check_autosave(self):
+ """ Save simulation to './sims/auto/*' whenever the autosave interval
+ is reached. We use the following naming convention for auto-saved files:
+
+ >>> ./sims/auto/<file-name>.<sim-epoch>.<sim-cycle>.auto
+ """
+ if self._autosave != "---":
+ if not self._lib.sal_main_get_cycle() % self._autosave:
+ auto_path = os.path.join(self._path, "sims/auto", ".".join([
+ self._args.file,
+ "{:08x}".format(self._lib.sal_main_get_epoch()),
+ "{:08x}".format(self._lib.sal_main_get_cycle()),
+ "auto"
+ ]))
+ self._lib.sal_main_save(auto_path.encode("utf-8"))
+
+ def exit(self):
+ """ Signal we want to exit the simulator.
+ """
+ self._exit = True
+
+ @property
+ def path(self):
+ return self._path
+
+ @property
+ def save_file_path(self):
+ return self._save_file_path
+
+ @property
+ def common_pipe(self):
+ return self._common_pipe
+
+ @property
+ def args(self):
+ return self._args
+
+ @property
+ def lib(self):
+ return self._lib
+
+ @property
+ def printer(self):
+ return self._printer
+
+ @property
+ def handler(self):
+ return self._handler
+
+ @property
+ def state(self):
+ return self._state
+
+ @property
+ def autosave(self):
+ return self._autosave
+
+ def _get_path(self):
+ """ Retrieve the absolute path of this script. We need to do this in
+ order to detect the './lib', './sims' and './genomes' subdirectories.
+ """
+ return os.path.dirname(__file__)
+
+ def _get_save_file_path(self):
+ """ Retrieve the absolute path of the file to which we will save this
+ simulation when we exit Salis.
+ """
+ return os.path.join(self._path, "sims", self._args.file)
+
+ def _get_common_pipe(self):
+ """ Get absolute path of the common pipe. This FIFO object may be used
+ by concurrent Salis simulations to share data between themselves.
+ """
+ return os.path.join(self._path, "common/pipe")
+
+ def _parse_args(self):
+ """ Parse command-line arguments with the 'argparse' module. To learn
+ more about each command, invoke the simulator in one of the following
+ ways:
+
+ (venv) $ python tsalis.py --help
+ (venv) $ python tsalis.py new --help
+ (venv) $ python tsalis.py load --help
+
+ """
+ # Custom formatter helps keep all help data aligned.
+ formatter = lambda prog: HelpFormatter(prog, max_help_position=30)
+
+ # Initialize the main parser with our custom formatter.
+ parser = ArgumentParser(
+ description="Viewer/controller for the Salis simulator.",
+ formatter_class=formatter
+ )
+ parser.add_argument(
+ "-v", "--version", action="version",
+ version="Salis: A-Life Simulator (" + __version__ + ")"
+ )
+
+ # Initialize the 'new/load' action subparsers.
+ subparsers = parser.add_subparsers(
+ dest="action", help="Possible actions..."
+ )
+ subparsers.required = True
+
+ # Set up subparser for the create 'new' action.
+ new_parser = subparsers.add_parser("new", formatter_class=formatter)
+ new_parser.add_argument(
+ "-o", "--order", required=True, type=lambda x: int(x, 0),
+ metavar="[1-31]", help="Create new simulation of given ORDER"
+ )
+ new_parser.add_argument(
+ "-f", "--file", required=True, type=str, metavar="FILE",
+ help="Name of FILE to save simulation to on exit"
+ )
+
+ # Set up subparser for the 'load' existing action.
+ load_parser = subparsers.add_parser("load", formatter_class=formatter)
+ load_parser.add_argument(
+ "-f", "--file", required=True, type=str, metavar="FILE",
+ help="Load previously saved simulation from FILE"
+ )
+
+ # Finally, parse all arguments.
+ args = parser.parse_args()
+
+ # Revise that parsed CL arguments are valid.
+ if args.action == "new":
+ if args.order not in range(1, 32):
+ parser.error("Order must be an integer between 1 and 31")
+ else:
+ savefile = os.path.join(self._path, "sims", args.file)
+
+ # No save-file with given name has been detected.
+ if not os.path.isfile(savefile):
+ parser.error(
+ "Save file provided '{}' does not exist".format(savefile)
+ )
+
+ return args
+
+ def _open_log_file(self):
+ """ Create a log file to store errors on. It will get deleted if no
+ errors are detected.
+ """
+ log_file = os.path.join(self._path, "error.log")
+ sys.stderr = open(log_file, "w")
+ return log_file
+
+ def _parse_lib(self):
+ """ Dynamically parse the Salis library C header files. We do this in
+ order to more easily set the correct input/output types of all loaded
+ functions. C functions to be parsed must be declared in a '.h' file
+ located on the '../include' directory, using the following syntax:
+
+ SALIS_API restype func_name(arg1_type arg1, arg2_type arg2);
+
+ Note to developers: the 'SALIS_API' keyword should *NOT* be used
+ anywhere else in the header files (not even in comments)!
+ """
+ lib = CDLL(os.path.join(self._path, "lib/libsalis.so"))
+ include_dir = os.path.join(self._path, "../include")
+ c_includes = [
+ os.path.join(include_dir, f)
+ for f in os.listdir(include_dir)
+ # Only parse '.h' header files.
+ if os.path.isfile(os.path.join(include_dir, f)) and f[-2:] == ".h"
+ ]
+ funcs_to_set = []
+
+ for include in c_includes:
+ with open(include, "r") as f:
+ text = f.read()
+
+ # Regexp to detect C functions to parse. This is a *very lazy*
+ # parser. So, if you want to expand/tweak Salis, be careful when
+ # declaring new functions!
+ funcs = re.findall(r"SALIS_API([\s\S]+?);", text, re.MULTILINE)
+
+ for func in funcs:
+ func = func.replace("\n", "")
+ func = func.replace("\t", "")
+ func = func.strip()
+ restype = func.split()[0]
+ name = func.split()[1].split("(")[0]
+ args = [
+ arg.split()[0]
+ for arg in func.split("(")[1].split(")")[0].split(",")
+ ]
+ funcs_to_set.append({
+ "name": name,
+ "restype": restype,
+ "args": args
+ })
+
+ # All Salis typedefs must be included here, associated to their CTYPES
+ # equivalents.
+ type_convert = {
+ "void": None,
+ "boolean": c_bool,
+ "uint8": c_uint8,
+ "uint8_p": POINTER(c_uint8),
+ "uint32": c_uint32,
+ "uint32_p": POINTER(c_uint32),
+ "string": c_char_p,
+ "Process": None,
+ }
+
+ # Finally, set correct arguments and return types of all Salis
+ # functions.
+ for func in funcs_to_set:
+ func["restype"] = type_convert[func["restype"]]
+ func["args"] = [type_convert[arg] for arg in func["args"]]
+ getattr(lib, func["name"]).restype = func["restype"]
+ getattr(lib, func["name"]).argtype = func["args"]
+
+ return lib
+
+if __name__ == "__main__":
+ """ Entry point...
+ """
+ Salis().run()
diff --git a/bin/sims/.keep b/bin/sims/.keep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/bin/sims/.keep
diff --git a/bin/sims/auto/.keep b/bin/sims/auto/.keep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/bin/sims/auto/.keep
diff --git a/bin/world.py b/bin/world.py
new file mode 100644
index 0000000..60fd427
--- /dev/null
+++ b/bin/world.py
@@ -0,0 +1,277 @@
+""" SALIS: Viewer/controller for the SALIS simulator.
+
+File: world.py
+Author: Paul Oliver
+Email: paul.t.oliver.design@gmail.com
+
+This module should be considered an extension of the 'printer' module. It takes
+care of getting a pre-redered image from Salis and post-processing it in order
+to print it into the curses screen. It also keeps track of user cntrollable
+rendering parameters (position and zoom).
+"""
+
+import curses
+from ctypes import c_uint8, cast, POINTER
+
+
+class World:
+ PADDING = 25
+
+ def __init__(self, printer, sim):
+ """ World constructor. We link to the printer and main simulation
+ classes. We also setup the colors for rendering the world.
+ """
+ self._printer = printer
+ self._sim = sim
+ self._pos = 0
+ self._zoom = 1
+ self._set_world_colors()
+
+ def render(self):
+ """ Function for rendering the world. We get a pre-rendered buffer from
+ Salis' memory module (its way faster to pre-render in C) and use that
+ to assemble the world image in Python.
+ """
+ # Window is so narrow that world is not visible.
+ if self._printer.size[1] <= self.PADDING:
+ return
+
+ # Get pre-rendered image from Salis' memory module.
+ line_width = self._printer.size[1] - self.PADDING
+ print_area = self._printer.size[0] * line_width
+ c_buffer = (c_uint8 * print_area)()
+ self._sim.lib.sal_mem_render_image(
+ self._pos, self._zoom, print_area, cast(c_buffer, POINTER(c_uint8))
+ )
+
+ # Get data elements of selected process, if it's running, and store
+ # them into a convenient dict object.
+ if self._sim.lib.sal_proc_is_free(self._printer.selected_proc):
+ sel_data = None
+ else:
+ out_data = self._printer.selected_proc_data
+ out_elem = self._printer.proc_elements
+ sel_data = {
+ "ip": out_data[out_elem.index("ip")],
+ "sp": out_data[out_elem.index("sp")],
+ "mb1a": out_data[out_elem.index("mb1a")],
+ "mb1s": out_data[out_elem.index("mb1s")],
+ "mb2a": out_data[out_elem.index("mb2a")],
+ "mb2s": out_data[out_elem.index("mb2s")],
+ }
+
+ # Iterate all cells on printable area and print the post-rendered
+ # cells. Rendered cells contain info about bit flags and instructions
+ # currently written into memory.
+ bidx = 0
+
+ for y in range(self._printer.size[0]):
+ for x in range(line_width):
+ xpad = x + self.PADDING
+ addr = self._pos + (self._zoom * bidx)
+ symb, attr = self._render_cell(c_buffer[bidx], addr, sel_data)
+
+ # Curses raises an exception when printing on the edge of the
+ # screen; we can just ignore it.
+ try:
+ self._printer.screen.addch(y, xpad, symb, attr)
+ except curses.error:
+ pass
+
+ bidx += 1
+
+ def zoom_out(self):
+ """ Zoom out by a factor of 2 (zoom *= 2).
+ """
+ if self._is_world_editable():
+ self._zoom = min(self._zoom * 2, self._get_max_zoom())
+
+ def zoom_in(self):
+ """ Zoom in by a factor of 2 (zoom //= 2).
+ """
+ if self._is_world_editable():
+ self._zoom = max(self._zoom // 2, 1)
+
+ def zoom_reset(self):
+ """ Reset zoom to a valid value on certain events (i.e. during terminal
+ resizing).
+ """
+ self._zoom = min(self._zoom, self._get_max_zoom())
+
+ def pan_left(self):
+ """ Pan world to the left (pos -= zoom).
+ """
+ if self._is_world_editable():
+ self._pos = max(self._pos - self._zoom, 0)
+
+ def pan_right(self):
+ """ Pan world to the right (pos += zoom).
+ """
+ if self._is_world_editable():
+ max_pos = self._sim.lib.sal_mem_get_size() - 1
+ self._pos = min(self._pos + self._zoom, max_pos)
+
+ def pan_down(self):
+ """ Pan world downward (pos += zoom * columns).
+ """
+ if self._is_world_editable():
+ self._pos = max(self._pos - self._get_line_area(), 0)
+
+ def pan_up(self):
+ """ Pan world upward (pos -= zoom * columns).
+ """
+ if self._is_world_editable():
+ max_pos = self._sim.lib.sal_mem_get_size() - 1
+ self._pos = min(self._pos + self._get_line_area(), max_pos)
+
+ def pan_reset(self):
+ """ Set world position to zero.
+ """
+ if self._is_world_editable():
+ self._pos = 0
+
+ def scroll_to(self, pos):
+ """ Move world pos to a specified position.
+ """
+ if self._is_world_editable():
+ if self._sim.lib.sal_mem_is_address_valid(pos):
+ self._pos = pos
+ else:
+ raise RuntimeError("Error: scrolling to an invalid address")
+
+ @property
+ def pos(self):
+ return self._pos
+
+ @property
+ def zoom(self):
+ return self._zoom
+
+ @property
+ def pair_sel_mb2(self):
+ return self._pair_sel_mb2
+
+ @property
+ def pair_sel_mb1(self):
+ return self._pair_sel_mb1
+
+ @property
+ def pair_sel_sp(self):
+ return self._pair_sel_sp
+
+ @property
+ def pair_sel_ip(self):
+ return self._pair_sel_ip
+
+ def _set_world_colors(self):
+ """ Define color pairs for rendering the world. Each color has a
+ special meaning, referring to the selected process IP, SP and memory
+ blocks, or to bit flags currently set on rendered cells.
+ """
+ self._pair_free = self._printer.get_color_pair(
+ curses.COLOR_BLUE
+ )
+ self._pair_alloc = self._printer.get_color_pair(
+ curses.COLOR_BLACK, curses.COLOR_BLUE
+ )
+ self._pair_mbstart = self._printer.get_color_pair(
+ curses.COLOR_BLACK, curses.COLOR_CYAN
+ )
+ self._pair_ip = self._printer.get_color_pair(
+ curses.COLOR_BLACK, curses.COLOR_WHITE
+ )
+ self._pair_sel_mb2 = self._printer.get_color_pair(
+ curses.COLOR_BLACK, curses.COLOR_GREEN
+ )
+ self._pair_sel_mb1 = self._printer.get_color_pair(
+ curses.COLOR_BLACK, curses.COLOR_YELLOW
+ )
+ self._pair_sel_sp = self._printer.get_color_pair(
+ curses.COLOR_BLACK, curses.COLOR_MAGENTA
+ )
+ self._pair_sel_ip = self._printer.get_color_pair(
+ curses.COLOR_BLACK, curses.COLOR_RED
+ )
+
+ def _render_cell(self, byte, addr, sel_data=None):
+ """ Render a single cell on the WORLD view. All cells are rendered by
+ interpreting the values coming in from the buffer. We overlay special
+ colors for representing the selected organism's state, on top of the
+ more common colors used to represent memory state.
+ """
+ # Paint black all cells that are out of memory bounds.
+ if not self._sim.lib.sal_mem_is_address_valid(addr):
+ return " ", curses.A_NORMAL
+
+ # Check if cell contains part of the currently selected process.
+ if sel_data:
+ top_addr = addr + self._zoom
+ top_mb1a = sel_data["mb1a"] + sel_data["mb1s"]
+ top_mb2a = sel_data["mb2a"] + sel_data["mb2s"]
+
+ if addr <= sel_data["ip"] < top_addr:
+ pair = self._pair_sel_ip
+ elif addr <= sel_data["sp"] < top_addr:
+ pair = self._pair_sel_sp
+ elif top_addr > sel_data["mb1a"] and top_mb1a > addr:
+ pair = self._pair_sel_mb1
+ elif top_addr > sel_data["mb2a"] and top_mb2a > addr:
+ pair = self._pair_sel_mb2
+
+ # No pair has been selected yet; select pair based on bit-flags.
+ if not "pair" in locals():
+ if byte >= 0x80:
+ pair = self._pair_ip
+ elif byte >= 0x40:
+ pair = self._pair_mbstart
+ elif byte >= 0x20:
+ pair = self._pair_alloc
+ else:
+ pair = self._pair_free
+
+ # Select symbol to represent instructions currently on cell.
+ inst = byte % 32
+
+ if self._zoom == 1:
+ symb = self._printer.inst_list[inst][1]
+ elif inst > 16:
+ symb = ":"
+ else:
+ symb = "."
+
+ # Return tuple containing our post-redered cell.
+ return symb, curses.color_pair(pair)
+
+ def _get_max_zoom(self):
+ """ Calculate maximum needed zoom so that the entire world fits on the
+ terminal window.
+ """
+ max_zoom = 1
+ line_size = self._printer.size[1] - self.PADDING
+ coverage = self._printer.size[0] * line_size
+
+ # We fix a maximum zoom level; otherwise, program may halt on extreme
+ # zoom levels.
+ while (
+ (coverage * max_zoom) < self._sim.lib.sal_mem_get_size() and
+ max_zoom < 2 ** 16
+ ):
+ max_zoom *= 2
+
+ return max_zoom
+
+ def _is_world_editable(self):
+ """ For this to return True, printer's current page must be WORLD page.
+ Additionally, the WORLD panel must be visible on the terminal window
+ (i.e. curses.COLS > data_margin).
+ """
+ correct_page = self._printer.current_page == "WORLD"
+ correct_size = self._printer.size[1] > self.PADDING
+ return correct_page and correct_size
+
+ def _get_line_area(self):
+ """ Return amount of bytes contained in a printed WORLD line.
+ """
+ line_size = self._printer.size[1] - self.PADDING
+ line_area = self._zoom * line_size
+ return line_area