diff options
Diffstat (limited to 'hsm-stream/Hsm/Stream.hs')
| -rw-r--r-- | hsm-stream/Hsm/Stream.hs | 87 |
1 files changed, 87 insertions, 0 deletions
diff --git a/hsm-stream/Hsm/Stream.hs b/hsm-stream/Hsm/Stream.hs new file mode 100644 index 0000000..a01eb4b --- /dev/null +++ b/hsm-stream/Hsm/Stream.hs @@ -0,0 +1,87 @@ +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE TypeFamilies #-} + +module Hsm.Stream + ( Stream + , startStream + , stopStream + , isStreaming + , runStream + ) +where + +import Control.Monad (forever) +import Control.Monad.Extra (unlessM, whenM) +import Data.Maybe (fromJust, isJust) +import Effectful (Dispatch (Static), DispatchOf, Eff, IOE, (:>)) +import Effectful.Concurrent (Concurrent, ThreadId, forkIO, killThread) +import Effectful.Dispatch.Static + ( SideEffects (WithSideEffects) + , StaticRep + , evalStaticRep + , getStaticRep + , putStaticRep + , unsafeEff_ + ) +import Effectful.Exception (IOException, catch, finally) +import Effectful.Fail (Fail) +import GHC.IO.Handle (Handle, hGetLine) +import Hsm.Log (Log, Logs, Severity (Info, Trace), logMsg) +import System.Process (ProcessHandle, StdStream (CreatePipe), cleanupProcess, createProcess, proc, std_err, std_out) + +data Stream (a :: * -> *) (b :: *) + +type instance DispatchOf Stream = Static WithSideEffects + +data StreamRep = StreamRep + { phdl :: ProcessHandle + , hout :: Handle + , herr :: Handle + , tout :: ThreadId + , terr :: ThreadId + } + +newtype instance StaticRep Stream + = Stream (Maybe StreamRep) + +-- The following functions manage the GStreamer pipeline as a subprocess. +-- This ensures: +-- - Clean resource cleanup on stream restart +-- - Proper WebSocket connection teardown (prevents browser-side lingering) +-- - Reliable browser disconnect/reconnect cycles +-- +-- Direct library integration proved problematic due to resource lifecycle +-- issues, particularly with `webrtcsink` WebSocket persistence. +isStreaming :: (Log "stream" :> es, Stream :> es) => Eff es Bool +isStreaming = do + Stream rep <- getStaticRep + return $ isJust rep + +startStream :: (Concurrent :> es, Fail :> es, Logs '["gst", "stream"] es, Stream :> es) => Eff es () +startStream = + unlessM isStreaming $ do + logMsg @"stream" Info "Initializing gstreamer pipeline" + (_, Just hout, Just herr, phdl) <- unsafeEff_ $ createProcess spDecl + tout <- spEcho hout + terr <- spEcho herr + putStaticRep . Stream $ Just StreamRep{..} + where + spFlags = words "--quiet --no-position" + pipeline = words "libcamerasrc ! videoconvert ! vp8enc deadline=1 ! queue ! webrtcsink run-signalling-server=true" + spArgs = spFlags <> pipeline + spDecl = (proc "gst-launch-1.0" spArgs){std_out = CreatePipe, std_err = CreatePipe} + spEcho hdl = forkIO . catch @IOException (forever $ unsafeEff_ (hGetLine hdl) >>= logMsg @"gst" Trace) . const $ return () + +stopStream :: (Concurrent :> es, Log "stream" :> es, Stream :> es) => Eff es () +stopStream = + whenM isStreaming $ do + Stream rep <- getStaticRep + logMsg Info "Stopping stream" + let StreamRep{..} = fromJust rep + unsafeEff_ $ cleanupProcess (Nothing, Just hout, Just herr, phdl) + killThread tout + killThread terr + putStaticRep $ Stream Nothing + +runStream :: (Concurrent :> es, IOE :> es, Log "stream" :> es) => Eff (Stream : es) a -> Eff es a +runStream action = evalStaticRep (Stream Nothing) $ finally action stopStream |
