diff options
Diffstat (limited to 'bin')
| -rw-r--r-- | bin/common/.keep | 0 | ||||
| -rw-r--r-- | bin/genomes/.keep | 0 | ||||
| -rw-r--r-- | bin/genomes/86.anc | 1 | ||||
| -rw-r--r-- | bin/handler.py | 403 | ||||
| -rw-r--r-- | bin/lib/.keep | 0 | ||||
| -rw-r--r-- | bin/printer.py | 833 | ||||
| -rwxr-xr-x | bin/salis.py | 346 | ||||
| -rw-r--r-- | bin/sims/.keep | 0 | ||||
| -rw-r--r-- | bin/sims/auto/.keep | 0 | ||||
| -rw-r--r-- | bin/world.py | 277 | 
10 files changed, 1860 insertions, 0 deletions
| diff --git a/bin/common/.keep b/bin/common/.keep new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/bin/common/.keep diff --git a/bin/genomes/.keep b/bin/genomes/.keep new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/bin/genomes/.keep diff --git a/bin/genomes/86.anc b/bin/genomes/86.anc new file mode 100644 index 0000000..30e76f3 --- /dev/null +++ b/bin/genomes/86.anc @@ -0,0 +1 @@ +:::[a...]b...^b^b^b-bba::.!d#d#b?d).:.{bc).::a:.:}bc:..LadWcd^a^cvb?b(.::$~b~d(..:a::: diff --git a/bin/handler.py b/bin/handler.py new file mode 100644 index 0000000..ad3b14e --- /dev/null +++ b/bin/handler.py @@ -0,0 +1,403 @@ +""" SALIS: Viewer/controller for the SALIS simulator. + +file: handler.py +Author: Paul Oliver +Email: paul.t.oliver.design@gmail.com + +This module should be considered the 'controller' part of the Salis simulator. +It receives and parses all user input via keyboard and console commands. It +also takes care of genome compilation (via genome files located on the +'genomes' directory). + +An user may open the Salis console by pressing the 'c' key while in a running +session. A nice quirk is the possibility to run python commands from within the +Salis console. As an example, to get the memory size, an user could type: + +>>> exec output = self._sim.lib.sal_mem_get_size() + +Note that 'output' denotes a storage variable that will get printed on the +console response. This ability gives an user a whole lot of power, and should +be used with care. +""" + +import os +import curses + + +class Handler: +	ESCAPE_KEY = 27 + +	def __init__(self, sim): +		""" Handler constructor. Simply link this class to the main simulation +		class and printer class and create symbol dictionary. +		""" +		self._sim = sim +		self._printer = sim.printer +		self._inst_dict = self._get_inst_dict() +		self._console_history = [] + +	def process_cmd(self, cmd): +		""" Process incoming commands from curses. Commands are received via +		ncurses' getch() function, thus, they must be transformed into their +		character representations with 'ord()'. +		""" +		if cmd == self.ESCAPE_KEY: +			self._sim.lib.sal_main_save( +				self._sim.save_file_path.encode("utf-8") +			) +			self._sim.exit() +		elif cmd == ord(" "): +			self._sim.toggle_state() +		elif cmd == curses.KEY_LEFT: +			self._printer.flip_page(-1) +		elif cmd == curses.KEY_RIGHT: +			self._printer.flip_page(1) +		elif cmd == curses.KEY_DOWN: +			self._printer.scroll_main(-1) +		elif cmd == curses.KEY_UP: +			self._printer.scroll_main(1) +		elif cmd == curses.KEY_RESIZE: +			self._printer.on_resize() +		elif cmd == ord("X"): +			self._printer.toggle_hex() +		elif cmd == ord("x"): +			self._printer.world.zoom_out() +		elif cmd == ord("z"): +			self._printer.world.zoom_in() +		elif cmd == ord("a"): +			self._printer.world.pan_left() +			self._printer.proc_scroll_left() +		elif cmd == ord("d"): +			self._printer.world.pan_right() +			self._printer.proc_scroll_right() +		elif cmd == ord("s"): +			self._printer.world.pan_down() +			self._printer.proc_scroll_down() +		elif cmd == ord("w"): +			self._printer.world.pan_up() +			self._printer.proc_scroll_up() +		elif cmd == ord("S"): +			self._printer.world.pan_reset() +			self._printer.proc_scroll_vertical_reset() +		elif cmd == ord("A"): +			self._printer.world.pan_reset() +			self._printer.proc_scroll_horizontal_reset() +		elif cmd == ord("o"): +			self._printer.proc_select_prev() +		elif cmd == ord("p"): +			self._printer.proc_select_next() +		elif cmd == ord("f"): +			self._printer.proc_select_first() +		elif cmd == ord("l"): +			self._printer.proc_select_last() +		elif cmd == ord("k"): +			self._printer.proc_scroll_to_selected() +		elif cmd == ord("g"): +			self._printer.proc_toggle_gene_view() +		elif cmd == ord("\n"): +			self._printer.run_cursor() +		elif cmd == ord("c"): +			self._printer.run_console() +		else: +			# Check for numeric input. Number keys [1 to 0] cycle the +			# simulation [2 ** ((n - 1) % 10] times. +			try: +				if chr(cmd).isdigit(): +					factor = int(chr(cmd)) +					factor = int(2 ** ((factor - 1) % 10)) +					self._cycle_sim(factor) +			except ValueError: +				pass + +	def handle_console(self, command_raw): +		""" Process console commands. We parse and check for input errors. Any +		python exception messages are redirected to the console-response +		window. +		""" +		if command_raw: +			command = command_raw.split() + +			try: +				# Handle both python and self-thrown exceptions. +				if command[0] in ["q", "quit"]: +					self._on_quit(command, save=True) +				elif command[0] in ["q!", "quit!"]: +					self._on_quit(command, save=False) +				elif command[0] in ["i", "input"]: +					self._on_input(command) +				elif command[0] in ["c", "compile"]: +					self._on_compile(command) +				elif command[0] in ["n", "new"]: +					self._on_new(command) +				elif command[0] in ["k", "kill"]: +					self._on_kill(command) +				elif command[0] in ["e", "exec"]: +					self._on_exec(command) +				elif command[0] in ["s", "scroll"]: +					self._on_scroll(command) +				elif command[0] in ["p", "process"]: +					self._on_proc_select(command) +				elif command[0] in ["r", "rename"]: +					self._on_rename(command) +				elif command[0] in ["save"]: +					self._on_save(command) +				elif command[0] in ["a", "auto"]: +					self._on_set_autosave(command) +				else: +					# Raise if a non-existing command has been given. +					self._raise("Invalid command: '{}'".format(command[0])) +			except BaseException as exep: +				# We parse and redirect python exceptions to the error +				# console-window. +				message = str(exep).strip() +				message = message[0].upper() + message[1:] +				self._printer.show_console_error(message) +			finally: +				# Store command on console history. +				self._console_history.append(command_raw.strip()) + +	@property +	def console_history(self): +		return self._console_history + +	def _raise(self, message): +		""" Generic exception thrower. Throws a 'RuntimeError' initialized with +		the given message. +		""" +		raise RuntimeError("ERROR: {}".format(message)) + +	def _respond(self, message): +		""" Generic console responder. Throws a 'RuntimeError' initialized with +		the given message. +		""" +		raise RuntimeError(message) + +	def _cycle_sim(self, factor): +		""" Simply cycle Salis 'factor' number of times. +		""" +		for _ in range(factor): +			self._sim.lib.sal_main_cycle() +			self._sim.check_autosave() + +	def _get_inst_dict(self): +		""" Transform the instruction list of the printer module into a +		dictionary that's more useful for genome compilation. Instruction +		symbols are keys, values are the actual byte representation. +		""" +		inst_dict = {} + +		for i, inst in enumerate(self._printer.inst_list): +			inst_dict[inst[1]] = i + +		return inst_dict + +	def _on_quit(self, command, save): +		""" Exit simulation. We can choose whether to save the simulation into a +		save file or not. +		""" +		if len(command) > 1: +			self._raise("Invalid parameters for '{}'".format(command[0])) + +		if save: +			self._sim.lib.sal_main_save( +				self._sim.save_file_path.encode("utf-8") +			) + +		self._sim.exit() + +	def _write_genome(self, genome, address_list): +		""" Write genome stream into a given list of memory addresses. All +		addresses must be valid or an exception is thrown. +		""" +		# All addresses we will write to must be valid. +		for base_addr in address_list: +			address = int(base_addr, 0) + +			for _ in range(len(genome)): +				if not self._sim.lib.sal_mem_is_address_valid(address): +					self._raise("Address '{}' is invalid".format(address)) + +				address += 1 + +		# All looks well! Let's compile the genome into memory. +		for base_addr in address_list: +			address = int(base_addr, 0) + +			for symbol in genome: +				self._sim.lib.sal_mem_set_inst( +					address, self._inst_dict[symbol] +				) +				address += 1 + +	def _on_input(self, command): +		""" Compile organism from user typed input. Compilation can only occur +		on valid memory addresses. An exception will be thrown when trying to +		write into non-valid address or when input stream is invalid. +		""" +		if len(command) < 3: +			self._raise("Invalid parameters for '{}'".format(command[0])) + +		# All characters in file must be actual instruction symbols. +		for character in command[1]: +			if character not in self._inst_dict: +				self._raise("Invalid symbol '{}' found on stream".format( +					character +				)) + +		# All looks well, Let's write the genome into memory. +		self._write_genome(command[1], command[2:]) + +	def _on_compile(self, command): +		""" Compile organism from source genome file. Genomes must be placed on +		the './genomes' directory. Compilation can only occur on valid memory +		addresses. An exception will be thrown when trying to write into +		non-valid address or when genome file is invalid. +		""" +		if len(command) < 3: +			self._raise("Invalid parameters for '{}'".format(command[0])) + +		# Open genome file for compilation. +		gen_file = os.path.join(self._sim.path, "genomes", command[1]) + +		with open(gen_file, "r") as f: +			genome = f.read().strip() + +		# Entire genome must be written on a single line. +		if "\n" in genome: +			self._raise("Newline detected on '{}'".format(gen_file)) + +		# All characters in file must be actual instruction symbols. +		for character in genome: +			if character not in self._inst_dict: +				self._raise("Invalid symbol '{}' found on '{}'".format( +					character, gen_file +				)) + +		# All looks well, Let's write the genome into memory. +		self._write_genome(genome, command[2:]) + +	def _on_new(self, command): +		""" Instantiate new organism of given size on given address. These +		memory areas must be free and valid or an exception is thrown. +		""" +		if len(command) < 3: +			self._raise("Invalid parameters for '{}'".format(command[0])) + +		# Check that all addresses we will allocate are free and valid. +		for base_addr in command[2:]: +			address = int(base_addr, 0) + +			for _ in range(int(command[1])): +				if not self._sim.lib.sal_mem_is_address_valid(address): +					self._raise("Address '{}' is invalid".format(address)) +				elif self._sim.lib.sal_mem_is_allocated(address): +					self._raise("Address '{}' is allocated".format(address)) + +				address += 1 + +		# All looks well! Let's instantiate our new organism. +		for base_addr in command[2:]: +			address = int(base_addr, 0) +			size = int(command[1], 0) +			self._sim.lib.sal_proc_create(address, size) + +	def _on_kill(self, command): +		""" Kill organism on bottom of reaper queue. +		""" +		if len(command) > 1: +			self._raise("Invalid parameters for '{}'".format(command[0])) + +		# Call proc kill function only if there's any organisms to kill. +		if not self._sim.lib.sal_proc_get_count(): +			self._raise("No organisms currently alive") +		else: +			self._sim.lib.sal_proc_kill() + +	def _on_exec(self, command): +		""" Allow a user to execute a python command from within the console. +		Using this is very hack-ish, and not recommended unless you're certain +		of what you're doing! +		""" +		if len(command) < 2: +			self._raise("'{}' must be followed by an executable string".format( +				command[0]) +			) + +		# User may query any simulation variable or status and the console will +		# respond. For example, to query memory size or order, type one of the +		# following: +		# +		#     >>> exec output = self._sim.lib.sal_mem_get_size() +		#     >>> exec output = self._sim.lib.sal_mem_get_order() +		# +		output = {} +		exec(" ".join(command[1:]), locals(), output) + +		if output: +			self._respond("EXEC RESPONDS: {}".format(str(output))) + +	def _on_scroll(self, command): +		""" We can scroll to a specific process (on PROCESS view) or to a +		specific world address (on WORLD view) via the console. +		""" +		if len(command) != 2: +			self._raise("Invalid parameters for '{}'".format(command[0])) + +		target = int(command[1], 0) + +		# If on PROCESS page, scroll to given process. +		if self._printer.current_page == "PROCESS": +			if target < self._sim.lib.sal_proc_get_capacity(): +				self._printer.proc_scroll_to(target) +			else: +				self._raise("No process with ID '{}' found".format(target)) +		elif self._printer.current_page == "WORLD": +			if self._sim.lib.sal_mem_is_address_valid(target): +				self._printer.world.scroll_to(target) +			else: +				self._raise("Address '{}' is invalid".format(address)) +		else: +			self._raise("'{}' must be called on PROCESS or WORLD page".format( +				command[0]) +			) + +	def _on_proc_select(self, command): +		""" Select a specific process (on PROCESS or WORLD page). +		""" +		if len(command) != 2: +			self._raise("Invalid parameters for '{}'".format(command[0])) + +		target = int(command[1], 0) + +		# If on PROCESS page, scroll to given process. +		if target < self._sim.lib.sal_proc_get_capacity(): +			self._printer.proc_select_by_id(target) +		else: +			self._raise("No process with ID '{}' found".format(target)) + +	def _on_rename(self, command): +		""" Set a new simulation name. Future auto-saved files will use this +		name as prefix. +		""" +		if len(command) != 2: +			self._raise("Invalid parameters for '{}'".format(command[0])) + +		self._sim.rename(command[1]) + +	def _on_save(self, command): +		""" Save simulation on its current state. +		""" +		if len(command) != 1: +			self._raise("Invalid parameters for '{}'".format(command[0])) + +		self._sim.lib.sal_main_save(self._sim.save_file_path.encode("utf-8")) + +	def _on_set_autosave(self, command): +		""" Set the simulation's auto save interval. Provide any integer +		between 0 and (2**32 - 1). If zero is provided, auto saving will be +		disabled. +		""" +		if len(command) != 2: +			self._raise("Invalid parameters for '{}'".format(command[0])) + +		self._sim.set_autosave(int(command[1], 0)) diff --git a/bin/lib/.keep b/bin/lib/.keep new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/bin/lib/.keep diff --git a/bin/printer.py b/bin/printer.py new file mode 100644 index 0000000..135e220 --- /dev/null +++ b/bin/printer.py @@ -0,0 +1,833 @@ +""" SALIS: Viewer/controller for the SALIS simulator. + +File: printer.py +Author: Paul Oliver +Email: paul.t.oliver.design@gmail.com + +This module should be considered the 'view' part of the Salis simulator. It +takes care of displaying the simulator's state in a nicely formatted, intuitive +format. It makes use of the curses library for terminal handling. +""" + +import curses +import curses.textpad +import os +import time +from collections import OrderedDict +from ctypes import c_uint8, c_uint32, cast, POINTER +from handler import Handler +from world import World + + +class Printer: +	def __init__(self, sim): +		""" Printer constructor. It takes care of starting up curses, defining +		the data pages and setting the printer on its initial state. +		""" +		self._sim = sim +		self._color_pair_count = 0 +		self._screen = self._get_screen() +		self._inst_list = self._get_inst_list() +		self._proc_elements = self._get_proc_elements() +		self._main = self._get_main() +		self._pages = self._get_pages() +		self._size = self._screen.getmaxyx() +		self._current_page = "MEMORY" +		self._main_scroll = 0 +		self._selected_proc = 0 +		self._selected_proc_data = (c_uint32 * len(self._proc_elements))() +		self._proc_list_scroll = 0 +		self._proc_element_scroll = 0 +		self._proc_gene_scroll = 0 +		self._proc_gene_view = False +		self._curs_y = 0 +		self._curs_x = 0 +		self._print_hex = False +		self._world = World(self, self._sim) + +	def __del__(self): +		""" Printer destructor exits curses. +		""" +		curses.endwin() + +	def get_color_pair(self, fg, bg=-1): +		""" We use this method to set new color pairs, keeping track of the +		number of pairs already set. We return the new color pair ID. +		""" +		self._color_pair_count += 1 +		curses.init_pair(self._color_pair_count, fg, bg) +		return self._color_pair_count + +	def get_cmd(self): +		""" This returns the pressed key from the curses handler. It's called +		during the simulation's main loop. Flushing input is important when in +		non-blocking mode. +		""" +		ch = self._screen.getch() +		curses.flushinp() +		return ch + +	def set_nodelay(self, nodelay): +		""" Toggles between blocking and non-blocking mode on curses. +		""" +		self._screen.nodelay(nodelay) + +	def toggle_hex(self): +		""" Toggle between decimal or hexadecimal printing of all simulation +		state elements. +		""" +		self._print_hex = not self._print_hex + +	def on_resize(self): +		""" Called whenever the terminal window gets resized. +		""" +		self._size = self._screen.getmaxyx() +		self.scroll_main() +		self._world.zoom_reset() + +	def flip_page(self, offset): +		""" Change data page by given offset (i.e. '1' for next page or '-1' +		for previous one). +		""" +		pidx = list(self._pages.keys()).index(self._current_page) +		pidx = (pidx + offset) % len(self._pages) +		self._current_page = list(self._pages.keys())[pidx] +		self.scroll_main() + +	def scroll_main(self, offset=0): +		""" Scrolling is allowed whenever the current page does not fit inside +		the terminal window. This method gets called, with no offset, under +		certain situations, like changing pages, just to make sure the screen +		gets cleared and at least some of the data is always scrolled into +		view. +		""" +		self._screen.clear() +		len_main = len(self._main) +		len_page = len(self._pages[self._current_page]) +		max_scroll = (len_main + len_page + 5) - self._size[0] +		self._main_scroll += offset +		self._main_scroll = max(0, min(self._main_scroll, max_scroll)) + +	def proc_scroll_left(self): +		""" Scroll process data elements or genomes (on PROCESS view) to the +		left. +		""" +		if self._current_page == "PROCESS": +			if self._proc_gene_view: +				self._proc_gene_scroll -= 1 +				self._proc_gene_scroll = max(0, self._proc_gene_scroll) +			else: +				self._proc_element_scroll -= 1 +				self._proc_element_scroll = max(0, self._proc_element_scroll) + +	def proc_scroll_right(self): +		""" Scroll process data elements or genomes (on PROCESS view) to the +		right. +		""" +		if self._current_page == "PROCESS": +			if self._proc_gene_view: +				self._proc_gene_scroll += 1 +			else: +				self._proc_element_scroll += 1 +				max_scroll = len(self._proc_elements) - 1 +				self._proc_element_scroll = min( +					max_scroll, self._proc_element_scroll +				) + +	def proc_scroll_down(self): +		""" Scroll process data table (on PROCESS view) up. +		""" +		if self._current_page == "PROCESS": +			self._proc_list_scroll = max(0, self._proc_list_scroll - 1) + +	def proc_scroll_up(self): +		""" Scroll process data table (on PROCESS view) down. +		""" +		if self._current_page == "PROCESS": +			self._proc_list_scroll = min( +				self._sim.lib.sal_proc_get_capacity() - 1, +				self._proc_list_scroll + 1 +			) + +	def proc_scroll_to(self, proc_id): +		""" Scroll process data table (on PROCESS view) to a specific position. +		""" +		if self._current_page == "PROCESS": +			if proc_id < self._sim.lib.sal_proc_get_capacity(): +				self._proc_list_scroll = proc_id +			else: +				raise RuntimeError("Error: scrolling to invalid process") + +	def proc_scroll_vertical_reset(self): +		""" Scroll process data table (on PROCESS view) back to top. +		""" +		if self._current_page == "PROCESS": +			self._proc_list_scroll = 0 + +	def proc_scroll_horizontal_reset(self): +		""" Scroll process data or genome table (on PROCESS view) back to the +		left. +		""" +		if self._current_page == "PROCESS": +			if self._proc_gene_view: +				self._proc_gene_scroll = 0 +			else: +				self._proc_element_scroll = 0 + +	def proc_select_prev(self): +		""" Select previous process. +		""" +		if self._current_page in ["PROCESS", "WORLD"]: +			self._selected_proc -= 1 +			self._selected_proc %= self._sim.lib.sal_proc_get_capacity() + +	def proc_select_next(self): +		""" Select next process. +		""" +		if self._current_page in ["PROCESS", "WORLD"]: +			self._selected_proc += 1 +			self._selected_proc %= self._sim.lib.sal_proc_get_capacity() + +	def proc_select_first(self): +		""" Select first process on reaper queue. +		""" +		if self._current_page in ["PROCESS", "WORLD"]: +			if self._sim.lib.sal_proc_get_count(): +				self._selected_proc = self._sim.lib.sal_proc_get_first() + +	def proc_select_last(self): +		""" Select last process on reaper queue. +		""" +		if self._current_page in ["PROCESS", "WORLD"]: +			if self._sim.lib.sal_proc_get_count(): +				self._selected_proc = self._sim.lib.sal_proc_get_last() + +	def proc_select_by_id(self, proc_id): +		""" Select process from given ID. +		""" +		if proc_id < self._sim.lib.sal_proc_get_capacity(): +			self._selected_proc = proc_id +		else: +			raise RuntimeError("Error: attempting to select non-existing proc") + +	def proc_scroll_to_selected(self): +		""" Scroll WORLD or PROCESS page so that selected process becomes +		visible. +		""" +		if self._current_page == "PROCESS": +			self._proc_list_scroll = self._selected_proc +		elif self._current_page == "WORLD": +			if not self._sim.lib.sal_proc_is_free(self._selected_proc): +				index = self._proc_elements.index("mb1a") +				address = self._selected_proc_data[index] +				self._world.scroll_to(address) + +	def proc_toggle_gene_view(self): +		""" Toggle between data element or genome view on PROCESS page. +		""" +		if self._current_page == "PROCESS": +			self._proc_gene_view = not self._proc_gene_view + +	def run_cursor(self): +		""" We can toggle a visible cursor on WORLD view to aid us in selecting +		processes. +		""" +		if self._current_page == "WORLD" and self._size[1] > World.PADDING: +			curses.curs_set(True) + +			while True: +				self._curs_y = max(0, min(self._curs_y, self._size[0] - 1)) +				self._curs_x = max(World.PADDING, min( +					self._curs_x, self._size[1] - 1 +				)) +				self._screen.move(self._curs_y, self._curs_x) +				cmd = self._screen.getch() + +				if cmd in [ord("c"), curses.KEY_RESIZE, Handler.ESCAPE_KEY]: +					self.on_resize() +					break +				elif cmd == curses.KEY_LEFT: +					self._curs_x -= 1 +				elif cmd == curses.KEY_RIGHT: +					self._curs_x += 1 +				elif cmd == curses.KEY_DOWN: +					self._curs_y += 1 +				elif cmd == curses.KEY_UP: +					self._curs_y -= 1 +				elif cmd == ord("\n"): +					self._proc_select_by_cursor() +					break + +			curses.curs_set(False) + +	def run_console(self): +		""" Run the Salis console. You can use the console to control all main +		aspects of the simulation, like compiling genomes into memory, creating +		or killing organisms, setting auto-save interval, among other stuff. +		""" +		# Print a pythonic prompt. +		self._print_line(self._size[0] - 1, ">>> ", scroll=False) +		self._screen.refresh() + +		# Create the console child window. We turn it into a Textbox object in +		# order to allow line-editing and extract output easily. +		console = curses.newwin(1, self._size[1] - 5, self._size[0] - 1, 5) +		textbox = curses.textpad.Textbox(console, insert_mode=True) +		textbox.stripspaces = True + +		# Grab a copy of the console history and instantiate a pointer to the +		# last element. +		history = self._sim.handler.console_history + [""] +		pointer = len(history) - 1 + +		# Nested method reinserts recorded commands from history into console. +		def access_history(cmd): +			nonlocal pointer + +			if pointer == len(history) - 1: +				history[-1] = console.instr().strip() + +			if cmd == "up" and pointer != 0: +				pointer -= 1 +			elif cmd == "down" and pointer < len(history) - 1: +				pointer += 1 + +			console.clear() +			console.addstr(0, 0, history[pointer]) +			console.refresh() + +		# Declare custom validator to control special commands. +		def validator(cmd): +			EXIT = 7 + +			if cmd in [curses.KEY_RESIZE, Handler.ESCAPE_KEY]: +				console.clear() +				return EXIT +			elif cmd == curses.KEY_UP: +				access_history("up") +			elif cmd == curses.KEY_DOWN: +				access_history("down") +			else: +				return cmd + +		# Run the Textbox object with our custom validator. +		curses.curs_set(True) +		output = textbox.edit(validator) +		curses.curs_set(False) + +		# Finally, extract data from console and send to handler. +		self._sim.handler.handle_console(output) +		self._screen.clear() + +	def show_console_error(self, message): +		""" Shows Salis console error messages, if any. These messages might +		contain actual python exception output. +		""" +		self._print_line(self._size[0] - 1, ">>>", curses.color_pair( +			self._pair_error +		) | curses.A_BOLD) +		self._screen.refresh() + +		# We also use a Textbox object, just so that execution gets halted +		# until a key gets pressed (even on non-blocking mode). +		console = curses.newwin(1, self._size[1] - 5, self._size[0] - 1, 5) +		textbox = curses.textpad.Textbox(console) + +		# Curses may raise an exception if printing on the edge of the screen; +		# we can just ignore it. +		try: +			console.addstr(0, 0, message, curses.color_pair( +				self._pair_error +			) | curses.A_BOLD) +		except curses.error: +			pass + +		# Custom validator simply exits on any key. +		def validator(cmd): +			EXIT = 7 +			return EXIT + +		textbox.edit(validator) +		self._screen.clear() + +	def print_page(self): +		""" Print current page to screen. We use the previously generated +		'_pages' dictionary to easily associate a label to a Salis function. +		""" +		# Update selected proc data if in WORLD view. +		if self._current_page == "WORLD": +			self._sim.lib.sal_proc_get_proc_data(self._selected_proc, cast( +				self._selected_proc_data, POINTER(c_uint32) +			)) + +		# Print MAIN simulation data. +		self._print_line( +			1, "SALIS[{}]".format(self._sim.args.file), curses.color_pair( +				self._pair_header +			) | curses.A_BOLD +		) +		self._print_widget(2, self._main) + +		# Print data of currently selected page. +		main_lines = len(self._main) + 3 +		self._print_header(main_lines, self._current_page) +		self._print_widget(main_lines + 1, self._pages[self._current_page]) + +		# Print special widgets (WORLD view and PROCESS list). +		if self._current_page == "WORLD": +			self._world.render() +		elif self._current_page == "PROCESS": +			self._print_proc_list() + +	@property +	def screen(self): +		return self._screen + +	@property +	def inst_list(self): +		return self._inst_list + +	@property +	def proc_elements(self): +		return self._proc_elements + +	@property +	def size(self): +		return self._size + +	@property +	def current_page(self): +		return self._current_page + +	@property +	def selected_proc(self): +		return self._selected_proc + +	@property +	def selected_proc_data(self): +		return self._selected_proc_data + +	@property +	def proc_list_scroll(self): +		return self._proc_list_scroll + +	@property +	def world(self): +		return self._world + +	def _set_colors(self): +		""" Define the color pairs for the data printer. +		""" +		curses.start_color() +		curses.use_default_colors() +		self._pair_header = self.get_color_pair(curses.COLOR_BLUE) +		self._pair_selected = self.get_color_pair(curses.COLOR_YELLOW) +		self._pair_error = self.get_color_pair(curses.COLOR_RED) + +	def _get_screen(self): +		""" Prepare and return the main curses window. We also set a shorter +		delay when responding to a pressed escape key. +		""" +		# Set a shorter delay to the ESCAPE key, so that we may use it to exit +		# Salis. +		os.environ.setdefault("ESCDELAY", "25") + +		# Prepare curses screen. +		screen = curses.initscr() +		curses.noecho() +		curses.cbreak() +		screen.keypad(True) +		curses.curs_set(False) + +		# We need color support in order to run the printer module. +		if curses.has_colors(): +			self._set_colors() +		else: +			raise RuntimeError("Error: no color support.") + +		return screen + +	def _get_inst_list(self): +		""" Parse instruction set from C header file named 'instset.h'. We're +		using the keyword 'SALIS_INST' to identify an instruction definition, +		so be careful not to use this keyword anywhere else on the headers. +		""" +		inst_list = [] +		inst_file = os.path.join(self._sim.path, "../include/instset.h") + +		with open(inst_file, "r") as f: +			lines = f.read().splitlines() + +		for line in lines: +			if line and line.split()[0] == "SALIS_INST": +				inst_name = line.split()[1][:4] +				inst_symb = line.split()[3] +				inst_list.append((inst_name, inst_symb)) + +		return inst_list + +	def _get_proc_elements(self): +		""" Parse process structure member variables from C header file named +		'process.h'. We're using the keyword 'SALIS_PROC_ELEMENT' to identify +		element declarations, so be careful not to use this keyword anywhere +		else on the headers. +		""" +		proc_elem_list = [] +		proc_elem_file = os.path.join(self._sim.path, "../include/process.h") + +		with open(proc_elem_file, "r") as f: +			lines = f.read().splitlines() + +		for line in lines: +			if line and line.split()[0] == "SALIS_PROC_ELEMENT": +				proc_elem_name = line.split()[2].split(";")[0] + +				if proc_elem_name == "stack[8]": +					# The stack is a special member variable, an array. We +					# translate it by returning a list of stack identifiers. +					proc_elem_list += ["stack[{}]".format(i) for i in range(8)] +				else: +					# We can assume all other struct elements are single +					# variables. +					proc_elem_list.append(proc_elem_name) + +		return proc_elem_list + +	def _get_main(self): +		""" Generate main set of data fields to be printed. We associate, on a +		list object, a label to each Salis function to be called. The following +		elements get printed on all pages. +		""" +		return [ +			("e", "cycle", self._sim.lib.sal_main_get_cycle), +			("e", "epoch", self._sim.lib.sal_main_get_epoch), +			("e", "state", lambda: self._sim.state), +			("e", "autosave", lambda: self._sim.autosave), +		] + +	def _get_pages(self): +		""" Generate data fields to be printed on each page. We associate, on a +		list object, a label to each Salis function to be called. Each list +		represents a PAGE. We initialize all pages inside an ordered dictionary +		object. +		""" +		# The following widgets help up print special sets of data elements. +		# The use of nested lambdas is needed to receive updated values. +		# Instruction counter widget: +		inst_widget = [("e", inst[0], (lambda j: ( +			lambda: self._sim.lib.sal_mem_get_inst_count(j) +		))(i)) for i, inst in enumerate(self._inst_list)] + +		# Evolver module state widget: +		state_widget = [("e", "state[{}]".format(i), (lambda j: ( +			lambda: self._sim.lib.sal_evo_get_state(j) +		))(i)) for i in range(4)] + +		# Selected process state widget: +		selected_widget = [("p", element, (lambda j: ( +			lambda: self._selected_proc_data[j] +		))(i)) for i, element in enumerate(self._proc_elements)] + +		# With the help of the widgets above, we can declare the PAGES +		# dictionary object. +		return OrderedDict([ +			("MEMORY", [ +				("e", "order", self._sim.lib.sal_mem_get_order), +				("e", "size", self._sim.lib.sal_mem_get_size), +				("e", "blocks", self._sim.lib.sal_mem_get_block_start_count), +				("e", "allocated", self._sim.lib.sal_mem_get_allocated_count), +				("e", "ips", self._sim.lib.sal_mem_get_ip_count), +				("s", ""), +				("h", "INSTRUCTIONS"), +			] + inst_widget), +			("EVOLVER", [ +				("e", "last", self._sim.lib.sal_evo_get_last_changed_address), +				("e", "calls", self._sim.lib.sal_evo_get_calls_on_last_cycle), +			] + state_widget), +			("PROCESS", [ +				("e", "count", self._sim.lib.sal_proc_get_count), +				("e", "capacity", self._sim.lib.sal_proc_get_capacity), +				("e", "first", self._sim.lib.sal_proc_get_first), +				("e", "last", self._sim.lib.sal_proc_get_last), +				("e", "exec", +					self._sim.lib.sal_proc_get_instructions_executed +				), +			]), +			("WORLD", [ +				("e", "position", lambda: self._world.pos), +				("e", "zoom", lambda: self._world.zoom), +				("e", "selected", lambda: self._selected_proc), +				("s", ""), +				("h", "SELECTED PROC"), +			] + selected_widget), +		]) + +	def _print_line(self, ypos, line, attrs=curses.A_NORMAL, scroll=True): +		""" Print a single line on screen only when it's visible. +		""" +		if scroll: +			ypos -= self._main_scroll + +		if 0 <= ypos < self._size[0]: +			# Curses raises an exception each time we print on the screen's +			# edge. We can just catch and ignore it. +			try: +				line = line[:self._size[1] - 1] +				self._screen.addstr(ypos, 1, line, attrs) +			except curses.error: +				pass + +	def _print_header(self, ypos, line): +		""" Print a bold header. +		""" +		header_attr = curses.A_BOLD | curses.color_pair(self._pair_header) +		self._print_line(ypos, line, header_attr) + +	def _print_value(self, ypos, element, value, attr=curses.A_NORMAL): +		""" Print a label:value pair. +		""" +		if type(value) == int: +			if value == ((2 ** 32) - 1): +				# In Salis, UINT32_MAX is used to represent NULL. We print NULL +				# as three dashes. +				value = "---" +			elif self._print_hex: +				value = hex(value) + +		line = "{:<10} : {:>10}".format(element, value) +		self._print_line(ypos, line, attr) + +	def _print_proc_element(self, ypos, element, value): +		""" Print elements of currently selected process. We highlight in +		YELLOW if the selected process is running. +		""" +		if self._sim.lib.sal_proc_is_free(self._selected_proc): +			attr = curses.A_NORMAL +		else: +			attr = curses.color_pair(self._pair_selected) + +		self._print_value(ypos, element, value, attr) + +	def _print_widget(self, ypos, widget): +		""" Print a widget (data PAGE) on screen. +		""" +		for i, element in enumerate(widget): +			if element[0] == "s": +				continue +			elif element[0] == "h": +				self._print_header(i + ypos, element[1]) +			elif element[0] == "e": +				self._print_value(i + ypos, element[1], element[2]()) +			elif element[0] == "p": +				self._print_proc_element(i + ypos, element[1], element[2]()) + +	def _clear_line(self, ypos): +		""" Clear the specified line. +		""" +		if 0 <= ypos < self._size[0]: +			self._screen.move(ypos, 0) +			self._screen.clrtoeol() + +	def _print_proc_data_list(self): +		""" Print list of process data elements in PROCESS page. We can toggle +		between printing the data elements or the genomes by pressing the 'g' +		key. +		""" +		# First, print the table header, by extracting element names from the +		# previously generated proc element list. +		ypos = len(self._main) + len(self._pages["PROCESS"]) + 5 +		header = " | ".join(["{:<10}".format("pidx")] + [ +			"{:>10}".format(element) +			for element in self._proc_elements[self._proc_element_scroll:] +		]) +		self._clear_line(ypos) +		self._print_header(ypos, header) +		ypos += 1 +		proc_id = self._proc_list_scroll + +		# Print all proc elements in decimal or hexadecimal format, depending +		# on hex-flag being set. +		if self._print_hex: +			data_format = lambda x: hex(x) +		else: +			data_format = lambda x: x + +		# Lastly, iterate all lines and print as much process data as it fits. +		# We can scroll the process data table using the 'wasd' keys. +		while ypos < self._size[0]: +			self._clear_line(ypos) + +			if proc_id < self._sim.lib.sal_proc_get_capacity(): +				if proc_id == self._selected_proc: +					# Always highlight the selected process. +					attr = curses.color_pair(self._pair_selected) +				else: +					attr = curses.A_NORMAL + +				# Retrieve a copy of the selected process state and store it in +				# a list object. +				proc_data = (c_uint32 * len(self._proc_elements))() +				self._sim.lib.sal_proc_get_proc_data(proc_id, cast( +					proc_data, POINTER(c_uint32)) +				) + +				# Lastly, assemble and print the next table row. +				row = " | ".join(["{:<10}".format(proc_id)] + [ +					"{:>10}".format(data_format(element)) +					for element in proc_data[self._proc_element_scroll:] +				]) +				self._print_line(ypos, row, attr) + +			proc_id += 1 +			ypos += 1 + +	def _print_proc_gene_block(self, ypos, gidx, xpos, mbs, mba, ip, sp, pair): +		""" Print a sub-set of a process genome. Namely, on of its two memory +		blocks. +		""" +		while gidx < mbs and xpos < curses.COLS: +			gaddr = mba + gidx + +			if gaddr == ip: +				attr = curses.color_pair(self._world.pair_sel_ip) +			elif gaddr == sp: +				attr = curses.color_pair(self._world.pair_sel_sp) +			else: +				attr = curses.color_pair(pair) + +			# Retrieve instruction from memory and transform it to correct +			# symbol. +			inst = self._sim.lib.sal_mem_get_inst(gaddr) +			symb = self._inst_list[inst][1] + +			# Curses raises an exception each time we print on the screen's +			# edge. We can just catch and ignore it. +			try: +				self._screen.addch(ypos, xpos, symb, attr) +			except curses.error: +				pass + +			gidx += 1 +			xpos += 1 + +		return xpos + +	def _print_proc_gene(self, ypos, proc_id): +		""" Print a single process genome on the genome table. We use the same +		colors to represent memory blocks, IP and SP of each process, as those +		used to represent the selected process on WORLD view. +		""" +		# There's nothing to print if process is free. +		if self._sim.lib.sal_proc_is_free(proc_id): +			return + +		# Process is alive. Retrieve a copy of the current process state and +		# store it in a list object. +		proc_data = (c_uint32 * len(self._proc_elements))() +		self._sim.lib.sal_proc_get_proc_data(proc_id, cast( +			proc_data, POINTER(c_uint32)) +		) + +		# Let's extract all data of interest. +		mb1a = proc_data[self._proc_elements.index("mb1a")] +		mb1s = proc_data[self._proc_elements.index("mb1s")] +		mb2a = proc_data[self._proc_elements.index("mb2a")] +		mb2s = proc_data[self._proc_elements.index("mb2s")] +		ip = proc_data[self._proc_elements.index("ip")] +		sp = proc_data[self._proc_elements.index("sp")] + +		# Always print MAIN memory block (mb1) first (on the left side). That +		# way we can keep most of our attention on the parent. +		xpos = self._print_proc_gene_block( +			ypos, self._proc_gene_scroll, 14, mb1s, mb1a, ip, sp, +			self._world.pair_sel_mb1 +		) + +		# Reset gene counter and print child memory block, if it exists. +		if mb1s < self._proc_gene_scroll: +			gidx = self._proc_gene_scroll - mb1s +		else: +			gidx = 0 + +		self._print_proc_gene_block( +			ypos, gidx, xpos, mb2s, mb2a, ip, sp, self._world.pair_sel_mb2 +		) + +	def _print_proc_gene_list(self): +		""" Print list of process genomes in PROCESS page. We can toggle +		between printing the genomes or the data elements by pressing the 'g' +		key. +		""" +		# First, print the table header. We print the current gene-scroll +		# position for easy reference. Return back to zero scroll with the 'A' +		# key. +		ypos = len(self._main) + len(self._pages["PROCESS"]) + 5 +		header = "{:<10} | genes {} -->".format( +			"pidx", self._proc_gene_scroll +		) +		self._clear_line(ypos) +		self._print_header(ypos, header) +		ypos += 1 +		proc_id = self._proc_list_scroll + +		# Iterate all lines and print as much genetic data as it fits. We can +		# scroll the gene data table using the 'wasd' keys. +		while ypos < self._size[0]: +			self._clear_line(ypos) + +			if proc_id < self._sim.lib.sal_proc_get_capacity(): +				if proc_id == self._selected_proc: +					# Always highlight the selected process. +					attr = curses.color_pair(self._pair_selected) +				else: +					attr = curses.A_NORMAL + +				# Assemble and print the next table row. +				row = "{:<10} |".format(proc_id) +				self._print_line(ypos, row, attr) +				self._print_proc_gene(ypos, proc_id) + +			proc_id += 1 +			ypos += 1 + +	def _print_proc_list(self): +		""" Print list of process genomes or process data elements in PROCESS +		page. We can toggle between printing the genomes or the data elements +		by pressing the 'g' key. +		""" +		if self._proc_gene_view: +			self._print_proc_gene_list() +		else: +			self._print_proc_data_list() + +	def _proc_select_by_cursor(self): +		""" Select process located on address under cursor, if any exists. +		""" +		# First, calculate address under cursor. +		ypos = self._curs_y +		xpos = self._curs_x - World.PADDING +		line_size = self._size[1] - World.PADDING +		address = self._world.pos + ( +			((ypos * line_size) + xpos) * self._world.zoom +		) + +		# Now, iterate all living processes and try to find one that owns the +		# calculated address. +		if self._sim.lib.sal_mem_is_address_valid(address): +			for proc_id in range(self._sim.lib.sal_proc_get_count()): +				if not self._sim.lib.sal_proc_is_free(proc_id): +					proc_data = (c_uint32 * len(self._proc_elements))() +					self._sim.lib.sal_proc_get_proc_data(proc_id, cast( +						proc_data, POINTER(c_uint32)) +					) +					mb1a = proc_data[self._proc_elements.index("mb1a")] +					mb1s = proc_data[self._proc_elements.index("mb1s")] +					mb2a = proc_data[self._proc_elements.index("mb2a")] +					mb2s = proc_data[self._proc_elements.index("mb2s")] + +					if ( +						mb1a <= address < (mb1a + mb1s) or +						mb2a <= address < (mb2a + mb2s) +					): +						self._selected_proc = proc_id +						break diff --git a/bin/salis.py b/bin/salis.py new file mode 100755 index 0000000..6dd82b0 --- /dev/null +++ b/bin/salis.py @@ -0,0 +1,346 @@ +#!/usr/bin/env python3 + +""" SALIS: Viewer/controller for the SALIS simulator. + +File: salis.py +Author: Paul Oliver +Email: paul.t.oliver.design@gmail.com + +Main handler for the Salis simulator. The Salis class takes care of +initializing, running and shutting down the simulator and other sub-modules. It +also takes care of parsing the command-line arguments and linking to the Salis +library with the help of ctypes. + +To execute this script, make sure to have python3 installed and in your path, +as well as the cython package. Also, make sure it has correct execute +permissions (chmod). +""" + +import os +import re +import sys +import time +import traceback +from argparse import ArgumentParser, HelpFormatter +from ctypes import CDLL, c_bool, c_uint8, c_uint32, c_char_p, POINTER +from handler import Handler +from printer import Printer + + +__version__ = "2.0" + + +class Salis: +	def __init__(self): +		""" Salis constructor. Arguments are passed through the command line +		and parsed with the 'argparse' module. Library is loaded with 'CDLL' +		and C headers are parsed to detect function argument and return types. +		""" +		self._path = self._get_path() +		self._args = self._parse_args() +		self._log = self._open_log_file() +		self._save_file_path = self._get_save_file_path() +		self._common_pipe = self._get_common_pipe() +		self._lib = self._parse_lib() +		self._printer = Printer(self) +		self._handler = Handler(self) +		self._state = "paused" +		self._autosave = "---" +		self._exit = False + +		# Based on CLI arguments, initialize a new Salis simulation or load +		# existing one from file. +		if self._args.action == "new": +			self._lib.sal_main_init( +				self._args.order, self._common_pipe.encode("utf-8") +			) +		elif self._args.action == "load": +			self._lib.sal_main_load( +				self._save_file_path.encode("utf-8"), +				self._common_pipe.encode("utf-8") +			) + +	def __del__(self): +		""" Salis destructor. +		""" +		# In case an error occurred early during initialization, checks whether +		# Salis has been initialized correctly before attempting to shut it +		# down. +		if hasattr(self, "_lib") and hasattr(self._lib, "sal_main_quit"): +			if self._lib.sal_main_is_init(): +				self._lib.sal_main_quit() + +		# If simulation ended correctly, 'error.log' should be empty. Delete +		# file it exists and its empty. +		if ( +			hasattr(self, "_log") and +			os.path.isfile(self._log) and +			os.stat(self._log).st_size == 0 +		): +			os.remove(self._log) + +	def run(self): +		""" Runs main simulation loop. Curses may be placed on non-blocking +		mode, which allows simulation to run freely while still listening to +		user input. +		""" +		while not self._exit: +			self._printer.print_page() +			self._handler.process_cmd(self._printer.get_cmd()) + +			# If in non-blocking mode, re-print data once every 15 +			# milliseconds. +			if self._state == "running": +				end = time.time() + 0.015 + +				while time.time() < end: +					self._lib.sal_main_cycle() +					self.check_autosave() + +	def toggle_state(self): +		""" Toggle between 'paused' and 'running' states. On 'running' curses +		gets placed in non-blocking mode. +		""" +		if self._state == "paused": +			self._state = "running" +			self._printer.set_nodelay(True) +		else: +			self._state = "paused" +			self._printer.set_nodelay(False) + +	def rename(self, new_name): +		""" Give the simulation a new name. +		""" +		self._args.file = new_name +		self._save_file_path = self._get_save_file_path() + +	def set_autosave(self, interval): +		""" Set the simulation's auto-save interval. When set to zero, auto +		saving is disabled, +		""" +		if not interval: +			self._autosave = "---" +		else: +			self._autosave = interval + +	def check_autosave(self): +		""" Save simulation to './sims/auto/*' whenever the autosave interval +		is reached. We use the following naming convention for auto-saved files: + +		>>> ./sims/auto/<file-name>.<sim-epoch>.<sim-cycle>.auto +		""" +		if self._autosave != "---": +			if not self._lib.sal_main_get_cycle() % self._autosave: +				auto_path = os.path.join(self._path, "sims/auto", ".".join([ +					self._args.file, +					"{:08x}".format(self._lib.sal_main_get_epoch()), +					"{:08x}".format(self._lib.sal_main_get_cycle()), +					"auto" +				])) +				self._lib.sal_main_save(auto_path.encode("utf-8")) + +	def exit(self): +		""" Signal we want to exit the simulator. +		""" +		self._exit = True + +	@property +	def path(self): +		return self._path + +	@property +	def save_file_path(self): +		return self._save_file_path + +	@property +	def common_pipe(self): +		return self._common_pipe + +	@property +	def args(self): +		return self._args + +	@property +	def lib(self): +		return self._lib + +	@property +	def printer(self): +		return self._printer + +	@property +	def handler(self): +		return self._handler + +	@property +	def state(self): +		return self._state + +	@property +	def autosave(self): +		return self._autosave + +	def _get_path(self): +		""" Retrieve the absolute path of this script. We need to do this in +		order to detect the './lib', './sims' and './genomes' subdirectories. +		""" +		return os.path.dirname(__file__) + +	def _get_save_file_path(self): +		""" Retrieve the absolute path of the file to which we will save this +		simulation when we exit Salis. +		""" +		return os.path.join(self._path, "sims", self._args.file) + +	def _get_common_pipe(self): +		""" Get absolute path of the common pipe. This FIFO object may be used +		by concurrent Salis simulations to share data between themselves. +		""" +		return os.path.join(self._path, "common/pipe") + +	def _parse_args(self): +		""" Parse command-line arguments with the 'argparse' module. To learn +		more about each command, invoke the simulator in one of the following +		ways: + +			(venv) $ python tsalis.py --help +			(venv) $ python tsalis.py new --help +			(venv) $ python tsalis.py load --help + +		""" +		# Custom formatter helps keep all help data aligned. +		formatter = lambda prog: HelpFormatter(prog, max_help_position=30) + +		# Initialize the main parser with our custom formatter. +		parser = ArgumentParser( +			description="Viewer/controller for the Salis simulator.", +			formatter_class=formatter +		) +		parser.add_argument( +			"-v", "--version", action="version", +			version="Salis: A-Life Simulator (" + __version__ + ")" +		) + +		# Initialize the 'new/load' action subparsers. +		subparsers = parser.add_subparsers( +			dest="action", help="Possible actions..." +		) +		subparsers.required = True + +		# Set up subparser for the create 'new' action. +		new_parser = subparsers.add_parser("new", formatter_class=formatter) +		new_parser.add_argument( +			"-o", "--order", required=True, type=lambda x: int(x, 0), +			metavar="[1-31]", help="Create new simulation of given ORDER" +		) +		new_parser.add_argument( +			"-f", "--file", required=True, type=str, metavar="FILE", +			help="Name of FILE to save simulation to on exit" +		) + +		# Set up subparser for the 'load' existing action. +		load_parser = subparsers.add_parser("load", formatter_class=formatter) +		load_parser.add_argument( +			"-f", "--file", required=True, type=str, metavar="FILE", +			help="Load previously saved simulation from FILE" +		) + +		# Finally, parse all arguments. +		args = parser.parse_args() + +		# Revise that parsed CL arguments are valid. +		if args.action == "new": +			if args.order not in range(1, 32): +				parser.error("Order must be an integer between 1 and 31") +		else: +			savefile = os.path.join(self._path, "sims", args.file) + +			# No save-file with given name has been detected. +			if not os.path.isfile(savefile): +				parser.error( +					"Save file provided '{}' does not exist".format(savefile) +				) + +		return args + +	def _open_log_file(self): +		""" Create a log file to store errors on. It will get deleted if no +		errors are detected. +		""" +		log_file = os.path.join(self._path, "error.log") +		sys.stderr = open(log_file, "w") +		return log_file + +	def _parse_lib(self): +		""" Dynamically parse the Salis library C header files. We do this in +		order to more easily set the correct input/output types of all loaded +		functions. C functions to be parsed must be declared in a '.h' file +		located on the '../include' directory, using the following syntax: + +			SALIS_API restype func_name(arg1_type arg1, arg2_type arg2); + +		Note to developers: the 'SALIS_API' keyword should *NOT* be used +		anywhere else in the header files (not even in comments)! +		""" +		lib = CDLL(os.path.join(self._path, "lib/libsalis.so")) +		include_dir = os.path.join(self._path, "../include") +		c_includes = [ +			os.path.join(include_dir, f) +			for f in os.listdir(include_dir) +			# Only parse '.h' header files. +			if os.path.isfile(os.path.join(include_dir, f)) and f[-2:] == ".h" +		] +		funcs_to_set = [] + +		for include in c_includes: +			with open(include, "r") as f: +				text = f.read() + +			# Regexp to detect C functions to parse. This is a *very lazy* +			# parser. So, if you want to expand/tweak Salis, be careful when +			# declaring new functions! +			funcs = re.findall(r"SALIS_API([\s\S]+?);", text, re.MULTILINE) + +			for func in funcs: +				func = func.replace("\n", "") +				func = func.replace("\t", "") +				func = func.strip() +				restype = func.split()[0] +				name = func.split()[1].split("(")[0] +				args = [ +					arg.split()[0] +					for arg in func.split("(")[1].split(")")[0].split(",") +				] +				funcs_to_set.append({ +					"name": name, +					"restype": restype, +					"args": args +				}) + +		# All Salis typedefs must be included here, associated to their CTYPES +		# equivalents. +		type_convert = { +			"void": None, +			"boolean": c_bool, +			"uint8": c_uint8, +			"uint8_p": POINTER(c_uint8), +			"uint32": c_uint32, +			"uint32_p": POINTER(c_uint32), +			"string": c_char_p, +			"Process": None, +		} + +		# Finally, set correct arguments and return types of all Salis +		# functions. +		for func in funcs_to_set: +			func["restype"] = type_convert[func["restype"]] +			func["args"] = [type_convert[arg] for arg in func["args"]] +			getattr(lib, func["name"]).restype = func["restype"] +			getattr(lib, func["name"]).argtype = func["args"] + +		return lib + +if __name__ == "__main__": +	""" Entry point... +	""" +	Salis().run() diff --git a/bin/sims/.keep b/bin/sims/.keep new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/bin/sims/.keep diff --git a/bin/sims/auto/.keep b/bin/sims/auto/.keep new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/bin/sims/auto/.keep diff --git a/bin/world.py b/bin/world.py new file mode 100644 index 0000000..60fd427 --- /dev/null +++ b/bin/world.py @@ -0,0 +1,277 @@ +""" SALIS: Viewer/controller for the SALIS simulator. + +File: world.py +Author: Paul Oliver +Email: paul.t.oliver.design@gmail.com + +This module should be considered an extension of the 'printer' module. It takes +care of getting a pre-redered image from Salis and post-processing it in order +to print it into the curses screen. It also keeps track of user cntrollable +rendering parameters (position and zoom). +""" + +import curses +from ctypes import c_uint8, cast, POINTER + + +class World: +	PADDING = 25 + +	def __init__(self, printer, sim): +		""" World constructor. We link to the printer and main simulation +		classes. We also setup the colors for rendering the world. +		""" +		self._printer = printer +		self._sim = sim +		self._pos = 0 +		self._zoom = 1 +		self._set_world_colors() + +	def render(self): +		""" Function for rendering the world. We get a pre-rendered buffer from +		Salis' memory module (its way faster to pre-render in C) and use that +		to assemble the world image in Python. +		""" +		# Window is so narrow that world is not visible. +		if self._printer.size[1] <= self.PADDING: +			return + +		# Get pre-rendered image from Salis' memory module. +		line_width = self._printer.size[1] - self.PADDING +		print_area = self._printer.size[0] * line_width +		c_buffer = (c_uint8 * print_area)() +		self._sim.lib.sal_mem_render_image( +			self._pos, self._zoom, print_area, cast(c_buffer, POINTER(c_uint8)) +		) + +		# Get data elements of selected process, if it's running, and store +		# them into a convenient dict object. +		if self._sim.lib.sal_proc_is_free(self._printer.selected_proc): +			sel_data = None +		else: +			out_data = self._printer.selected_proc_data +			out_elem = self._printer.proc_elements +			sel_data = { +				"ip": out_data[out_elem.index("ip")], +				"sp": out_data[out_elem.index("sp")], +				"mb1a": out_data[out_elem.index("mb1a")], +				"mb1s": out_data[out_elem.index("mb1s")], +				"mb2a": out_data[out_elem.index("mb2a")], +				"mb2s": out_data[out_elem.index("mb2s")], +			} + +		# Iterate all cells on printable area and print the post-rendered +		# cells. Rendered cells contain info about bit flags and instructions +		# currently written into memory. +		bidx = 0 + +		for y in range(self._printer.size[0]): +			for x in range(line_width): +				xpad = x + self.PADDING +				addr = self._pos + (self._zoom * bidx) +				symb, attr = self._render_cell(c_buffer[bidx], addr, sel_data) + +				# Curses raises an exception when printing on the edge of the +				# screen; we can just ignore it. +				try: +					self._printer.screen.addch(y, xpad, symb, attr) +				except curses.error: +					pass + +				bidx += 1 + +	def zoom_out(self): +		""" Zoom out by a factor of 2 (zoom *= 2). +		""" +		if self._is_world_editable(): +			self._zoom = min(self._zoom * 2, self._get_max_zoom()) + +	def zoom_in(self): +		""" Zoom in by a factor of 2 (zoom //= 2). +		""" +		if self._is_world_editable(): +			self._zoom = max(self._zoom // 2, 1) + +	def zoom_reset(self): +		""" Reset zoom to a valid value on certain events (i.e. during terminal +		resizing). +		""" +		self._zoom = min(self._zoom, self._get_max_zoom()) + +	def pan_left(self): +		""" Pan world to the left (pos -= zoom). +		""" +		if self._is_world_editable(): +			self._pos = max(self._pos - self._zoom, 0) + +	def pan_right(self): +		""" Pan world to the right (pos += zoom). +		""" +		if self._is_world_editable(): +			max_pos = self._sim.lib.sal_mem_get_size() - 1 +			self._pos = min(self._pos + self._zoom, max_pos) + +	def pan_down(self): +		""" Pan world downward (pos += zoom * columns). +		""" +		if self._is_world_editable(): +			self._pos = max(self._pos - self._get_line_area(), 0) + +	def pan_up(self): +		""" Pan world upward (pos -= zoom * columns). +		""" +		if self._is_world_editable(): +			max_pos = self._sim.lib.sal_mem_get_size() - 1 +			self._pos = min(self._pos + self._get_line_area(), max_pos) + +	def pan_reset(self): +		""" Set world position to zero. +		""" +		if self._is_world_editable(): +			self._pos = 0 + +	def scroll_to(self, pos): +		""" Move world pos to a specified position. +		""" +		if self._is_world_editable(): +			if self._sim.lib.sal_mem_is_address_valid(pos): +				self._pos = pos +			else: +				raise RuntimeError("Error: scrolling to an invalid address") + +	@property +	def pos(self): +		return self._pos + +	@property +	def zoom(self): +		return self._zoom + +	@property +	def pair_sel_mb2(self): +		return self._pair_sel_mb2 + +	@property +	def pair_sel_mb1(self): +		return self._pair_sel_mb1 + +	@property +	def pair_sel_sp(self): +		return self._pair_sel_sp + +	@property +	def pair_sel_ip(self): +		return self._pair_sel_ip + +	def _set_world_colors(self): +		""" Define color pairs for rendering the world. Each color has a +		special meaning, referring to the selected process IP, SP and memory +		blocks, or to bit flags currently set on rendered cells. +		""" +		self._pair_free = self._printer.get_color_pair( +			curses.COLOR_BLUE +		) +		self._pair_alloc = self._printer.get_color_pair( +			curses.COLOR_BLACK, curses.COLOR_BLUE +		) +		self._pair_mbstart = self._printer.get_color_pair( +			curses.COLOR_BLACK, curses.COLOR_CYAN +		) +		self._pair_ip = self._printer.get_color_pair( +			curses.COLOR_BLACK, curses.COLOR_WHITE +		) +		self._pair_sel_mb2 = self._printer.get_color_pair( +			curses.COLOR_BLACK, curses.COLOR_GREEN +		) +		self._pair_sel_mb1 = self._printer.get_color_pair( +			curses.COLOR_BLACK, curses.COLOR_YELLOW +		) +		self._pair_sel_sp = self._printer.get_color_pair( +			 curses.COLOR_BLACK, curses.COLOR_MAGENTA +		) +		self._pair_sel_ip = self._printer.get_color_pair( +			 curses.COLOR_BLACK, curses.COLOR_RED +		) + +	def _render_cell(self, byte, addr, sel_data=None): +		""" Render a single cell on the WORLD view. All cells are rendered by +		interpreting the values coming in from the buffer. We overlay special +		colors for representing the selected organism's state, on top of the +		more common colors used to represent memory state. +		""" +		# Paint black all cells that are out of memory bounds. +		if not self._sim.lib.sal_mem_is_address_valid(addr): +			return " ", curses.A_NORMAL + +		# Check if cell contains part of the currently selected process. +		if sel_data: +			top_addr = addr + self._zoom +			top_mb1a = sel_data["mb1a"] + sel_data["mb1s"] +			top_mb2a = sel_data["mb2a"] + sel_data["mb2s"] + +			if addr <= sel_data["ip"] < top_addr: +				pair = self._pair_sel_ip +			elif addr <= sel_data["sp"] < top_addr: +				pair = self._pair_sel_sp +			elif top_addr > sel_data["mb1a"] and top_mb1a > addr: +				pair = self._pair_sel_mb1 +			elif top_addr > sel_data["mb2a"] and top_mb2a > addr: +				pair = self._pair_sel_mb2 + +		# No pair has been selected yet; select pair based on bit-flags. +		if not "pair" in locals(): +			if byte >= 0x80: +				pair = self._pair_ip +			elif byte >= 0x40: +				pair = self._pair_mbstart +			elif byte >= 0x20: +				pair = self._pair_alloc +			else: +				pair = self._pair_free + +		# Select symbol to represent instructions currently on cell. +		inst = byte % 32 + +		if self._zoom == 1: +			symb = self._printer.inst_list[inst][1] +		elif inst > 16: +			symb = ":" +		else: +			symb = "." + +		# Return tuple containing our post-redered cell. +		return symb, curses.color_pair(pair) + +	def _get_max_zoom(self): +		""" Calculate maximum needed zoom so that the entire world fits on the +		terminal window. +		""" +		max_zoom = 1 +		line_size = self._printer.size[1] - self.PADDING +		coverage = self._printer.size[0] * line_size + +		# We fix a maximum zoom level; otherwise, program may halt on extreme +		# zoom levels. +		while ( +			(coverage * max_zoom) < self._sim.lib.sal_mem_get_size() and +			max_zoom < 2 ** 16 +		): +			max_zoom *= 2 + +		return max_zoom + +	def _is_world_editable(self): +		""" For this to return True, printer's current page must be WORLD page. +		Additionally, the WORLD panel must be visible on the terminal window +		(i.e. curses.COLS > data_margin). +		""" +		correct_page = self._printer.current_page == "WORLD" +		correct_size = self._printer.size[1] > self.PADDING +		return correct_page and correct_size + +	def _get_line_area(self): +		""" Return amount of bytes contained in a printed WORLD line. +		""" +		line_size = self._printer.size[1] - self.PADDING +		line_area = self._zoom * line_size +		return line_area | 
