From f0854265f7a1b59078308965d33fe2583a5c0f9c Mon Sep 17 00:00:00 2001 From: Paul Oliver Date: Sat, 24 Aug 2024 11:57:18 -0700 Subject: Initial commit --- hsm-core/Hsm/Core/App.hs | 88 ++++++++++++++++++++++ hsm-core/Hsm/Core/Fsm.hs | 89 ++++++++++++++++++++++ hsm-core/Hsm/Core/Message.hs | 27 +++++++ hsm-core/Hsm/Core/Zmq.hs | 175 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 379 insertions(+) create mode 100644 hsm-core/Hsm/Core/App.hs create mode 100644 hsm-core/Hsm/Core/Fsm.hs create mode 100644 hsm-core/Hsm/Core/Message.hs create mode 100644 hsm-core/Hsm/Core/Zmq.hs (limited to 'hsm-core/Hsm/Core') diff --git a/hsm-core/Hsm/Core/App.hs b/hsm-core/Hsm/Core/App.hs new file mode 100644 index 0000000..1bb3465 --- /dev/null +++ b/hsm-core/Hsm/Core/App.hs @@ -0,0 +1,88 @@ +{-# LANGUAGE ApplicativeDo #-} +{-# LANGUAGE ImportQualifiedPost #-} + +module Hsm.Core.App + ( launch + , launchWithEcho + ) +where + +import Control.Applicative ((<**>)) +import Data.Aeson (FromJSON, Result (Error, Success), Value, fromJSON) +import Data.Aeson.Key (fromString) +import Data.Aeson.KeyMap (KeyMap, (!?)) +import Data.Composition ((.:)) +import Data.Function.Slip (slipl) +import Data.Maybe (fromMaybe) +import Data.Text (pack, unpack) +import Data.Yaml (decodeFileThrow) +import Effectful.Log qualified as L +import Log.Backend.StandardOutput (withStdOutLogger) +import Options.Applicative qualified as P +import System.IO.Echo (withoutInputEcho) + +data Options = Options String L.LogLevel + +type App e a = e -> L.Logger -> L.LogLevel -> IO a + +parser :: P.Parser Options +parser = do + config <- + P.strOption $ + P.help "Path to services config file" + <> P.short 'c' + <> P.long "config" + <> P.metavar "PATH" + <> P.value "config.yaml" + <> P.showDefault + level <- + P.option (P.eitherReader $ L.readLogLevelEither . pack) $ + P.help "Log level" + <> P.short 'l' + <> P.long "log-level" + <> P.metavar "LEVEL" + <> P.value L.LogInfo + <> P.showDefaultWith (unpack . L.showLogLevel) + pure $ Options config level + +launchWith + :: forall e a + . FromJSON e + => String + -> App e a + -> (IO a -> IO a) + -> IO a +launchWith name app wrapper = do + Options path level <- P.execParser info + returnEnv path >>= runApp level + where + title :: String + title = "Launch " <> name <> " service" + + description :: P.InfoMod Options + description = P.fullDesc <> P.progDesc title + + info :: P.ParserInfo Options + info = P.info (parser <**> P.helper) description + + err :: String + err = "Service configuration for " <> name <> " not found" + + load :: KeyMap Value -> Value + load configs = fromMaybe (error err) $ configs !? fromString name + + check :: Result e -> e + check (Success e) = e + check (Error str) = error str + + returnEnv :: String -> IO e + returnEnv = fmap (check . fromJSON . load) . decodeFileThrow + + runApp :: L.LogLevel -> e -> IO a + runApp = wrapper . withStdOutLogger .: slipl app + +launch :: FromJSON e => String -> App e a -> IO a +launch = slipl launchWith withoutInputEcho + +launchWithEcho :: FromJSON e => String -> App e a -> IO a +launchWithEcho = slipl launchWith id diff --git a/hsm-core/Hsm/Core/Fsm.hs b/hsm-core/Hsm/Core/Fsm.hs new file mode 100644 index 0000000..e0f54a3 --- /dev/null +++ b/hsm-core/Hsm/Core/Fsm.hs @@ -0,0 +1,89 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE OverloadedStrings #-} + +module Hsm.Core.Fsm + ( FsmState (FsmState) + , FsmResult (FsmResult) + , FsmOutput (FsmOutput) + , fsm + , fsmStream + , pLogAttention + , pLogInfo + , pLogTrace + ) +where + +import Control.Monad (forM_) +import Data.Aeson.Types (emptyObject) +import Data.Composition ((.:)) +import Data.Function.Slip (slipl) +import Data.List (singleton) +import Data.Maybe (fromJust, isJust) +import Data.Text (Text) +import Effectful (Eff, (:>)) +import Effectful.Log qualified as L +import Effectful.Reader.Static (Reader, ask) +import Effectful.State.Static.Local (State, get, put) +import Streamly.Data.Stream (Stream, mapM, takeWhile) +import Prelude hiding (mapM, takeWhile) + +data FsmState i o e s + = FsmState Text (i -> e -> s -> FsmOutput i o e s) + +data FsmResult i o e s + = FsmResult o s (FsmState i o e s) + +data FsmOutput i o e s + = FsmOutput (Maybe (FsmResult i o e s)) [(L.LogLevel, Text)] + +type FsmConstraint i o e s es = + ( L.Log :> es + , Reader e :> es + , State (FsmState i o e s) :> es + , State s :> es + ) + +fsm :: forall i o e s es. FsmConstraint i o e s es => i -> Eff es (Maybe o) +fsm input = + L.localDomain "fsm" $ do + FsmState name action <- get + state <- get + env <- ask + L.logTrace_ $ "Entering state " <> name + FsmOutput res logs <- return $ action input env state + L.localDomain name $ forM_ logs $ uncurry $ slipl L.logMessage emptyObject + L.logTrace_ $ "Exiting state " <> name + maybe exit push res + where + push :: FsmResult i o e s -> Eff es (Maybe o) + push (FsmResult output state next) = do + put state + put next + return $ Just output + + exit :: Eff es (Maybe o) + exit = do + L.logAttention_ "No state returned, exiting FSM" + return Nothing + +fsmStream + :: forall i o e s es + . FsmConstraint i o e s es + => Stream (Eff es) i + -> Stream (Eff es) o +fsmStream = mapM (return . fromJust) . takeWhile isJust . mapM (fsm @_ @_ @e @s) + +type LogList = [(L.LogLevel, Text)] + +pLog :: L.LogLevel -> Text -> LogList +pLog = singleton .: (,) + +pLogAttention :: Text -> LogList +pLogAttention = pLog L.LogAttention + +pLogInfo :: Text -> LogList +pLogInfo = pLog L.LogInfo + +pLogTrace :: Text -> LogList +pLogTrace = pLog L.LogTrace diff --git a/hsm-core/Hsm/Core/Message.hs b/hsm-core/Hsm/Core/Message.hs new file mode 100644 index 0000000..2fb5d3c --- /dev/null +++ b/hsm-core/Hsm/Core/Message.hs @@ -0,0 +1,27 @@ +{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE OverloadedStrings #-} + +module Hsm.Core.Message + ( message + , topic + , body + ) +where + +import Data.Binary (Binary, decode, encode) +import Data.ByteString (ByteString, fromStrict, toStrict) +import Data.ByteString.Char8 qualified as B (breakSubstring, drop, length) +import Data.Text (Text) +import Data.Text.Encoding (decodeUtf8, encodeUtf8) + +sep :: ByteString +sep = "//" + +message :: Binary a => Text -> a -> ByteString +message t b = encodeUtf8 t <> sep <> toStrict (encode b) + +topic :: ByteString -> Text +topic = decodeUtf8 . fst . B.breakSubstring sep + +body :: Binary a => ByteString -> a +body = decode . fromStrict . B.drop (B.length sep) . snd . B.breakSubstring sep diff --git a/hsm-core/Hsm/Core/Zmq.hs b/hsm-core/Hsm/Core/Zmq.hs new file mode 100644 index 0000000..fe764eb --- /dev/null +++ b/hsm-core/Hsm/Core/Zmq.hs @@ -0,0 +1,175 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeFamilies #-} + +module Hsm.Core.Zmq + ( Client + , receive + , runClient + , Server + , send + , runServer + ) +where + +import Control.Monad (forM_) +import Data.Aeson.Types (Value, emptyObject) +import Data.Binary (Binary) +import Data.Function ((&)) +import Data.Text (Text, show, unpack) +import Data.Text.Encoding (encodeUtf8) +import Data.Time.Clock (UTCTime, getCurrentTime) +import Effectful (Dispatch (Static), DispatchOf, Eff, Effect, IOE, (:>)) +import Effectful.Dispatch.Static qualified as S +import Effectful.Log qualified as L +import Effectful.Reader.Static (Reader, ask) +import Effectful.Resource (Resource, allocate) +import GHC.Records (HasField) +import Hsm.Core.Message (body, message, topic) +import System.ZMQ4 qualified as Z +import Prelude hiding (show) + +type DefaultLoggerIO = UTCTime -> L.LogLevel -> Text -> Value -> IO () + +type LoggerIO = L.LogLevel -> Text -> IO () + +type ZmqResource t = (Z.Context, Z.Socket t) + +type ZmqConstraint es = (IOE :> es, L.Log :> es, Resource :> es) + +withSocket + :: forall t es + . ( Z.SocketType t + , ZmqConstraint es + ) + => t + -> Eff es (Z.Socket t) +withSocket stype = L.getLoggerIO >>= withResource . logIO + where + logIO :: DefaultLoggerIO -> LoggerIO + logIO loggerIO level msg = do + now <- getCurrentTime + loggerIO now level msg emptyObject + + withResource :: LoggerIO -> Eff es (Z.Socket t) + withResource logger = snd . snd <$> allocate acquire release + where + acquire :: IO (ZmqResource t) + acquire = do + logger L.LogTrace "Acquiring ZMQ context" + context <- Z.context + socket <- Z.socket context stype + return (context, socket) + + release :: ZmqResource t -> IO () + release (context, socket) = do + logger L.LogTrace "Releasing ZMQ context" + Z.close socket + Z.shutdown context + +data Client :: Effect + +type instance DispatchOf Client = Static S.WithSideEffects + +newtype instance S.StaticRep Client = Client (Z.Socket Z.Sub) + +clientDomain :: Text +clientDomain = "client" + +receive + :: forall a es + . ( Binary a + , Client :> es + , L.Log :> es + , Show a + ) + => Eff es a +receive = do + Client socket <- S.getStaticRep + msg <- S.unsafeEff_ $ Z.receive socket + "Message received [" <> topic msg <> "]: " <> show (body @a msg) + & L.logTrace_ + & L.localDomain clientDomain + return $ body msg + +runClient + :: forall e es a + . ( HasField "subEps" e [Text] + , HasField "topics" e [Text] + , Reader e :> es + , ZmqConstraint es + ) + => Eff (Client : es) a + -> Eff es a +runClient action = withSocket Z.Sub >>= runAction + where + runAction :: Z.Socket Z.Sub -> Eff es a + runAction socket = S.evalStaticRep (Client socket) $ initialize >> action + where + connect :: Text -> Eff (Client : es) () + connect = S.unsafeEff_ . Z.connect socket . unpack + + subscribe :: Text -> Eff (Client : es) () + subscribe = S.unsafeEff_ . Z.subscribe socket . encodeUtf8 + + initialize :: Eff (Client : es) () + initialize = + L.localDomain clientDomain $ do + L.logInfo_ "Initializing ZMQ client" + env <- ask @e + forM_ env.subEps connect + forM_ env.topics subscribe + L.logTrace_ $ "Listening to " <> show env.subEps + L.logTrace_ $ "Subscribed to " <> show env.topics + +data Server :: Effect + +type instance DispatchOf Server = Static S.WithSideEffects + +newtype instance S.StaticRep Server = Server (Z.Socket Z.Pub) + +serverDomain :: Text +serverDomain = "server" + +send + :: forall a e es + . ( Binary a + , HasField "name" e Text + , L.Log :> es + , Reader e :> es + , Server :> es + , Show a + ) + => a + -> Eff es () +send payload = do + Server s <- S.getStaticRep + env <- ask @e + S.unsafeEff_ $ Z.send s [] $ message env.name payload + L.localDomain serverDomain $ L.logTrace_ $ "Message sent: " <> show payload + +runServer + :: forall e es a + . ( HasField "pubEp" e Text + , Reader e :> es + , ZmqConstraint es + ) + => Eff (Server : es) a + -> Eff es a +runServer action = withSocket Z.Pub >>= runAction + where + runAction :: Z.Socket Z.Pub -> Eff es a + runAction socket = S.evalStaticRep (Server socket) $ initialize >> action + where + bind :: Text -> Eff (Server : es) () + bind = S.unsafeEff_ . Z.bind socket . unpack + + initialize :: Eff (Server : es) () + initialize = + L.localDomain serverDomain $ do + L.logInfo_ "Initializing ZMQ server" + env <- ask @e + bind env.pubEp + L.logTrace_ $ "Publishing to " <> env.pubEp -- cgit v1.2.1