Skip to content

Commit

Permalink
Merge pull request #214 from lolepezy/once-mode
Browse files Browse the repository at this point in the history
Once mode
  • Loading branch information
lolepezy authored Jun 30, 2024
2 parents 8beaf18 + dff72cd commit babb203
Show file tree
Hide file tree
Showing 13 changed files with 149 additions and 68 deletions.
75 changes: 54 additions & 21 deletions app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ import qualified Network.Wai.Handler.Warp as Warp
import System.Directory
import System.Environment
import System.FilePath ((</>))
import System.IO (hPutStrLn, stderr)
import System.IO
import System.Exit

import Options.Generic

Expand Down Expand Up @@ -76,6 +77,7 @@ import Network.HTTP.Client.TLS
-- import Network.HTTP.Simple
import Network.Connection


main :: IO ()
main = do
cliOptions@CLIOptions{..} <- unwrapRecord $
Expand All @@ -98,7 +100,7 @@ main = do


executeMainProcess :: CLIOptions Unwrapped -> IO ()
executeMainProcess cliOptions = do
executeMainProcess cliOptions@CLIOptions{..} = do
-- TODO This doesn't look pretty, come up with something better.
appStateHolder <- newTVarIO Nothing

Expand All @@ -110,26 +112,36 @@ executeMainProcess cliOptions = do
for_ z $ mergeSystemMetrics sm

withLogger logConfig bumpSysMetric $ \logger -> do
logDebug logger [i|Starting #{rpkiProverVersion}.|]
logDebug logger $ if once
then [i|Starting #{rpkiProverVersion} in one-off mode.|]
else [i|Starting #{rpkiProverVersion} as a server.|]

if cliOptions ^. #initialise
then
-- init the FS layout and download TALs
void $ liftIO $ initialiseFS cliOptions logger
else do
-- run the validator
(appContext, validations) <- do
runValidatorT (newScopes "initialise") $ do
runValidatorT (newScopes "Initialise") $ do
checkPreconditions cliOptions
createAppContext cliOptions logger (logConfig ^. #logLevel)
case appContext of
Left _ ->
logError logger [i|Couldn't initialise, problems: #{validations}.|]
Left _ -> do
logError logger [i|Failure:
#{formatValidations (validations ^. typed)}|]
drainLog logger
hFlush stdout
hFlush stderr
exitFailure
Right appContext' -> do
-- now we have the appState, set appStateHolder
atomically $ writeTVar appStateHolder $ Just $ appContext' ^. #appState
void $ race
(runHttpApi appContext')
(runValidatorServer appContext')
if once
then runValidatorServer appContext'
else void $ race
(runHttpApi appContext')
(runValidatorServer appContext')

executeWorkerProcess :: IO ()
executeWorkerProcess = do
Expand Down Expand Up @@ -176,6 +188,7 @@ turnOffTlsValidation = do
manager <- newManager $ mkManagerSettings (TLSSettingsSimple True True True) Nothing
setGlobalManager manager


runValidatorServer :: (Storage s, MaintainableStorage s) => AppContext s -> IO ()
runValidatorServer appContext@AppContext {..} = do

Expand Down Expand Up @@ -220,18 +233,21 @@ createAppContext cliOptions@CLIOptions{..} logger derivedLogLevel = do

programPath <- liftIO getExecutablePath

(root, tald, rsyncd, tmpd, cached) <- fsLayout cliOptions logger CheckTALsExists

let defaults = defaultConfig
let lmdbRealSize = (Size <$> lmdbSize) `orDefault` (defaults ^. #lmdbSizeMb)

-- clean up tmp directory if it's not empty
cleanDir tmpd
let lmdbRealSize = (Size <$> lmdbSize) `orDefault` (defaults ^. #lmdbSizeMb)

let cpuCount' = fromMaybe getRtsCpuCount cpuCount
(root, tald, rsyncd, tmpd, cached) <-
fromTryM
(\e -> UnspecifiedE "Error verifying/creating FS layout: " (fmtEx e))
$ do
z@(_, _, _, tmpd, _) <- fsLayout cliOptions logger CheckTALsExists
-- clean up tmp directory if it's not empty
cleanDir tmpd
pure z

-- Set capabilities to the values from the CLI or to all available CPUs,
-- (disregard the HT issue for now it needs more testing).
let cpuCount' = fromMaybe getRtsCpuCount cpuCount
liftIO $ setCpuCount cpuCount'
let parallelism =
case fetcherCount of
Expand All @@ -243,9 +259,10 @@ createAppContext cliOptions@CLIOptions{..} logger derivedLogLevel = do
& maybeSet #rtrPort rtrPort
& maybeSet #rtrAddress rtrAddress
& #rtrLogFile .~ rtrLogFile
else Nothing
else Nothing

rsyncPrefetchUrls <- rsyncPrefetches cliOptions
proverRunMode <- deriveProverRunMode cliOptions
rsyncPrefetchUrls <- rsyncPrefetches cliOptions

let config = defaults
& #programBinaryPath .~ programPath
Expand All @@ -254,6 +271,7 @@ createAppContext cliOptions@CLIOptions{..} logger derivedLogLevel = do
& #tmpDirectory .~ tmpd
& #cacheDirectory .~ cached
& #extraTalsDirectories .~ extraTalsDirectory
& #proverRunMode .~ proverRunMode
& #parallelism .~ parallelism
& #rsyncConf . #rsyncRoot .~ rsyncd
& #rsyncConf . #rsyncClientPath .~ rsyncClientPath
Expand Down Expand Up @@ -296,6 +314,7 @@ createAppContext cliOptions@CLIOptions{..} logger derivedLogLevel = do
& maybeSet (#systemConfig . #rrdpWorkerMemoryMb) maxRrdpFetchMemory
& maybeSet (#systemConfig . #validationWorkerMemoryMb) maxValidationMemory

-- Do some "common sense" adjustmnents to the config for correctness
let adjustedConfig = config
-- Cache must be cleaned up at least as often as the
-- lifetime of the objects in it
Expand All @@ -306,7 +325,7 @@ createAppContext cliOptions@CLIOptions{..} logger derivedLogLevel = do
readSlurmFiles files

-- Read the files first to fail fast
unless (null localExceptions) $ do
unless (null localExceptions) $
void $ readSlurms localExceptions

appState <- createAppState logger localExceptions
Expand Down Expand Up @@ -447,7 +466,7 @@ checkSubDirectory :: FilePath -> FilePath -> IO (Either Text FilePath)
checkSubDirectory root sub = do
let subDirectory = root </> sub
doesDirectoryExist subDirectory >>= \case
False -> pure $ Left [i|Directory #{subDirectory} doesn't exist.|]
False -> pure $ Left [i|Directory #{subDirectory} doesn't exist|]
True -> pure $ Right subDirectory

createSubDirectoryIfNeeded :: FilePath -> FilePath -> IO (Either Text FilePath)
Expand Down Expand Up @@ -509,6 +528,14 @@ createAppState logger localExceptions = do
checkPreconditions :: CLIOptions Unwrapped -> ValidatorT IO ()
checkPreconditions CLIOptions {..} = checkRsyncInPath rsyncClientPath

deriveProverRunMode :: CLIOptions Unwrapped -> ValidatorT IO ProverRunMode
deriveProverRunMode CLIOptions {..} =
case (once, vrpOutput) of
(False, Nothing) -> pure ServerMode
(True, Just vo) -> pure $ OneOffMode vo
_ -> appError $ UnspecifiedE "options"
[i|Options `--once` and `--vrp-output` must be either both set or both not set|]


-- | Run rpki-prover in a CLI mode for verifying RSC signature (*.sig file).
executeVerifier :: CLIOptions Unwrapped -> IO ()
Expand Down Expand Up @@ -565,6 +592,13 @@ data CLIOptions wrapped = CLIOptions {
initialise :: wrapped ::: Bool <?>
"If set, the FS layout will be created and TAL files will be downloaded.",

once :: wrapped ::: Bool <?>
("If set, will run one validation cycle and exit. Http API will not start, " +++
"result will be written to the file set by --vrp-output option (which must also be set)."),

vrpOutput :: wrapped ::: Maybe FilePath <?>
"Path of the file to write VRPs to. Only effectful when --once option is set.",

noRirTals :: wrapped ::: Bool <?>
"If set, RIR TAL files will not be downloaded.",

Expand Down Expand Up @@ -738,7 +772,6 @@ deriving instance Show (CLIOptions Unwrapped)

type (+++) (a :: Symbol) (b :: Symbol) = AppendSymbol a b


withLogConfig :: CLIOptions Unwrapped -> (LogConfig -> IO ()) -> IO ()
withLogConfig CLIOptions{..} f =
case logLevel of
Expand Down
5 changes: 4 additions & 1 deletion src/RPKI/AppMonad.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ fromEither z =
fromValue z

fromEitherM :: Monad m => m (Either AppError r) -> ValidatorT m r
fromEitherM s = embedValidatorT $ (, mempty) <$> s
fromEitherM s =
appLift s >>= \case
Left e -> appError e
Right r -> pure r

vFromEither :: Either ValidationError r -> PureValidatorT r
vFromEither = fromEither . first ValidationE
Expand Down
8 changes: 6 additions & 2 deletions src/RPKI/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ data Config = Config {
extraTalsDirectories :: [FilePath],
tmpDirectory :: FilePath,
cacheDirectory :: FilePath,
proverRunMode :: ProverRunMode,
parallelism :: Parallelism,
rsyncConf :: RsyncConf,
rrdpConf :: RrdpConf,
Expand Down Expand Up @@ -101,7 +102,6 @@ data ManifestProcessing = RFC6486_Strict | RFC9286
deriving stock (Eq, Ord, Show, Generic)
deriving anyclass (TheBinary)


data ValidationAlgorithm = FullEveryIteration | Incremental
deriving stock (Eq, Ord, Show, Generic)
deriving anyclass (TheBinary)
Expand All @@ -110,11 +110,14 @@ data FetchTimingCalculation = Constant | Adaptive
deriving stock (Eq, Ord, Show, Generic)
deriving anyclass (TheBinary)


data FetchMethod = SyncOnly | SyncAndAsync
deriving stock (Eq, Ord, Show, Generic)
deriving anyclass (TheBinary)

data ProverRunMode = OneOffMode FilePath | ServerMode
deriving stock (Show, Eq, Ord, Generic)
deriving anyclass (TheBinary)

data ValidationConfig = ValidationConfig {
revalidationInterval :: Seconds,
rrdpRepositoryRefreshInterval :: Seconds,
Expand Down Expand Up @@ -213,6 +216,7 @@ defaultConfig = Config {
extraTalsDirectories = [],
tmpDirectory = "",
cacheDirectory = "",
proverRunMode = ServerMode,
parallelism = makeParallelism 2,
rsyncConf = RsyncConf {
rsyncClientPath = Nothing,
Expand Down
46 changes: 25 additions & 21 deletions src/RPKI/Fetch.hs
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ import RPKI.Parallel
Fall-back is disabled in the sync mode, so if primary repoistory doesn't
respond, we skip all the fall-backs and mark them all as ForAsyncFetch.
-}
fetchSync :: (MonadIO m, Storage s) =>
fetchQuickly :: (MonadIO m, Storage s) =>
AppContext s
-> RepositoryProcessing
-> WorldVersion
-> PublicationPointAccess
-> ValidatorT m (Maybe FetchResult)
fetchSync appContext@AppContext {..}
-> ValidatorT m [FetchResult]
fetchQuickly appContext@AppContext {..}
repositoryProcessing@RepositoryProcessing {..}
worldVersion
ppa = do
Expand All @@ -83,7 +83,7 @@ fetchSync appContext@AppContext {..}
-- It exists mainly for comparisons and measurements and
-- not very useful in practice.
let primaryRepo = getPrimaryRepository pps ppa
Just <$>
(:[]) <$>
fetchOnePp appContext (syncFetchConfig config)
repositoryProcessing worldVersion primaryRepo
(\meta _ -> pure meta)
Expand All @@ -95,7 +95,7 @@ fetchSync appContext@AppContext {..}
-- There's nothing to be fetched in the sync mode,
-- so just mark all of them for async fetching.
markForAsyncFetch repositoryProcessing asyncRepos
pure Nothing
pure []
Just syncPp_ -> do
-- In sync mode fetch only the first PP
fetchResult <-
Expand All @@ -112,7 +112,7 @@ fetchSync appContext@AppContext {..}
$ NonEmpty.toList $ unPublicationPointAccess ppa
markForAsyncFetch repositoryProcessing toMarkAsync

pure $ Just fetchResult
pure $! [fetchResult]
where
newMetaCallback syncPp_ pps newMeta fetchMoment = do
-- Set fetchType to ForAsyncFetch to all fall-back URLs,
Expand Down Expand Up @@ -140,26 +140,26 @@ fetchSync appContext@AppContext {..}


{-
Fetch repositories in the async mode (i.e. concurrently to top-down validation).
Fetch with fallback going through all (both RRDP and rsync) options.
-}
fetchAsync :: (MonadIO m, Storage s) =>
fetchWithFallback :: (MonadIO m, Storage s) =>
AppContext s
-> RepositoryProcessing
-> WorldVersion
-> FetchConfig
-> PublicationPointAccess
-> ValidatorT m [FetchResult]
fetchAsync
fetchWithFallback
appContext@AppContext {..}
repositoryProcessing
worldVersion
fetchConfig
ppa
= go True $ NonEmpty.toList $ unPublicationPointAccess ppa
where
go _ [] = pure []
go isPrimary (pp : rest) = do
fetchResult <- fetchOnePp appContext (asyncFetchConfig config)
fetchResult <- fetchOnePp appContext fetchConfig
repositoryProcessing worldVersion pp (newMetaCallback isPrimary)
case fetchResult of
FetchFailure _ _ -> do
Expand All @@ -183,19 +183,23 @@ fetchAsync
logWarn logger $ if nextOneNeedAFetch
then [i|Failed to fetch #{getURL pp}, will fall-back to the next one: #{getURL $ getRpkiURL ppNext}.|]
else [i|Failed to fetch #{getURL pp}, next one (#{getURL $ getRpkiURL ppNext}) is up-to-date.|]
go False rest

(fetchResult :) <$> go False rest

_ -> pure [fetchResult]

-- We are doing async fetch here, so we are not going to promote fall-back
-- repositories back to ForSyncFetch type. I.e. if a CA has publication
-- points as "repo_a - fall-back-to -> repo_b", repo_b is never going to
-- become ForSyncFetch, only repo_a can become sync and only after it is
-- back to normal state.
newMetaCallback isPrimary newMeta fetchMoment =
pure $ if isPrimary
then newMeta
else newMeta & #fetchType .~ ForAsyncFetch fetchMoment
case config ^. #proverRunMode of
OneOffMode {} -> pure newMeta
ServerMode ->
-- We are doing async fetch here, so we are not going to promote fall-back
-- repositories back to ForSyncFetch type. I.e. if a CA has publication
-- points as "repo_a - fall-back-to -> repo_b", repo_b is never going to
-- become ForSyncFetch, only repo_a can become sync and only after it is
-- back to normal state.
pure $ if isPrimary
then newMeta
else newMeta & #fetchType .~ ForAsyncFetch fetchMoment


fetchOnePp :: (MonadIO m, Storage s) =>
Expand Down Expand Up @@ -260,7 +264,7 @@ fetchOnePp
pure (rpkiUrl, fetchFreshness, f)

-- This is hacky but basically setting the "fetched/up-to-date" metric
-- without ValidatorT/PureValidatorT.
-- without ValidatorT/PureValidatorT (we can only run it in IO).
updateFetchMetric repoUrl fetchFreshness validations r elapsed = let
realFreshness = either (const FailedToFetch) (const fetchFreshness) r
repoScope = validatorSubScope' RepositoryFocus repoUrl parentScope
Expand Down
7 changes: 6 additions & 1 deletion src/RPKI/Logging.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import qualified Data.ByteString.Char8 as C8

import Data.Bifunctor
import Data.Foldable
import Data.Maybe
import Data.Text (Text, justifyLeft)

import Data.String.Interpolate.IsString
Expand Down Expand Up @@ -246,6 +245,12 @@ withLogger LogConfig {..} sysMetricCallback f = do
in [i|#{level} #{pid} #{timestamp} #{message}|]


drainLog :: MonadIO m => AppLogger -> m ()
drainLog (getQueue -> queue) =
liftIO $ atomically $ do
empty <- isEmptyCQueue queue
unless empty retry

eol :: Char
eol = '\n'

Expand Down
2 changes: 1 addition & 1 deletion src/RPKI/Messages.hs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ toMessage = \case
InternalE t -> toInternalErrorMessage t

UnspecifiedE context e ->
[i|Unspecified error #{context}, details: #{e}.|]
[i|Unspecified error in #{context}, details: #{e}.|]


toRsyncMessage :: RsyncError -> Text
Expand Down
1 change: 1 addition & 0 deletions src/RPKI/Orphans/Json.hs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ instance ToJSON ManifestProcessing
instance ToJSON ValidationAlgorithm
instance ToJSON FetchTimingCalculation
instance ToJSON FetchMethod
instance ToJSON ProverRunMode
instance ToJSON TAL
instance ToJSON HttpApiConfig
instance ToJSON ValidationConfig
Expand Down
Loading

0 comments on commit babb203

Please sign in to comment.