From e3bfa26ddba3dc011bfdc414960d278fca0ca5a1 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Mon, 25 May 2020 22:00:22 +0200 Subject: [PATCH] join request + large FediChord refactoring - implement sending of initial join request sending, response parsing and cache population (untested but compiles) - refactor basic types and their functions into Hash2Pub.FediChordTypes to prevent import loops, leaving Hash2Pub.FediChord to contain the high level actions called from Main --- Hash2Pub.cabal | 2 +- src/Hash2Pub/ASN1Coding.hs | 2 +- src/Hash2Pub/DHTProtocol.hs | 171 ++++++++---- src/Hash2Pub/FediChord.hs | 461 +++------------------------------ src/Hash2Pub/FediChordTypes.hs | 428 ++++++++++++++++++++++++++++++ src/Hash2Pub/ProtocolTypes.hs | 10 +- 6 files changed, 607 insertions(+), 467 deletions(-) create mode 100644 src/Hash2Pub/FediChordTypes.hs diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index f1533f4..2286385 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -55,7 +55,7 @@ library import: deps -- Modules exported by the library. - exposed-modules: Hash2Pub.FediChord, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes + exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes -- Modules included in this library but not exported. other-modules: Hash2Pub.Utils diff --git a/src/Hash2Pub/ASN1Coding.hs b/src/Hash2Pub/ASN1Coding.hs index 47492ef..abf749b 100644 --- a/src/Hash2Pub/ASN1Coding.hs +++ b/src/Hash2Pub/ASN1Coding.hs @@ -16,7 +16,7 @@ import qualified Data.Set as Set import Data.Time.Clock.POSIX () import Safe -import Hash2Pub.FediChord +import Hash2Pub.FediChordTypes import Hash2Pub.ProtocolTypes import Hash2Pub.Utils diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index d59ec06..7b07785 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -14,6 +14,11 @@ module Hash2Pub.DHTProtocol , maximumParts , sendQueryIdMessage , requestQueryID + , requestJoin + , queryIdLookupLoop + , resolve + , mkSendSocket + , mkServerSocket ) where @@ -24,8 +29,11 @@ import Control.Concurrent.STM.TQueue import Control.Monad (foldM, forM, forM_) import qualified Data.ByteString as BS import Data.Either (rights) -import Data.Foldable (foldl') +import Data.Foldable (foldl', foldr') import Data.IORef +import Data.IP (IPv6, fromHostAddress6, + toHostAddress6) +import Data.List (sortBy) import qualified Data.Map as Map import Data.Maybe (fromJust, fromMaybe, mapMaybe, maybe) @@ -39,15 +47,15 @@ import System.Random import System.Timeout import Hash2Pub.ASN1Coding -import Hash2Pub.FediChord (CacheEntry (..), +import Hash2Pub.FediChordTypes (CacheEntry (..), LocalNodeState (..), NodeCache, NodeID, NodeState (..), RemoteNodeState (..), cacheGetNodeStateUnvalidated, cacheLookup, cacheLookupPred, cacheLookupSucc, localCompare, - mkSendSocket, mkServerSocket, - setPredecessors, setSuccessors) + localCompare, setPredecessors, + setSuccessors) import Hash2Pub.ProtocolTypes import Debug.Trace (trace) @@ -134,52 +142,85 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc -- ====== message send and receive operations ====== -requestQueryID :: LocalNodeState -> NodeID -> IO RemoteNodeState +-- | send a join request and return the joined 'LocalNodeState' including neighbours +requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted + -> LocalNodeState -- ^ joining NodeState + -> IO (Maybe LocalNodeState) -- ^ node after join with all its new information +requestJoin toJoinOn ownState = do + sock <- mkSendSocket (getDomain toJoinOn) (getDhtPort toJoinOn) + responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 1 Join (Just JoinRequestPayload)) sock + joinedStateUnsorted <- foldM + (\nsAcc msg -> case payload msg of + Nothing -> pure nsAcc + Just msgPl -> do + -- add transfered cache entries to global NodeCache + queueAddEntries (joinCache msgPl) nsAcc + -- add received predecessors and successors + let + addPreds ns' = setPredecessors (foldr' (:) (predecessors ns') (joinPredecessors msgPl)) ns' + addSuccs ns' = setSuccessors (foldr' (:) (successors ns') (joinSuccessors msgPl)) ns' + pure $ addSuccs . addPreds $ nsAcc + ) + -- reset predecessors and successors + (setPredecessors [] . setSuccessors [] $ ownState) + responses + if responses == Set.empty + then pure Nothing + -- sort successors and predecessors + else pure . Just . setSuccessors (sortBy localCompare $ successors joinedStateUnsorted) . setPredecessors (sortBy localCompare $ predecessors joinedStateUnsorted) $ joinedStateUnsorted + + +-- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID. +requestQueryID :: LocalNodeState -- ^ NodeState of the querying node + -> NodeID -- ^ target key ID to look up + -> IO RemoteNodeState -- ^ the node responsible for handling that key -- 1. do a local lookup for the l closest nodes -- 2. create l sockets -- 3. send a message async concurrently to all l nodes -- 4. collect the results, insert them into cache -- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) +-- TODO: deal with lookup failures requestQueryID ns targetID = do firstCacheSnapshot <- readIORef . nodeCacheRef $ ns - lookupLoop firstCacheSnapshot - where - lookupLoop :: NodeCache -> IO RemoteNodeState - lookupLoop cacheSnapshot = do - let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID - -- FOUND can only be returned if targetID is owned by local node - case localResult of - FOUND thisNode -> pure thisNode - FORWARD nodeSet -> do - -- create connected sockets to all query targets - sockets <- mapM (\resultNode -> mkSendSocket (domain resultNode) (dhtPort resultNode)) $ remoteNode <$> Set.toList nodeSet - -- ToDo: make attempts and timeout configurable - queryThreads <- mapM (async . sendQueryIdMessage targetID ns) sockets - -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 - -- ToDo: exception handling, maybe log them - responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads - -- insert new cache entries both into global cache as well as in local copy, to make sure it is already up to date at next lookup - now <- getPOSIXTime - newLCache <- foldM (\oldCache resp -> do - let entriesToInsert = case queryResult <$> payload resp of - Just (FOUND result1) -> [addCacheEntryPure now (RemoteCacheEntry result1 now)] - Just (FORWARD resultset) -> addCacheEntryPure now <$> Set.elems resultset - _ -> [] - -- forward entries to global cache - forM_ entriesToInsert $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) entry - -- insert entries into local cache copy - pure $ foldl' ( - \oldLCache insertFunc -> insertFunc oldLCache - ) oldCache entriesToInsert - ) cacheSnapshot responses + queryIdLookupLoop firstCacheSnapshot ns targetID - -- check for a FOUND and return it - let foundResp = headMay . mapMaybe (\resp -> case queryResult <$> payload resp of - Just (FOUND ns') -> Just ns' - _ -> Nothing - ) $ responses - -- if no FOUND, recursively call lookup again - maybe (lookupLoop newLCache) pure foundResp +-- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining +queryIdLookupLoop :: NodeCache -> LocalNodeState -> NodeID -> IO RemoteNodeState +queryIdLookupLoop cacheSnapshot ns targetID = do + let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID + -- FOUND can only be returned if targetID is owned by local node + case localResult of + FOUND thisNode -> pure thisNode + FORWARD nodeSet -> do + -- create connected sockets to all query targets + sockets <- mapM (\resultNode -> mkSendSocket (getDomain resultNode) (getDhtPort resultNode)) $ remoteNode <$> Set.toList nodeSet + -- ToDo: make attempts and timeout configurable + queryThreads <- mapM (async . sendQueryIdMessage targetID ns) sockets + -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 + -- ToDo: exception handling, maybe log them + responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads + -- insert new cache entries both into global cache as well as in local copy, to make sure it is already up to date at next lookup + now <- getPOSIXTime + newLCache <- foldM (\oldCache resp -> do + let entriesToInsert = case queryResult <$> payload resp of + Just (FOUND result1) -> [addCacheEntryPure now (RemoteCacheEntry result1 now)] + Just (FORWARD resultset) -> addCacheEntryPure now <$> Set.elems resultset + _ -> [] + -- forward entries to global cache + forM_ entriesToInsert $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) entry + -- insert entries into local cache copy + pure $ foldl' ( + \oldLCache insertFunc -> insertFunc oldLCache + ) oldCache entriesToInsert + ) cacheSnapshot responses + + -- check for a FOUND and return it + let foundResp = headMay . mapMaybe (\resp -> case queryResult <$> payload resp of + Just (FOUND ns') -> Just ns' + _ -> Nothing + ) $ responses + -- if no FOUND, recursively call lookup again + maybe (queryIdLookupLoop newLCache ns targetID) pure foundResp sendQueryIdMessage :: NodeID -- ^ target key ID to look up @@ -208,8 +249,7 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests -- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary. recvdParts <- atomically $ flushTBQueue responseQ - -- PLACEHOLDER - pure Set.empty + pure $ Set.fromList recvdParts where -- state reingeben: state = noch nicht geackte messages, result = responses sendAndAck :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses @@ -242,6 +282,14 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do else recvLoop responseQueue newRemaining receivedPartNums +-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache +queueAddEntries :: [RemoteCacheEntry] + -> LocalNodeState + -> IO () +queueAddEntries entries ns = do + now <- getPOSIXTime + forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry + -- | retry an IO action at most *i* times until it delivers a result attempts :: Int -- ^ number of retries *i* -> IO (Maybe a) -- ^ action to retry @@ -252,3 +300,38 @@ attempts i action = do case actionResult of Nothing -> attempts (i-1) action Just res -> pure $ Just res + +-- ====== network socket operations ====== + +-- | resolve a specified host and return the 'AddrInfo' for it. +-- If no hostname or IP is specified, the 'AddrInfo' can be used to bind to all +-- addresses; +-- if no port is specified an arbitrary free port is selected. +resolve :: Maybe String -- ^ hostname or IP address to be resolved + -> Maybe PortNumber -- ^ port number of either local bind or remote + -> IO AddrInfo +resolve host port = let + hints = defaultHints { addrFamily = AF_INET6, addrSocketType = Datagram + , addrFlags = [AI_PASSIVE] } + in + head <$> getAddrInfo (Just hints) host (show <$> port) + +-- | create an unconnected UDP Datagram 'Socket' bound to the specified address +mkServerSocket :: HostAddress6 -> PortNumber -> IO Socket +mkServerSocket ip port = do + sockAddr <- addrAddress <$> resolve (Just $ show . fromHostAddress6 $ ip) (Just port) + sock <- socket AF_INET6 Datagram defaultProtocol + setSocketOption sock IPv6Only 1 + bind sock sockAddr + pure sock + +-- | create a UDP datagram socket, connected to a destination. +-- The socket gets an arbitrary free local port assigned. +mkSendSocket :: String -- ^ destination hostname or IP + -> PortNumber -- ^ destination port + -> IO Socket -- ^ a socket with an arbitrary source port +mkSendSocket dest destPort = do + destAddr <- addrAddress <$> resolve (Just dest) (Just destPort) + sendSock <- socket AF_INET6 Datagram defaultProtocol + setSocketOption sendSock IPv6Only 1 + pure sendSock diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 565d09b..540267c 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -1,7 +1,6 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE DerivingStrategies #-} -{-# LANGUAGE GeneralizedNewtypeDeriving #-} -{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE OverloadedStrings #-} {- | Module : FediChord Description : An opinionated implementation of the EpiChord DHT by Leong et al. @@ -46,8 +45,10 @@ module Hash2Pub.FediChord ( ) where import Control.Exception +import Data.Foldable (foldr') import qualified Data.Map.Strict as Map import Data.Maybe (fromMaybe, isJust, mapMaybe) +import qualified Data.Set as Set import Data.Time.Clock.POSIX import Network.Socket @@ -66,379 +67,12 @@ import Data.Typeable (Typeable (..), typeOf) import Data.Word import qualified Network.ByteOrder as NetworkBytes +import Hash2Pub.DHTProtocol +import Hash2Pub.FediChordTypes import Hash2Pub.Utils import Debug.Trace (trace) --- define protocol constants --- | static definition of ID length in bits -idBits :: Integer -idBits = 256 - --- |NodeIDs are Integers wrapped in a newtype, to be able to redefine --- their instance behaviour --- --- for being able to check value bounds, the constructor should not be used directly --- and new values are created via @toNodeID@ (newtype constructors cannot be hidden) -newtype NodeID = NodeID { getNodeID :: Integer } deriving stock (Show, Eq) deriving newtype Enum - --- |smart data constructor for NodeID that throws a runtime exception for out-of-bounds values. --- When needing a runtime-safe constructor with drawbacks, try @fromInteger@ -toNodeID :: Integer -> NodeID -toNodeID i = assert (i >= getNodeID minBound && i <= getNodeID maxBound) $ NodeID i - --- |NodeIDs are bounded by the value range of an unsigned Integer of length 'idBits' -instance Bounded NodeID where - minBound = NodeID 0 - maxBound = NodeID (2^idBits - 1) - --- |calculations with NodeIDs are modular arithmetic operations -instance Num NodeID where - a + b = NodeID $ (getNodeID a + getNodeID b) `mod` (getNodeID maxBound + 1) - a * b = NodeID $ (getNodeID a * getNodeID b) `mod` (getNodeID maxBound + 1) - a - b = NodeID $ (getNodeID a - getNodeID b) `mod` (getNodeID maxBound + 1) - -- |safe constructor for NodeID values with the drawback, that out-of-bound values are wrapped around - -- with modulo to fit in the allowed value space. For runtime checking, look at @toNodeID@. - fromInteger i = NodeID $ i `mod` (getNodeID maxBound + 1) - signum = NodeID . signum . getNodeID - abs = NodeID . abs . getNodeID -- ToDo: make sure that at creation time only IDs within the range are used - --- | use normal strict monotonic ordering of integers, realising the ring structure --- is done in the @NodeCache@ implementation -instance Ord NodeID where - a `compare` b = getNodeID a `compare` getNodeID b - --- | local comparison of 2 node IDs, only relevant for determining a successor or predecessor on caches with just 2 nodes -localCompare :: NodeID -> NodeID -> Ordering -a `localCompare` b - | getNodeID a == getNodeID b = EQ - | wayForwards > wayBackwards = GT - | otherwise = LT - where - wayForwards = getNodeID (b - a) - wayBackwards = getNodeID (a - b) - - --- | represents a node and all its important state -data RemoteNodeState = RemoteNodeState - { nid :: NodeID - , domain :: String - -- ^ full public domain name the node is reachable under - , ipAddr :: HostAddress6 - -- the node's public IPv6 address - , dhtPort :: PortNumber - -- ^ port of the DHT itself - , servicePort :: PortNumber - -- ^ port of the service provided on top of the DHT - , vServerID :: Integer - -- ^ ID of this vserver - } - deriving (Show, Eq) - --- | represents a node and encapsulates all data and parameters that are not present for remote nodes -data LocalNodeState = LocalNodeState - { nodeState :: RemoteNodeState - -- ^ represents common data present both in remote and local node representations - , nodeCacheRef :: IORef NodeCache - -- ^ EpiChord node cache with expiry times for nodes - , cacheWriteQueue :: TQueue (NodeCache -> NodeCache) - -- ^ cache updates are not written directly to the 'nodeCache' but queued and - , successors :: [NodeID] -- could be a set instead as these are ordered as well - -- ^ successor nodes in ascending order by distance - , predecessors :: [NodeID] - -- ^ predecessor nodes in ascending order by distance - , kNeighbours :: Int - -- ^ desired length of predecessor and successor list - , lNumBestNodes :: Int - -- ^ number of best next hops to provide - , pNumParallelQueries :: Int - -- ^ number of parallel sent queries - , jEntriesPerSlice :: Int - -- ^ number of desired entries per cache slice - } - deriving (Show, Eq) - --- | class for various NodeState representations, providing --- getters and setters for common values -class NodeState a where - -- getters for common properties - getNid :: a -> NodeID - getDomain :: a -> String - getIpAddr :: a -> HostAddress6 - getDhtPort :: a -> PortNumber - getServicePort :: a -> PortNumber - getVServerID :: a -> Integer - -- setters for common properties - setNid :: NodeID -> a -> a - setDomain :: String -> a -> a - setIpAddr :: HostAddress6 -> a -> a - setDhtPort :: PortNumber -> a -> a - setServicePort :: PortNumber -> a -> a - setVServerID :: Integer -> a -> a - toRemoteNodeState :: a -> RemoteNodeState - -instance NodeState RemoteNodeState where - getNid = nid - getDomain = domain - getIpAddr = ipAddr - getDhtPort = dhtPort - getServicePort = servicePort - getVServerID = vServerID - setNid nid' ns = ns {nid = nid'} - setDomain domain' ns = ns {domain = domain'} - setIpAddr ipAddr' ns = ns {ipAddr = ipAddr'} - setDhtPort dhtPort' ns = ns {dhtPort = dhtPort'} - setServicePort servicePort' ns = ns {servicePort = servicePort'} - setVServerID vServerID' ns = ns {vServerID = vServerID'} - toRemoteNodeState = id - --- | helper function for setting values on the 'RemoteNodeState' contained in the 'LocalNodeState' -propagateNodeStateSet_ :: (RemoteNodeState -> RemoteNodeState) -> LocalNodeState -> LocalNodeState -propagateNodeStateSet_ func ns = let - newNs = func $ nodeState ns - in - ns {nodeState = newNs} - - -instance NodeState LocalNodeState where - getNid = getNid . nodeState - getDomain = getDomain . nodeState - getIpAddr = getIpAddr . nodeState - getDhtPort = getDhtPort . nodeState - getServicePort = getServicePort . nodeState - getVServerID = getVServerID . nodeState - setNid nid' = propagateNodeStateSet_ $ setNid nid' - setDomain domain' = propagateNodeStateSet_ $ setDomain domain' - setIpAddr ipAddr' = propagateNodeStateSet_ $ setIpAddr ipAddr' - setDhtPort dhtPort' = propagateNodeStateSet_ $ setDhtPort dhtPort' - setServicePort servicePort' = propagateNodeStateSet_ $ setServicePort servicePort' - setVServerID vServerID' = propagateNodeStateSet_ $ setVServerID vServerID' - toRemoteNodeState = nodeState - --- | defining Show instances to be able to print NodeState for debug purposes -instance Typeable a => Show (IORef a) where - show x = show (typeOf x) - -instance Typeable a => Show (TQueue a) where - show x = show (typeOf x) - --- | convenience function that updates the successors of a 'LocalNodeState' -setSuccessors :: [NodeID] -> LocalNodeState -> LocalNodeState -setSuccessors succ' ns = ns {successors = succ'} - --- | convenience function that updates the predecessors of a 'LocalNodeState' -setPredecessors :: [NodeID] -> LocalNodeState -> LocalNodeState -setPredecessors pred' ns = ns {predecessors = pred'} - -type NodeCache = Map.Map NodeID CacheEntry - --- | An entry of the 'nodeCache' can hold 2 different kinds of data. --- Type variable @a@ should be of type class 'NodeState', but I do not want to use GADTs here. -data CacheEntry = NodeEntry Bool RemoteNodeState POSIXTime - | ProxyEntry (NodeID, ProxyDirection) (Maybe CacheEntry) - deriving (Show, Eq) - --- | as a compromise, only NodeEntry components are ordered by their NodeID --- while ProxyEntry components should never be tried to be ordered. -instance Ord CacheEntry where - - a `compare` b = compare (extractID a) (extractID b) - where - extractID (NodeEntry _ eState _) = getNid eState - extractID (ProxyEntry _ _) = error "proxy entries should never appear outside of the NodeCache" - -data ProxyDirection = Backwards - | Forwards - deriving (Show, Eq) - -instance Enum ProxyDirection where - toEnum (-1) = Backwards - toEnum 1 = Forwards - toEnum _ = error "no such ProxyDirection" - fromEnum Backwards = - 1 - fromEnum Forwards = 1 - ---- useful function for getting entries for a full cache transfer -cacheEntries :: NodeCache -> [CacheEntry] -cacheEntries ncache = mapMaybe extractNodeEntries $ Map.elems ncache - where - extractNodeEntries (ProxyEntry _ possibleEntry) = possibleEntry - --- | An empty @NodeCache@ needs to be initialised with 2 proxy entries, --- linking the modular name space together by connecting @minBound@ and @maxBound@ -initCache :: NodeCache -initCache = Map.fromList $ proxyEntry <$> [(maxBound, (minBound, Forwards)), (minBound, (maxBound, Backwards))] - where - proxyEntry (from,to) = (from, ProxyEntry to Nothing) - --- | Maybe returns the cache entry stored at given key -cacheLookup :: NodeID -- ^lookup key - -> NodeCache -- ^lookup cache - -> Maybe CacheEntry -cacheLookup key cache = case Map.lookup key cache of - Just (ProxyEntry _ result) -> result - res -> res - --- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@ --- to simulate a modular ring -lookupWrapper :: (NodeID -> NodeCache -> Maybe (NodeID, CacheEntry)) -> (NodeID -> NodeCache -> Maybe (NodeID, CacheEntry)) -> ProxyDirection -> NodeID -> NodeCache -> Maybe CacheEntry -lookupWrapper f fRepeat direction key cache = - case f key cache of - -- the proxy entry found holds a - Just (_, ProxyEntry _ (Just entry@NodeEntry{})) -> Just entry - -- proxy entry holds another proxy entry, this should not happen - Just (_, ProxyEntry _ (Just (ProxyEntry _ _))) -> Nothing - -- proxy entry without own entry is a pointer on where to continue - -- if lookup direction is the same as pointer direction: follow pointer - Just (foundKey, ProxyEntry (pointerID, pointerDirection) Nothing) -> - let newKey = if pointerDirection == direction - then pointerID - else foundKey + (fromInteger . toInteger . fromEnum $ direction) - in if cacheNotEmpty cache - then lookupWrapper fRepeat fRepeat direction newKey cache - else Nothing - -- normal entries are returned - Just (_, entry@NodeEntry{}) -> Just entry - Nothing -> Nothing - where - cacheNotEmpty :: NodeCache -> Bool - cacheNotEmpty cache' = (Map.size cache' > 2) -- there are more than the 2 ProxyEntries - || isJust ( cacheLookup minBound cache') -- or one of the ProxyEntries holds a node - || isJust (cacheLookup maxBound cache') - --- | find the successor node to a given key on a modular EpiChord ring cache. --- Note: The EpiChord definition of "successor" includes the node at the key itself, --- if existing. -cacheLookupSucc :: NodeID -- ^lookup key - -> NodeCache -- ^ring cache - -> Maybe CacheEntry -cacheLookupSucc = lookupWrapper Map.lookupGE Map.lookupGE Forwards - --- | find the predecessor node to a given key on a modular EpiChord ring cache. -cacheLookupPred :: NodeID -- ^lookup key - -> NodeCache -- ^ring cache - -> Maybe CacheEntry -cacheLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards - --- clean up cache entries: once now - entry > maxAge --- transfer difference now - entry to other node - --- | return the @NodeState@ data from a cache entry without checking its validation status -cacheGetNodeStateUnvalidated :: CacheEntry -> RemoteNodeState -cacheGetNodeStateUnvalidated (NodeEntry _ nState _) = nState -cacheGetNodeStateUnvalidated (ProxyEntry _ (Just entry)) = cacheGetNodeStateUnvalidated entry -cacheGetNodeStateUnvalidated _ = error "trying to pure empty node state, please report a bug" - --- | converts a 'HostAddress6' IP address to a big-endian strict ByteString -ipAddrAsBS :: HostAddress6 -> BS.ByteString -ipAddrAsBS (a, b, c, d) = mconcat $ fmap NetworkBytes.bytestring32 [a, b, c, d] - --- | converts a ByteString in big endian order to an IPv6 address 'HostAddress6' -bsAsIpAddr :: BS.ByteString -> HostAddress6 -bsAsIpAddr bytes = (a,b,c,d) - where - a:b:c:d:_ = fmap NetworkBytes.word32 . chunkBytes 4 $ bytes - - --- | generates a 256 bit long NodeID using SHAKE128, represented as ByteString -genNodeIDBS :: HostAddress6 -- ^a node's IPv6 address - -> String -- ^a node's 1st and 2nd level domain name - -> Word8 -- ^the used vserver ID - -> BS.ByteString -- ^the NodeID as a 256bit ByteString big-endian unsigned integer -genNodeIDBS ip nodeDomain vserver = - hashIpaddrUpper `BS.append` hashID nodeDomain' `BS.append` hashIpaddLower - where - vsBS = BS.pack [vserver] -- attention: only works for vserver IDs up to 255 - ipaddrNet = BS.take 8 (ipAddrAsBS ip) `BS.append` vsBS - nodeDomain' = BSU.fromString nodeDomain `BS.append` vsBS - hashID bstr = BS.pack . BA.unpack $ (hash bstr :: Digest (SHAKE128 128)) - (hashIpaddrUpper, hashIpaddLower) = BS.splitAt 64 $ hashID ipaddrNet - - --- | generates a 256 bit long @NodeID@ using SHAKE128 -genNodeID :: HostAddress6 -- ^a node's IPv6 address - -> String -- ^a node's 1st and 2nd level domain name - -> Word8 -- ^the used vserver ID - -> NodeID -- ^the generated @NodeID@ -genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs - --- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT -genKeyIDBS :: String -- ^the key string - -> BS.ByteString -- ^the key ID represented as a @ByteString@ -genKeyIDBS key = BS.pack . BA.unpack $ (hash (BSU.fromString key) :: Digest SHA3_256) - --- | generates a 256 bit long key identifier for looking up its data on the DHT -genKeyID :: String -- ^the key string - -> NodeID -- ^the key ID -genKeyID = NodeID . byteStringToUInteger . genKeyIDBS - - --- | parses the bit pattern of a ByteString as an unsigned Integer in Big Endian order --- by iterating it byte-wise from the back and shifting the byte values according to their offset -byteStringToUInteger :: BS.ByteString -> Integer -byteStringToUInteger bs = sum $ parsedBytes 0 bs - where - parsedBytes :: Integer -> BS.ByteString -> [ Integer ] - parsedBytes offset uintBs = case BS.unsnoc uintBs of - Nothing -> [] - Just (bs', w) -> parseWithOffset offset w : parsedBytes (offset+1) bs' - - parseWithOffset :: Integer -> Word8 -> Integer - parseWithOffset 0 word = toInteger word -- a shift of 0 is always 0 - parseWithOffset offset word = toInteger word * 2^(8 * offset) - - - --- TODO: complete rewrite --- |checks wether the cache entries fulfill the logarithmic EpiChord invariant --- of having j entries per slice, and creates a list of necessary lookup actions. --- Should be invoked periodically. ---checkCacheSlices :: NodeState -> IO [()] ---checkCacheSlices state = case getNodeCache state of --- -- don't do anything on nodes without a cache --- Nothing -> pure [()] --- Just cache' -> checkSlice jEntries (nid state) startBound lastSucc =<< readIORef cache' --- -- TODO: do the same for predecessors --- where --- jEntries = fromMaybe 0 $ getInternals_ jEntriesPerSlice state --- lastSucc = last <$> maybeEmpty (fromMaybe [] $ getSuccessors state) --- startBound = NodeID 2^(255::Integer) + nid state --- checkSlice :: Int -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [IO ()] --- checkSlice _ _ _ Nothing _ = [] --- checkSlice j ownID upperBound (Just lastSuccNode) cache --- | upperBound < lastSuccNode = [] --- | otherwise = --- -- continuously half the DHT namespace, take the upper part as a slice, --- -- check for existing entries in that slice and create a lookup action --- -- and recursively do this on the lower half. --- -- recursion edge case: all successors/ predecessors need to be in the --- -- first slice. --- let --- diff = getNodeID $ upperBound - ownID --- lowerBound = ownID + NodeID (diff `div` 2) --- in --- -- TODO: replace empty IO actions with actual lookups to middle of slice --- -- TODO: validate ID before adding to cache --- case Map.lookupLT upperBound cache of --- Nothing -> pure () : checkSlice j ownID lowerBound (Just lastSuccNode) cache --- Just (matchID, _) -> --- if --- matchID <= lowerBound then pure () : checkSlice j ownID lowerBound (Just lastSuccNode) cache --- else --- checkSlice j ownID lowerBound (Just lastSuccNode) cache - - --- Todo: DHT backend can learn potential initial bootstrapping points through the instances mentioned in the received AP-relay messages --- persist them on disk so they can be used for all following bootstraps - --- | configuration values used for initialising the FediChord DHT -data FediChordConf = FediChordConf - { confDomain :: String - , confIP :: HostAddress6 - , confDhtPort :: Int - } - deriving (Show, Eq) - -- | initialise data structures, compute own IDs and bind to listening socket -- ToDo: load persisted state, thus this function already operates in IO fediChordInit :: FediChordConf -> IO (Socket, LocalNodeState) @@ -475,16 +109,42 @@ nodeStateInit conf = do } pure initialState ---fediChordJoin :: LocalNodeState -- ^ the local 'NodeState' --- -> (String, PortNumber) -- ^ domain and port of a bootstrapping node --- -> Socket -- ^ socket used for sending and receiving the join message --- -> IO Either String NodeState -- ^ the joined 'NodeState' after a successful --- -- join, otherwise an error message ---fediChordJoin ns (joinHost, joinPort) sock = do --- -- 1. get routed to destination until FOUND --- -- 2. then send a join to the currently responsible node --- -- ToDo: implement cache management, as already all received replies should be stored in cache --- +fediChordJoin :: LocalNodeState -- ^ the local 'NodeState' + -> (String, PortNumber) -- ^ domain and port of a bootstrapping node + -> IO (Either String LocalNodeState) -- ^ the joined 'NodeState' after a + -- successful join, otherwise an error message +fediChordJoin ns (joinHost, joinPort) = do + -- can be invoked multiple times with all known bootstrapping nodes until successfully joined + sock <- mkSendSocket joinHost joinPort + -- 1. get routed to placement of own ID until FOUND: + -- Initialise an empty cache only with the responses from a bootstrapping node + bootstrapResponse <- sendQueryIdMessage (getNid ns) ns sock + if bootstrapResponse == Set.empty + then pure . Left $ "Bootstrapping node " <> show joinHost <> " gave no response." + else do + now <- getPOSIXTime + -- create new cache with all returned node responses + let bootstrapCache = + -- traverse response parts + foldr' (\resp cacheAcc -> case queryResult <$> payload resp of + Nothing -> cacheAcc + Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc + Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset + ) + initCache bootstrapResponse + -- get routed to the currently responsible node, based on the response + -- from the bootstrapping node + currentlyResponsible <- queryIdLookupLoop bootstrapCache ns $ getNid ns + -- do actual join + joinResult <- requestJoin currentlyResponsible ns + case joinResult of + Nothing -> pure . Left $ "Error joining on " <> show currentlyResponsible + Just joinedNS -> pure . Right $ joinedNS + + + -- 2. then send a join to the currently responsible node + -- after successful join, finally add own node to the cache + -- | cache updater thread that waits for incoming NodeCache update instructions on -- the node's cacheWriteQueue and then modifies the NodeCache as the single writer. @@ -497,38 +157,3 @@ cacheWriter ns = do refModifier :: NodeCache -> (NodeCache, ()) refModifier nc = (f nc, ()) atomicModifyIORef' (nodeCacheRef ns) refModifier - --- ====== network socket operations ====== - --- | resolve a specified host and return the 'AddrInfo' for it. --- If no hostname or IP is specified, the 'AddrInfo' can be used to bind to all --- addresses; --- if no port is specified an arbitrary free port is selected. -resolve :: Maybe String -- ^ hostname or IP address to be resolved - -> Maybe PortNumber -- ^ port number of either local bind or remote - -> IO AddrInfo -resolve host port = let - hints = defaultHints { addrFamily = AF_INET6, addrSocketType = Datagram - , addrFlags = [AI_PASSIVE] } - in - head <$> getAddrInfo (Just hints) host (show <$> port) - --- | create an unconnected UDP Datagram 'Socket' bound to the specified address -mkServerSocket :: HostAddress6 -> PortNumber -> IO Socket -mkServerSocket ip port = do - sockAddr <- addrAddress <$> resolve (Just $ show . fromHostAddress6 $ ip) (Just port) - sock <- socket AF_INET6 Datagram defaultProtocol - setSocketOption sock IPv6Only 1 - bind sock sockAddr - pure sock - --- | create a UDP datagram socket, connected to a destination. --- The socket gets an arbitrary free local port assigned. -mkSendSocket :: String -- ^ destination hostname or IP - -> PortNumber -- ^ destination port - -> IO Socket -- ^ a socket with an arbitrary source port -mkSendSocket dest destPort = do - destAddr <- addrAddress <$> resolve (Just dest) (Just destPort) - sendSock <- socket AF_INET6 Datagram defaultProtocol - setSocketOption sendSock IPv6Only 1 - pure sendSock diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs new file mode 100644 index 0000000..7ad09a9 --- /dev/null +++ b/src/Hash2Pub/FediChordTypes.hs @@ -0,0 +1,428 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE OverloadedStrings #-} + +module Hash2Pub.FediChordTypes ( + NodeID -- abstract, but newtype constructors cannot be hidden + , getNodeID + , toNodeID + , NodeState (..) + , LocalNodeState (..) + , RemoteNodeState (..) + , setSuccessors + , setPredecessors + , NodeCache + , CacheEntry(..) + , cacheGetNodeStateUnvalidated + , initCache + , cacheLookup + , cacheLookupSucc + , cacheLookupPred + , localCompare + , genNodeID + , genNodeIDBS + , genKeyID + , genKeyIDBS + , byteStringToUInteger + , ipAddrAsBS + , bsAsIpAddr + , FediChordConf(..) + ) where + +import Control.Exception +import qualified Data.Map.Strict as Map +import Data.Maybe (fromMaybe, isJust, mapMaybe) +import qualified Data.Set as Set +import Data.Time.Clock.POSIX +import Network.Socket + +-- for hashing and ID conversion +import Control.Concurrent.STM +import Control.Concurrent.STM.TQueue +import Control.Monad (forever) +import Crypto.Hash +import qualified Data.ByteArray as BA +import qualified Data.ByteString as BS +import qualified Data.ByteString.UTF8 as BSU +import Data.IORef +import Data.IP (IPv6, fromHostAddress6, + toHostAddress6) +import Data.Typeable (Typeable (..), typeOf) +import Data.Word +import qualified Network.ByteOrder as NetworkBytes + +import Hash2Pub.Utils + + + +-- define protocol constants +-- | static definition of ID length in bits +idBits :: Integer +idBits = 256 + +-- |NodeIDs are Integers wrapped in a newtype, to be able to redefine +-- their instance behaviour +-- +-- for being able to check value bounds, the constructor should not be used directly +-- and new values are created via @toNodeID@ (newtype constructors cannot be hidden) +newtype NodeID = NodeID { getNodeID :: Integer } deriving stock (Show, Eq) deriving newtype Enum + +-- |smart data constructor for NodeID that throws a runtime exception for out-of-bounds values. +-- When needing a runtime-safe constructor with drawbacks, try @fromInteger@ +toNodeID :: Integer -> NodeID +toNodeID i = assert (i >= getNodeID minBound && i <= getNodeID maxBound) $ NodeID i + +-- |NodeIDs are bounded by the value range of an unsigned Integer of length 'idBits' +instance Bounded NodeID where + minBound = NodeID 0 + maxBound = NodeID (2^idBits - 1) + +-- |calculations with NodeIDs are modular arithmetic operations +instance Num NodeID where + a + b = NodeID $ (getNodeID a + getNodeID b) `mod` (getNodeID maxBound + 1) + a * b = NodeID $ (getNodeID a * getNodeID b) `mod` (getNodeID maxBound + 1) + a - b = NodeID $ (getNodeID a - getNodeID b) `mod` (getNodeID maxBound + 1) + -- |safe constructor for NodeID values with the drawback, that out-of-bound values are wrapped around + -- with modulo to fit in the allowed value space. For runtime checking, look at @toNodeID@. + fromInteger i = NodeID $ i `mod` (getNodeID maxBound + 1) + signum = NodeID . signum . getNodeID + abs = NodeID . abs . getNodeID -- ToDo: make sure that at creation time only IDs within the range are used + +-- | use normal strict monotonic ordering of integers, realising the ring structure +-- is done in the @NodeCache@ implementation +instance Ord NodeID where + a `compare` b = getNodeID a `compare` getNodeID b + +-- | local comparison of 2 node IDs, only relevant for determining a successor or predecessor on caches with just 2 nodes +localCompare :: NodeID -> NodeID -> Ordering +a `localCompare` b + | getNodeID a == getNodeID b = EQ + | wayForwards > wayBackwards = GT + | otherwise = LT + where + wayForwards = getNodeID (b - a) + wayBackwards = getNodeID (a - b) + + +-- | represents a node and all its important state +data RemoteNodeState = RemoteNodeState + { nid :: NodeID + , domain :: String + -- ^ full public domain name the node is reachable under + , ipAddr :: HostAddress6 + -- the node's public IPv6 address + , dhtPort :: PortNumber + -- ^ port of the DHT itself + , servicePort :: PortNumber + -- ^ port of the service provided on top of the DHT + , vServerID :: Integer + -- ^ ID of this vserver + } + deriving (Show, Eq) + +-- | represents a node and encapsulates all data and parameters that are not present for remote nodes +data LocalNodeState = LocalNodeState + { nodeState :: RemoteNodeState + -- ^ represents common data present both in remote and local node representations + , nodeCacheRef :: IORef NodeCache + -- ^ EpiChord node cache with expiry times for nodes + , cacheWriteQueue :: TQueue (NodeCache -> NodeCache) + -- ^ cache updates are not written directly to the 'nodeCache' but queued and + , successors :: [NodeID] -- could be a set instead as these are ordered as well + -- ^ successor nodes in ascending order by distance + , predecessors :: [NodeID] + -- ^ predecessor nodes in ascending order by distance + , kNeighbours :: Int + -- ^ desired length of predecessor and successor list + , lNumBestNodes :: Int + -- ^ number of best next hops to provide + , pNumParallelQueries :: Int + -- ^ number of parallel sent queries + , jEntriesPerSlice :: Int + -- ^ number of desired entries per cache slice + } + deriving (Show, Eq) + +-- | class for various NodeState representations, providing +-- getters and setters for common values +class NodeState a where + -- getters for common properties + getNid :: a -> NodeID + getDomain :: a -> String + getIpAddr :: a -> HostAddress6 + getDhtPort :: a -> PortNumber + getServicePort :: a -> PortNumber + getVServerID :: a -> Integer + -- setters for common properties + setNid :: NodeID -> a -> a + setDomain :: String -> a -> a + setIpAddr :: HostAddress6 -> a -> a + setDhtPort :: PortNumber -> a -> a + setServicePort :: PortNumber -> a -> a + setVServerID :: Integer -> a -> a + toRemoteNodeState :: a -> RemoteNodeState + +instance NodeState RemoteNodeState where + getNid = nid + getDomain = domain + getIpAddr = ipAddr + getDhtPort = dhtPort + getServicePort = servicePort + getVServerID = vServerID + setNid nid' ns = ns {nid = nid'} + setDomain domain' ns = ns {domain = domain'} + setIpAddr ipAddr' ns = ns {ipAddr = ipAddr'} + setDhtPort dhtPort' ns = ns {dhtPort = dhtPort'} + setServicePort servicePort' ns = ns {servicePort = servicePort'} + setVServerID vServerID' ns = ns {vServerID = vServerID'} + toRemoteNodeState = id + +-- | helper function for setting values on the 'RemoteNodeState' contained in the 'LocalNodeState' +propagateNodeStateSet_ :: (RemoteNodeState -> RemoteNodeState) -> LocalNodeState -> LocalNodeState +propagateNodeStateSet_ func ns = let + newNs = func $ nodeState ns + in + ns {nodeState = newNs} + + +instance NodeState LocalNodeState where + getNid = getNid . nodeState + getDomain = getDomain . nodeState + getIpAddr = getIpAddr . nodeState + getDhtPort = getDhtPort . nodeState + getServicePort = getServicePort . nodeState + getVServerID = getVServerID . nodeState + setNid nid' = propagateNodeStateSet_ $ setNid nid' + setDomain domain' = propagateNodeStateSet_ $ setDomain domain' + setIpAddr ipAddr' = propagateNodeStateSet_ $ setIpAddr ipAddr' + setDhtPort dhtPort' = propagateNodeStateSet_ $ setDhtPort dhtPort' + setServicePort servicePort' = propagateNodeStateSet_ $ setServicePort servicePort' + setVServerID vServerID' = propagateNodeStateSet_ $ setVServerID vServerID' + toRemoteNodeState = nodeState + +-- | defining Show instances to be able to print NodeState for debug purposes +instance Typeable a => Show (IORef a) where + show x = show (typeOf x) + +instance Typeable a => Show (TQueue a) where + show x = show (typeOf x) + +-- | convenience function that updates the successors of a 'LocalNodeState' +setSuccessors :: [NodeID] -> LocalNodeState -> LocalNodeState +setSuccessors succ' ns = ns {successors = succ'} + +-- | convenience function that updates the predecessors of a 'LocalNodeState' +setPredecessors :: [NodeID] -> LocalNodeState -> LocalNodeState +setPredecessors pred' ns = ns {predecessors = pred'} + +type NodeCache = Map.Map NodeID CacheEntry + +-- | An entry of the 'nodeCache' can hold 2 different kinds of data. +-- Type variable @a@ should be of type class 'NodeState', but I do not want to use GADTs here. +data CacheEntry = NodeEntry Bool RemoteNodeState POSIXTime + | ProxyEntry (NodeID, ProxyDirection) (Maybe CacheEntry) + deriving (Show, Eq) + +-- | as a compromise, only NodeEntry components are ordered by their NodeID +-- while ProxyEntry components should never be tried to be ordered. +instance Ord CacheEntry where + + a `compare` b = compare (extractID a) (extractID b) + where + extractID (NodeEntry _ eState _) = getNid eState + extractID (ProxyEntry _ _) = error "proxy entries should never appear outside of the NodeCache" + +data ProxyDirection = Backwards + | Forwards + deriving (Show, Eq) + +instance Enum ProxyDirection where + toEnum (-1) = Backwards + toEnum 1 = Forwards + toEnum _ = error "no such ProxyDirection" + fromEnum Backwards = - 1 + fromEnum Forwards = 1 + +--- useful function for getting entries for a full cache transfer +cacheEntries :: NodeCache -> [CacheEntry] +cacheEntries ncache = mapMaybe extractNodeEntries $ Map.elems ncache + where + extractNodeEntries (ProxyEntry _ possibleEntry) = possibleEntry + +-- | An empty @NodeCache@ needs to be initialised with 2 proxy entries, +-- linking the modular name space together by connecting @minBound@ and @maxBound@ +initCache :: NodeCache +initCache = Map.fromList $ proxyEntry <$> [(maxBound, (minBound, Forwards)), (minBound, (maxBound, Backwards))] + where + proxyEntry (from,to) = (from, ProxyEntry to Nothing) + +-- | Maybe returns the cache entry stored at given key +cacheLookup :: NodeID -- ^lookup key + -> NodeCache -- ^lookup cache + -> Maybe CacheEntry +cacheLookup key cache = case Map.lookup key cache of + Just (ProxyEntry _ result) -> result + res -> res + +-- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@ +-- to simulate a modular ring +lookupWrapper :: (NodeID -> NodeCache -> Maybe (NodeID, CacheEntry)) -> (NodeID -> NodeCache -> Maybe (NodeID, CacheEntry)) -> ProxyDirection -> NodeID -> NodeCache -> Maybe CacheEntry +lookupWrapper f fRepeat direction key cache = + case f key cache of + -- the proxy entry found holds a + Just (_, ProxyEntry _ (Just entry@NodeEntry{})) -> Just entry + -- proxy entry holds another proxy entry, this should not happen + Just (_, ProxyEntry _ (Just (ProxyEntry _ _))) -> Nothing + -- proxy entry without own entry is a pointer on where to continue + -- if lookup direction is the same as pointer direction: follow pointer + Just (foundKey, ProxyEntry (pointerID, pointerDirection) Nothing) -> + let newKey = if pointerDirection == direction + then pointerID + else foundKey + (fromInteger . toInteger . fromEnum $ direction) + in if cacheNotEmpty cache + then lookupWrapper fRepeat fRepeat direction newKey cache + else Nothing + -- normal entries are returned + Just (_, entry@NodeEntry{}) -> Just entry + Nothing -> Nothing + where + cacheNotEmpty :: NodeCache -> Bool + cacheNotEmpty cache' = (Map.size cache' > 2) -- there are more than the 2 ProxyEntries + || isJust ( cacheLookup minBound cache') -- or one of the ProxyEntries holds a node + || isJust (cacheLookup maxBound cache') + +-- | find the successor node to a given key on a modular EpiChord ring cache. +-- Note: The EpiChord definition of "successor" includes the node at the key itself, +-- if existing. +cacheLookupSucc :: NodeID -- ^lookup key + -> NodeCache -- ^ring cache + -> Maybe CacheEntry +cacheLookupSucc = lookupWrapper Map.lookupGE Map.lookupGE Forwards + +-- | find the predecessor node to a given key on a modular EpiChord ring cache. +cacheLookupPred :: NodeID -- ^lookup key + -> NodeCache -- ^ring cache + -> Maybe CacheEntry +cacheLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards + +-- clean up cache entries: once now - entry > maxAge +-- transfer difference now - entry to other node + +-- | return the @NodeState@ data from a cache entry without checking its validation status +cacheGetNodeStateUnvalidated :: CacheEntry -> RemoteNodeState +cacheGetNodeStateUnvalidated (NodeEntry _ nState _) = nState +cacheGetNodeStateUnvalidated (ProxyEntry _ (Just entry)) = cacheGetNodeStateUnvalidated entry +cacheGetNodeStateUnvalidated _ = error "trying to pure empty node state, please report a bug" + +-- | converts a 'HostAddress6' IP address to a big-endian strict ByteString +ipAddrAsBS :: HostAddress6 -> BS.ByteString +ipAddrAsBS (a, b, c, d) = mconcat $ fmap NetworkBytes.bytestring32 [a, b, c, d] + +-- | converts a ByteString in big endian order to an IPv6 address 'HostAddress6' +bsAsIpAddr :: BS.ByteString -> HostAddress6 +bsAsIpAddr bytes = (a,b,c,d) + where + a:b:c:d:_ = fmap NetworkBytes.word32 . chunkBytes 4 $ bytes + + +-- | generates a 256 bit long NodeID using SHAKE128, represented as ByteString +genNodeIDBS :: HostAddress6 -- ^a node's IPv6 address + -> String -- ^a node's 1st and 2nd level domain name + -> Word8 -- ^the used vserver ID + -> BS.ByteString -- ^the NodeID as a 256bit ByteString big-endian unsigned integer +genNodeIDBS ip nodeDomain vserver = + hashIpaddrUpper `BS.append` hashID nodeDomain' `BS.append` hashIpaddLower + where + vsBS = BS.pack [vserver] -- attention: only works for vserver IDs up to 255 + ipaddrNet = BS.take 8 (ipAddrAsBS ip) `BS.append` vsBS + nodeDomain' = BSU.fromString nodeDomain `BS.append` vsBS + hashID bstr = BS.pack . BA.unpack $ (hash bstr :: Digest (SHAKE128 128)) + (hashIpaddrUpper, hashIpaddLower) = BS.splitAt 64 $ hashID ipaddrNet + + +-- | generates a 256 bit long @NodeID@ using SHAKE128 +genNodeID :: HostAddress6 -- ^a node's IPv6 address + -> String -- ^a node's 1st and 2nd level domain name + -> Word8 -- ^the used vserver ID + -> NodeID -- ^the generated @NodeID@ +genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs + +-- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT +genKeyIDBS :: String -- ^the key string + -> BS.ByteString -- ^the key ID represented as a @ByteString@ +genKeyIDBS key = BS.pack . BA.unpack $ (hash (BSU.fromString key) :: Digest SHA3_256) + +-- | generates a 256 bit long key identifier for looking up its data on the DHT +genKeyID :: String -- ^the key string + -> NodeID -- ^the key ID +genKeyID = NodeID . byteStringToUInteger . genKeyIDBS + + +-- | parses the bit pattern of a ByteString as an unsigned Integer in Big Endian order +-- by iterating it byte-wise from the back and shifting the byte values according to their offset +byteStringToUInteger :: BS.ByteString -> Integer +byteStringToUInteger bs = sum $ parsedBytes 0 bs + where + parsedBytes :: Integer -> BS.ByteString -> [ Integer ] + parsedBytes offset uintBs = case BS.unsnoc uintBs of + Nothing -> [] + Just (bs', w) -> parseWithOffset offset w : parsedBytes (offset+1) bs' + + parseWithOffset :: Integer -> Word8 -> Integer + parseWithOffset 0 word = toInteger word -- a shift of 0 is always 0 + parseWithOffset offset word = toInteger word * 2^(8 * offset) + + + +-- TODO: complete rewrite +-- |checks wether the cache entries fulfill the logarithmic EpiChord invariant +-- of having j entries per slice, and creates a list of necessary lookup actions. +-- Should be invoked periodically. +--checkCacheSlices :: NodeState -> IO [()] +--checkCacheSlices state = case getNodeCache state of +-- -- don't do anything on nodes without a cache +-- Nothing -> pure [()] +-- Just cache' -> checkSlice jEntries (nid state) startBound lastSucc =<< readIORef cache' +-- -- TODO: do the same for predecessors +-- where +-- jEntries = fromMaybe 0 $ getInternals_ jEntriesPerSlice state +-- lastSucc = last <$> maybeEmpty (fromMaybe [] $ getSuccessors state) +-- startBound = NodeID 2^(255::Integer) + nid state +-- checkSlice :: Int -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [IO ()] +-- checkSlice _ _ _ Nothing _ = [] +-- checkSlice j ownID upperBound (Just lastSuccNode) cache +-- | upperBound < lastSuccNode = [] +-- | otherwise = +-- -- continuously half the DHT namespace, take the upper part as a slice, +-- -- check for existing entries in that slice and create a lookup action +-- -- and recursively do this on the lower half. +-- -- recursion edge case: all successors/ predecessors need to be in the +-- -- first slice. +-- let +-- diff = getNodeID $ upperBound - ownID +-- lowerBound = ownID + NodeID (diff `div` 2) +-- in +-- -- TODO: replace empty IO actions with actual lookups to middle of slice +-- -- TODO: validate ID before adding to cache +-- case Map.lookupLT upperBound cache of +-- Nothing -> pure () : checkSlice j ownID lowerBound (Just lastSuccNode) cache +-- Just (matchID, _) -> +-- if +-- matchID <= lowerBound then pure () : checkSlice j ownID lowerBound (Just lastSuccNode) cache +-- else +-- checkSlice j ownID lowerBound (Just lastSuccNode) cache + + +-- Todo: DHT backend can learn potential initial bootstrapping points through the instances mentioned in the received AP-relay messages +-- persist them on disk so they can be used for all following bootstraps + +-- | configuration values used for initialising the FediChord DHT +data FediChordConf = FediChordConf + { confDomain :: String + , confIP :: HostAddress6 + , confDhtPort :: Int + } + deriving (Show, Eq) + + diff --git a/src/Hash2Pub/ProtocolTypes.hs b/src/Hash2Pub/ProtocolTypes.hs index 275d58f..9203bdd 100644 --- a/src/Hash2Pub/ProtocolTypes.hs +++ b/src/Hash2Pub/ProtocolTypes.hs @@ -1,9 +1,9 @@ module Hash2Pub.ProtocolTypes where -import qualified Data.Set as Set -import Data.Time.Clock.POSIX (POSIXTime) +import qualified Data.Set as Set +import Data.Time.Clock.POSIX (POSIXTime) -import Hash2Pub.FediChord +import Hash2Pub.FediChordTypes data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) | FOUND RemoteNodeState @@ -37,6 +37,10 @@ data FediChordMessage = Request } deriving (Show, Eq) +instance Ord FediChordMessage where + compare a b | requestID a == requestID b = part a `compare` part b + | otherwise = requestID a `compare` requestID b + data ActionPayload = QueryIDRequestPayload { queryTargetID :: NodeID , queryLBestNodes :: Integer