From 0e7b2e49585e3725736ae380cbd2dfb28fea5093 Mon Sep 17 00:00:00 2001 From: Paul Oliver Date: Thu, 29 Feb 2024 02:29:14 +0100 Subject: Added 'Common.py' module. [#27] Not exactly a 'tappable pipe', this simple communications module allows for IPC between individual Salis organisms and different simulations via UDP sockets. --- bin/modules/common.py | 174 +++++++++++++++++++++++++++++++++++++++++++++++++ bin/modules/handler.py | 97 +++++++++++++++++++++++++-- bin/modules/printer.py | 138 +++++++++++++++++++++++++++++++++------ 3 files changed, 382 insertions(+), 27 deletions(-) create mode 100644 bin/modules/common.py (limited to 'bin/modules') diff --git a/bin/modules/common.py b/bin/modules/common.py new file mode 100644 index 0000000..25163d1 --- /dev/null +++ b/bin/modules/common.py @@ -0,0 +1,174 @@ +""" SALIS: Viewer/controller for the SALIS simulator. + +File: common.py +Author: Paul Oliver +Email: paul.t.oliver.design@gmail.com + +Network communications module for Salis simulator. This module allows for IPC +between individual Salis organisms and different simulations via UDP sockets. +""" + +import json +import os +import socket + +from ctypes import c_int, c_uint8, CFUNCTYPE + + +class Common: + SENDER_TYPE = CFUNCTYPE(c_int, c_uint8) + RECEIVER_TYPE = CFUNCTYPE(c_uint8) + + def __init__(self, sim, max_buffer_size=4096): + """ Initialize module with a default buffer size of 4 KiB. + """ + self.__sim = sim + self.__settings_path = self.__get_settings_path() + self.max_buffer_size = max_buffer_size + self.in_buffer = bytearray() + self.out_buffer = bytearray() + self.sources = [] + self.targets = [] + + # Use a global client socket for all output operations. + self.__client = self.__get_socket() + + def define_functors(self): + """ Define the C callbacks which we'll pass to the Salis simulator. + These simply push and pop instructions from the input and output + buffers whenever organisms call the SEND and RCVE instructions. + """ + def sender(inst): + if len(self.out_buffer) < self.max_buffer_size: + self.out_buffer.append(inst) + + def receiver(): + if len(self.in_buffer): + res = self.in_buffer[0] + self.in_buffer = self.in_buffer[1:] + return c_uint8(res) + else: + return c_uint8(0) + + self.__sender = self.SENDER_TYPE(sender) + self.__receiver = self.RECEIVER_TYPE(receiver) + self.__sim.lib.sal_comm_set_sender(self.__sender) + self.__sim.lib.sal_comm_set_receiver(self.__receiver) + + def add_source(self, address, port): + """ Create new input socket. + """ + sock = self.__get_server(address, port) + self.sources.append(sock) + + def add_target(self, address, port): + """ Create new output address/port tuple. We use global output socket + ('self.__client') for output operations. + """ + self.targets.append((address, port)) + + def remove_source(self, address, port): + """ Remove an input socket. + """ + source = (address, port) + self.sources = [s for s in self.sources if s.getsockname() != source] + + def remove_target(self, address, port): + """ Remove an output address/port pair. + """ + target = (address, port) + self.targets = [t for t in self.targets if t != target] + + def link_to_self(self, port): + """ Create input and output links to 'localhost'. + """ + self.add_source(socket.gethostbyname(socket.gethostname()), port) + self.add_target(socket.gethostbyname(socket.gethostname()), port) + + def cycle(self): + """ We push all data on the output buffer to all targets and clear it. + We withdraw incoming data from all source sockets and append it to the + input buffer. + """ + if len(self.out_buffer) and self.targets: + for target in self.targets: + self.__client.sendto(self.out_buffer, target) + + # Clear output buffer. + self.out_buffer = bytearray() + + # Receive data and store on input buffer. + if len(self.in_buffer) < self.max_buffer_size: + for source in self.sources: + try: + self.in_buffer += source.recv( + self.max_buffer_size - len(self.in_buffer) + ) + except socket.error: + pass + + def load_network_config(self, filename): + """ Load network configuration from a JSON file. + """ + with open(os.path.join(self.__settings_path, filename), "r") as f: + in_dict = json.load(f) + + self.max_buffer_size = in_dict["max_buffer_size"] + + for source in in_dict["sources"]: + self.add_source(*source) + + for target in in_dict["targets"]: + self.add_target(*target) + + for inst in in_dict["in_buffer"]: + self.in_buffer.append(self.__sim.handler.inst_dict[inst]) + + for inst in in_dict["out_buffer"]: + self.out_buffer.append(self.__sim.handler.inst_dict[inst]) + + def save_network_config(self, filename): + """ Save network configuration to a JSON file. + """ + out_dict = { + "max_buffer_size": self.max_buffer_size, + "in_buffer": "", + "out_buffer": "", + "sources": [s.getsockname() for s in self.sources], + "targets": self.targets, + } + + for byte in self.in_buffer: + out_dict["in_buffer"] += self.__sim.printer.inst_list[byte][1] + + for byte in self.out_buffer: + out_dict["out_buffer"] += self.__sim.printer.inst_list[byte][1] + + with open(os.path.join(self.__settings_path, filename), "w") as f: + json.dump(out_dict, f, indent="\t") + + + ############################### + # Private methods + ############################### + + def __get_settings_path(self): + """ Get path to network settings directory. + """ + self_path = os.path.dirname(__file__) + return os.path.join(self_path, "../network") + + def __get_socket(self): + """ Generate a non-blocking UDP socket. + """ + sock = socket.socket( + socket.AF_INET, socket.SOCK_DGRAM | socket.SOCK_NONBLOCK + ) + return sock + + def __get_server(self, address, port): + """ Generate a socket and bind to an address/port pair. + """ + serv_socket = self.__get_socket() + serv_socket.bind((address, port)) + return serv_socket diff --git a/bin/modules/handler.py b/bin/modules/handler.py index 3325391..ba2b249 100644 --- a/bin/modules/handler.py +++ b/bin/modules/handler.py @@ -35,13 +35,13 @@ class Handler: """ self.__sim = sim self.__printer = sim.printer - self.__inst_dict = self.__get_inst_dict() self.__min_commands = [ ord("M"), ord(" "), curses.KEY_RESIZE, self.KEY_ESCAPE, ] + self.inst_dict = self.__get_inst_dict() self.console_history = [] # Set short delay for ESCAPE key (which is used to exit the simulator). @@ -82,9 +82,11 @@ class Handler: elif cmd == ord("a"): self.__printer.world.pan_left() self.__printer.proc_scroll_left() + self.__printer.comm_scroll_left() elif cmd == ord("d"): self.__printer.world.pan_right() self.__printer.proc_scroll_right() + self.__printer.comm_scroll_right() elif cmd == ord("s"): self.__printer.world.pan_down() self.__printer.proc_scroll_down() @@ -103,6 +105,7 @@ class Handler: elif cmd == ord("A"): self.__printer.world.pan_reset() self.__printer.proc_scroll_horizontal_reset() + self.__printer.comm_scroll_horizontal_reset() elif cmd == ord("o"): self.__printer.proc_select_prev() elif cmd == ord("p"): @@ -166,6 +169,20 @@ class Handler: self.__on_save(command) elif command[0] in ["a", "auto"]: self.__on_set_autosave(command) + elif command[0] in ["l", "link"]: + self.__on_link_to_self(command) + elif command[0] in ["source"]: + self.__on_add_source(command) + elif command[0] in ["target"]: + self.__on_add_target(command) + elif command[0] in ["rem_source"]: + self.__on_remove_source(command) + elif command[0] in ["rem_target"]: + self.__on_remove_target(command) + elif command[0] in ["net_load"]: + self.__on_network_load(command) + elif command[0] in ["net_save"]: + self.__on_network_save(command) else: # Raise if a non-existing command has been given. self.__raise("Invalid command: '{}'".format(command[0])) @@ -203,8 +220,7 @@ class Handler: time_max = time.time() + self.CYCLE_TIMEOUT for _ in range(factor): - self.__sim.lib.sal_main_cycle() - self.__sim.check_autosave() + self.__sim.cycle() if time.time() > time_max: break @@ -255,7 +271,7 @@ class Handler: for symbol in genome: self.__sim.lib.sal_mem_set_inst( - address, self.__inst_dict[symbol] + address, self.inst_dict[symbol] ) address += 1 @@ -269,7 +285,7 @@ class Handler: # All characters in file must be actual instruction symbols. for character in command[1]: - if character not in self.__inst_dict: + if character not in self.inst_dict: self.__raise("Invalid symbol '{}' found on stream".format( character )) @@ -298,7 +314,7 @@ class Handler: # All characters in file must be actual instruction symbols. for character in genome: - if character not in self.__inst_dict: + if character not in self.inst_dict: self.__raise("Invalid symbol '{}' found on '{}'".format( character, gen_file )) @@ -364,6 +380,8 @@ class Handler: # output = {} exec(" ".join(command[1:]), locals(), output) + self.__sim.printer.screen.clear() + self.__sim.printer.print_page() if output: self.__respond("EXEC RESPONDS: {}".format(str(output))) @@ -433,3 +451,70 @@ class Handler: self.__raise("Invalid parameters for '{}'".format(command[0])) self.__sim.set_autosave(int(command[1], 0)) + + def __on_link_to_self(self, command): + """ Add self as network target and source. + """ + if len(command) != 2: + self.__raise("Invalid parameters for '{}'".format(command[0])) + + port = int(command[1]) + self.__sim.common.link_to_self(int(command[1])) + + def __on_add_source(self, command): + """ Add new network source. + """ + if len(command) != 3: + self.__raise("Invalid parameters for '{}'".format(command[0])) + + address = command[1] + port = int(command[2]) + self.__sim.common.add_source(address, port) + + def __on_add_target(self, command): + """ Add new network target. + """ + if len(command) != 3: + self.__raise("Invalid parameters for '{}'".format(command[0])) + + address = command[1] + port = int(command[2]) + self.__sim.common.add_target(address, port) + + def __on_remove_source(self, command): + """ Remove existing network source. + """ + if len(command) != 3: + self.__raise("Invalid parameters for '{}'".format(command[0])) + + address = command[1] + port = int(command[2]) + self.__sim.common.remove_source(address, port) + + def __on_remove_target(self, command): + """ Remove existing network target. + """ + if len(command) != 3: + self.__raise("Invalid parameters for '{}'".format(command[0])) + + address = command[1] + port = int(command[2]) + self.__sim.common.remove_target(address, port) + + def __on_network_load(self, command): + """ Load network settings from JSON file (located on network settings + directory. + """ + if len(command) != 2: + self.__raise("Invalid parameters for '{}'".format(command[0])) + + self.__sim.common.load_network_config(command[1]) + + def __on_network_save(self, command): + """ Save network settings to a JSON file (which will be placed on the + network settings directory). + """ + if len(command) != 2: + self.__raise("Invalid parameters for '{}'".format(command[0])) + + self.__sim.common.save_network_config(command[1]) diff --git a/bin/modules/printer.py b/bin/modules/printer.py index 51a5b33..cfbca84 100644 --- a/bin/modules/printer.py +++ b/bin/modules/printer.py @@ -12,11 +12,9 @@ format. It makes use of the curses library for terminal handling. import curses import curses.textpad import os -import time from collections import OrderedDict -from ctypes import c_uint8, c_uint32, cast, POINTER -from modules.handler import Handler +from ctypes import c_uint32, cast, POINTER from modules.world import World @@ -44,6 +42,7 @@ class Printer: self.__proc_element_scroll = 0 self.__proc_gene_scroll = 0 self.__proc_gene_view = False + self.__common_buffer_scroll = 0 self.__curs_y = 0 self.__curs_x = 0 self.__print_hex = False @@ -143,6 +142,19 @@ class Printer: max_scroll, self.__proc_element_scroll ) + def comm_scroll_left(self): + """ Scroll buffers (on COMMON view) to the left. + """ + if self.current_page == "COMMON": + self.__common_buffer_scroll -= 1 + self.__common_buffer_scroll = max(0, self.__common_buffer_scroll) + + def comm_scroll_right(self): + """ Scroll buffers (on COMMON view) to the right. + """ + if self.current_page == "COMMON": + self.__common_buffer_scroll += 1 + def proc_scroll_down(self, fast=False): """ Scroll process data table (on PROCESS view) up. """ @@ -195,6 +207,12 @@ class Printer: else: self.__proc_element_scroll = 0 + def comm_scroll_horizontal_reset(self): + """ Scroll common in and out buffers back to the left. + """ + if self.current_page == "COMMON": + self.__common_buffer_scroll = 0 + def proc_select_prev(self): """ Select previous process. """ @@ -411,6 +429,8 @@ class Printer: self.world.render() elif self.current_page == "PROCESS": self.__print_proc_list() + elif self.current_page == "COMMON": + self.__print_common_data() ############################### @@ -552,6 +572,11 @@ class Printer: ("e", "last", self.__sim.lib.sal_proc_get_last), ("e", "selected", lambda: self.selected_proc), ]), + ("COMMON", [ + ("e", "in", lambda: len(self.__sim.common.in_buffer)), + ("e", "out", lambda: len(self.__sim.common.out_buffer)), + ("e", "max", lambda: self.__sim.common.max_buffer_size), + ]), ("WORLD", [ ("e", "position", lambda: self.world.pos), ("e", "zoom", lambda: self.world.zoom), @@ -627,6 +652,15 @@ class Printer: self.screen.move(ypos, 0) self.screen.clrtoeol() + def __data_format(self, x): + """ Print all proc IDs and elements in decimal or hexadecimal format, + depending on hex-flag being set. + """ + if self.__print_hex: + return hex(x) + else: + return x + def __print_proc_data_list(self): """ Print list of process data elements in PROCESS page. We can toggle between printing the data elements or the genomes by pressing the 'g' @@ -644,13 +678,6 @@ class Printer: ypos += 1 proc_id = self.proc_list_scroll - # Print all proc IDs and elements in decimal or hexadecimal format, - # depending on hex-flag being set. - if self.__print_hex: - data_format = lambda x: hex(x) - else: - data_format = lambda x: x - # Lastly, iterate all lines and print as much process data as it fits. # We can scroll the process data table using the 'wasd' keys. while ypos < self.size[0]: @@ -671,8 +698,8 @@ class Printer: ) # Lastly, assemble and print the next table row. - row = " | ".join(["{:<10}".format(data_format(proc_id))] + [ - "{:>10}".format(data_format(element)) + row = " | ".join(["{:<10}".format(self.__data_format(proc_id))] + [ + "{:>10}".format(self.__data_format(element)) for element in proc_data[self.__proc_element_scroll:] ]) self.__print_line(ypos, row, attr) @@ -759,19 +786,12 @@ class Printer: between printing the genomes or the data elements by pressing the 'g' key. """ - # Print all proc IDs and gene scroll in decimal or hexadecimal format, - # depending on hex-flag being set. - if self.__print_hex: - data_format = lambda x: hex(x) - else: - data_format = lambda x: x - # First, print the table header. We print the current gene-scroll # position for easy reference. Return back to zero scroll with the 'A' # key. ypos = len(self.__main) + len(self.__pages["PROCESS"]) + 5 header = "{:<10} | genes {} -->".format( - "pidx", data_format(self.__proc_gene_scroll) + "pidx", self.__data_format(self.__proc_gene_scroll) ) self.__clear_line(ypos) self.__print_header(ypos, header) @@ -791,13 +811,89 @@ class Printer: attr = curses.A_NORMAL # Assemble and print the next table row. - row = "{:<10} |".format(data_format(proc_id)) + row = "{:<10} |".format(self.__data_format(proc_id)) self.__print_line(ypos, row, attr) self.__print_proc_gene(ypos, proc_id) proc_id += 1 ypos += 1 + def __print_buffer(self, ypos, buff): + """ Print contents of a network buffer as a list of instruction + symbols. + """ + if not ypos < self.size[0]: + return + + if not len(buff): + self.__print_line(ypos, "---") + return + + xpos = 1 + bpos = self.__common_buffer_scroll + self.__clear_line(ypos) + + while xpos < self.size[1] - 1 and bpos < len(buff): + symbol = self.inst_list[int(buff[bpos])][1] + self.screen.addstr(ypos, xpos, symbol) + xpos += 1 + bpos += 1 + + def __print_common_widget( + self, ypos_s, ypos_b, head_s, head_b, sockets, buff, sources=False + ): + """ Print data pertaining input or output network buffers, sources and + targets. + """ + self.__print_header(ypos_s, head_s) + + # Socket info is stored differently for input and output. + if sources: + fmt_sock = lambda s: "{} {}".format(*s.getsockname()) + else: + fmt_sock = lambda s: "{} {}".format(*s) + + # Print active socket list. + if sockets: + for socket in sockets: + ypos_s += 1 + self.__print_line(ypos_s, fmt_sock(socket)) + else: + self.__print_line(ypos_s + 1, "---") + + # Print current contents of the network buffer. + self.__clear_line(ypos_b) + self.__print_header(ypos_b, "{:<10} | {} -->".format( + head_b, self.__data_format(self.__common_buffer_scroll) + )) + self.__clear_line(ypos_b + 1) + self.__print_buffer(ypos_b + 1, buff) + + def __print_common_data(self): + """ Print active socket list and network buffer data. + """ + ypos_src = len(self.__main) + len(self.__pages["COMMON"]) + 5 + ypos_tgt = ypos_src + max(3, len(self.__sim.common.sources) + 2) + ypos_ibf = ypos_tgt + max(3, len(self.__sim.common.targets) + 2) + ypos_obf = ypos_ibf + 3 + self.__print_common_widget( + ypos_src, + ypos_ibf, + "SOURCES", + "IN BUFFER", + self.__sim.common.sources, + self.__sim.common.in_buffer, + sources=True, + ) + self.__print_common_widget( + ypos_tgt, + ypos_obf, + "TARGETS", + "OUT BUFFER", + self.__sim.common.targets, + self.__sim.common.out_buffer, + ) + def __print_proc_list(self): """ Print list of process genomes or process data elements in PROCESS page. We can toggle between printing the genomes or the data elements -- cgit v1.2.1