From 70d3e37b1a088209fe84abf07a39d14dec116c6b Mon Sep 17 00:00:00 2001 From: Paul Oliver Date: Sat, 24 Aug 2024 11:57:18 -0700 Subject: Initial commit --- hsm-core/Hsm/Core/App.hs | 75 +++++++++++++++++++++++++++++++++++++++++ hsm-core/Hsm/Core/Fsm.hs | 43 ++++++++++++++++++++++++ hsm-core/Hsm/Core/LogIO.hs | 25 ++++++++++++++ hsm-core/Hsm/Core/Message.hs | 29 ++++++++++++++++ hsm-core/Hsm/Core/Pipes.hs | 72 +++++++++++++++++++++++++++++++++++++++ hsm-core/Hsm/Core/Zmq.hs | 80 ++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 324 insertions(+) create mode 100644 hsm-core/Hsm/Core/App.hs create mode 100644 hsm-core/Hsm/Core/Fsm.hs create mode 100644 hsm-core/Hsm/Core/LogIO.hs create mode 100644 hsm-core/Hsm/Core/Message.hs create mode 100644 hsm-core/Hsm/Core/Pipes.hs create mode 100644 hsm-core/Hsm/Core/Zmq.hs (limited to 'hsm-core/Hsm/Core') diff --git a/hsm-core/Hsm/Core/App.hs b/hsm-core/Hsm/Core/App.hs new file mode 100644 index 0000000..51f6d94 --- /dev/null +++ b/hsm-core/Hsm/Core/App.hs @@ -0,0 +1,75 @@ +{-# LANGUAGE ImportQualifiedPost #-} + +-- Module : Hsm.Core.App +-- Maintainer : contact@pauloliver.dev +module Hsm.Core.App + ( launch + ) +where + +import Control.Applicative ((<**>)) +import Data.Aeson (FromJSON, Result (Error, Success), Value, fromJSON) +import Data.Aeson.Key (fromString) +import Data.Aeson.KeyMap (KeyMap, (!?)) +import Data.Maybe (fromMaybe) +import Data.Text (pack, unpack) +import Data.Yaml (decodeFileThrow) +import Effectful.Log (LogLevel (LogInfo), Logger, readLogLevelEither, showLogLevel) +import Log.Backend.StandardOutput (withStdOutLogger) +import Options.Applicative qualified as P +import System.IO.Echo (withoutInputEcho) + +data Options = Options String LogLevel + +parser :: P.Parser Options +parser = + Options + <$> P.strOption + ( P.help "Path to services config file" + <> P.short 'c' + <> P.long "config" + <> P.metavar "PATH" + <> P.value "config.yaml" + <> P.showDefault + ) + <*> P.option + (P.eitherReader $ readLogLevelEither . pack) + ( P.help "Log level" + <> P.short 'l' + <> P.long "log-level" + <> P.metavar "LEVEL" + <> P.value LogInfo + <> P.showDefaultWith (unpack . showLogLevel) + ) + +-- Bootstraps an application by reading its settings from a provided +-- configuration file. A configuration must exist in the config file for the name +-- provided. A logger object, log level and read-only configuration are passed +-- down to the provided application. +launch :: FromJSON e => String -> (Logger -> LogLevel -> e -> IO a) -> IO a +launch name app = + P.execParser info >>= \(Options path level) -> + returnEnv path >>= \env -> + withoutInputEcho $ withStdOutLogger $ \logger -> app logger level env + where + title :: String + title = "Launch " <> name <> " service" + + description :: P.InfoMod Options + description = P.fullDesc <> P.progDesc title + + info :: P.ParserInfo Options + info = P.info (parser <**> P.helper) description + + err :: String + err = "Service configuration for " <> name <> " not found" + + load :: KeyMap Value -> Value + load configs = fromMaybe (error err) $ configs !? fromString name + + check :: Result e -> e + check (Success e) = e + check (Error str) = error str + + returnEnv :: FromJSON e => String -> IO e + returnEnv = fmap (check . fromJSON . load) . decodeFileThrow diff --git a/hsm-core/Hsm/Core/Fsm.hs b/hsm-core/Hsm/Core/Fsm.hs new file mode 100644 index 0000000..caa2e7e --- /dev/null +++ b/hsm-core/Hsm/Core/Fsm.hs @@ -0,0 +1,43 @@ +{-# LANGUAGE OverloadedStrings #-} + +-- Module : Hsm.Core.Fsm +-- Maintainer : contact@pauloliver.dev +module Hsm.Core.Fsm + ( Method (Method) + , FsmC + , fsm + ) +where + +import Control.Exception.Safe (Exception, catch, displayException) +import Data.Text (Text, pack) +import Effectful ((:>)) +import Effectful.Log (Log, localDomain, logAttention_, logTrace_) +import Effectful.Reader.Static (Reader) +import Effectful.State.Static.Local (State) +import GHC.Records (HasField) +import Hsm.Core.Pipes (Pipe) + +data Method a b e s x es = Method Text (Pipe a b e s es (Method a b e s x es)) + +type FsmC e s x es = (HasField "name" e Text, Exception x, Log :> es, Reader e :> es, State s :> es) + +-- Builds an FSM with an initial and a default method. Because both @Proxy@ +-- and @Eff@ are instances of @MonadCatch@ and @MonadThrow@, exceptions may be +-- thrown from within state methods. The FSM transitions to its default state +-- if an exception is thrown. +fsm :: forall a b e s x es. FsmC e s x es => Method a b e s x es -> Method a b e s x es -> Pipe a b e s es () +fsm actionInit actionDefault = localDomain "fsm" $ run actionInit + where + run :: Method a b e s x es -> Pipe a b e s es () + run (Method name action) = do + logTrace_ $ "Entering state " <> name + next <- localDomain name action `catch` handle + logTrace_ $ "Exiting state " <> name + run next + where + handle :: x -> Pipe a b e s es (Method a b e s x es) + handle exception = do + logAttention_ $ "Exception caught while on state " <> name + logAttention_ $ pack $ displayException exception + return actionDefault diff --git a/hsm-core/Hsm/Core/LogIO.hs b/hsm-core/Hsm/Core/LogIO.hs new file mode 100644 index 0000000..fb187e9 --- /dev/null +++ b/hsm-core/Hsm/Core/LogIO.hs @@ -0,0 +1,25 @@ +{-# LANGUAGE ImportQualifiedPost #-} + +-- Module : Hsm.Core.LogIO +-- Maintainer : contact@pauloliver.dev +module Hsm.Core.LogIO + ( LoggerIO + , getLoggerIO + ) +where + +import Data.Aeson.Types (emptyObject) +import Data.Text (Text) +import Data.Time.Clock (getCurrentTime) +import Effectful.Log (LogLevel, MonadLog) +import Effectful.Log qualified as L (getLoggerIO) + +-- Wraps logger returned by @getLoggerIO@ into simpler type. +type LoggerIO = LogLevel -> Text -> IO () + +getLoggerIO :: MonadLog m => m LoggerIO +getLoggerIO = + L.getLoggerIO >>= \logIO -> + return $ \level msg -> + getCurrentTime >>= \now -> + logIO now level msg emptyObject diff --git a/hsm-core/Hsm/Core/Message.hs b/hsm-core/Hsm/Core/Message.hs new file mode 100644 index 0000000..069ab99 --- /dev/null +++ b/hsm-core/Hsm/Core/Message.hs @@ -0,0 +1,29 @@ +{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE OverloadedStrings #-} + +-- Module : Hsm.Core.Message +-- Maintainer : contact@pauloliver.dev +module Hsm.Core.Message + ( message + , topic + , body + ) +where + +import Data.Binary (Binary, decode, encode) +import Data.ByteString (ByteString, fromStrict, toStrict) +import Data.ByteString.Char8 qualified as B (breakSubstring, drop, length) +import Data.Text (Text) +import Data.Text.Encoding (encodeUtf8) + +sep :: ByteString +sep = "//" + +message :: Binary a => Text -> a -> ByteString +message t b = encodeUtf8 t <> sep <> toStrict (encode b) + +topic :: ByteString -> ByteString +topic = fst . B.breakSubstring sep + +body :: Binary a => ByteString -> a +body = decode . fromStrict . B.drop (B.length sep) . snd . B.breakSubstring sep diff --git a/hsm-core/Hsm/Core/Pipes.hs b/hsm-core/Hsm/Core/Pipes.hs new file mode 100644 index 0000000..2c63c59 --- /dev/null +++ b/hsm-core/Hsm/Core/Pipes.hs @@ -0,0 +1,72 @@ +{-# LANGUAGE ImportQualifiedPost #-} + +-- Module : Hsm.Core.Pipes +-- Maintainer : contact@pauloliver.dev +module Hsm.Core.Pipes + ( Proxy + , (>->) + , await + , yield + , runEffect + , Producer + , Pipe + , Consumer + ) +where + +import Control.Exception.Safe (MonadCatch, MonadThrow) +import Control.Monad.Reader (MonadReader, ask, local) +import Control.Monad.State (MonadState, get, put) +import Control.Monad.Trans.Resource (MonadResource, liftResourceT) +import Data.Composition ((.:.)) +import Effectful (Eff, IOE, (:>)) +import Effectful.Log (Log, MonadLog, getLoggerEnv, localData, localDomain, localMaxLogLevel, logMessage) +import Effectful.Reader.Static qualified as E (Reader, ask, local) +import Effectful.Resource (Resource) +import Effectful.State.Static.Local qualified as E (State, get, put) +import Pipes (MonadIO, X, hoist, lift, liftIO) +import Pipes qualified as P (Proxy, await, runEffect, yield, (>->)) + +-- Wraps @Pipes.Proxy@ with @Eff@ as its internal monad. This provides +-- composable streaming plus @Eff@ as a means to constrain effects within +-- individual pipeline components. +newtype Proxy a' a b' b e s es r = Proxy (P.Proxy a' a b' b (Eff es) r) deriving (Applicative, Functor, Monad, MonadCatch, MonadThrow) + +(>->) :: Proxy a' a () b e s es r -> Proxy () b c' c e s es r -> Proxy a' a c' c e s es r +Proxy a >-> Proxy b = Proxy $ a P.>-> b + +await :: Proxy () a y' y e s es a +await = Proxy P.await + +yield :: a -> Proxy x' x () a e s es () +yield = Proxy . P.yield + +runEffect :: Proxy X () () X e s es r -> Eff es r +runEffect (Proxy effect) = P.runEffect effect + +instance Log :> es => MonadLog (Proxy a' a b' b e s es) where + getLoggerEnv = Proxy $ lift getLoggerEnv + localData env (Proxy action) = Proxy $ hoist (localData env) action + localDomain domain (Proxy action) = Proxy $ hoist (localDomain domain) action + localMaxLogLevel level (Proxy action) = Proxy $ hoist (localMaxLogLevel level) action + logMessage = Proxy . lift .:. logMessage + +instance E.Reader e :> es => MonadReader e (Proxy a' a b' b e s es) where + ask = Proxy $ lift E.ask + local f (Proxy action) = Proxy $ hoist (E.local f) action + +instance (IOE :> es, Resource :> es) => MonadResource (Proxy a' a b' b e s es) where + liftResourceT = Proxy . lift . liftResourceT + +instance E.State s :> es => MonadState s (Proxy a' a b' b e s es) where + get = Proxy $ lift E.get + put = Proxy . lift . E.put + +instance IOE :> es => MonadIO (Proxy a' a b' b e s es) where + liftIO = Proxy . lift . liftIO + +type Producer b e s es = Proxy X () () b e s es + +type Pipe a b e s es = Proxy () a () b e s es + +type Consumer a e s es = Proxy () a () X e s es diff --git a/hsm-core/Hsm/Core/Zmq.hs b/hsm-core/Hsm/Core/Zmq.hs new file mode 100644 index 0000000..69ff59a --- /dev/null +++ b/hsm-core/Hsm/Core/Zmq.hs @@ -0,0 +1,80 @@ +{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} + +-- Module : Hsm.Core.Zmq +-- Maintainer : contact@pauloliver.dev +module Hsm.Core.Zmq + ( client + , server + ) +where + +import Control.Monad (forM_, forever) +import Control.Monad.IO.Class (liftIO) +import Control.Monad.Reader (ask) +import Data.Binary (Binary) +import Data.Text (Text, pack, unpack) +import Data.Text.Encoding (encodeUtf8) +import Effectful (IOE, (:>)) +import Effectful.Log (Log, LogLevel (LogTrace), localDomain, logInfo_, logTrace_) +import Effectful.Reader.Static (Reader) +import Effectful.Resource (Resource, allocate) +import GHC.Records (HasField) +import Hsm.Core.LogIO (LoggerIO, getLoggerIO) +import Hsm.Core.Message (body, message) +import Hsm.Core.Pipes (Consumer, Producer, Proxy, await, yield) +import System.ZMQ4 qualified as Z + +data ZmqResource t = ZmqResource Z.Context (Z.Socket t) + +type ZmqC e es = (IOE :> es, Log :> es, Reader e :> es, Resource :> es) + +type UsingC t e es = (Z.SocketType t, ZmqC e es) + +type ClientC a e es = (Binary a, HasField "name" e Text, HasField "subEps" e [Text], HasField "topics" e [Text], ZmqC e es) + +type ServerC a e es = (Binary a, HasField "name" e Text, HasField "pubEp" e Text, ZmqC e es) + +type Action t a' a b' b e s es = Z.Socket t -> e -> Proxy a' a b' b e s es () + +using :: forall t a' a b' b e s es. UsingC t e es => t -> Action t a' a b' b e s es -> Proxy a' a b' b e s es () +using stype action = getLoggerIO >>= safely + where + safely :: LoggerIO -> Proxy a' a b' b e s es () + safely logger = + allocate acquire release >>= \(_, ZmqResource _ socket) -> + ask >>= action socket + where + acquire :: IO (ZmqResource t) + acquire = do + logger LogTrace "Acquiring ZMQ context" + context <- Z.context + socket <- Z.socket context stype + return $ ZmqResource context socket + + release :: ZmqResource t -> IO () + release (ZmqResource context socket) = do + logger LogTrace "Releasing ZMQ context" + Z.close socket + Z.shutdown context + +client :: ClientC a e es => Producer a e s es () +client = + using Z.Sub $ \socket e -> + localDomain "client" $ do + logInfo_ "Initializing ZMQ client" + liftIO $ forM_ e.subEps $ Z.connect socket . unpack + liftIO $ forM_ e.topics $ Z.subscribe socket . encodeUtf8 + logTrace_ $ "Listening to " <> pack (show e.subEps) + logTrace_ $ "Subscribed to " <> pack (show e.topics) + forever $ liftIO (Z.receive socket) >>= yield . body + +server :: ServerC a e es => Consumer a e s es () +server = + using Z.Pub $ \socket e -> + localDomain "server" $ do + logInfo_ "Initializing ZMQ server" + liftIO $ Z.bind socket $ unpack e.pubEp + logTrace_ $ "Publishing to " <> e.pubEp + forever $ await >>= liftIO . Z.send socket [] . message e.name -- cgit v1.2.1