{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Bcc.Node.Handlers.Shutdown
  (
  -- * Generalised shutdown handling
    ShutdownFDs
  , withShutdownHandling

  -- * Requesting shutdown
  , ShutdownDoorbell
  , getShutdownDoorbell
  , triggerShutdown

  -- * Watch ChainDB for passing a configured slot sync limit threshold,
  --   translating it to a graceful shutdown.
  , maybeSpawnOnSlotSyncedShutdownHandler
  )
where

import           Bcc.Prelude hiding (ByteString, atomically, take, trace)

import qualified Control.Concurrent.Async as Async
import           Data.Text (pack)

import qualified GHC.IO.Handle.FD as IO (fdToHandle)
import qualified System.IO as IO
import qualified System.IO.Error as IO
import           System.Posix.Types (Fd (Fd))
import qualified System.Process as IO (createPipeFd)

import           Bcc.BM.Data.Tracer (TracingVerbosity (..), severityNotice, trTransformer)
import           Bcc.BM.Trace
import           Bcc.Slotting.Slot (WithOrigin (..))
import           Control.Tracer
import qualified Shardagnostic.Consensus.Storage.ChainDB as ChainDB
import           Shardagnostic.Consensus.Util.ResourceRegistry (ResourceRegistry)
import           Shardagnostic.Consensus.Util.STM (Watcher(..), forkLinkedWatcher)
import           Shardagnostic.Network.Block (MaxSlotNo (..), SlotNo, pointSlot)

import           Bcc.Node.Configuration.POM (NodeConfiguration (..))

-- | 'ShutdownFDs' mediate the graceful shutdown requests,
-- either external or internal to the process.
--
-- In the external mediation case, the parent process passes us the file descriptor
-- number of the read end of a pipe, via the CLI with @--shutdown-ipc FD@.
-- In the internal mediation case, we create our own pipe.
-- In both cases we store the accessible ends in 'ShutdownFDs'.
--
-- In either case, if the write end gets closed, either deliberately:
--   - by internal call of 'triggerShutdown' on 'ShutdownFDs', or
--   - by parent process
-- ..or automatically, because the parent process itself terminated,
-- then we initiate a clean shutdown.
data ShutdownFDs
  = NoShutdownFDs
  | ExternalShutdown !ShutdownListener
  -- ^ Shutdown to be provided by external process.
  | InternalShutdown !ShutdownListener !ShutdownDoorbell
  -- ^ Shutdown to be provided from within the process.

-- | FD used to send an EOF-based request for shutdown.
newtype ShutdownDoorbell = ShutdownDoorbell { ShutdownDoorbell -> Fd
_doorbellFd :: Fd }

-- | FD we're listening on for the EOF signalling the shutdown.
newtype ShutdownListener = ShutdownListener { ShutdownListener -> Fd
_listenerFd :: Fd }

-- | Gracefully handle shutdown requests, if requested by 'ShutdownFDs'.
--
-- The file descriptor wrapped in a 'ShutdownListener' designates the
-- receiving end of the shutdown signalling communication channel.
-- The opposite end might be either internal or external to the node process.
withShutdownHandler :: Maybe ShutdownListener -> Trace IO Text -> IO () -> IO ()
withShutdownHandler :: Maybe ShutdownListener -> Trace IO Text -> IO () -> IO ()
withShutdownHandler Maybe ShutdownListener
listener Trace IO Text
trace IO ()
action
  | Just (ShutdownListener Fd
fd) <- Maybe ShutdownListener
listener =
      IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
Async.race_ (IO () -> IO ()
forall a. IO a -> IO a
wrapUninterruptableIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Fd -> IO ()
waitForEOF Fd
fd) IO ()
action
  | Bool
otherwise = IO ()
action
  where
    waitForEOF :: Fd -> IO ()
    waitForEOF :: Fd -> IO ()
waitForEOF (Fd CInt
fd) = do
      Handle
hnd <- CInt -> IO Handle
IO.fdToHandle CInt
fd
      Either IOError Char
r   <- IO Char -> IO (Either IOError Char)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO Char -> IO (Either IOError Char))
-> IO Char -> IO (Either IOError Char)
forall a b. (a -> b) -> a -> b
$ Handle -> IO Char
IO.hGetChar Handle
hnd
      case Either IOError Char
r of
        Left IOError
e
          | IOError -> Bool
IO.isEOFError IOError
e -> Tracer IO Text -> Text -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO Text
tracer Text
"received shutdown request"
          | Bool
otherwise       -> IOError -> IO ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO IOError
e

        Right Char
_  ->
          IOError -> IO ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (IOError -> IO ()) -> IOError -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IOError
IO.userError String
"--shutdown-ipc FD does not expect input"

    tracer :: Tracer IO Text
    tracer :: Tracer IO Text
tracer = TracingVerbosity -> Trace IO Text -> Tracer IO Text
forall a (m :: * -> *) b.
Transformable a m b =>
TracingVerbosity -> Trace m a -> Tracer m b
trTransformer TracingVerbosity
MaximalVerbosity (Trace IO Text -> Trace IO Text
forall (m :: * -> *) a. Trace m a -> Trace m a
severityNotice Trace IO Text
trace)

sfdsListener :: ShutdownFDs -> Maybe ShutdownListener
sfdsListener :: ShutdownFDs -> Maybe ShutdownListener
sfdsListener = \case
  ExternalShutdown ShutdownListener
r -> ShutdownListener -> Maybe ShutdownListener
forall a. a -> Maybe a
Just ShutdownListener
r
  InternalShutdown ShutdownListener
r ShutdownDoorbell
_w -> ShutdownListener -> Maybe ShutdownListener
forall a. a -> Maybe a
Just ShutdownListener
r
  ShutdownFDs
_ -> Maybe ShutdownListener
forall a. Maybe a
Nothing

-- | Windows blocking file IO calls like 'hGetChar' are not interruptable by
-- asynchronous exceptions, as used by async 'cancel' (as of base-4.12).
--
-- This wrapper works around that problem by running the blocking IO in a
-- separate thread. If the parent thread receives an async cancel then it
-- will return. Note however that in this circumstance the child thread may
-- continue and remain blocked, leading to a leak of the thread. As such this
-- is only reasonable to use a fixed number of times for the whole process.
--
wrapUninterruptableIO :: IO a -> IO a
wrapUninterruptableIO :: IO a -> IO a
wrapUninterruptableIO IO a
action = IO a -> IO (Async a)
forall a. IO a -> IO (Async a)
async IO a
action IO (Async a) -> (Async a -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Async a -> IO a
forall a. Async a -> IO a
wait

-- | If 'ShutdownFDs' supports internal shutdown requests,
-- return its shutdown doorbell.
getShutdownDoorbell :: ShutdownFDs -> Maybe ShutdownDoorbell
getShutdownDoorbell :: ShutdownFDs -> Maybe ShutdownDoorbell
getShutdownDoorbell (InternalShutdown ShutdownListener
_l ShutdownDoorbell
doorbell) = ShutdownDoorbell -> Maybe ShutdownDoorbell
forall a. a -> Maybe a
Just ShutdownDoorbell
doorbell
getShutdownDoorbell ShutdownFDs
_ = Maybe ShutdownDoorbell
forall a. Maybe a
Nothing

-- | Given the 'ShutdownDoorbell' component of 'ShutdownFDs',
--   and an explanation of the reason, request a graceful shutdown.
triggerShutdown :: ShutdownDoorbell -> Trace IO Text -> Text -> IO ()
triggerShutdown :: ShutdownDoorbell -> Trace IO Text -> Text -> IO ()
triggerShutdown (ShutdownDoorbell (Fd CInt
shutFd)) Trace IO Text
trace Text
reason = do
  Tracer IO Text -> Text -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith (TracingVerbosity -> Trace IO Text -> Tracer IO Text
forall a (m :: * -> *) b.
Transformable a m b =>
TracingVerbosity -> Trace m a -> Tracer m b
trTransformer TracingVerbosity
MaximalVerbosity (Trace IO Text -> Tracer IO Text)
-> Trace IO Text -> Tracer IO Text
forall a b. (a -> b) -> a -> b
$ Trace IO Text -> Trace IO Text
forall (m :: * -> *) a. Trace m a -> Trace m a
severityNotice Trace IO Text
trace)
    (Text
"Ringing the node shutdown doorbell:  " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
reason)
  Handle -> IO ()
IO.hClose (Handle -> IO ()) -> IO Handle -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< CInt -> IO Handle
IO.fdToHandle CInt
shutFd

-- | We provide an optional cross-platform method to politely request shut down.
--
-- For the duration of 'action', we gracefully handle shutdown requests,
-- external or internal, as requested by configuration in 'NodeCLI',
-- while allocating corresponding 'ShutdownFDs', and providing them to the 'action'.
withShutdownHandling
  :: NodeConfiguration
  -> Trace IO Text
  -> (ShutdownFDs -> IO ())
  -> IO ()
withShutdownHandling :: NodeConfiguration
-> Trace IO Text -> (ShutdownFDs -> IO ()) -> IO ()
withShutdownHandling NodeConfiguration
nc Trace IO Text
trace ShutdownFDs -> IO ()
action = do
  ShutdownFDs
sfds <- NodeConfiguration -> IO ShutdownFDs
decideShutdownFds NodeConfiguration
nc
  Maybe ShutdownListener -> Trace IO Text -> IO () -> IO ()
withShutdownHandler (ShutdownFDs -> Maybe ShutdownListener
sfdsListener ShutdownFDs
sfds) Trace IO Text
trace (ShutdownFDs -> IO ()
action ShutdownFDs
sfds)
 where
   decideShutdownFds :: NodeConfiguration -> IO ShutdownFDs
   decideShutdownFds :: NodeConfiguration -> IO ShutdownFDs
decideShutdownFds NodeConfiguration{ncShutdownIPC :: NodeConfiguration -> Maybe Fd
ncShutdownIPC = Just Fd
fd} =
     ShutdownFDs -> IO ShutdownFDs
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ShutdownFDs -> IO ShutdownFDs) -> ShutdownFDs -> IO ShutdownFDs
forall a b. (a -> b) -> a -> b
$ ShutdownListener -> ShutdownFDs
ExternalShutdown (Fd -> ShutdownListener
ShutdownListener Fd
fd)
   decideShutdownFds NodeConfiguration{ncShutdownOnSlotSynced :: NodeConfiguration -> MaxSlotNo
ncShutdownOnSlotSynced = MaxSlotNo{}} =
     IO ShutdownFDs
mkInternalShutdown
   decideShutdownFds NodeConfiguration
_ = ShutdownFDs -> IO ShutdownFDs
forall (f :: * -> *) a. Applicative f => a -> f a
pure ShutdownFDs
NoShutdownFDs

   mkInternalShutdown :: IO ShutdownFDs
   mkInternalShutdown :: IO ShutdownFDs
mkInternalShutdown = do
     (CInt
r, CInt
w) <- IO (CInt, CInt)
IO.createPipeFd
     ShutdownFDs -> IO ShutdownFDs
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ShutdownFDs -> IO ShutdownFDs) -> ShutdownFDs -> IO ShutdownFDs
forall a b. (a -> b) -> a -> b
$ ShutdownListener -> ShutdownDoorbell -> ShutdownFDs
InternalShutdown (Fd -> ShutdownListener
ShutdownListener (Fd -> ShutdownListener) -> Fd -> ShutdownListener
forall a b. (a -> b) -> a -> b
$ CInt -> Fd
Fd CInt
r) (Fd -> ShutdownDoorbell
ShutdownDoorbell (Fd -> ShutdownDoorbell) -> Fd -> ShutdownDoorbell
forall a b. (a -> b) -> a -> b
$ CInt -> Fd
Fd CInt
w)

-- | If configuration in 'NodeCLI' and 'ShutdownFDs' agree,
-- spawn a thread that would cause node to shutdown upon ChainDB reaching the
-- configuration-defined slot.
maybeSpawnOnSlotSyncedShutdownHandler
  :: NodeConfiguration
  -> ShutdownFDs
  -> Trace IO Text
  -> ResourceRegistry IO
  -> ChainDB.ChainDB IO blk
  -> IO ()
maybeSpawnOnSlotSyncedShutdownHandler :: NodeConfiguration
-> ShutdownFDs
-> Trace IO Text
-> ResourceRegistry IO
-> ChainDB IO blk
-> IO ()
maybeSpawnOnSlotSyncedShutdownHandler NodeConfiguration
nc ShutdownFDs
sfds Trace IO Text
trace ResourceRegistry IO
registry ChainDB IO blk
chaindb =
  case (NodeConfiguration -> MaxSlotNo
ncShutdownOnSlotSynced NodeConfiguration
nc, ShutdownFDs
sfds) of
    (MaxSlotNo SlotNo
maxSlot, InternalShutdown ShutdownListener
_sl ShutdownDoorbell
sd) -> do
      Tracer IO Text -> Text -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith (TracingVerbosity -> Trace IO Text -> Tracer IO Text
forall a (m :: * -> *) b.
Transformable a m b =>
TracingVerbosity -> Trace m a -> Tracer m b
trTransformer TracingVerbosity
MaximalVerbosity (Trace IO Text -> Tracer IO Text)
-> Trace IO Text -> Tracer IO Text
forall a b. (a -> b) -> a -> b
$ Trace IO Text -> Trace IO Text
forall (m :: * -> *) a. Trace m a -> Trace m a
severityNotice Trace IO Text
trace)
        (Text
"will terminate upon reaching " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
pack (SlotNo -> String
forall a b. (Show a, ConvertText String b) => a -> b
show SlotNo
maxSlot))
      SlotNo -> ShutdownDoorbell -> IO ()
spawnSlotLimitTerminator SlotNo
maxSlot ShutdownDoorbell
sd
    (MaxSlotNo{}, ShutdownFDs
_) -> Text -> IO ()
forall a. HasCallStack => Text -> a
panic
      Text
"internal error: slot-limited shutdown requested, but no proper ShutdownFDs passed."
    (MaxSlotNo, ShutdownFDs)
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
 where
  spawnSlotLimitTerminator :: SlotNo -> ShutdownDoorbell -> IO ()
  spawnSlotLimitTerminator :: SlotNo -> ShutdownDoorbell -> IO ()
spawnSlotLimitTerminator SlotNo
maxSlot ShutdownDoorbell
sd =
    IO (Thread IO Void) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Thread IO Void) -> IO ()) -> IO (Thread IO Void) -> IO ()
forall a b. (a -> b) -> a -> b
$ ResourceRegistry IO
-> String
-> Watcher IO (WithOrigin SlotNo) (WithOrigin SlotNo)
-> IO (Thread IO Void)
forall (m :: * -> *) a fp.
(IOLike m, Eq fp, HasCallStack) =>
ResourceRegistry m -> String -> Watcher m a fp -> m (Thread m Void)
forkLinkedWatcher ResourceRegistry IO
registry String
"slotLimitTerminator" Watcher :: forall (m :: * -> *) a fp.
(a -> fp) -> Maybe fp -> (a -> m ()) -> STM m a -> Watcher m a fp
Watcher {
        wFingerprint :: WithOrigin SlotNo -> WithOrigin SlotNo
wFingerprint = WithOrigin SlotNo -> WithOrigin SlotNo
forall (cat :: * -> * -> *) a. Category cat => cat a a
identity
      , wInitial :: Maybe (WithOrigin SlotNo)
wInitial     = Maybe (WithOrigin SlotNo)
forall a. Maybe a
Nothing
      , wNotify :: WithOrigin SlotNo -> IO ()
wNotify      = \case
          WithOrigin SlotNo
Origin -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
          At SlotNo
cur -> Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SlotNo
cur SlotNo -> SlotNo -> Bool
forall a. Ord a => a -> a -> Bool
>= SlotNo
maxSlot) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
            ShutdownDoorbell -> Trace IO Text -> Text -> IO ()
triggerShutdown ShutdownDoorbell
sd Trace IO Text
trace
            (Text
"spawnSlotLimitTerminator: reached target " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SlotNo -> Text
forall a b. (Show a, ConvertText String b) => a -> b
show SlotNo
cur)
      , wReader :: STM IO (WithOrigin SlotNo)
wReader      = Point blk -> WithOrigin SlotNo
forall block. Point block -> WithOrigin SlotNo
pointSlot (Point blk -> WithOrigin SlotNo)
-> STM (Point blk) -> STM (WithOrigin SlotNo)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ChainDB IO blk -> STM IO (Point blk)
forall (m :: * -> *) blk. ChainDB m blk -> STM m (Point blk)
ChainDB.getTipPoint ChainDB IO blk
chaindb
      }