From 864a1d2a22580a33b5e928734fd256c2133fb672 Mon Sep 17 00:00:00 2001 From: Paul Oliver Date: Sat, 3 Jan 2026 00:26:51 +0000 Subject: Adds camera streaming to frontend --- hsm-stream/Hsm/Stream.hs | 116 +++++++++++++++++++++++++++-------------------- 1 file changed, 67 insertions(+), 49 deletions(-) (limited to 'hsm-stream/Hsm/Stream.hs') diff --git a/hsm-stream/Hsm/Stream.hs b/hsm-stream/Hsm/Stream.hs index e0b2b5b..a01eb4b 100644 --- a/hsm-stream/Hsm/Stream.hs +++ b/hsm-stream/Hsm/Stream.hs @@ -1,69 +1,87 @@ -{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TypeFamilies #-} module Hsm.Stream ( Stream , startStream , stopStream + , isStreaming , runStream ) where -import Control.Monad (void, when) -import Effectful (Dispatch (Static), DispatchOf, Eff, IOE, liftIO, (:>)) -import Effectful.Dispatch.Static (SideEffects (WithSideEffects), StaticRep, evalStaticRep, getStaticRep, unsafeEff_) -import Effectful.Exception (finally) -import Foreign.C.String (withCString) -import Foreign.Ptr (Ptr, nullPtr) -import Hsm.Log (Log, Severity (Info), logMsg) -import Hsm.Stream.FFI - ( GstElement - , gstDeinit - , gstElementSetState - , gstInit - , gstObjectUnref - , gstParseLaunch - , gstStateNull - , gstStatePlaying +import Control.Monad (forever) +import Control.Monad.Extra (unlessM, whenM) +import Data.Maybe (fromJust, isJust) +import Effectful (Dispatch (Static), DispatchOf, Eff, IOE, (:>)) +import Effectful.Concurrent (Concurrent, ThreadId, forkIO, killThread) +import Effectful.Dispatch.Static + ( SideEffects (WithSideEffects) + , StaticRep + , evalStaticRep + , getStaticRep + , putStaticRep + , unsafeEff_ ) -import System.Environment (setEnv) +import Effectful.Exception (IOException, catch, finally) +import Effectful.Fail (Fail) +import GHC.IO.Handle (Handle, hGetLine) +import Hsm.Log (Log, Logs, Severity (Info, Trace), logMsg) +import System.Process (ProcessHandle, StdStream (CreatePipe), cleanupProcess, createProcess, proc, std_err, std_out) data Stream (a :: * -> *) (b :: *) type instance DispatchOf Stream = Static WithSideEffects -newtype instance StaticRep Stream - = Stream (Ptr GstElement) +data StreamRep = StreamRep + { phdl :: ProcessHandle + , hout :: Handle + , herr :: Handle + , tout :: ThreadId + , terr :: ThreadId + } -startStream :: (Log "stream" :> es, Stream :> es) => Eff es () -startStream = do - Stream pipeline <- getStaticRep - logMsg Info "Starting stream" - unsafeEff_ . void $ gstElementSetState pipeline gstStatePlaying +newtype instance StaticRep Stream + = Stream (Maybe StreamRep) -stopStream :: (Log "stream" :> es, Stream :> es) => Eff es () -stopStream = do - Stream pipeline <- getStaticRep - logMsg Info "Stopping stream" - unsafeEff_ . void $ gstElementSetState pipeline gstStateNull +-- The following functions manage the GStreamer pipeline as a subprocess. +-- This ensures: +-- - Clean resource cleanup on stream restart +-- - Proper WebSocket connection teardown (prevents browser-side lingering) +-- - Reliable browser disconnect/reconnect cycles +-- +-- Direct library integration proved problematic due to resource lifecycle +-- issues, particularly with `webrtcsink` WebSocket persistence. +isStreaming :: (Log "stream" :> es, Stream :> es) => Eff es Bool +isStreaming = do + Stream rep <- getStaticRep + return $ isJust rep -runStream :: (IOE :> es, Log "stream" :> es) => Bool -> Eff (Stream : es) a -> Eff es a -runStream suppressXLogs action = do - when suppressXLogs $ do - logMsg Info "Suppressing external loggers" - liftIO $ setEnv "GST_DEBUG" "none" - liftIO $ setEnv "LIBCAMERA_LOG_LEVELS" "FATAL" - liftIO $ setEnv "WEBRTCSINK_SIGNALLING_SERVER_LOG" "none" - logMsg Info "Initializing gstreamer library" - liftIO $ gstInit nullPtr nullPtr - logMsg Info $ "Parsing gstreamer pipeline: " <> pipelineStr - pipeline <- liftIO . withCString pipelineStr $ \cStr -> gstParseLaunch cStr nullPtr - evalStaticRep (Stream pipeline) . finally action $ stopStream >> endStream +startStream :: (Concurrent :> es, Fail :> es, Logs '["gst", "stream"] es, Stream :> es) => Eff es () +startStream = + unlessM isStreaming $ do + logMsg @"stream" Info "Initializing gstreamer pipeline" + (_, Just hout, Just herr, phdl) <- unsafeEff_ $ createProcess spDecl + tout <- spEcho hout + terr <- spEcho herr + putStaticRep . Stream $ Just StreamRep{..} where - pipelineStr = "libcamerasrc ! videoconvert ! vp8enc deadline=1 ! webrtcsink run-signalling-server=true" - endStream = do - Stream pipeline <- getStaticRep - logMsg Info "Unrefing gstreamer pipeline" - liftIO $ gstObjectUnref pipeline - logMsg Info "De-initializing gstreamer library" - liftIO gstDeinit + spFlags = words "--quiet --no-position" + pipeline = words "libcamerasrc ! videoconvert ! vp8enc deadline=1 ! queue ! webrtcsink run-signalling-server=true" + spArgs = spFlags <> pipeline + spDecl = (proc "gst-launch-1.0" spArgs){std_out = CreatePipe, std_err = CreatePipe} + spEcho hdl = forkIO . catch @IOException (forever $ unsafeEff_ (hGetLine hdl) >>= logMsg @"gst" Trace) . const $ return () + +stopStream :: (Concurrent :> es, Log "stream" :> es, Stream :> es) => Eff es () +stopStream = + whenM isStreaming $ do + Stream rep <- getStaticRep + logMsg Info "Stopping stream" + let StreamRep{..} = fromJust rep + unsafeEff_ $ cleanupProcess (Nothing, Just hout, Just herr, phdl) + killThread tout + killThread terr + putStaticRep $ Stream Nothing + +runStream :: (Concurrent :> es, IOE :> es, Log "stream" :> es) => Eff (Stream : es) a -> Eff es a +runStream action = evalStaticRep (Stream Nothing) $ finally action stopStream -- cgit v1.2.1