diff options
author | Paul Oliver <contact@pauloliver.dev> | 2024-12-29 17:05:34 +0000 |
---|---|---|
committer | Paul Oliver <contact@pauloliver.dev> | 2025-01-16 18:30:09 -0800 |
commit | cc639b06c7126fac7b445d8f778455620d7f8f50 (patch) | |
tree | a4c5c7c0b0a9cdb5bea0891e198003035065e57d /hsm-core/Hsm |
Initial
Diffstat (limited to 'hsm-core/Hsm')
-rw-r--r-- | hsm-core/Hsm/Core/App.hs | 21 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/Env.hs | 29 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/Fsm.hs | 61 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/Log.hs | 20 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/Message.hs | 25 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/Options.hs | 40 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/Zmq.hs | 34 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/Zmq/Client.hs | 108 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/Zmq/Server.hs | 81 |
9 files changed, 419 insertions, 0 deletions
diff --git a/hsm-core/Hsm/Core/App.hs b/hsm-core/Hsm/Core/App.hs new file mode 100644 index 0000000..11759be --- /dev/null +++ b/hsm-core/Hsm/Core/App.hs @@ -0,0 +1,21 @@ +module Hsm.Core.App + ( launch + ) where + +import Data.Aeson (FromJSON) +import Data.Text (Text) +import Effectful.Log (LogLevel, Logger) +import Hsm.Core.Env (environment) +import Hsm.Core.Options (Options(Options), options) +import Log.Backend.StandardOutput (withStdOutLogger) + +launch :: + FromJSON env + => Text + -> (IO app -> IO app) + -> (env -> Logger -> LogLevel -> IO app) + -> IO app +launch name wrapper app = do + Options path level <- options name + env <- environment name path + wrapper $ withStdOutLogger $ \logger -> app env logger level diff --git a/hsm-core/Hsm/Core/Env.hs b/hsm-core/Hsm/Core/Env.hs new file mode 100644 index 0000000..4e7986f --- /dev/null +++ b/hsm-core/Hsm/Core/Env.hs @@ -0,0 +1,29 @@ +module Hsm.Core.Env + ( environment + , deriveFromYaml + ) where + +import Data.Aeson (FromJSON, Result(Error, Success), Value, fromJSON) +import Data.Aeson.Key (fromText) +import Data.Aeson.KeyMap (KeyMap, (!?)) +import Data.Aeson.TH (defaultOptions, deriveFromJSON, rejectUnknownFields) +import Data.Maybe (fromMaybe) +import Data.Text (Text, unpack) +import Data.Yaml (decodeFileThrow) +import Language.Haskell.TH (Dec, Name, Q) + +environment :: FromJSON env => Text -> Text -> IO env +environment name = fmap (check . fromJSON . load) . decodeFileThrow . unpack + where + load :: KeyMap Value -> Value + load keymap = + fromMaybe + (error $ "Service configuration for " <> unpack name <> " not found)") + $ keymap !? fromText name + -- + check :: Result env -> env + check (Success env) = env + check (Error str) = error str + +deriveFromYaml :: Name -> Q [Dec] +deriveFromYaml = deriveFromJSON defaultOptions {rejectUnknownFields = True} diff --git a/hsm-core/Hsm/Core/Fsm.hs b/hsm-core/Hsm/Core/Fsm.hs new file mode 100644 index 0000000..43fa497 --- /dev/null +++ b/hsm-core/Hsm/Core/Fsm.hs @@ -0,0 +1,61 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE OverloadedStrings #-} + +module Hsm.Core.Fsm + ( FsmState(FsmState) + , FsmOutput(FsmOutput) + , FsmResult(FsmResult) + , fsm + ) where + +import Data.Maybe (fromJust, isJust) +import Data.Text (Text) +import Effectful (Eff, (:>)) +import Effectful.Log (Log, LogLevel, localDomain, logAttention_, logTrace_) +import Effectful.Reader.Static (Reader, ask) +import Effectful.State.Static.Local (State, get, put) +import Hsm.Core.Log (logTup) +import Streamly.Data.Stream qualified as S (Stream, mapM, takeWhile) + +data FsmState i o env sta = + FsmState Text (i -> env -> sta -> FsmOutput i o env sta) + +data FsmOutput i o env sta = + FsmOutput (Maybe (FsmResult i o env sta)) [(LogLevel, Text)] + +data FsmResult i o env sta = + FsmResult o sta (FsmState i o env sta) + +fsm :: + forall i o env sta es. + ( Log :> es + , Reader env :> es + , State (FsmState i o env sta) :> es + , State sta :> es + ) + => S.Stream (Eff es) i + -> S.Stream (Eff es) o +fsm = S.mapM (return . fromJust) . S.takeWhile isJust . S.mapM run + where + exit :: Eff es (Maybe o) + exit = do + logAttention_ "No state returned, exiting FSM" + return Nothing + -- + push :: FsmResult i o env sta -> Eff es (Maybe o) + push (FsmResult out sta next) = do + put sta + put next + return $ Just out + -- + run :: i -> Eff es (Maybe o) + run input = + localDomain "fsm" $ do + FsmState name action <- get + sta <- get + env <- ask + logTrace_ $ "Entering state " <> name + FsmOutput res logs <- return $ action input env sta + localDomain name $ mapM_ logTup logs + logTrace_ $ "Exiting state " <> name + maybe exit push res diff --git a/hsm-core/Hsm/Core/Log.hs b/hsm-core/Hsm/Core/Log.hs new file mode 100644 index 0000000..9f1f357 --- /dev/null +++ b/hsm-core/Hsm/Core/Log.hs @@ -0,0 +1,20 @@ +module Hsm.Core.Log + ( withLogIO + , logTup + ) where + +import Data.Aeson.Types (emptyObject) +import Data.Text (Text) +import Data.Time.Clock (getCurrentTime) +import Effectful (Eff, (:>)) +import Effectful.Log (Log, LogLevel, getLoggerIO, logMessage) + +withLogIO :: Log :> es => Eff es (LogLevel -> Text -> IO ()) +withLogIO = do + logIO <- getLoggerIO + return $ \level message -> do + now <- getCurrentTime + logIO now level message emptyObject + +logTup :: Log :> es => (LogLevel, Text) -> Eff es () +logTup (level, message) = logMessage level message emptyObject diff --git a/hsm-core/Hsm/Core/Message.hs b/hsm-core/Hsm/Core/Message.hs new file mode 100644 index 0000000..b2a9f23 --- /dev/null +++ b/hsm-core/Hsm/Core/Message.hs @@ -0,0 +1,25 @@ +{-# LANGUAGE OverloadedStrings #-} + +module Hsm.Core.Message + ( message + , topic + , body + ) where + +import Data.Binary (Binary, decode, encode) +import Data.ByteString (ByteString, fromStrict, toStrict) +import Data.ByteString.Char8 qualified as B (breakSubstring, drop, length) +import Data.Text (Text) +import Data.Text.Encoding (decodeUtf8, encodeUtf8) + +sep :: ByteString +sep = "//" + +message :: Binary a => Text -> a -> ByteString +message t b = encodeUtf8 t <> sep <> toStrict (encode b) + +topic :: ByteString -> Text +topic = decodeUtf8 . fst . B.breakSubstring sep + +body :: Binary a => ByteString -> a +body = decode . fromStrict . B.drop (B.length sep) . snd . B.breakSubstring sep diff --git a/hsm-core/Hsm/Core/Options.hs b/hsm-core/Hsm/Core/Options.hs new file mode 100644 index 0000000..29e40a4 --- /dev/null +++ b/hsm-core/Hsm/Core/Options.hs @@ -0,0 +1,40 @@ +{-# LANGUAGE OverloadedStrings #-} + +module Hsm.Core.Options + ( Options(Options) + , options + ) where + +import Control.Applicative ((<**>)) +import Data.Text (Text, pack, unpack) +import Effectful.Log (LogLevel(LogInfo), readLogLevelEither, showLogLevel) +import Options.Applicative qualified as P +import Options.Applicative.Text qualified as P + +data Options = + Options Text LogLevel + +parser :: P.Parser Options +parser = + Options + <$> P.textOption + (P.help "Path to services config file" + <> P.short 'c' + <> P.long "config" + <> P.metavar "PATH" + <> P.value "servconf.yaml" + <> P.showDefault) + <*> P.option + (P.eitherReader $ readLogLevelEither . pack) + (P.help "Log level" + <> P.short 'l' + <> P.long "log-level" + <> P.metavar "LEVEL" + <> P.value LogInfo + <> P.showDefaultWith (unpack . showLogLevel)) + +options :: Text -> IO Options +options name = + P.customExecParser (P.prefs $ P.columns 100) + $ P.info (parser <**> P.helper) + $ P.fullDesc <> P.progDesc ("Launch " <> unpack name <> " service") diff --git a/hsm-core/Hsm/Core/Zmq.hs b/hsm-core/Hsm/Core/Zmq.hs new file mode 100644 index 0000000..8c12133 --- /dev/null +++ b/hsm-core/Hsm/Core/Zmq.hs @@ -0,0 +1,34 @@ +{-# LANGUAGE OverloadedStrings #-} + +module Hsm.Core.Zmq + ( withSocket + ) where + +import Data.Text (Text) +import Effectful (Eff, IOE, (:>)) +import Effectful.Log (Log, LogLevel(LogTrace)) +import Effectful.Resource (Resource, allocate) +import Hsm.Core.Log (withLogIO) +import System.ZMQ4 qualified as Z + +withSocket :: + forall t es. (Z.SocketType t, IOE :> es, Log :> es, Resource :> es) + => t + -> Eff es (Z.Socket t) +withSocket stype = withLogIO >>= bracket + where + bracket :: (LogLevel -> Text -> IO ()) -> Eff es (Z.Socket t) + bracket logIO = snd . snd <$> allocate acquire release + where + acquire :: IO (Z.Context, Z.Socket t) + acquire = do + logIO LogTrace "Acquiring ZMQ context" + cont <- Z.context + sock <- Z.socket cont stype + return (cont, sock) + -- + release :: (Z.Context, Z.Socket t) -> IO () + release (cont, sock) = do + logIO LogTrace "Releasing ZMQ context" + Z.close sock + Z.shutdown cont diff --git a/hsm-core/Hsm/Core/Zmq/Client.hs b/hsm-core/Hsm/Core/Zmq/Client.hs new file mode 100644 index 0000000..793841e --- /dev/null +++ b/hsm-core/Hsm/Core/Zmq/Client.hs @@ -0,0 +1,108 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeFamilies #-} + +module Hsm.Core.Zmq.Client + ( Client + , receive + , poll + , runClient + ) where + +import Control.Monad.Loops (whileM) +import Data.Binary (Binary) +import Data.Text (Text, pack, unpack) +import Data.Text.Encoding (encodeUtf8) +import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>)) +import Effectful.Dispatch.Static qualified as E +import Effectful.Log (Log, localDomain, logInfo_, logTrace_) +import Effectful.Reader.Static (Reader, ask) +import Effectful.Resource (Resource) +import GHC.Records (HasField) +import Hsm.Core.Message (body, topic) +import Hsm.Core.Zmq (withSocket) +import Streamly.Data.Stream (Stream, repeatM) +import System.ZMQ4 qualified as Z + +data Client a b + +type instance DispatchOf Client = Static E.WithSideEffects + +newtype instance E.StaticRep Client = + Client (Z.Socket Z.Sub) + +domain :: Text +domain = "client" + +receiver :: + forall es a. (Log :> es, Client :> es, Binary a, Show a) + => Eff es a +receiver = do + Client sock <- E.getStaticRep + message <- E.unsafeEff_ $ Z.receive sock + localDomain domain + $ logTrace_ + $ "Message received [" + <> topic message + <> "]: " + <> pack (show $ body @a message) + return $ body message + +receive :: + forall es a. (Log :> es, Client :> es, Binary a, Show a) + => Stream (Eff es) a +receive = repeatM receiver + +poll :: + forall es a. (Log :> es, Client :> es, Binary a, Show a) + => Stream (Eff es) [a] +poll = repeatM poller + where + newMsg :: Eff es Bool + newMsg = do + Client sock <- E.getStaticRep + peek <- E.unsafeEff_ $ Z.poll 0 [Z.Sock sock [Z.In] Nothing] + return $ peek /= [[]] + -- + poller :: Eff es [a] + poller = do + ms <- whileM newMsg receiver + localDomain domain + $ localDomain "poller" + $ logTrace_ + $ pack (show $ length ms) <> " new message(s) on queue" + return ms + +runClient :: + forall env es a. + ( HasField "subEps" env [Text] + , HasField "topics" env [Text] + , IOE :> es + , Log :> es + , Reader env :> es + , Resource :> es + ) + => Eff (Client : es) a + -> Eff es a +runClient action = withSocket Z.Sub >>= run + where + run :: Z.Socket Z.Sub -> Eff es a + run sock = E.evalStaticRep (Client sock) $ initialize >> action + where + connect :: Text -> Eff (Client : es) () + connect = E.unsafeEff_ . Z.connect sock . unpack + -- + subscribe :: Text -> Eff (Client : es) () + subscribe = E.unsafeEff_ . Z.subscribe sock . encodeUtf8 + -- + initialize :: Eff (Client : es) () + initialize = + localDomain domain $ do + logInfo_ "Initializing ZMQ client" + env <- ask @env + mapM_ connect env.subEps + mapM_ subscribe env.topics + logTrace_ $ "Listening to " <> pack (show env.subEps) + logTrace_ $ "Subscribed to " <> pack (show env.topics) diff --git a/hsm-core/Hsm/Core/Zmq/Server.hs b/hsm-core/Hsm/Core/Zmq/Server.hs new file mode 100644 index 0000000..5663cd8 --- /dev/null +++ b/hsm-core/Hsm/Core/Zmq/Server.hs @@ -0,0 +1,81 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeFamilies #-} + +module Hsm.Core.Zmq.Server + ( Server + , send + , runServer + ) where + +import Data.Binary (Binary) +import Data.Text (Text, pack, unpack) +import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>)) +import Effectful.Dispatch.Static qualified as E +import Effectful.Log (Log, localDomain, logInfo_, logTrace_) +import Effectful.Reader.Static (Reader, ask) +import Effectful.Resource (Resource) +import GHC.Records (HasField) +import Hsm.Core.Message (message) +import Hsm.Core.Zmq (withSocket) +import Streamly.Data.Fold qualified as S (drain) +import Streamly.Data.Stream qualified as S (Stream, fold, mapM) +import System.ZMQ4 qualified as Z + +data Server a b + +type instance DispatchOf Server = Static E.WithSideEffects + +newtype instance E.StaticRep Server = + Server (Z.Socket Z.Pub) + +domain :: Text +domain = "server" + +send :: + forall env es a. + ( HasField "name" env Text + , Log :> es + , Reader env :> es + , Server :> es + , Binary a + , Show a + ) + => S.Stream (Eff es) a + -> Eff es () +send = S.fold S.drain . S.mapM sender + where + sender :: a -> Eff es () + sender payload = do + Server sock <- E.getStaticRep + env <- ask @env + E.unsafeEff_ $ Z.send sock [] $ message env.name payload + localDomain domain $ logTrace_ $ "Message sent: " <> pack (show payload) + +runServer :: + forall env es a. + ( HasField "pubEp" env Text + , IOE :> es + , Log :> es + , Reader env :> es + , Resource :> es + ) + => Eff (Server : es) a + -> Eff es a +runServer action = withSocket Z.Pub >>= run + where + run :: Z.Socket Z.Pub -> Eff es a + run sock = E.evalStaticRep (Server sock) $ initialize >> action + where + bind :: Text -> Eff (Server : es) () + bind = E.unsafeEff_ . Z.bind sock . unpack + -- + initialize :: Eff (Server : es) () + initialize = + localDomain domain $ do + logInfo_ "Initializing ZMQ server" + env <- ask @env + bind env.pubEp + logTrace_ $ "Publishing to " <> env.pubEp |