diff options
author | Paul Oliver <contact@pauloliver.dev> | 2024-12-29 17:05:34 +0000 |
---|---|---|
committer | Paul Oliver <contact@pauloliver.dev> | 2025-01-16 18:30:09 -0800 |
commit | cc639b06c7126fac7b445d8f778455620d7f8f50 (patch) | |
tree | a4c5c7c0b0a9cdb5bea0891e198003035065e57d |
Initial
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | README.md | 52 | ||||
-rw-r--r-- | hsm-command/Hsm/Command/Command.hs | 52 | ||||
-rw-r--r-- | hsm-command/Hsm/Command/Readline.hs | 51 | ||||
-rw-r--r-- | hsm-command/Main.hs | 33 | ||||
-rw-r--r-- | hsm-command/hsm-command.cabal | 48 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/App.hs | 21 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/Env.hs | 29 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/Fsm.hs | 61 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/Log.hs | 20 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/Message.hs | 25 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/Options.hs | 40 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/Zmq.hs | 34 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/Zmq/Client.hs | 108 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/Zmq/Server.hs | 81 | ||||
-rw-r--r-- | hsm-core/hsm-core.cabal | 42 | ||||
-rw-r--r-- | hsm-dummy-poller/Main.hs | 56 | ||||
-rw-r--r-- | hsm-dummy-poller/hsm-dummy-poller.cabal | 25 | ||||
-rw-r--r-- | hsm-dummy-pulser/Main.hs | 64 | ||||
-rw-r--r-- | hsm-dummy-pulser/hsm-dummy-pulser.cabal | 25 | ||||
-rw-r--r-- | hsm-dummy-receiver/Main.hs | 44 | ||||
-rw-r--r-- | hsm-dummy-receiver/hsm-dummy-receiver.cabal | 25 | ||||
-rw-r--r-- | servconf.yaml | 21 | ||||
-rw-r--r-- | stack.yaml | 11 | ||||
-rw-r--r-- | stack.yaml.lock | 26 |
25 files changed, 996 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6a97e52 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +**/.stack-work/ +.hsm_command_history diff --git a/README.md b/README.md new file mode 100644 index 0000000..01c9427 --- /dev/null +++ b/README.md @@ -0,0 +1,52 @@ +# HsMouse +Experimental control code for robotics. Tested on Raspberry Pi 5. + +## Features +- [`zeromq4-haskell`](https://hackage.haskell.org/package/zeromq4-haskell) +library is used for IPC. +- [`effectful`](https://hackage.haskell.org/package/effectful) library is used +to control effects within monadic computations. +- [`streamly`](https://hackage.haskell.org/package/streamly) library is used +to build pipelines modularly and stream data within pipeline elements. E.g. +`zmq client & processor & zmq server`. + +## Build +Install [`stack`](https://docs.haskellstack.org/en/stable/). I recommend using +[`ghcup`](https://www.haskell.org/ghcup/) for this. Run `stack build` to +compile all libraries and executables. Note: you might need to install some +system dependencies on your host first (e.g. `libzmq`, etc.) + +## Test +On one terminal, run `stack exec dummy-receiver`. This will initialize a ZMQ +client that will wait for incoming pulses. On a separate terminal, run +`stack exec dummy-pulser`. You should be able to see pulses being transmitted +from server to client. E.g.: +``` +$> stack exec dummy-receiver +2025-01-12 21:27:02 INFO receiver/client: Initializing ZMQ client +2025-01-12 21:27:16 INFO receiver/receiver: Received pulse #1 +2025-01-12 21:27:17 INFO receiver/receiver: Received pulse #2 +2025-01-12 21:27:18 INFO receiver/receiver: Received pulse #3 +2025-01-12 21:27:19 INFO receiver/receiver: Received pulse #4 +2025-01-12 21:27:20 INFO receiver/receiver: Received pulse #5 +2025-01-12 21:27:21 INFO receiver/receiver: Received pulse #6 +2025-01-12 21:27:22 INFO receiver/receiver: Received pulse #7 +2025-01-12 21:27:23 INFO receiver/receiver: Received pulse #8 +2025-01-12 21:27:24 INFO receiver/receiver: Received pulse #9 +``` + +``` +$> stack exec dummy-pulser +2025-01-12 21:27:15 INFO pulser/server: Initializing ZMQ server +2025-01-12 21:27:16 INFO pulser/fsm/run: Sending pulse #1 +2025-01-12 21:27:17 INFO pulser/fsm/run: Sending pulse #2 +2025-01-12 21:27:18 INFO pulser/fsm/run: Sending pulse #3 +2025-01-12 21:27:19 INFO pulser/fsm/run: Sending pulse #4 +2025-01-12 21:27:20 INFO pulser/fsm/run: Sending pulse #5 +2025-01-12 21:27:21 INFO pulser/fsm/run: Sending pulse #6 +2025-01-12 21:27:22 INFO pulser/fsm/run: Sending pulse #7 +2025-01-12 21:27:23 INFO pulser/fsm/run: Sending pulse #8 +2025-01-12 21:27:24 INFO pulser/fsm/run: Sending pulse #9 +2025-01-12 21:27:25 ATTENTION pulser/fsm/run: Reached 10 pulses +2025-01-12 21:27:25 ATTENTION pulser/fsm: No state returned, exiting FSM +``` diff --git a/hsm-command/Hsm/Command/Command.hs b/hsm-command/Hsm/Command/Command.hs new file mode 100644 index 0000000..3b53287 --- /dev/null +++ b/hsm-command/Hsm/Command/Command.hs @@ -0,0 +1,52 @@ +{-# LANGUAGE DeriveAnyClass #-} + +module Hsm.Command.Command + ( Direction(X, Z) + , Angle(CW, CCW) + , Speed(Slow, Mid, Fast) + , Command(Move, Rotate) + , commandStream + ) where + +import Data.Binary (Binary) +import Data.Maybe (fromJust, isJust) +import Data.Text (pack) +import Effectful (Eff, (:>)) +import Effectful.Log (Log, logAttention_) +import GHC.Generics (Generic) +import Hsm.Command.Readline (Readline, readline) +import Streamly.Data.Stream qualified as S +import Text.Read (readEither) + +data Direction + = X + | Z + deriving (Binary, Generic, Read, Show) + +data Angle + = CW + | CCW + deriving (Binary, Generic, Read, Show) + +data Speed + = Slow + | Mid + | Fast + deriving (Binary, Generic, Read, Show) + +data Command + = Move Direction Speed Int + | Rotate Angle Speed Int + deriving (Binary, Generic, Read, Show) + +commandStream :: + forall es. (Log :> es, Readline :> es) + => S.Stream (Eff es) Command +commandStream = + S.mapMaybeM (parse . fromJust) $ S.takeWhile isJust $ S.repeatM readline + where + parse :: String -> Eff es (Maybe Command) + parse string = + case readEither string of + Left err -> logAttention_ (pack err) >> return Nothing + Right command -> return $ Just command diff --git a/hsm-command/Hsm/Command/Readline.hs b/hsm-command/Hsm/Command/Readline.hs new file mode 100644 index 0000000..3c56453 --- /dev/null +++ b/hsm-command/Hsm/Command/Readline.hs @@ -0,0 +1,51 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE TypeFamilies #-} + +module Hsm.Command.Readline + ( Readline + , readline + , runReadline + ) where + +import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>)) +import Effectful.Dispatch.Static qualified as S +import Effectful.Log (Log, getLoggerEnv, leLogger, waitForLogger) +import Effectful.Resource (Resource, allocate) +import System.Console.Haskeline qualified as H +import System.Console.Haskeline.IO qualified as H + +data Readline a b + +type instance DispatchOf Readline = Static S.WithSideEffects + +newtype instance S.StaticRep Readline = + Readline H.InputState + +readline :: + forall es. (Log :> es, Readline :> es) + => Eff es (Maybe String) +readline = do + flushLogger + Readline hdl <- S.getStaticRep + S.unsafeEff_ $ nextLine hdl + where + flushLogger :: Eff es () + flushLogger = getLoggerEnv >>= S.unsafeEff_ . waitForLogger . leLogger + -- + nextLine :: H.InputState -> IO (Maybe String) + nextLine hdl = + H.queryInput hdl + $ H.handleInterrupt (return Nothing) + $ H.withInterrupt + $ H.getInputLine "% " + +runReadline :: (IOE :> es, Resource :> es) => Eff (Readline : es) a -> Eff es a +runReadline action = do + handle <- snd <$> allocate istate H.cancelInput + S.evalStaticRep (Readline handle) action + where + settings :: H.Settings IO + settings = H.defaultSettings {H.historyFile = Just ".hsm_command_history"} + -- + istate :: IO H.InputState + istate = H.initializeInput settings diff --git a/hsm-command/Main.hs b/hsm-command/Main.hs new file mode 100644 index 0000000..0b24719 --- /dev/null +++ b/hsm-command/Main.hs @@ -0,0 +1,33 @@ +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TemplateHaskell #-} + +import Data.Function ((&)) +import Data.Text (Text) +import Effectful (runEff) +import Effectful.Log (runLog) +import Effectful.Reader.Static (runReader) +import Effectful.Resource (runResource) +import Hsm.Command.Command (commandStream) +import Hsm.Command.Readline (runReadline) +import Hsm.Core.App (launch) +import Hsm.Core.Env (deriveFromYaml) +import Hsm.Core.Zmq.Server (runServer, send) + +data Env = Env + { name :: Text + , pubEp :: Text + } + +$(deriveFromYaml ''Env) + +main :: IO () +main = + launch @Env "command" id $ \env logger level -> + (commandStream & send @Env) + & runServer @Env + & runLog env.name logger level + & runReader env + & runReadline + & runResource + & runEff diff --git a/hsm-command/hsm-command.cabal b/hsm-command/hsm-command.cabal new file mode 100644 index 0000000..766f372 --- /dev/null +++ b/hsm-command/hsm-command.cabal @@ -0,0 +1,48 @@ +cabal-version: 3.4 +author: Paul Oliver +build-type: Simple +maintainer: contact@pauloliver.dev +name: hsm-command +version: 0.1.0.0 + +library + build-depends: + , base + , binary + , effectful-core + , haskeline + , log-effectful + , resourcet-effectful + , streamly-core + , text + + exposed-modules: + Hsm.Command.Command + Hsm.Command.Readline + + ghc-options: -Wall -Wunused-packages + default-language: GHC2021 + +executable command + build-depends: + , base + , binary + , effectful-core + , haskeline + , hsm-core + , log-effectful + , resourcet-effectful + , streamly-core + , text + + main-is: Main.hs + other-modules: + Hsm.Command.Command + Hsm.Command.Readline + + ghc-options: -Wall -Wunused-packages + + if !arch(x86_64) + ghc-options: -optl=-mno-fix-cortex-a53-835769 + + default-language: GHC2021 diff --git a/hsm-core/Hsm/Core/App.hs b/hsm-core/Hsm/Core/App.hs new file mode 100644 index 0000000..11759be --- /dev/null +++ b/hsm-core/Hsm/Core/App.hs @@ -0,0 +1,21 @@ +module Hsm.Core.App + ( launch + ) where + +import Data.Aeson (FromJSON) +import Data.Text (Text) +import Effectful.Log (LogLevel, Logger) +import Hsm.Core.Env (environment) +import Hsm.Core.Options (Options(Options), options) +import Log.Backend.StandardOutput (withStdOutLogger) + +launch :: + FromJSON env + => Text + -> (IO app -> IO app) + -> (env -> Logger -> LogLevel -> IO app) + -> IO app +launch name wrapper app = do + Options path level <- options name + env <- environment name path + wrapper $ withStdOutLogger $ \logger -> app env logger level diff --git a/hsm-core/Hsm/Core/Env.hs b/hsm-core/Hsm/Core/Env.hs new file mode 100644 index 0000000..4e7986f --- /dev/null +++ b/hsm-core/Hsm/Core/Env.hs @@ -0,0 +1,29 @@ +module Hsm.Core.Env + ( environment + , deriveFromYaml + ) where + +import Data.Aeson (FromJSON, Result(Error, Success), Value, fromJSON) +import Data.Aeson.Key (fromText) +import Data.Aeson.KeyMap (KeyMap, (!?)) +import Data.Aeson.TH (defaultOptions, deriveFromJSON, rejectUnknownFields) +import Data.Maybe (fromMaybe) +import Data.Text (Text, unpack) +import Data.Yaml (decodeFileThrow) +import Language.Haskell.TH (Dec, Name, Q) + +environment :: FromJSON env => Text -> Text -> IO env +environment name = fmap (check . fromJSON . load) . decodeFileThrow . unpack + where + load :: KeyMap Value -> Value + load keymap = + fromMaybe + (error $ "Service configuration for " <> unpack name <> " not found)") + $ keymap !? fromText name + -- + check :: Result env -> env + check (Success env) = env + check (Error str) = error str + +deriveFromYaml :: Name -> Q [Dec] +deriveFromYaml = deriveFromJSON defaultOptions {rejectUnknownFields = True} diff --git a/hsm-core/Hsm/Core/Fsm.hs b/hsm-core/Hsm/Core/Fsm.hs new file mode 100644 index 0000000..43fa497 --- /dev/null +++ b/hsm-core/Hsm/Core/Fsm.hs @@ -0,0 +1,61 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE OverloadedStrings #-} + +module Hsm.Core.Fsm + ( FsmState(FsmState) + , FsmOutput(FsmOutput) + , FsmResult(FsmResult) + , fsm + ) where + +import Data.Maybe (fromJust, isJust) +import Data.Text (Text) +import Effectful (Eff, (:>)) +import Effectful.Log (Log, LogLevel, localDomain, logAttention_, logTrace_) +import Effectful.Reader.Static (Reader, ask) +import Effectful.State.Static.Local (State, get, put) +import Hsm.Core.Log (logTup) +import Streamly.Data.Stream qualified as S (Stream, mapM, takeWhile) + +data FsmState i o env sta = + FsmState Text (i -> env -> sta -> FsmOutput i o env sta) + +data FsmOutput i o env sta = + FsmOutput (Maybe (FsmResult i o env sta)) [(LogLevel, Text)] + +data FsmResult i o env sta = + FsmResult o sta (FsmState i o env sta) + +fsm :: + forall i o env sta es. + ( Log :> es + , Reader env :> es + , State (FsmState i o env sta) :> es + , State sta :> es + ) + => S.Stream (Eff es) i + -> S.Stream (Eff es) o +fsm = S.mapM (return . fromJust) . S.takeWhile isJust . S.mapM run + where + exit :: Eff es (Maybe o) + exit = do + logAttention_ "No state returned, exiting FSM" + return Nothing + -- + push :: FsmResult i o env sta -> Eff es (Maybe o) + push (FsmResult out sta next) = do + put sta + put next + return $ Just out + -- + run :: i -> Eff es (Maybe o) + run input = + localDomain "fsm" $ do + FsmState name action <- get + sta <- get + env <- ask + logTrace_ $ "Entering state " <> name + FsmOutput res logs <- return $ action input env sta + localDomain name $ mapM_ logTup logs + logTrace_ $ "Exiting state " <> name + maybe exit push res diff --git a/hsm-core/Hsm/Core/Log.hs b/hsm-core/Hsm/Core/Log.hs new file mode 100644 index 0000000..9f1f357 --- /dev/null +++ b/hsm-core/Hsm/Core/Log.hs @@ -0,0 +1,20 @@ +module Hsm.Core.Log + ( withLogIO + , logTup + ) where + +import Data.Aeson.Types (emptyObject) +import Data.Text (Text) +import Data.Time.Clock (getCurrentTime) +import Effectful (Eff, (:>)) +import Effectful.Log (Log, LogLevel, getLoggerIO, logMessage) + +withLogIO :: Log :> es => Eff es (LogLevel -> Text -> IO ()) +withLogIO = do + logIO <- getLoggerIO + return $ \level message -> do + now <- getCurrentTime + logIO now level message emptyObject + +logTup :: Log :> es => (LogLevel, Text) -> Eff es () +logTup (level, message) = logMessage level message emptyObject diff --git a/hsm-core/Hsm/Core/Message.hs b/hsm-core/Hsm/Core/Message.hs new file mode 100644 index 0000000..b2a9f23 --- /dev/null +++ b/hsm-core/Hsm/Core/Message.hs @@ -0,0 +1,25 @@ +{-# LANGUAGE OverloadedStrings #-} + +module Hsm.Core.Message + ( message + , topic + , body + ) where + +import Data.Binary (Binary, decode, encode) +import Data.ByteString (ByteString, fromStrict, toStrict) +import Data.ByteString.Char8 qualified as B (breakSubstring, drop, length) +import Data.Text (Text) +import Data.Text.Encoding (decodeUtf8, encodeUtf8) + +sep :: ByteString +sep = "//" + +message :: Binary a => Text -> a -> ByteString +message t b = encodeUtf8 t <> sep <> toStrict (encode b) + +topic :: ByteString -> Text +topic = decodeUtf8 . fst . B.breakSubstring sep + +body :: Binary a => ByteString -> a +body = decode . fromStrict . B.drop (B.length sep) . snd . B.breakSubstring sep diff --git a/hsm-core/Hsm/Core/Options.hs b/hsm-core/Hsm/Core/Options.hs new file mode 100644 index 0000000..29e40a4 --- /dev/null +++ b/hsm-core/Hsm/Core/Options.hs @@ -0,0 +1,40 @@ +{-# LANGUAGE OverloadedStrings #-} + +module Hsm.Core.Options + ( Options(Options) + , options + ) where + +import Control.Applicative ((<**>)) +import Data.Text (Text, pack, unpack) +import Effectful.Log (LogLevel(LogInfo), readLogLevelEither, showLogLevel) +import Options.Applicative qualified as P +import Options.Applicative.Text qualified as P + +data Options = + Options Text LogLevel + +parser :: P.Parser Options +parser = + Options + <$> P.textOption + (P.help "Path to services config file" + <> P.short 'c' + <> P.long "config" + <> P.metavar "PATH" + <> P.value "servconf.yaml" + <> P.showDefault) + <*> P.option + (P.eitherReader $ readLogLevelEither . pack) + (P.help "Log level" + <> P.short 'l' + <> P.long "log-level" + <> P.metavar "LEVEL" + <> P.value LogInfo + <> P.showDefaultWith (unpack . showLogLevel)) + +options :: Text -> IO Options +options name = + P.customExecParser (P.prefs $ P.columns 100) + $ P.info (parser <**> P.helper) + $ P.fullDesc <> P.progDesc ("Launch " <> unpack name <> " service") diff --git a/hsm-core/Hsm/Core/Zmq.hs b/hsm-core/Hsm/Core/Zmq.hs new file mode 100644 index 0000000..8c12133 --- /dev/null +++ b/hsm-core/Hsm/Core/Zmq.hs @@ -0,0 +1,34 @@ +{-# LANGUAGE OverloadedStrings #-} + +module Hsm.Core.Zmq + ( withSocket + ) where + +import Data.Text (Text) +import Effectful (Eff, IOE, (:>)) +import Effectful.Log (Log, LogLevel(LogTrace)) +import Effectful.Resource (Resource, allocate) +import Hsm.Core.Log (withLogIO) +import System.ZMQ4 qualified as Z + +withSocket :: + forall t es. (Z.SocketType t, IOE :> es, Log :> es, Resource :> es) + => t + -> Eff es (Z.Socket t) +withSocket stype = withLogIO >>= bracket + where + bracket :: (LogLevel -> Text -> IO ()) -> Eff es (Z.Socket t) + bracket logIO = snd . snd <$> allocate acquire release + where + acquire :: IO (Z.Context, Z.Socket t) + acquire = do + logIO LogTrace "Acquiring ZMQ context" + cont <- Z.context + sock <- Z.socket cont stype + return (cont, sock) + -- + release :: (Z.Context, Z.Socket t) -> IO () + release (cont, sock) = do + logIO LogTrace "Releasing ZMQ context" + Z.close sock + Z.shutdown cont diff --git a/hsm-core/Hsm/Core/Zmq/Client.hs b/hsm-core/Hsm/Core/Zmq/Client.hs new file mode 100644 index 0000000..793841e --- /dev/null +++ b/hsm-core/Hsm/Core/Zmq/Client.hs @@ -0,0 +1,108 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeFamilies #-} + +module Hsm.Core.Zmq.Client + ( Client + , receive + , poll + , runClient + ) where + +import Control.Monad.Loops (whileM) +import Data.Binary (Binary) +import Data.Text (Text, pack, unpack) +import Data.Text.Encoding (encodeUtf8) +import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>)) +import Effectful.Dispatch.Static qualified as E +import Effectful.Log (Log, localDomain, logInfo_, logTrace_) +import Effectful.Reader.Static (Reader, ask) +import Effectful.Resource (Resource) +import GHC.Records (HasField) +import Hsm.Core.Message (body, topic) +import Hsm.Core.Zmq (withSocket) +import Streamly.Data.Stream (Stream, repeatM) +import System.ZMQ4 qualified as Z + +data Client a b + +type instance DispatchOf Client = Static E.WithSideEffects + +newtype instance E.StaticRep Client = + Client (Z.Socket Z.Sub) + +domain :: Text +domain = "client" + +receiver :: + forall es a. (Log :> es, Client :> es, Binary a, Show a) + => Eff es a +receiver = do + Client sock <- E.getStaticRep + message <- E.unsafeEff_ $ Z.receive sock + localDomain domain + $ logTrace_ + $ "Message received [" + <> topic message + <> "]: " + <> pack (show $ body @a message) + return $ body message + +receive :: + forall es a. (Log :> es, Client :> es, Binary a, Show a) + => Stream (Eff es) a +receive = repeatM receiver + +poll :: + forall es a. (Log :> es, Client :> es, Binary a, Show a) + => Stream (Eff es) [a] +poll = repeatM poller + where + newMsg :: Eff es Bool + newMsg = do + Client sock <- E.getStaticRep + peek <- E.unsafeEff_ $ Z.poll 0 [Z.Sock sock [Z.In] Nothing] + return $ peek /= [[]] + -- + poller :: Eff es [a] + poller = do + ms <- whileM newMsg receiver + localDomain domain + $ localDomain "poller" + $ logTrace_ + $ pack (show $ length ms) <> " new message(s) on queue" + return ms + +runClient :: + forall env es a. + ( HasField "subEps" env [Text] + , HasField "topics" env [Text] + , IOE :> es + , Log :> es + , Reader env :> es + , Resource :> es + ) + => Eff (Client : es) a + -> Eff es a +runClient action = withSocket Z.Sub >>= run + where + run :: Z.Socket Z.Sub -> Eff es a + run sock = E.evalStaticRep (Client sock) $ initialize >> action + where + connect :: Text -> Eff (Client : es) () + connect = E.unsafeEff_ . Z.connect sock . unpack + -- + subscribe :: Text -> Eff (Client : es) () + subscribe = E.unsafeEff_ . Z.subscribe sock . encodeUtf8 + -- + initialize :: Eff (Client : es) () + initialize = + localDomain domain $ do + logInfo_ "Initializing ZMQ client" + env <- ask @env + mapM_ connect env.subEps + mapM_ subscribe env.topics + logTrace_ $ "Listening to " <> pack (show env.subEps) + logTrace_ $ "Subscribed to " <> pack (show env.topics) diff --git a/hsm-core/Hsm/Core/Zmq/Server.hs b/hsm-core/Hsm/Core/Zmq/Server.hs new file mode 100644 index 0000000..5663cd8 --- /dev/null +++ b/hsm-core/Hsm/Core/Zmq/Server.hs @@ -0,0 +1,81 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeFamilies #-} + +module Hsm.Core.Zmq.Server + ( Server + , send + , runServer + ) where + +import Data.Binary (Binary) +import Data.Text (Text, pack, unpack) +import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>)) +import Effectful.Dispatch.Static qualified as E +import Effectful.Log (Log, localDomain, logInfo_, logTrace_) +import Effectful.Reader.Static (Reader, ask) +import Effectful.Resource (Resource) +import GHC.Records (HasField) +import Hsm.Core.Message (message) +import Hsm.Core.Zmq (withSocket) +import Streamly.Data.Fold qualified as S (drain) +import Streamly.Data.Stream qualified as S (Stream, fold, mapM) +import System.ZMQ4 qualified as Z + +data Server a b + +type instance DispatchOf Server = Static E.WithSideEffects + +newtype instance E.StaticRep Server = + Server (Z.Socket Z.Pub) + +domain :: Text +domain = "server" + +send :: + forall env es a. + ( HasField "name" env Text + , Log :> es + , Reader env :> es + , Server :> es + , Binary a + , Show a + ) + => S.Stream (Eff es) a + -> Eff es () +send = S.fold S.drain . S.mapM sender + where + sender :: a -> Eff es () + sender payload = do + Server sock <- E.getStaticRep + env <- ask @env + E.unsafeEff_ $ Z.send sock [] $ message env.name payload + localDomain domain $ logTrace_ $ "Message sent: " <> pack (show payload) + +runServer :: + forall env es a. + ( HasField "pubEp" env Text + , IOE :> es + , Log :> es + , Reader env :> es + , Resource :> es + ) + => Eff (Server : es) a + -> Eff es a +runServer action = withSocket Z.Pub >>= run + where + run :: Z.Socket Z.Pub -> Eff es a + run sock = E.evalStaticRep (Server sock) $ initialize >> action + where + bind :: Text -> Eff (Server : es) () + bind = E.unsafeEff_ . Z.bind sock . unpack + -- + initialize :: Eff (Server : es) () + initialize = + localDomain domain $ do + logInfo_ "Initializing ZMQ server" + env <- ask @env + bind env.pubEp + logTrace_ $ "Publishing to " <> env.pubEp diff --git a/hsm-core/hsm-core.cabal b/hsm-core/hsm-core.cabal new file mode 100644 index 0000000..bbdbbb3 --- /dev/null +++ b/hsm-core/hsm-core.cabal @@ -0,0 +1,42 @@ +cabal-version: 3.4 +author: Paul Oliver +build-type: Simple +maintainer: contact@pauloliver.dev +name: hsm-core +version: 0.1.0.0 + +library + build-depends: + , aeson + , base + , binary + , bytestring + , effectful-core + , log-base + , log-effectful + , monad-loops + , optparse-applicative + , optparse-text + , resourcet-effectful + , streamly-core + , template-haskell + , text + , time + , yaml + , zeromq4-haskell + + exposed-modules: + Hsm.Core.App + Hsm.Core.Env + Hsm.Core.Fsm + Hsm.Core.Log + Hsm.Core.Message + Hsm.Core.Zmq.Client + Hsm.Core.Zmq.Server + + other-modules: + Hsm.Core.Options + Hsm.Core.Zmq + + ghc-options: -Wall -Wunused-packages + default-language: GHC2021 diff --git a/hsm-dummy-poller/Main.hs b/hsm-dummy-poller/Main.hs new file mode 100644 index 0000000..7a35230 --- /dev/null +++ b/hsm-dummy-poller/Main.hs @@ -0,0 +1,56 @@ +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TemplateHaskell #-} + +import Control.Monad (forM_) +import Data.Function ((&)) +import Data.Text (Text, pack) +import Effectful (Eff, (:>), runEff) +import Effectful.Concurrent (Concurrent, runConcurrent, threadDelay) +import Effectful.Log (Log, localDomain, logInfo_, runLog) +import Effectful.Reader.Static (Reader, asks, runReader) +import Effectful.Resource (runResource) +import Hsm.Core.App (launch) +import Hsm.Core.Env (deriveFromYaml) +import Hsm.Core.Zmq.Client (poll, runClient) +import Streamly.Data.Fold qualified as S (drain) +import Streamly.Data.Stream qualified as S (Stream, fold, mapM) +import System.IO.Echo (withoutInputEcho) + +data Env = Env + { name :: Text + , subEps :: [Text] + , topics :: [Text] + , period :: Int + } + +$(deriveFromYaml ''Env) + +handle :: + forall es. (Concurrent :> es, Log :> es, Reader Env :> es) + => S.Stream (Eff es) [Int] + -> Eff es () +handle = + S.fold S.drain . S.mapM (\p -> asks period >>= threadDelay >> handler p) + where + receiverDomain :: Text + receiverDomain = "receiver" + -- + handler :: [Int] -> Eff es () + handler [] = localDomain receiverDomain $ logInfo_ "No pulse received yet" + handler ps = + forM_ ps $ \p -> + localDomain receiverDomain + $ logInfo_ + $ "Received pulse #" <> pack (show p) + +main :: IO () +main = + launch @Env "dummy-poller" withoutInputEcho $ \env logger level -> + (poll & handle) + & runClient @Env + & runConcurrent + & runLog env.name logger level + & runReader env + & runResource + & runEff diff --git a/hsm-dummy-poller/hsm-dummy-poller.cabal b/hsm-dummy-poller/hsm-dummy-poller.cabal new file mode 100644 index 0000000..801cf68 --- /dev/null +++ b/hsm-dummy-poller/hsm-dummy-poller.cabal @@ -0,0 +1,25 @@ +cabal-version: 3.4 +author: Paul Oliver +build-type: Simple +maintainer: contact@pauloliver.dev +name: hsm-dummy-poller +version: 0.1.0.0 + +executable dummy-poller + build-depends: + , base + , echo + , effectful + , hsm-core + , log-effectful + , resourcet-effectful + , streamly-core + , text + + main-is: Main.hs + ghc-options: -Wall -Wunused-packages + + if !arch(x86_64) + ghc-options: -optl=-mno-fix-cortex-a53-835769 + + default-language: GHC2021 diff --git a/hsm-dummy-pulser/Main.hs b/hsm-dummy-pulser/Main.hs new file mode 100644 index 0000000..3d734ba --- /dev/null +++ b/hsm-dummy-pulser/Main.hs @@ -0,0 +1,64 @@ +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TemplateHaskell #-} + +import Data.Function ((&)) +import Data.Text (Text, pack) +import Effectful (Eff, (:>), runEff) +import Effectful.Concurrent (Concurrent, runConcurrent, threadDelay) +import Effectful.Log (LogLevel(LogAttention, LogInfo), runLog) +import Effectful.Reader.Static (Reader, asks, runReader) +import Effectful.Resource (runResource) +import Effectful.State.Static.Local (evalState) +import Hsm.Core.App (launch) +import Hsm.Core.Env (deriveFromYaml) +import Hsm.Core.Fsm qualified as F +import Hsm.Core.Zmq.Server (runServer, send) +import Streamly.Data.Stream (Stream, repeatM) +import System.IO.Echo (withoutInputEcho) + +data Env = Env + { name :: Text + , pubEp :: Text + , period :: Int + , pulses :: Int + } + +$(deriveFromYaml ''Env) + +pulse :: (Concurrent :> es, Reader Env :> es) => Stream (Eff es) () +pulse = repeatM $ asks period >>= threadDelay + +stateRun :: F.FsmState () Int Env Int +stateRun = F.FsmState "run" action + where + action :: () -> Env -> Int -> F.FsmOutput () Int Env Int + action _ env sta = + if sta < env.pulses + then next + else exit + where + next :: F.FsmOutput () Int Env Int + next = + F.FsmOutput + (Just $ F.FsmResult sta (succ sta) stateRun) + [(LogInfo, "Sending pulse #" <> pack (show sta))] + -- + exit :: F.FsmOutput () Int Env Int + exit = + F.FsmOutput + Nothing + [(LogAttention, "Reached " <> pack (show env.pulses) <> " pulses")] + +main :: IO () +main = + launch @Env "dummy-pulser" withoutInputEcho $ \env logger level -> + (pulse & F.fsm @_ @_ @Env @Int & send @Env @_ @Int) + & runServer @Env + & evalState @Int 1 + & evalState stateRun + & runConcurrent + & runLog env.name logger level + & runReader env + & runResource + & runEff diff --git a/hsm-dummy-pulser/hsm-dummy-pulser.cabal b/hsm-dummy-pulser/hsm-dummy-pulser.cabal new file mode 100644 index 0000000..747f62c --- /dev/null +++ b/hsm-dummy-pulser/hsm-dummy-pulser.cabal @@ -0,0 +1,25 @@ +cabal-version: 3.4 +author: Paul Oliver +build-type: Simple +maintainer: contact@pauloliver.dev +name: hsm-dummy-pulser +version: 0.1.0.0 + +executable dummy-pulser + build-depends: + , base + , echo + , effectful + , hsm-core + , log-effectful + , resourcet-effectful + , streamly-core + , text + + main-is: Main.hs + ghc-options: -Wall -Wunused-packages + + if !arch(x86_64) + ghc-options: -optl=-mno-fix-cortex-a53-835769 + + default-language: GHC2021 diff --git a/hsm-dummy-receiver/Main.hs b/hsm-dummy-receiver/Main.hs new file mode 100644 index 0000000..c0aa98a --- /dev/null +++ b/hsm-dummy-receiver/Main.hs @@ -0,0 +1,44 @@ +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TemplateHaskell #-} + +import Data.Function ((&)) +import Data.Text (Text, pack) +import Effectful (Eff, (:>), runEff) +import Effectful.Log (Log, localDomain, logInfo_, runLog) +import Effectful.Reader.Static (runReader) +import Effectful.Resource (runResource) +import Hsm.Core.App (launch) +import Hsm.Core.Env (deriveFromYaml) +import Hsm.Core.Zmq.Client (receive, runClient) +import Streamly.Data.Fold qualified as S (drain) +import Streamly.Data.Stream qualified as S (Stream, fold, mapM) +import System.IO.Echo (withoutInputEcho) + +data Env = Env + { name :: Text + , subEps :: [Text] + , topics :: [Text] + } + +$(deriveFromYaml ''Env) + +handle :: + forall es. Log :> es + => S.Stream (Eff es) Int + -> Eff es () +handle = S.fold S.drain . S.mapM handler + where + handler :: Int -> Eff es () + handler p = + localDomain "receiver" $ logInfo_ $ "Received pulse #" <> pack (show p) + +main :: IO () +main = + launch @Env "dummy-receiver" withoutInputEcho $ \env logger level -> + (receive & handle) + & runClient @Env + & runLog env.name logger level + & runReader env + & runResource + & runEff diff --git a/hsm-dummy-receiver/hsm-dummy-receiver.cabal b/hsm-dummy-receiver/hsm-dummy-receiver.cabal new file mode 100644 index 0000000..9738bbe --- /dev/null +++ b/hsm-dummy-receiver/hsm-dummy-receiver.cabal @@ -0,0 +1,25 @@ +cabal-version: 3.4 +author: Paul Oliver +build-type: Simple +maintainer: contact@pauloliver.dev +name: hsm-dummy-receiver +version: 0.1.0.0 + +executable dummy-receiver + build-depends: + , base + , echo + , effectful-core + , hsm-core + , log-effectful + , resourcet-effectful + , streamly-core + , text + + main-is: Main.hs + ghc-options: -Wall -Wunused-packages + + if !arch(x86_64) + ghc-options: -optl=-mno-fix-cortex-a53-835769 + + default-language: GHC2021 diff --git a/servconf.yaml b/servconf.yaml new file mode 100644 index 0000000..9bf4c63 --- /dev/null +++ b/servconf.yaml @@ -0,0 +1,21 @@ +command: + name: command + pubEp: tcp://0.0.0.0:10000 +dummy-poller: + name: poller + period: 3000000 + subEps: + - tcp://0.0.0.0:10001 + topics: + - pulser +dummy-pulser: + name: pulser + period: 1000000 + pubEp: tcp://0.0.0.0:10001 + pulses: 10 +dummy-receiver: + name: receiver + subEps: + - tcp://0.0.0.0:10001 + topics: + - pulser diff --git a/stack.yaml b/stack.yaml new file mode 100644 index 0000000..aff8282 --- /dev/null +++ b/stack.yaml @@ -0,0 +1,11 @@ +allow-newer: true +extra-deps: + - log-effectful-1.0.1.0 + - resourcet-effectful-1.0.1.0 +packages: + - hsm-command + - hsm-core + - hsm-dummy-poller + - hsm-dummy-pulser + - hsm-dummy-receiver +snapshot: lts-23.3 diff --git a/stack.yaml.lock b/stack.yaml.lock new file mode 100644 index 0000000..aa298d4 --- /dev/null +++ b/stack.yaml.lock @@ -0,0 +1,26 @@ +# This file was autogenerated by Stack. +# You should not edit this file by hand. +# For more information, please see the documentation at: +# https://docs.haskellstack.org/en/stable/topics/lock_files + +packages: +- completed: + hackage: log-effectful-1.0.1.0@sha256:79d1c821db1c1d95cf109f813f13a2588e45dbacb5f797eefc200ff7a5984923,2466 + pantry-tree: + sha256: 5ab7c6b553ea50ce7b6218e86db9ff3632b0482dd00aaf749ebece93b968e0ca + size: 326 + original: + hackage: log-effectful-1.0.1.0 +- completed: + hackage: resourcet-effectful-1.0.1.0@sha256:13f94c9832d0d1573abbabcddc5c3aa3c341973d1d442445795593e355e7803e,2115 + pantry-tree: + sha256: ef0db7bdeca5df1e722958cf5c8f3205ed5bf92b111e0fbc5d1a3c592d1c210e + size: 283 + original: + hackage: resourcet-effectful-1.0.1.0 +snapshots: +- completed: + sha256: dd89d2322cb5af74c6ab9d96c0c5f6c8e6653e0c991d619b4bb141a49cb98668 + size: 679282 + url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/lts/23/3.yaml + original: lts-23.3 |