From 98ca0ff13e2996aa45d7bcfab695143689ae8650 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Thu, 30 Jul 2020 01:21:56 +0200 Subject: [PATCH] service config, integrate service launch into DHT launch TODO: hold a reference from DHT to service --- Hash2Pub.cabal | 2 +- app/Main.hs | 20 +++++++++++++++----- src/Hash2Pub/FediChord.hs | 11 +++++++++-- src/Hash2Pub/FediChordTypes.hs | 25 +++++++++++++++++++++++++ src/Hash2Pub/PostService.hs | 15 ++++++--------- src/Hash2Pub/ServiceTypes.hs | 15 --------------- 6 files changed, 56 insertions(+), 32 deletions(-) delete mode 100644 src/Hash2Pub/ServiceTypes.hs diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index 56441ad..54cb29d 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -55,7 +55,7 @@ library import: deps -- Modules exported by the library. - exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes, Hash2Pub.PostService, Hash2Pub.ServiceTypes, Hash2Pub.RingMap + exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes, Hash2Pub.PostService, Hash2Pub.RingMap -- Modules included in this library but not exported. other-modules: Hash2Pub.Utils diff --git a/app/Main.hs b/app/Main.hs index 8887ee8..98961c0 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -10,15 +10,17 @@ import Data.IP (IPv6, toHostAddress6) import System.Environment import Hash2Pub.FediChord +import Hash2Pub.FediChordTypes +import Hash2Pub.PostService (PostService (..)) main :: IO () main = do -- ToDo: parse and pass config -- probably use `tomland` for that - conf <- readConfig + (fConf, sConf) <- readConfig -- TODO: first initialise 'RealNode', then the vservers -- ToDo: load persisted caches, bootstrapping nodes … - (serverSock, thisNode) <- fediChordInit conf + (serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d)) -- currently no masking is necessary, as there is nothing to clean up nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode -- try joining the DHT using one of the provided bootstrapping nodes @@ -41,10 +43,11 @@ main = do pure () -readConfig :: IO FediChordConf +readConfig :: IO (FediChordConf, ServiceConf) readConfig = do - confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : _ <- getArgs - pure $ FediChordConf { + confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : servicePortString : speedup : _ <- getArgs + let + fConf = FediChordConf { confDomain = confDomainString , confIP = toHostAddress6 . read $ ipString , confDhtPort = read portString @@ -53,3 +56,10 @@ readConfig = do , confBootstrapSamplingInterval = 180 , confMaxLookupCacheAge = 300 } + sConf = ServiceConf { + confSubscriptionExpiryTime = 2*3600 `div` read speedup + , confServicePort = read servicePortString + , confServiceHost = confDomainString + } + pure (fConf, sConf) + diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 26a373c..7a5abb0 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -95,16 +95,23 @@ import Debug.Trace (trace) -- | initialise data structures, compute own IDs and bind to listening socket -- ToDo: load persisted state, thus this function already operates in IO -fediChordInit :: FediChordConf -> IO (Socket, LocalNodeStateSTM) -fediChordInit initConf = do +--fediChordInit :: (DHT d, Service s d) +-- => FediChordConf +-- -> (d -> s d) -- ^ runner function for service +-- -> IO (Socket, LocalNodeStateSTM) +fediChordInit initConf serviceRunner = do emptyLookupCache <- newTVarIO Map.empty let realNode = RealNode { vservers = [] , nodeConfig = initConf , bootstrapNodes = confBootstrapNodes initConf , lookupCacheSTM = emptyLookupCache + --, service = undefined } realNodeSTM <- newTVarIO realNode + -- launch service and set the reference in the RealNode + serv <- serviceRunner realNodeSTM + --atomically . writeTVar $ realNode { service = serv } initialState <- nodeStateInit realNodeSTM initialStateSTM <- newTVarIO initialState serverSock <- mkServerSocket (getIpAddr initialState) (getDhtPort initialState) diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index d764b71..604519e 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -58,11 +58,14 @@ module Hash2Pub.FediChordTypes ( , bsAsIpAddr , FediChordConf(..) , DHT(..) + , Service(..) + , ServiceConf(..) ) where import Control.Exception import Data.Foldable (foldr') import Data.Function (on) +import qualified Data.Hashable as Hashable import Data.List (delete, nub, sortBy) import qualified Data.Map.Strict as Map import Data.Maybe (fromJust, fromMaybe, isJust, @@ -144,6 +147,7 @@ a `localCompare` b -- | Data for managing the virtual server nodes of this real node. -- Also contains shared data and config values. -- TODO: more data structures for k-choices bookkeeping +--data RealNode s = RealNode data RealNode = RealNode { vservers :: [LocalNodeStateSTM] -- ^ references to all active versers @@ -155,6 +159,7 @@ data RealNode = RealNode -- ^ a global cache of looked up keys and their associated nodes } +--type RealNodeSTM s = TVar (RealNode s) type RealNodeSTM = TVar RealNode -- | represents a node and all its important state @@ -411,6 +416,26 @@ data FediChordConf = FediChordConf } deriving (Show, Eq) +-- ====== Service Types ============ + +class Service s d where + -- | run the service + runService :: ServiceConf -> d -> IO (s d) + getServicePort' :: (Integral i) => s d -> i + +instance Hashable.Hashable NodeID where + hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID + hash = Hashable.hash . getNodeID + +data ServiceConf = ServiceConf + { confSubscriptionExpiryTime :: Integer + -- ^ subscription lease expiration in seconds + , confServicePort :: Int + -- ^ listening port for service + , confServiceHost :: String + -- ^ hostname of service + } + class DHT d where -- | lookup the responsible host handling a given key string, -- possiblggy from a lookup cache diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 81b00a3..264bccb 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -32,12 +32,10 @@ import Servant import Hash2Pub.FediChordTypes import Hash2Pub.RingMap -import Hash2Pub.ServiceTypes data PostService d = PostService - { psPort :: Warp.Port - , psHost :: String + { serviceConf :: ServiceConf -- queues, other data structures , baseDHT :: (DHT d) => d , serviceThread :: TVar ThreadId @@ -66,7 +64,7 @@ type TagSubscribers = (HMap.HashMap (String, Int) (TChan PostID, POSIXTime)) instance DHT d => Service PostService d where -- | initialise 'PostService' data structures and run server - runService dht host port = do + runService conf dht = do -- create necessary TVars threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder subscriberVar <- newTVarIO emptyRMap @@ -75,8 +73,7 @@ instance DHT d => Service PostService d where relayInQueue' <- newTQueueIO let thisService = PostService { - psPort = port' - , psHost = host + serviceConf = conf , baseDHT = dht , serviceThread = threadVar , subscribers = subscriberVar @@ -84,8 +81,8 @@ instance DHT d => Service PostService d where , ownPosts = ownPostVar , relayInQueue = relayInQueue' } - port' = fromIntegral port - warpSettings = Warp.setPort port' . Warp.setHost (fromString host) $ Warp.defaultSettings + port' = fromIntegral (confServicePort conf) + warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings -- Run 'concurrently_' from another thread to be able to return the -- 'PostService'. -- Terminating that parent thread will make all child threads terminate as well. @@ -98,7 +95,7 @@ instance DHT d => Service PostService d where atomically $ writeTVar threadVar servThreadID pure thisService - getServicePort s = fromIntegral $ psPort s + getServicePort' = fromIntegral . confServicePort . serviceConf -- | return a WAI application diff --git a/src/Hash2Pub/ServiceTypes.hs b/src/Hash2Pub/ServiceTypes.hs deleted file mode 100644 index 5e2b37c..0000000 --- a/src/Hash2Pub/ServiceTypes.hs +++ /dev/null @@ -1,15 +0,0 @@ -{-# LANGUAGE MultiParamTypeClasses #-} -module Hash2Pub.ServiceTypes where - -import Data.Hashable (Hashable (..)) - -import Hash2Pub.FediChord (DHT (..), NodeID (..)) - -class Service s d where - -- | run the service - runService :: (Integral i) => d -> String -> i -> IO (s d) - getServicePort :: (Integral i) => s d -> i - -instance Hashable NodeID where - hashWithSalt salt = hashWithSalt salt . getNodeID - hash = hash . getNodeID