service config, integrate service launch into DHT launch
TODO: hold a reference from DHT to service
This commit is contained in:
parent
da47f8062f
commit
98ca0ff13e
|
@ -55,7 +55,7 @@ library
|
||||||
import: deps
|
import: deps
|
||||||
|
|
||||||
-- Modules exported by the library.
|
-- Modules exported by the library.
|
||||||
exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes, Hash2Pub.PostService, Hash2Pub.ServiceTypes, Hash2Pub.RingMap
|
exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes, Hash2Pub.PostService, Hash2Pub.RingMap
|
||||||
|
|
||||||
-- Modules included in this library but not exported.
|
-- Modules included in this library but not exported.
|
||||||
other-modules: Hash2Pub.Utils
|
other-modules: Hash2Pub.Utils
|
||||||
|
|
20
app/Main.hs
20
app/Main.hs
|
@ -10,15 +10,17 @@ import Data.IP (IPv6, toHostAddress6)
|
||||||
import System.Environment
|
import System.Environment
|
||||||
|
|
||||||
import Hash2Pub.FediChord
|
import Hash2Pub.FediChord
|
||||||
|
import Hash2Pub.FediChordTypes
|
||||||
|
import Hash2Pub.PostService (PostService (..))
|
||||||
|
|
||||||
main :: IO ()
|
main :: IO ()
|
||||||
main = do
|
main = do
|
||||||
-- ToDo: parse and pass config
|
-- ToDo: parse and pass config
|
||||||
-- probably use `tomland` for that
|
-- probably use `tomland` for that
|
||||||
conf <- readConfig
|
(fConf, sConf) <- readConfig
|
||||||
-- TODO: first initialise 'RealNode', then the vservers
|
-- TODO: first initialise 'RealNode', then the vservers
|
||||||
-- ToDo: load persisted caches, bootstrapping nodes …
|
-- ToDo: load persisted caches, bootstrapping nodes …
|
||||||
(serverSock, thisNode) <- fediChordInit conf
|
(serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
|
||||||
-- currently no masking is necessary, as there is nothing to clean up
|
-- currently no masking is necessary, as there is nothing to clean up
|
||||||
nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode
|
nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode
|
||||||
-- try joining the DHT using one of the provided bootstrapping nodes
|
-- try joining the DHT using one of the provided bootstrapping nodes
|
||||||
|
@ -41,10 +43,11 @@ main = do
|
||||||
pure ()
|
pure ()
|
||||||
|
|
||||||
|
|
||||||
readConfig :: IO FediChordConf
|
readConfig :: IO (FediChordConf, ServiceConf)
|
||||||
readConfig = do
|
readConfig = do
|
||||||
confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : _ <- getArgs
|
confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : servicePortString : speedup : _ <- getArgs
|
||||||
pure $ FediChordConf {
|
let
|
||||||
|
fConf = FediChordConf {
|
||||||
confDomain = confDomainString
|
confDomain = confDomainString
|
||||||
, confIP = toHostAddress6 . read $ ipString
|
, confIP = toHostAddress6 . read $ ipString
|
||||||
, confDhtPort = read portString
|
, confDhtPort = read portString
|
||||||
|
@ -53,3 +56,10 @@ readConfig = do
|
||||||
, confBootstrapSamplingInterval = 180
|
, confBootstrapSamplingInterval = 180
|
||||||
, confMaxLookupCacheAge = 300
|
, confMaxLookupCacheAge = 300
|
||||||
}
|
}
|
||||||
|
sConf = ServiceConf {
|
||||||
|
confSubscriptionExpiryTime = 2*3600 `div` read speedup
|
||||||
|
, confServicePort = read servicePortString
|
||||||
|
, confServiceHost = confDomainString
|
||||||
|
}
|
||||||
|
pure (fConf, sConf)
|
||||||
|
|
||||||
|
|
|
@ -95,16 +95,23 @@ import Debug.Trace (trace)
|
||||||
|
|
||||||
-- | initialise data structures, compute own IDs and bind to listening socket
|
-- | initialise data structures, compute own IDs and bind to listening socket
|
||||||
-- ToDo: load persisted state, thus this function already operates in IO
|
-- ToDo: load persisted state, thus this function already operates in IO
|
||||||
fediChordInit :: FediChordConf -> IO (Socket, LocalNodeStateSTM)
|
--fediChordInit :: (DHT d, Service s d)
|
||||||
fediChordInit initConf = do
|
-- => FediChordConf
|
||||||
|
-- -> (d -> s d) -- ^ runner function for service
|
||||||
|
-- -> IO (Socket, LocalNodeStateSTM)
|
||||||
|
fediChordInit initConf serviceRunner = do
|
||||||
emptyLookupCache <- newTVarIO Map.empty
|
emptyLookupCache <- newTVarIO Map.empty
|
||||||
let realNode = RealNode {
|
let realNode = RealNode {
|
||||||
vservers = []
|
vservers = []
|
||||||
, nodeConfig = initConf
|
, nodeConfig = initConf
|
||||||
, bootstrapNodes = confBootstrapNodes initConf
|
, bootstrapNodes = confBootstrapNodes initConf
|
||||||
, lookupCacheSTM = emptyLookupCache
|
, lookupCacheSTM = emptyLookupCache
|
||||||
|
--, service = undefined
|
||||||
}
|
}
|
||||||
realNodeSTM <- newTVarIO realNode
|
realNodeSTM <- newTVarIO realNode
|
||||||
|
-- launch service and set the reference in the RealNode
|
||||||
|
serv <- serviceRunner realNodeSTM
|
||||||
|
--atomically . writeTVar $ realNode { service = serv }
|
||||||
initialState <- nodeStateInit realNodeSTM
|
initialState <- nodeStateInit realNodeSTM
|
||||||
initialStateSTM <- newTVarIO initialState
|
initialStateSTM <- newTVarIO initialState
|
||||||
serverSock <- mkServerSocket (getIpAddr initialState) (getDhtPort initialState)
|
serverSock <- mkServerSocket (getIpAddr initialState) (getDhtPort initialState)
|
||||||
|
|
|
@ -58,11 +58,14 @@ module Hash2Pub.FediChordTypes (
|
||||||
, bsAsIpAddr
|
, bsAsIpAddr
|
||||||
, FediChordConf(..)
|
, FediChordConf(..)
|
||||||
, DHT(..)
|
, DHT(..)
|
||||||
|
, Service(..)
|
||||||
|
, ServiceConf(..)
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Control.Exception
|
import Control.Exception
|
||||||
import Data.Foldable (foldr')
|
import Data.Foldable (foldr')
|
||||||
import Data.Function (on)
|
import Data.Function (on)
|
||||||
|
import qualified Data.Hashable as Hashable
|
||||||
import Data.List (delete, nub, sortBy)
|
import Data.List (delete, nub, sortBy)
|
||||||
import qualified Data.Map.Strict as Map
|
import qualified Data.Map.Strict as Map
|
||||||
import Data.Maybe (fromJust, fromMaybe, isJust,
|
import Data.Maybe (fromJust, fromMaybe, isJust,
|
||||||
|
@ -144,6 +147,7 @@ a `localCompare` b
|
||||||
-- | Data for managing the virtual server nodes of this real node.
|
-- | Data for managing the virtual server nodes of this real node.
|
||||||
-- Also contains shared data and config values.
|
-- Also contains shared data and config values.
|
||||||
-- TODO: more data structures for k-choices bookkeeping
|
-- TODO: more data structures for k-choices bookkeeping
|
||||||
|
--data RealNode s = RealNode
|
||||||
data RealNode = RealNode
|
data RealNode = RealNode
|
||||||
{ vservers :: [LocalNodeStateSTM]
|
{ vservers :: [LocalNodeStateSTM]
|
||||||
-- ^ references to all active versers
|
-- ^ references to all active versers
|
||||||
|
@ -155,6 +159,7 @@ data RealNode = RealNode
|
||||||
-- ^ a global cache of looked up keys and their associated nodes
|
-- ^ a global cache of looked up keys and their associated nodes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
--type RealNodeSTM s = TVar (RealNode s)
|
||||||
type RealNodeSTM = TVar RealNode
|
type RealNodeSTM = TVar RealNode
|
||||||
|
|
||||||
-- | represents a node and all its important state
|
-- | represents a node and all its important state
|
||||||
|
@ -411,6 +416,26 @@ data FediChordConf = FediChordConf
|
||||||
}
|
}
|
||||||
deriving (Show, Eq)
|
deriving (Show, Eq)
|
||||||
|
|
||||||
|
-- ====== Service Types ============
|
||||||
|
|
||||||
|
class Service s d where
|
||||||
|
-- | run the service
|
||||||
|
runService :: ServiceConf -> d -> IO (s d)
|
||||||
|
getServicePort' :: (Integral i) => s d -> i
|
||||||
|
|
||||||
|
instance Hashable.Hashable NodeID where
|
||||||
|
hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID
|
||||||
|
hash = Hashable.hash . getNodeID
|
||||||
|
|
||||||
|
data ServiceConf = ServiceConf
|
||||||
|
{ confSubscriptionExpiryTime :: Integer
|
||||||
|
-- ^ subscription lease expiration in seconds
|
||||||
|
, confServicePort :: Int
|
||||||
|
-- ^ listening port for service
|
||||||
|
, confServiceHost :: String
|
||||||
|
-- ^ hostname of service
|
||||||
|
}
|
||||||
|
|
||||||
class DHT d where
|
class DHT d where
|
||||||
-- | lookup the responsible host handling a given key string,
|
-- | lookup the responsible host handling a given key string,
|
||||||
-- possiblggy from a lookup cache
|
-- possiblggy from a lookup cache
|
||||||
|
|
|
@ -32,12 +32,10 @@ import Servant
|
||||||
|
|
||||||
import Hash2Pub.FediChordTypes
|
import Hash2Pub.FediChordTypes
|
||||||
import Hash2Pub.RingMap
|
import Hash2Pub.RingMap
|
||||||
import Hash2Pub.ServiceTypes
|
|
||||||
|
|
||||||
|
|
||||||
data PostService d = PostService
|
data PostService d = PostService
|
||||||
{ psPort :: Warp.Port
|
{ serviceConf :: ServiceConf
|
||||||
, psHost :: String
|
|
||||||
-- queues, other data structures
|
-- queues, other data structures
|
||||||
, baseDHT :: (DHT d) => d
|
, baseDHT :: (DHT d) => d
|
||||||
, serviceThread :: TVar ThreadId
|
, serviceThread :: TVar ThreadId
|
||||||
|
@ -66,7 +64,7 @@ type TagSubscribers = (HMap.HashMap (String, Int) (TChan PostID, POSIXTime))
|
||||||
|
|
||||||
instance DHT d => Service PostService d where
|
instance DHT d => Service PostService d where
|
||||||
-- | initialise 'PostService' data structures and run server
|
-- | initialise 'PostService' data structures and run server
|
||||||
runService dht host port = do
|
runService conf dht = do
|
||||||
-- create necessary TVars
|
-- create necessary TVars
|
||||||
threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder
|
threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder
|
||||||
subscriberVar <- newTVarIO emptyRMap
|
subscriberVar <- newTVarIO emptyRMap
|
||||||
|
@ -75,8 +73,7 @@ instance DHT d => Service PostService d where
|
||||||
relayInQueue' <- newTQueueIO
|
relayInQueue' <- newTQueueIO
|
||||||
let
|
let
|
||||||
thisService = PostService {
|
thisService = PostService {
|
||||||
psPort = port'
|
serviceConf = conf
|
||||||
, psHost = host
|
|
||||||
, baseDHT = dht
|
, baseDHT = dht
|
||||||
, serviceThread = threadVar
|
, serviceThread = threadVar
|
||||||
, subscribers = subscriberVar
|
, subscribers = subscriberVar
|
||||||
|
@ -84,8 +81,8 @@ instance DHT d => Service PostService d where
|
||||||
, ownPosts = ownPostVar
|
, ownPosts = ownPostVar
|
||||||
, relayInQueue = relayInQueue'
|
, relayInQueue = relayInQueue'
|
||||||
}
|
}
|
||||||
port' = fromIntegral port
|
port' = fromIntegral (confServicePort conf)
|
||||||
warpSettings = Warp.setPort port' . Warp.setHost (fromString host) $ Warp.defaultSettings
|
warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
|
||||||
-- Run 'concurrently_' from another thread to be able to return the
|
-- Run 'concurrently_' from another thread to be able to return the
|
||||||
-- 'PostService'.
|
-- 'PostService'.
|
||||||
-- Terminating that parent thread will make all child threads terminate as well.
|
-- Terminating that parent thread will make all child threads terminate as well.
|
||||||
|
@ -98,7 +95,7 @@ instance DHT d => Service PostService d where
|
||||||
atomically $ writeTVar threadVar servThreadID
|
atomically $ writeTVar threadVar servThreadID
|
||||||
pure thisService
|
pure thisService
|
||||||
|
|
||||||
getServicePort s = fromIntegral $ psPort s
|
getServicePort' = fromIntegral . confServicePort . serviceConf
|
||||||
|
|
||||||
|
|
||||||
-- | return a WAI application
|
-- | return a WAI application
|
||||||
|
|
|
@ -1,15 +0,0 @@
|
||||||
{-# LANGUAGE MultiParamTypeClasses #-}
|
|
||||||
module Hash2Pub.ServiceTypes where
|
|
||||||
|
|
||||||
import Data.Hashable (Hashable (..))
|
|
||||||
|
|
||||||
import Hash2Pub.FediChord (DHT (..), NodeID (..))
|
|
||||||
|
|
||||||
class Service s d where
|
|
||||||
-- | run the service
|
|
||||||
runService :: (Integral i) => d -> String -> i -> IO (s d)
|
|
||||||
getServicePort :: (Integral i) => s d -> i
|
|
||||||
|
|
||||||
instance Hashable NodeID where
|
|
||||||
hashWithSalt salt = hashWithSalt salt . getNodeID
|
|
||||||
hash = hash . getNodeID
|
|
Loading…
Reference in a new issue