565 lines
23 KiB
Haskell
565 lines
23 KiB
Haskell
{-# LANGUAGE DataKinds #-}
|
|
{-# LANGUAGE DerivingStrategies #-}
|
|
{-# LANGUAGE FlexibleContexts #-}
|
|
{-# LANGUAGE FlexibleInstances #-}
|
|
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
|
|
{-# LANGUAGE MultiParamTypeClasses #-}
|
|
{-# LANGUAGE OverloadedStrings #-}
|
|
{-# LANGUAGE RankNTypes #-}
|
|
|
|
module Hash2Pub.FediChordTypes
|
|
( NodeID -- abstract, but newtype constructors cannot be hidden
|
|
, idBits
|
|
, getNodeID
|
|
, toNodeID
|
|
, NodeState (..)
|
|
, LocalNodeState (..)
|
|
, LocalNodeStateSTM
|
|
, RemoteNodeState (..)
|
|
, RealNode (..)
|
|
, RealNodeSTM
|
|
, VSMap
|
|
, LoadStats (..)
|
|
, emptyLoadStats
|
|
, remainingLoadTarget
|
|
, loadSliceSum
|
|
, addVserver
|
|
, SegmentLoadStats (..)
|
|
, setSuccessors
|
|
, setPredecessors
|
|
, NodeCache
|
|
, NodeCacheEntry
|
|
, LookupCache
|
|
, LookupCacheEntry
|
|
, CacheEntry(..)
|
|
, RingEntry(..)
|
|
, RingMap(..)
|
|
, HasKeyID(..)
|
|
, rMapSize
|
|
, rMapLookup
|
|
, rMapLookupPred
|
|
, rMapLookupSucc
|
|
, addRMapEntry
|
|
, addRMapEntryWith
|
|
, addPredecessors
|
|
, addSuccessors
|
|
, takeRMapPredecessors
|
|
, takeRMapSuccessors
|
|
, deleteRMapEntry
|
|
, setRMapEntries
|
|
, rMapFromList
|
|
, rMapToList
|
|
, cacheGetNodeStateUnvalidated
|
|
, initCache
|
|
, nodeCacheEntries
|
|
, cacheLookup
|
|
, cacheLookupSucc
|
|
, cacheLookupPred
|
|
, localCompare
|
|
, genNodeID
|
|
, genNodeIDBS
|
|
, hasValidNodeId
|
|
, genKeyID
|
|
, genKeyIDBS
|
|
, byteStringToUInteger
|
|
, ipAddrAsBS
|
|
, bsAsIpAddr
|
|
, FediChordConf(..)
|
|
, DHT(..)
|
|
, Service(..)
|
|
, ServiceConf(..)
|
|
) where
|
|
|
|
import Control.Exception
|
|
import Data.Foldable (foldr')
|
|
import Data.Function (on)
|
|
import qualified Data.Hashable as Hashable
|
|
import Data.HashMap.Strict (HashMap)
|
|
import qualified Data.HashMap.Strict as HMap
|
|
import Data.List (delete, nub, sortBy)
|
|
import qualified Data.Map.Strict as Map
|
|
import Data.Maybe (fromJust, fromMaybe, isJust,
|
|
isNothing, mapMaybe)
|
|
import qualified Data.Set as Set
|
|
import Data.Time.Clock.POSIX
|
|
import Network.Socket
|
|
|
|
-- for hashing and ID conversion
|
|
import Control.Concurrent.STM
|
|
import Control.Concurrent.STM.TQueue
|
|
import Control.Concurrent.STM.TVar
|
|
import Control.Monad (forever)
|
|
import Crypto.Hash
|
|
import qualified Data.ByteArray as BA
|
|
import qualified Data.ByteString as BS
|
|
import qualified Data.ByteString.UTF8 as BSU
|
|
import Data.IP (IPv6, fromHostAddress6,
|
|
toHostAddress6)
|
|
import Data.Typeable (Typeable (..), typeOf)
|
|
import Data.Word
|
|
import qualified Network.ByteOrder as NetworkBytes
|
|
|
|
import Hash2Pub.RingMap
|
|
import Hash2Pub.Utils
|
|
|
|
import Debug.Trace (trace)
|
|
|
|
|
|
|
|
-- define protocol constants
|
|
-- | static definition of ID length in bits
|
|
idBits :: Integer
|
|
idBits = 256
|
|
|
|
-- |NodeIDs are Integers wrapped in a newtype, to be able to redefine
|
|
-- their instance behaviour
|
|
--
|
|
-- for being able to check value bounds, the constructor should not be used directly
|
|
-- and new values are created via @toNodeID@ (newtype constructors cannot be hidden)
|
|
newtype NodeID = NodeID { getNodeID :: Integer } deriving stock (Show, Eq) deriving newtype Enum
|
|
|
|
-- |smart data constructor for NodeID that throws a runtime exception for out-of-bounds values.
|
|
-- When needing a runtime-safe constructor with drawbacks, try @fromInteger@
|
|
toNodeID :: Integer -> NodeID
|
|
toNodeID i = assert (i >= getNodeID minBound && i <= getNodeID maxBound) $ NodeID i
|
|
|
|
-- |NodeIDs are bounded by the value range of an unsigned Integer of length 'idBits'
|
|
instance Bounded NodeID where
|
|
minBound = NodeID 0
|
|
maxBound = NodeID (2^idBits - 1)
|
|
|
|
-- |calculations with NodeIDs are modular arithmetic operations
|
|
instance Num NodeID where
|
|
a + b = NodeID $ (getNodeID a + getNodeID b) `mod` (getNodeID maxBound + 1)
|
|
a * b = NodeID $ (getNodeID a * getNodeID b) `mod` (getNodeID maxBound + 1)
|
|
a - b = NodeID $ (getNodeID a - getNodeID b) `mod` (getNodeID maxBound + 1)
|
|
-- |safe constructor for NodeID values with the drawback, that out-of-bound values are wrapped around
|
|
-- with modulo to fit in the allowed value space. For runtime checking, look at @toNodeID@.
|
|
fromInteger i = NodeID $ i `mod` (getNodeID maxBound + 1)
|
|
signum = NodeID . signum . getNodeID
|
|
abs = NodeID . abs . getNodeID -- ToDo: make sure that at creation time only IDs within the range are used
|
|
|
|
-- | use normal strict monotonic ordering of integers, realising the ring structure
|
|
-- is done in the @NodeCache@ implementation
|
|
instance Ord NodeID where
|
|
a `compare` b = getNodeID a `compare` getNodeID b
|
|
|
|
-- | local comparison of 2 node IDs, only relevant for determining a successor or predecessor on caches with just 2 nodes
|
|
localCompare :: NodeID -> NodeID -> Ordering
|
|
a `localCompare` b
|
|
| getNodeID a == getNodeID b = EQ
|
|
| wayForwards > wayBackwards = GT
|
|
| otherwise = LT
|
|
where
|
|
wayForwards = getNodeID (b - a)
|
|
wayBackwards = getNodeID (a - b)
|
|
|
|
-- | Data for managing the virtual server nodes of this real node.
|
|
-- Also contains shared data and config values.
|
|
-- TODO: more data structures for k-choices bookkeeping
|
|
data RealNode s = RealNode
|
|
{ vservers :: VSMap s
|
|
-- ^ map of all active VServer node IDs to their node state
|
|
, nodeConfig :: FediChordConf
|
|
-- ^ holds the initial configuration read at program start
|
|
, bootstrapNodes :: [(String, PortNumber)]
|
|
-- ^ nodes to be used as bootstrapping points, new ones learned during operation
|
|
, lookupCacheSTM :: TVar LookupCache
|
|
-- ^ a global cache of looked up keys and their associated nodes
|
|
, globalNodeCacheSTM :: TVar NodeCache
|
|
-- ^ EpiChord node cache with expiry times for nodes.
|
|
, globalCacheWriteQueue :: TQueue (NodeCache -> NodeCache)
|
|
-- ^ cache updates are not written directly to the 'globalNodeCacheSTM'
|
|
, nodeService :: s (RealNodeSTM s)
|
|
}
|
|
|
|
-- | insert a new vserver mapping into a node
|
|
addVserver :: (NodeID, LocalNodeStateSTM s) -> RealNode s -> RealNode s
|
|
addVserver (key, nstate) node = node
|
|
{ vservers = addRMapEntry key nstate (vservers node) }
|
|
|
|
type VSMap s = RingMap NodeID (LocalNodeStateSTM s)
|
|
type RealNodeSTM s = TVar (RealNode s)
|
|
|
|
-- | represents a node and all its important state
|
|
data RemoteNodeState = RemoteNodeState
|
|
{ nid :: NodeID
|
|
, domain :: String
|
|
-- ^ full public domain name the node is reachable under
|
|
, ipAddr :: HostAddress6
|
|
-- the node's public IPv6 address
|
|
, dhtPort :: PortNumber
|
|
-- ^ port of the DHT itself
|
|
, servicePort :: PortNumber
|
|
-- ^ port of the service provided on top of the DHT
|
|
, vServerID :: Word8
|
|
-- ^ ID of this vserver
|
|
}
|
|
deriving (Show, Eq)
|
|
|
|
instance Ord RemoteNodeState where
|
|
a `compare` b = nid a `compare` nid b
|
|
|
|
-- | represents a node and encapsulates all data and parameters that are not present for remote nodes
|
|
data LocalNodeState s = LocalNodeState
|
|
{ nodeState :: RemoteNodeState
|
|
-- ^ represents common data present both in remote and local node representations
|
|
, nodeCacheSTM :: TVar NodeCache
|
|
-- ^ reference to the 'globalNodeCacheSTM'
|
|
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
|
|
-- ^ reference to the 'globalCacheWriteQueue
|
|
, successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well
|
|
-- ^ successor nodes in ascending order by distance
|
|
, predecessors :: [RemoteNodeState]
|
|
-- ^ predecessor nodes in ascending order by distance
|
|
, kNeighbours :: Int
|
|
-- ^ desired length of predecessor and successor list
|
|
, lNumBestNodes :: Int
|
|
-- ^ number of best next hops to provide
|
|
, pNumParallelQueries :: Int
|
|
-- ^ number of parallel sent queries
|
|
, jEntriesPerSlice :: Int
|
|
-- ^ number of desired entries per cache slice
|
|
, parentRealNode :: RealNodeSTM s
|
|
-- ^ the parent node managing this vserver instance
|
|
}
|
|
deriving (Show, Eq)
|
|
|
|
-- | for concurrent access, LocalNodeState is wrapped in a TVar
|
|
type LocalNodeStateSTM s = TVar (LocalNodeState s)
|
|
|
|
-- | class for various NodeState representations, providing
|
|
-- getters and setters for common values
|
|
class NodeState a where
|
|
-- getters for common properties
|
|
getNid :: a -> NodeID
|
|
getDomain :: a -> String
|
|
getIpAddr :: a -> HostAddress6
|
|
getDhtPort :: a -> PortNumber
|
|
getServicePort :: a -> PortNumber
|
|
getVServerID :: a -> Word8
|
|
-- setters for common properties
|
|
setNid :: NodeID -> a -> a
|
|
setDomain :: String -> a -> a
|
|
setIpAddr :: HostAddress6 -> a -> a
|
|
setDhtPort :: PortNumber -> a -> a
|
|
setServicePort :: PortNumber -> a -> a
|
|
setVServerID :: Word8 -> a -> a
|
|
toRemoteNodeState :: a -> RemoteNodeState
|
|
|
|
instance NodeState RemoteNodeState where
|
|
getNid = nid
|
|
getDomain = domain
|
|
getIpAddr = ipAddr
|
|
getDhtPort = dhtPort
|
|
getServicePort = servicePort
|
|
getVServerID = vServerID
|
|
setNid nid' ns = ns {nid = nid'}
|
|
setDomain domain' ns = ns {domain = domain'}
|
|
setIpAddr ipAddr' ns = ns {ipAddr = ipAddr'}
|
|
setDhtPort dhtPort' ns = ns {dhtPort = dhtPort'}
|
|
setServicePort servicePort' ns = ns {servicePort = servicePort'}
|
|
setVServerID vServerID' ns = ns {vServerID = vServerID'}
|
|
toRemoteNodeState = id
|
|
|
|
-- | helper function for setting values on the 'RemoteNodeState' contained in the 'LocalNodeState'
|
|
propagateNodeStateSet_ :: (RemoteNodeState -> RemoteNodeState) -> LocalNodeState s -> LocalNodeState s
|
|
propagateNodeStateSet_ func ns = let
|
|
newNs = func $ nodeState ns
|
|
in
|
|
ns {nodeState = newNs}
|
|
|
|
|
|
instance NodeState (LocalNodeState s) where
|
|
getNid = getNid . nodeState
|
|
getDomain = getDomain . nodeState
|
|
getIpAddr = getIpAddr . nodeState
|
|
getDhtPort = getDhtPort . nodeState
|
|
getServicePort = getServicePort . nodeState
|
|
getVServerID = getVServerID . nodeState
|
|
setNid nid' = propagateNodeStateSet_ $ setNid nid'
|
|
setDomain domain' = propagateNodeStateSet_ $ setDomain domain'
|
|
setIpAddr ipAddr' = propagateNodeStateSet_ $ setIpAddr ipAddr'
|
|
setDhtPort dhtPort' = propagateNodeStateSet_ $ setDhtPort dhtPort'
|
|
setServicePort servicePort' = propagateNodeStateSet_ $ setServicePort servicePort'
|
|
setVServerID vServerID' = propagateNodeStateSet_ $ setVServerID vServerID'
|
|
toRemoteNodeState = nodeState
|
|
|
|
-- | defining Show instances to be able to print NodeState for debug purposes
|
|
instance Typeable a => Show (TVar a) where
|
|
show x = show (typeOf x)
|
|
|
|
instance Typeable a => Show (TQueue a) where
|
|
show x = show (typeOf x)
|
|
|
|
instance Typeable a => Show (TChan a) where
|
|
show x = show (typeOf x)
|
|
|
|
|
|
-- | convenience function that replaces the predecessors of a 'LocalNodeState' with the k closest nodes from the provided list
|
|
setPredecessors :: [RemoteNodeState] -> LocalNodeState s -> LocalNodeState s
|
|
setPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . rMapFromList . fmap keyValuePair . filter ((/=) (getNid ns) . getNid) $ preds}
|
|
|
|
-- | convenience function that replaces the successors of a 'LocalNodeState' with the k closest nodes from the provided list
|
|
setSuccessors :: [RemoteNodeState] -> LocalNodeState s -> LocalNodeState s
|
|
setSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . rMapFromList . fmap keyValuePair . filter ((/=) (getNid ns) . getNid) $ succs}
|
|
|
|
-- | sets the predecessors of a 'LocalNodeState' to the closest k nodes of the current predecessors and the provided list, combined
|
|
addPredecessors :: [RemoteNodeState] -> LocalNodeState s -> LocalNodeState s
|
|
addPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . addRMapEntries (keyValuePair <$> filter ((/=) (getNid ns) . getNid) preds) . rMapFromList . fmap keyValuePair $ predecessors ns}
|
|
|
|
-- | sets the successors of a 'LocalNodeState' to the closest k nodes of the current successors and the provided list, combined
|
|
addSuccessors :: [RemoteNodeState] -> LocalNodeState s -> LocalNodeState s
|
|
addSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . addRMapEntries (keyValuePair <$> filter ((/=) (getNid ns) . getNid) succs) . rMapFromList . fmap keyValuePair $ successors ns}
|
|
|
|
instance HasKeyID NodeID RemoteNodeState where
|
|
getKeyID = getNid
|
|
|
|
instance HasKeyID k a => HasKeyID k (CacheEntry a) where
|
|
getKeyID (CacheEntry _ obj _) = getKeyID obj
|
|
|
|
instance HasKeyID NodeID NodeID where
|
|
getKeyID = id
|
|
|
|
type NodeCacheEntry = CacheEntry RemoteNodeState
|
|
type NodeCache = RingMap NodeID NodeCacheEntry
|
|
|
|
type LookupCacheEntry = CacheEntry (String, PortNumber)
|
|
type LookupCache = Map.Map NodeID LookupCacheEntry
|
|
|
|
-- | 'RingEntry' type for usage as a node cache
|
|
data CacheEntry a = CacheEntry Bool a POSIXTime
|
|
deriving (Show, Eq)
|
|
|
|
--- useful function for getting entries for a full cache transfer
|
|
nodeCacheEntries :: NodeCache -> [NodeCacheEntry]
|
|
nodeCacheEntries = mapMaybe extractRingEntry . Map.elems . getRingMap
|
|
|
|
initCache :: NodeCache
|
|
initCache = emptyRMap
|
|
|
|
cacheLookup :: NodeID -- ^lookup key
|
|
-> NodeCache -- ^lookup cache
|
|
-> Maybe NodeCacheEntry
|
|
cacheLookup = rMapLookup
|
|
|
|
cacheLookupSucc :: NodeID -- ^lookup key
|
|
-> NodeCache -- ^ring cache
|
|
-> Maybe NodeCacheEntry
|
|
cacheLookupSucc key cache = snd <$> rMapLookupSucc key cache
|
|
|
|
cacheLookupPred :: NodeID -- ^lookup key
|
|
-> NodeCache -- ^ring cache
|
|
-> Maybe NodeCacheEntry
|
|
cacheLookupPred key cache = snd <$> rMapLookupPred key cache
|
|
|
|
-- clean up cache entries: once now - entry > maxAge
|
|
-- transfer difference now - entry to other node
|
|
|
|
-- | return the @NodeState@ data from a cache entry without checking its validation status
|
|
cacheGetNodeStateUnvalidated :: CacheEntry RemoteNodeState -> RemoteNodeState
|
|
cacheGetNodeStateUnvalidated (CacheEntry _ nState _) = nState
|
|
|
|
-- | converts a 'HostAddress6' IP address to a big-endian strict ByteString
|
|
ipAddrAsBS :: HostAddress6 -> BS.ByteString
|
|
ipAddrAsBS (a, b, c, d) = mconcat $ fmap NetworkBytes.bytestring32 [a, b, c, d]
|
|
|
|
-- | converts a ByteString in big endian order to an IPv6 address 'HostAddress6'
|
|
bsAsIpAddr :: BS.ByteString -> HostAddress6
|
|
bsAsIpAddr bytes = (a,b,c,d)
|
|
where
|
|
a:b:c:d:_ = fmap NetworkBytes.word32 . chunkBytes 4 $ bytes
|
|
|
|
|
|
-- | generates a 256 bit long NodeID using SHAKE128, represented as ByteString
|
|
genNodeIDBS :: HostAddress6 -- ^a node's IPv6 address
|
|
-> String -- ^a node's 1st and 2nd level domain name
|
|
-> Word8 -- ^the used vserver ID
|
|
-> BS.ByteString -- ^the NodeID as a 256bit ByteString big-endian unsigned integer
|
|
genNodeIDBS ip nodeDomain vserver =
|
|
hashIpaddrUpper `BS.append` hashID nodeDomain' `BS.append` hashIpaddLower
|
|
where
|
|
vsBS = BS.pack [vserver] -- attention: only works for vserver IDs up to 255
|
|
ipaddrNet = BS.take 8 (ipAddrAsBS ip) `BS.append` vsBS
|
|
nodeDomain' = BSU.fromString nodeDomain `BS.append` vsBS
|
|
hashID bstr = BS.pack . BA.unpack $ (hash bstr :: Digest (SHAKE128 128))
|
|
(hashIpaddrUpper, hashIpaddLower) = BS.splitAt 64 $ hashID ipaddrNet
|
|
|
|
|
|
-- | generates a 256 bit long @NodeID@ using SHAKE128
|
|
genNodeID :: HostAddress6 -- ^a node's IPv6 address
|
|
-> String -- ^a node's 1st and 2nd level domain name
|
|
-> Word8 -- ^the used vserver ID
|
|
-> NodeID -- ^the generated @NodeID@
|
|
genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs
|
|
|
|
|
|
hasValidNodeId :: Word8 -> RemoteNodeState -> HostAddress6 -> Bool
|
|
hasValidNodeId numVs rns addr = getVServerID rns < numVs && getNid rns == genNodeID addr (getDomain rns) (getVServerID rns)
|
|
|
|
|
|
-- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT
|
|
genKeyIDBS :: String -- ^the key string
|
|
-> BS.ByteString -- ^the key ID represented as a @ByteString@
|
|
genKeyIDBS key = BS.pack . BA.unpack $ (hash (BSU.fromString key) :: Digest SHA3_256)
|
|
|
|
-- | generates a 256 bit long key identifier for looking up its data on the DHT
|
|
genKeyID :: String -- ^the key string
|
|
-> NodeID -- ^the key ID
|
|
genKeyID = NodeID . byteStringToUInteger . genKeyIDBS
|
|
|
|
|
|
-- | parses the bit pattern of a ByteString as an unsigned Integer in Big Endian order
|
|
-- by iterating it byte-wise from the back and shifting the byte values according to their offset
|
|
byteStringToUInteger :: BS.ByteString -> Integer
|
|
byteStringToUInteger bs = sum $ parsedBytes 0 bs
|
|
where
|
|
parsedBytes :: Integer -> BS.ByteString -> [ Integer ]
|
|
parsedBytes offset uintBs = case BS.unsnoc uintBs of
|
|
Nothing -> []
|
|
Just (bs', w) -> parseWithOffset offset w : parsedBytes (offset+1) bs'
|
|
|
|
parseWithOffset :: Integer -> Word8 -> Integer
|
|
parseWithOffset 0 word = toInteger word -- a shift of 0 is always 0
|
|
parseWithOffset offset word = toInteger word * 2^(8 * offset)
|
|
|
|
-- Todo: DHT backend can learn potential initial bootstrapping points through the instances mentioned in the received AP-relay messages
|
|
-- persist them on disk so they can be used for all following bootstraps
|
|
|
|
-- | configuration values used for initialising the FediChord DHT
|
|
data FediChordConf = FediChordConf
|
|
{ confDomain :: String
|
|
-- ^ the domain/ hostname the node is reachable under
|
|
, confIP :: HostAddress6
|
|
-- ^ IP address of outgoing packets
|
|
, confDhtPort :: Int
|
|
-- ^ listening port for the FediChord DHT
|
|
, confBootstrapNodes :: [(String, PortNumber)]
|
|
-- ^ list of potential bootstrapping nodes
|
|
, confStabiliseInterval :: Int
|
|
-- ^ pause between stabilise runs, in milliseconds
|
|
, confBootstrapSamplingInterval :: Int
|
|
-- ^ pause between sampling the own ID through bootstrap nodes, in milliseconds
|
|
, confMaxLookupCacheAge :: POSIXTime
|
|
-- ^ maximum age of key lookup cache entries in seconds
|
|
, confJoinAttemptsInterval :: Int
|
|
-- ^ interval between join attempts on newly learned nodes, in milliseconds
|
|
, confMaxNodeCacheAge :: POSIXTime
|
|
-- ^ maximum age of entries in the node cache, in milliseconds
|
|
, confResponsePurgeAge :: POSIXTime
|
|
-- ^ maximum age of message parts in response part cache, in seconds
|
|
, confRequestTimeout :: Int
|
|
-- ^ how long to wait until response has arrived, in milliseconds
|
|
, confRequestRetries :: Int
|
|
-- ^ how often re-sending a timed-out request can be retried
|
|
, confEnableKChoices :: Bool
|
|
-- ^ whether to enable k-choices load balancing
|
|
, confKChoicesOverload :: Double
|
|
-- ^ fraction of capacity above which a node considers itself overloaded
|
|
, confKChoicesUnderload :: Double
|
|
-- ^ fraction of capacity below which a node considers itself underloaded
|
|
, confKChoicesMaxVS :: Word8
|
|
-- ^ upper limit of vserver index κ
|
|
, confKChoicesRebalanceInterval :: Int
|
|
-- ^ interval between vserver rebalance attempts
|
|
}
|
|
deriving (Show, Eq)
|
|
|
|
-- ====== k-choices load balancing types ======
|
|
|
|
data LoadStats = LoadStats
|
|
{ loadPerTag :: RingMap NodeID Double
|
|
-- ^ map of loads for each handled tag
|
|
, totalCapacity :: Double
|
|
-- ^ total designated capacity of the service
|
|
, compensatedLoadSum :: Double
|
|
-- ^ effective load reevant for load balancing after compensating for
|
|
}
|
|
deriving (Show, Eq)
|
|
|
|
-- | calculates the mismatch from the target load by taking into account the
|
|
-- underload and overload limits
|
|
remainingLoadTarget :: FediChordConf -> LoadStats -> Double
|
|
remainingLoadTarget conf lstats = targetLoad - compensatedLoadSum lstats
|
|
where
|
|
targetLoad = totalCapacity lstats * (confKChoicesUnderload conf + confKChoicesOverload conf) / 2
|
|
|
|
|
|
-- | calculates the sum of tag load in a contiguous slice between to keys
|
|
loadSliceSum :: LoadStats
|
|
-> NodeID -- ^ lower segment bound
|
|
-> NodeID -- ^ upper segment bound
|
|
-> Double -- ^ sum of all tag loads within that segment
|
|
loadSliceSum stats from to = sum . takeRMapSuccessorsFromTo from to $ loadPerTag stats
|
|
|
|
|
|
data SegmentLoadStats = SegmentLoadStats
|
|
{ segmentLowerKeyBound :: NodeID
|
|
-- ^ segment start key
|
|
, segmentUpperKeyBound :: NodeID
|
|
-- ^ segment end key
|
|
, segmentLoad :: Double
|
|
-- ^ sum of load of all keys in the segment
|
|
, segmentOwnerRemainingLoadTarget :: Double
|
|
-- ^ remaining load target of the current segment handler:
|
|
, segmentOwnerCapacity :: Double
|
|
-- ^ total capacity of the current segment handler node, used for normalisation
|
|
, segmentCurrentOwner :: RemoteNodeState
|
|
-- ^ the current owner of the segment that needs to be joined on
|
|
}
|
|
|
|
-- TODO: figure out a better way of initialising
|
|
emptyLoadStats :: LoadStats
|
|
emptyLoadStats = LoadStats
|
|
{ loadPerTag = emptyRMap
|
|
, totalCapacity = 0
|
|
, compensatedLoadSum = 0
|
|
}
|
|
|
|
-- ====== Service Types ============
|
|
|
|
class Service s d where
|
|
-- | run the service
|
|
runService :: ServiceConf -> d -> IO (s d)
|
|
getListeningPortFromService :: (Integral i) => s d -> i
|
|
-- | trigger a service data migration of data between the two given keys
|
|
migrateData :: s d
|
|
-> NodeID -- ^ source/ sender node ID
|
|
-> NodeID -- ^ start key
|
|
-> NodeID -- ^ end key
|
|
-> (String, Int) -- ^ hostname and port of target service
|
|
-> IO (Either String ()) -- ^ success or failure
|
|
-- | Wait for an incoming migration from a given node to succeed, may block forever
|
|
waitForMigrationFrom :: s d -> NodeID -> IO ()
|
|
getServiceLoadStats :: s d -> IO LoadStats
|
|
|
|
instance Hashable.Hashable NodeID where
|
|
hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID
|
|
hash = Hashable.hash . getNodeID
|
|
|
|
data ServiceConf = ServiceConf
|
|
{ confSubscriptionExpiryTime :: POSIXTime
|
|
-- ^ subscription lease expiration in seconds
|
|
, confServicePort :: Int
|
|
-- ^ listening port for service
|
|
, confServiceHost :: String
|
|
-- ^ hostname of service
|
|
, confLogfilePath :: String
|
|
-- ^ where to store the (measurement) log file
|
|
, confStatsEvalDelay :: Int
|
|
-- ^ delay between statistic rate measurement samplings, in microseconds
|
|
, confSpeedupFactor :: Int
|
|
-- While the speedup factor needs to be already included in all
|
|
}
|
|
|
|
class DHT d where
|
|
-- | lookup the responsible host handling a given key string,
|
|
-- possiblggy from a lookup cache
|
|
lookupKey :: d -> String -> IO (Maybe (String, PortNumber))
|
|
-- | lookup the responsible host handling a given key string,
|
|
-- but force the DHT to do a fresh lookup instead of returning a cached result.
|
|
-- Also invalidates old cache entries.
|
|
forceLookupKey :: d -> String -> IO (Maybe (String, PortNumber))
|
|
isResponsibleFor :: d -> NodeID -> IO Bool
|
|
isResponsibleForSTM :: d -> NodeID -> STM Bool
|