1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
-- Module : Hsm.Core.Zmq
-- Maintainer : contact@pauloliver.dev
module Hsm.Core.Zmq
( client
, server
)
where
import Control.Monad (forM_, forever)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Reader (ask)
import Data.Binary (Binary)
import Data.Text (Text, pack, unpack)
import Data.Text.Encoding (encodeUtf8)
import Effectful (IOE, (:>))
import Effectful.Log (Log, LogLevel (LogTrace), localDomain, logInfo_, logTrace_)
import Effectful.Reader.Static (Reader)
import Effectful.Resource (Resource, allocate)
import GHC.Records (HasField)
import Hsm.Core.LogIO (LoggerIO, getLoggerIO)
import Hsm.Core.Message (body, message)
import Hsm.Core.Pipes (Consumer, Producer, Proxy, await, yield)
import System.ZMQ4 qualified as Z
data ZmqResource t = ZmqResource Z.Context (Z.Socket t)
type ZmqC e es = (IOE :> es, Log :> es, Reader e :> es, Resource :> es)
type UsingC t e es = (Z.SocketType t, ZmqC e es)
type ClientC a e es = (Binary a, HasField "name" e Text, HasField "subEps" e [Text], HasField "topics" e [Text], ZmqC e es)
type ServerC a e es = (Binary a, HasField "name" e Text, HasField "pubEp" e Text, ZmqC e es)
type Action t a' a b' b e s es = Z.Socket t -> e -> Proxy a' a b' b e s es ()
using :: forall t a' a b' b e s es. UsingC t e es => t -> Action t a' a b' b e s es -> Proxy a' a b' b e s es ()
using stype action = getLoggerIO >>= safely
where
safely :: LoggerIO -> Proxy a' a b' b e s es ()
safely logger =
allocate acquire release >>= \(_, ZmqResource _ socket) ->
ask >>= action socket
where
acquire :: IO (ZmqResource t)
acquire = do
logger LogTrace "Acquiring ZMQ context"
context <- Z.context
socket <- Z.socket context stype
return $ ZmqResource context socket
release :: ZmqResource t -> IO ()
release (ZmqResource context socket) = do
logger LogTrace "Releasing ZMQ context"
Z.close socket
Z.shutdown context
client :: ClientC a e es => Producer a e s es ()
client =
using Z.Sub $ \socket e ->
localDomain "client" $ do
logInfo_ "Initializing ZMQ client"
liftIO $ forM_ e.subEps $ Z.connect socket . unpack
liftIO $ forM_ e.topics $ Z.subscribe socket . encodeUtf8
logTrace_ $ "Listening to " <> pack (show e.subEps)
logTrace_ $ "Subscribed to " <> pack (show e.topics)
forever $ liftIO (Z.receive socket) >>= yield . body
server :: ServerC a e es => Consumer a e s es ()
server =
using Z.Pub $ \socket e ->
localDomain "server" $ do
logInfo_ "Initializing ZMQ server"
liftIO $ Z.bind socket $ unpack e.pubEp
logTrace_ $ "Publishing to " <> e.pubEp
forever $ await >>= liftIO . Z.send socket [] . message e.name
|