Hash2Pub/src/Hash2Pub/FediChord.hs

536 lines
23 KiB
Haskell

{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}
{- |
Module : FediChord
Description : An opinionated implementation of the EpiChord DHT by Leong et al.
Copyright : (c) schmittlauch, 2019-2020
License : AGPL-3
Stability : experimental
Modernised EpiChord + k-choices load balancing
-}
module Hash2Pub.FediChord (
NodeID -- abstract, but newtype constructors cannot be hidden
, getNodeID
, toNodeID
, NodeState (..)
, InternalNodeState (..)
, getNodeCacheRef
, putNodeCache
, getSuccessors
, putSuccessors
, getPredecessors
, putPredecessors
, getLNumBestNodes
, NodeCache
, CacheEntry(..)
, cacheGetNodeStateUnvalidated
, initCache
, cacheLookup
, cacheLookupSucc
, cacheLookupPred
, localCompare
, genNodeID
, genNodeIDBS
, genKeyID
, genKeyIDBS
, byteStringToUInteger
, ipAddrAsBS
, bsAsIpAddr
, FediChordConf(..)
, fediChordInit
, nodeStateInit
, mkServerSocket
, resolve
, cacheWriter
) where
import Control.Exception
import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe, isJust, mapMaybe)
import Data.Time.Clock.POSIX
import Network.Socket
-- for hashing and ID conversion
import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue
import Control.Monad (forever)
import Crypto.Hash
import qualified Data.ByteArray as BA
import qualified Data.ByteString as BS
import qualified Data.ByteString.UTF8 as BSU
import Data.IORef
import Data.IP (IPv6, fromHostAddress6,
toHostAddress6)
import Data.Typeable (Typeable (..), typeOf)
import Data.Word
import qualified Network.ByteOrder as NetworkBytes
import Hash2Pub.Utils
import Debug.Trace (trace)
-- define protocol constants
-- | static definition of ID length in bits
idBits :: Integer
idBits = 256
-- |NodeIDs are Integers wrapped in a newtype, to be able to redefine
-- their instance behaviour
--
-- for being able to check value bounds, the constructor should not be used directly
-- and new values are created via @toNodeID@ (newtype constructors cannot be hidden)
newtype NodeID = NodeID { getNodeID :: Integer } deriving (Eq, Show, Enum)
-- |smart data constructor for NodeID that throws a runtime exception for out-of-bounds values.
-- When needing a runtime-safe constructor with drawbacks, try @fromInteger@
toNodeID :: Integer -> NodeID
toNodeID i = assert (i >= getNodeID minBound && i <= getNodeID maxBound) $ NodeID i
-- |NodeIDs are bounded by the value range of an unsigned Integer of length 'idBits'
instance Bounded NodeID where
minBound = NodeID 0
maxBound = NodeID (2^idBits - 1)
-- |calculations with NodeIDs are modular arithmetic operations
instance Num NodeID where
a + b = NodeID $ (getNodeID a + getNodeID b) `mod` (getNodeID maxBound + 1)
a * b = NodeID $ (getNodeID a * getNodeID b) `mod` (getNodeID maxBound + 1)
a - b = NodeID $ (getNodeID a - getNodeID b) `mod` (getNodeID maxBound + 1)
-- |safe constructor for NodeID values with the drawback, that out-of-bound values are wrapped around
-- with modulo to fit in the allowed value space. For runtime checking, look at @toNodeID@.
fromInteger i = NodeID $ i `mod` (getNodeID maxBound + 1)
signum = NodeID . signum . getNodeID
abs = NodeID . abs . getNodeID -- ToDo: make sure that at creation time only IDs within the range are used
-- | use normal strict monotonic ordering of integers, realising the ring structure
-- is done in the @NodeCache@ implementation
instance Ord NodeID where
a `compare` b = getNodeID a `compare` getNodeID b
-- | local comparison of 2 node IDs, only relevant for determining a successor or predecessor on caches with just 2 nodes
localCompare :: NodeID -> NodeID -> Ordering
a `localCompare` b
| getNodeID a == getNodeID b = EQ
| wayForwards > wayBackwards = GT
| otherwise = LT
where
wayForwards = getNodeID (b - a)
wayBackwards = getNodeID (a - b)
-- | represents a node and all its important state
data NodeState = NodeState {
nid :: NodeID
, domain :: String
-- ^ full public domain name the node is reachable under
, ipAddr :: HostAddress6
-- the node's public IPv6 address
, dhtPort :: PortNumber
-- ^ port of the DHT itself
, apPort :: Maybe PortNumber
-- ^ port of the ActivityPub relay and storage service
-- might have to be queried first
, vServerID :: Integer
-- ^ ID of this vserver
-- ==== internal state ====
, internals :: Maybe InternalNodeState
-- ^ data not present in the representation of remote nodes
-- is put into its own type.
-- This is usually @Nothing@ for all remote nodes.
} deriving (Show, Eq)
-- | encapsulates all data and parameters that are not present for remote nodes
data InternalNodeState = InternalNodeState {
nodeCache :: IORef NodeCache
-- ^ EpiChord node cache with expiry times for nodes
-- as the map is ordered, lookups for the closes preceding node can be done using @lookupLT@.
-- encapsulated into an IORef for allowing concurrent reads without locking
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- ^ cache updates are not written directly to the 'nodeCache' but queued and
-- only processed by a single writer thread to prevent lost updates.
-- All nodeCache modifying functions have to be partially applied enough before
-- being put into the queue.
--
, successors :: [NodeID] -- could be a set instead as these are ordered as well
-- ^ successor nodes in ascending order by distance
, predecessors :: [NodeID]
-- ^ predecessor nodes in ascending order by distance
----- protocol parameters -----
-- TODO: evaluate moving these somewhere else
, kNeighbours :: Int
-- ^ desired length of predecessor and successor list
-- needs to be parameterisable for simulation purposes
, lNumBestNodes :: Int
-- ^ number of best next hops to provide
-- needs to be parameterisable for simulation purposes
, pNumParallelQueries :: Int
-- ^ number of parallel sent queries
-- needs to be parameterisable for simulation purposes
, jEntriesPerSlice :: Int
-- ^ number of desired entries per cache slice
-- needs to be parameterisable for simulation purposes
} deriving (Show, Eq)
-- | defining Show instances to be able to print NodeState for debug purposes
instance Typeable a => Show (IORef a) where
show x = show (typeOf x)
instance Typeable a => Show (TQueue a) where
show x = show (typeOf x)
-- | extract a value from the internals of a 'NodeState'
getInternals_ :: (InternalNodeState -> a) -> NodeState -> Maybe a
getInternals_ func ns = func <$> internals ns
-- could be done better with lenses
-- | convenience function that updates an internal value of a NodeState
putInternals_ :: (InternalNodeState -> InternalNodeState) -> NodeState -> NodeState
putInternals_ func ns = let
newInternals = func <$> internals ns
in
ns {internals = newInternals }
-- | convenience function for extracting the 'NodeCache' from a 'NodeState'
getNodeCacheRef :: NodeState -> Maybe (IORef NodeCache)
getNodeCacheRef = getInternals_ nodeCache
-- | convenience function for updating the 'NodeCache' on 'NodeState' s that have
-- internals.
-- NodeStates without a cache (without internals) are returned unchanged
putNodeCache :: IORef NodeCache -> NodeState -> NodeState
putNodeCache nc = putInternals_ (\i -> i {nodeCache = nc})
getCacheWriteQueue :: NodeState -> Maybe (TQueue (NodeCache -> NodeCache))
getCacheWriteQueue = getInternals_ cacheWriteQueue
-- | convenience function for extracting the @successors@ from a 'NodeState'
getSuccessors :: NodeState -> Maybe [NodeID]
getSuccessors = getInternals_ successors
-- | convenience function that updates the successors of a NodeState
putSuccessors :: [NodeID] -> NodeState -> NodeState
putSuccessors succ' = putInternals_ (\i -> i {successors = succ'})
-- | convenience function for extracting the @predecessors@ from a 'NodeState'
getPredecessors :: NodeState -> Maybe [NodeID]
getPredecessors = getInternals_ predecessors
-- | convenience function that updates the predecessors of a NodeState
putPredecessors :: [NodeID] -> NodeState -> NodeState
putPredecessors pred' = putInternals_ (\i -> i {predecessors = pred'})
-- | convenience function for extracting the @lNumBestNodes@ from a 'NodeState'
getLNumBestNodes :: NodeState -> Maybe Int
getLNumBestNodes = getInternals_ lNumBestNodes
type NodeCache = Map.Map NodeID CacheEntry
-- |an entry of the 'nodeCache' can hold 2 different kinds of data
data CacheEntry =
-- | an entry representing its validation status, the node state and its timestamp
NodeEntry Bool NodeState POSIXTime
-- | a proxy field for closing the ring structure, indicating the lookup shall be
-- resumed at the given @NodeID@ unless the @ProxyEntry@ itself holds a @NodeEntry@
| ProxyEntry (NodeID, ProxyDirection) (Maybe CacheEntry)
deriving (Show, Eq)
-- | as a compromise, only NodeEntry components are ordered by their NodeID
-- while ProxyEntry components should never be tried to be ordered.
instance Ord CacheEntry where
a `compare` b = compare (extractID a) (extractID b)
where
extractID (NodeEntry _ eState _) = nid eState
extractID (ProxyEntry _ _) = error "proxy entries should never appear outside of the NodeCache"
data ProxyDirection = Backwards | Forwards deriving (Show, Eq)
instance Enum ProxyDirection where
toEnum (-1) = Backwards
toEnum 1 = Forwards
toEnum _ = error "no such ProxyDirection"
fromEnum Backwards = - 1
fromEnum Forwards = 1
--- useful function for getting entries for a full cache transfer
cacheEntries :: NodeCache -> [CacheEntry]
cacheEntries ncache = mapMaybe extractNodeEntries $ Map.elems ncache
where
extractNodeEntries (ProxyEntry _ possibleEntry) = possibleEntry
-- | An empty @NodeCache@ needs to be initialised with 2 proxy entries,
-- linking the modular name space together by connecting @minBound@ and @maxBound@
initCache :: NodeCache
initCache = Map.fromList $ proxyEntry <$> [(maxBound, (minBound, Forwards)), (minBound, (maxBound, Backwards))]
where
proxyEntry (from,to) = (from, ProxyEntry to Nothing)
-- | Maybe returns the cache entry stored at given key
cacheLookup :: NodeID -- ^lookup key
-> NodeCache -- ^lookup cache
-> Maybe CacheEntry
cacheLookup key cache = case Map.lookup key cache of
Just (ProxyEntry _ result) -> result
res -> res
-- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@
-- to simulate a modular ring
lookupWrapper :: (NodeID -> NodeCache -> Maybe (NodeID, CacheEntry)) -> (NodeID -> NodeCache -> Maybe (NodeID, CacheEntry)) -> ProxyDirection -> NodeID -> NodeCache -> Maybe CacheEntry
lookupWrapper f fRepeat direction key cache =
case f key cache of
-- the proxy entry found holds a
Just (_, ProxyEntry _ (Just entry@NodeEntry{})) -> Just entry
-- proxy entry holds another proxy entry, this should not happen
Just (_, ProxyEntry _ (Just (ProxyEntry _ _))) -> Nothing
-- proxy entry without own entry is a pointer on where to continue
-- if lookup direction is the same as pointer direction: follow pointer
Just (foundKey, ProxyEntry (pointerID, pointerDirection) Nothing) ->
let newKey = if pointerDirection == direction
then pointerID
else foundKey + (fromInteger . toInteger . fromEnum $ direction)
in if cacheNotEmpty cache
then lookupWrapper fRepeat fRepeat direction newKey cache
else Nothing
-- normal entries are returned
Just (_, entry@NodeEntry{}) -> Just entry
Nothing -> Nothing
where
cacheNotEmpty :: NodeCache -> Bool
cacheNotEmpty cache' = (Map.size cache' > 2) -- there are more than the 2 ProxyEntries
|| isJust ( cacheLookup minBound cache') -- or one of the ProxyEntries holds a node
|| isJust (cacheLookup maxBound cache')
-- | find the successor node to a given key on a modular EpiChord ring cache.
-- Note: The EpiChord definition of "successor" includes the node at the key itself,
-- if existing.
cacheLookupSucc :: NodeID -- ^lookup key
-> NodeCache -- ^ring cache
-> Maybe CacheEntry
cacheLookupSucc = lookupWrapper Map.lookupGE Map.lookupGE Forwards
-- | find the predecessor node to a given key on a modular EpiChord ring cache.
cacheLookupPred :: NodeID -- ^lookup key
-> NodeCache -- ^ring cache
-> Maybe CacheEntry
cacheLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards
-- clean up cache entries: once now - entry > maxAge
-- transfer difference now - entry to other node
-- | return the @NodeState@ data from a cache entry without checking its validation status
cacheGetNodeStateUnvalidated :: CacheEntry -> NodeState
cacheGetNodeStateUnvalidated (NodeEntry _ nState _) = nState
cacheGetNodeStateUnvalidated (ProxyEntry _ (Just entry)) = cacheGetNodeStateUnvalidated entry
cacheGetNodeStateUnvalidated _ = error "trying to pure empty node state, please report a bug"
-- | converts a 'HostAddress6' IP address to a big-endian strict ByteString
ipAddrAsBS :: HostAddress6 -> BS.ByteString
ipAddrAsBS (a, b, c, d) = mconcat $ fmap NetworkBytes.bytestring32 [a, b, c, d]
-- | converts a ByteString in big endian order to an IPv6 address 'HostAddress6'
bsAsIpAddr :: BS.ByteString -> HostAddress6
bsAsIpAddr bytes = (a,b,c,d)
where
a:b:c:d:_ = fmap NetworkBytes.word32 . chunkBytes 4 $ bytes
-- | generates a 256 bit long NodeID using SHAKE128, represented as ByteString
genNodeIDBS :: HostAddress6 -- ^a node's IPv6 address
-> String -- ^a node's 1st and 2nd level domain name
-> Word8 -- ^the used vserver ID
-> BS.ByteString -- ^the NodeID as a 256bit ByteString big-endian unsigned integer
genNodeIDBS ip nodeDomain vserver =
hashIpaddrUpper `BS.append` hashID nodeDomain' `BS.append` hashIpaddLower
where
vsBS = BS.pack [vserver] -- attention: only works for vserver IDs up to 255
ipaddrNet = BS.take 8 (ipAddrAsBS ip) `BS.append` vsBS
nodeDomain' = BSU.fromString nodeDomain `BS.append` vsBS
hashID bstr = BS.pack . BA.unpack $ (hash bstr :: Digest (SHAKE128 128))
(hashIpaddrUpper, hashIpaddLower) = BS.splitAt 64 $ hashID ipaddrNet
-- | generates a 256 bit long @NodeID@ using SHAKE128
genNodeID :: HostAddress6 -- ^a node's IPv6 address
-> String -- ^a node's 1st and 2nd level domain name
-> Word8 -- ^the used vserver ID
-> NodeID -- ^the generated @NodeID@
genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs
-- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT
genKeyIDBS :: String -- ^the key string
-> BS.ByteString -- ^the key ID represented as a @ByteString@
genKeyIDBS key = BS.pack . BA.unpack $ (hash (BSU.fromString key) :: Digest SHA3_256)
-- | generates a 256 bit long key identifier for looking up its data on the DHT
genKeyID :: String -- ^the key string
-> NodeID -- ^the key ID
genKeyID = NodeID . byteStringToUInteger . genKeyIDBS
-- | parses the bit pattern of a ByteString as an unsigned Integer in Big Endian order
-- by iterating it byte-wise from the back and shifting the byte values according to their offset
byteStringToUInteger :: BS.ByteString -> Integer
byteStringToUInteger bs = sum $ parsedBytes 0 bs
where
parsedBytes :: Integer -> BS.ByteString -> [ Integer ]
parsedBytes offset uintBs = case BS.unsnoc uintBs of
Nothing -> []
Just (bs', w) -> parseWithOffset offset w : parsedBytes (offset+1) bs'
parseWithOffset :: Integer -> Word8 -> Integer
parseWithOffset 0 word = toInteger word -- a shift of 0 is always 0
parseWithOffset offset word = toInteger word * 2^(8 * offset)
-- TODO: complete rewrite
-- |checks wether the cache entries fulfill the logarithmic EpiChord invariant
-- of having j entries per slice, and creates a list of necessary lookup actions.
-- Should be invoked periodically.
--checkCacheSlices :: NodeState -> IO [()]
--checkCacheSlices state = case getNodeCache state of
-- -- don't do anything on nodes without a cache
-- Nothing -> pure [()]
-- Just cache' -> checkSlice jEntries (nid state) startBound lastSucc =<< readIORef cache'
-- -- TODO: do the same for predecessors
-- where
-- jEntries = fromMaybe 0 $ getInternals_ jEntriesPerSlice state
-- lastSucc = last <$> maybeEmpty (fromMaybe [] $ getSuccessors state)
-- startBound = NodeID 2^(255::Integer) + nid state
-- checkSlice :: Int -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [IO ()]
-- checkSlice _ _ _ Nothing _ = []
-- checkSlice j ownID upperBound (Just lastSuccNode) cache
-- | upperBound < lastSuccNode = []
-- | otherwise =
-- -- continuously half the DHT namespace, take the upper part as a slice,
-- -- check for existing entries in that slice and create a lookup action
-- -- and recursively do this on the lower half.
-- -- recursion edge case: all successors/ predecessors need to be in the
-- -- first slice.
-- let
-- diff = getNodeID $ upperBound - ownID
-- lowerBound = ownID + NodeID (diff `div` 2)
-- in
-- -- TODO: replace empty IO actions with actual lookups to middle of slice
-- -- TODO: validate ID before adding to cache
-- case Map.lookupLT upperBound cache of
-- Nothing -> pure () : checkSlice j ownID lowerBound (Just lastSuccNode) cache
-- Just (matchID, _) ->
-- if
-- matchID <= lowerBound then pure () : checkSlice j ownID lowerBound (Just lastSuccNode) cache
-- else
-- checkSlice j ownID lowerBound (Just lastSuccNode) cache
-- Todo: DHT backend can learn potential initial bootstrapping points through the instances mentioned in the received AP-relay messages
-- persist them on disk so they can be used for all following bootstraps
-- | configuration values used for initialising the FediChord DHT
data FediChordConf = FediChordConf {
confDomain :: String
, confIP :: HostAddress6
, confDhtPort :: Int
} deriving (Show, Eq)
-- | initialise data structures, compute own IDs and bind to listening socket
-- ToDo: load persisted state, thus this function already operates in IO
fediChordInit :: FediChordConf -> IO (Socket, NodeState)
fediChordInit conf = do
initialState <- nodeStateInit conf
serverSock <- mkServerSocket (ipAddr initialState) (dhtPort initialState)
pure (serverSock, initialState)
-- | initialises the 'NodeState' for this local node.
-- Separated from 'fediChordInit' to be usable in tests.
nodeStateInit :: FediChordConf -> IO NodeState
nodeStateInit conf = do
cacheRef <- newIORef initCache
q <- atomically newTQueue
let
initialState = NodeState {
domain = confDomain conf
, ipAddr = confIP conf
, nid = genNodeID (confIP conf) (confDomain conf) 0
, dhtPort = toEnum $ confDhtPort conf
, apPort = Nothing
, vServerID = 0
, internals = Just internalsInit
}
internalsInit = InternalNodeState {
nodeCache = cacheRef
, cacheWriteQueue = q
, successors = []
, predecessors = []
, kNeighbours = 3
, lNumBestNodes = 3
, pNumParallelQueries = 2
, jEntriesPerSlice = 2
}
pure initialState
--fediChordJoin :: NodeState -- ^ the local 'NodeState'
-- -> (String, PortNumber) -- ^ domain and port of a bootstrapping node
-- -> Socket -- ^ socket used for sending and receiving the join message
-- -> IO Either String NodeState -- ^ the joined 'NodeState' after a successful
-- -- join, otherwise an error message
--fediChordJoin ns (joinHost, joinPort) sock = do
-- -- 1. get routed to destination until FOUND
-- -- 2. then send a join to the currently responsible node
-- -- ToDo: implement cache management, as already all received replies should be stored in cache
--
-- | cache updater thread that waits for incoming NodeCache update instructions on
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
cacheWriter :: NodeState -> IO ()
cacheWriter ns = do
let writeQueue' = getCacheWriteQueue ns
case writeQueue' of
Nothing -> pure ()
Just writeQueue -> forever $ do
f <- atomically $ readTQueue writeQueue
let
refModifier :: NodeCache -> (NodeCache, ())
refModifier nc = (f nc, ())
maybe (pure ()) (
\ref -> atomicModifyIORef' ref refModifier
) $ getNodeCacheRef ns
-- ====== network socket operations ======
-- | resolve a specified host and return the 'AddrInfo' for it.
-- If no hostname or IP is specified, the 'AddrInfo' can be used to bind to all
-- addresses;
-- if no port is specified an arbitrary free port is selected.
resolve :: Maybe String -- ^ hostname or IP address to be resolved
-> Maybe PortNumber -- ^ port number of either local bind or remote
-> IO AddrInfo
resolve host port = let
hints = defaultHints { addrFamily = AF_INET6, addrSocketType = Datagram
, addrFlags = [AI_PASSIVE] }
in
head <$> getAddrInfo (Just hints) host (show <$> port)
-- | create an unconnected UDP Datagram 'Socket' bound to the specified address
mkServerSocket :: HostAddress6 -> PortNumber -> IO Socket
mkServerSocket ip port = do
sockAddr <- addrAddress <$> resolve (Just $ show . fromHostAddress6 $ ip) (Just port)
sock <- socket AF_INET6 Datagram defaultProtocol
setSocketOption sock IPv6Only 1
bind sock sockAddr
pure sock
-- | create a UDP datagram socket, connected to a destination.
-- The socket gets an arbitrary free local port assigned.
mkSendSocket :: String -- ^ destination hostname or IP
-> PortNumber -- ^ destination port
-> IO Socket -- ^ a socket with an arbitrary source port
mkSendSocket dest destPort = do
destAddr <- addrAddress <$> resolve (Just dest) (Just destPort)
sendSock <- socket AF_INET6 Datagram defaultProtocol
setSocketOption sendSock IPv6Only 1
pure sendSock