1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
|
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeFamilies #-}
module Hsm.Core.Zmq
( Client
, receive
, runClient
, Server
, send
, runServer
)
where
import Control.Monad (forM_)
import Data.Aeson.Types (Value, emptyObject)
import Data.Binary (Binary)
import Data.Function ((&))
import Data.Text (Text, show, unpack)
import Data.Text.Encoding (encodeUtf8)
import Data.Time.Clock (UTCTime, getCurrentTime)
import Effectful (Dispatch (Static), DispatchOf, Eff, Effect, IOE, (:>))
import Effectful.Dispatch.Static qualified as S
import Effectful.Log qualified as L
import Effectful.Reader.Static (Reader, ask)
import Effectful.Resource (Resource, allocate)
import GHC.Records (HasField)
import Hsm.Core.Message (body, message, topic)
import System.ZMQ4 qualified as Z
import Prelude hiding (show)
type DefaultLoggerIO = UTCTime -> L.LogLevel -> Text -> Value -> IO ()
type LoggerIO = L.LogLevel -> Text -> IO ()
type ZmqResource t = (Z.Context, Z.Socket t)
type ZmqConstraint es = (IOE :> es, L.Log :> es, Resource :> es)
withSocket
:: forall t es
. ( Z.SocketType t
, ZmqConstraint es
)
=> t
-> Eff es (Z.Socket t)
withSocket stype = L.getLoggerIO >>= withResource . logIO
where
logIO :: DefaultLoggerIO -> LoggerIO
logIO loggerIO level msg = do
now <- getCurrentTime
loggerIO now level msg emptyObject
withResource :: LoggerIO -> Eff es (Z.Socket t)
withResource logger = snd . snd <$> allocate acquire release
where
acquire :: IO (ZmqResource t)
acquire = do
logger L.LogTrace "Acquiring ZMQ context"
context <- Z.context
socket <- Z.socket context stype
return (context, socket)
release :: ZmqResource t -> IO ()
release (context, socket) = do
logger L.LogTrace "Releasing ZMQ context"
Z.close socket
Z.shutdown context
data Client :: Effect
type instance DispatchOf Client = Static S.WithSideEffects
newtype instance S.StaticRep Client = Client (Z.Socket Z.Sub)
clientDomain :: Text
clientDomain = "client"
receive
:: forall a es
. ( Binary a
, Client :> es
, L.Log :> es
, Show a
)
=> Eff es a
receive = do
Client socket <- S.getStaticRep
msg <- S.unsafeEff_ $ Z.receive socket
"Message received [" <> topic msg <> "]: " <> show (body @a msg)
& L.logTrace_
& L.localDomain clientDomain
return $ body msg
runClient
:: forall e es a
. ( HasField "subEps" e [Text]
, HasField "topics" e [Text]
, Reader e :> es
, ZmqConstraint es
)
=> Eff (Client : es) a
-> Eff es a
runClient action = withSocket Z.Sub >>= runAction
where
runAction :: Z.Socket Z.Sub -> Eff es a
runAction socket = S.evalStaticRep (Client socket) $ initialize >> action
where
connect :: Text -> Eff (Client : es) ()
connect = S.unsafeEff_ . Z.connect socket . unpack
subscribe :: Text -> Eff (Client : es) ()
subscribe = S.unsafeEff_ . Z.subscribe socket . encodeUtf8
initialize :: Eff (Client : es) ()
initialize =
L.localDomain clientDomain $ do
L.logInfo_ "Initializing ZMQ client"
env <- ask @e
forM_ env.subEps connect
forM_ env.topics subscribe
L.logTrace_ $ "Listening to " <> show env.subEps
L.logTrace_ $ "Subscribed to " <> show env.topics
data Server :: Effect
type instance DispatchOf Server = Static S.WithSideEffects
newtype instance S.StaticRep Server = Server (Z.Socket Z.Pub)
serverDomain :: Text
serverDomain = "server"
send
:: forall a e es
. ( Binary a
, HasField "name" e Text
, L.Log :> es
, Reader e :> es
, Server :> es
, Show a
)
=> a
-> Eff es ()
send payload = do
Server s <- S.getStaticRep
env <- ask @e
S.unsafeEff_ $ Z.send s [] $ message env.name payload
L.localDomain serverDomain $ L.logTrace_ $ "Message sent: " <> show payload
runServer
:: forall e es a
. ( HasField "pubEp" e Text
, Reader e :> es
, ZmqConstraint es
)
=> Eff (Server : es) a
-> Eff es a
runServer action = withSocket Z.Pub >>= runAction
where
runAction :: Z.Socket Z.Pub -> Eff es a
runAction socket = S.evalStaticRep (Server socket) $ initialize >> action
where
bind :: Text -> Eff (Server : es) ()
bind = S.unsafeEff_ . Z.bind socket . unpack
initialize :: Eff (Server : es) ()
initialize =
L.localDomain serverDomain $ do
L.logInfo_ "Initializing ZMQ server"
env <- ask @e
bind env.pubEp
L.logTrace_ $ "Publishing to " <> env.pubEp
|