Merge branch 'dhtNetworking' into mainline

closes #1 #3
This commit is contained in:
Trolli Schmittlauch 2020-07-08 01:19:54 +02:00
commit 384be969b7
10 changed files with 2147 additions and 695 deletions

View file

@ -4,5 +4,5 @@
- error: { lhs: return, rhs: pure }
- ignore: {name: "Avoid lambda using `infix`"}
- ignore: {name: ["Avoid lambda using `infix`", "Use lambda-case"]}

View file

@ -4,14 +4,16 @@ NodeID ::= INTEGER (0..115792089237316195423570985008687907853269984665640564039
Domain ::= VisibleString
Partnum ::= INTEGER (0..150)
Action ::= ENUMERATED {queryID, join, leave, stabilise, ping}
Request ::= SEQUENCE {
action Action,
requestID INTEGER,
requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer
sender NodeState,
parts INTEGER (0..150), -- number of message parts
part INTEGER (0..150), -- part number of this message, starts at 1
part Partnum, -- part number of this message, starts at 1
finalPart BOOLEAN, -- flag indicating this `part` to be the last of this reuest
actionPayload CHOICE {
queryIDRequestPayload QueryIDRequestPayload,
joinRequestPayload JoinRequestPayload,
@ -25,10 +27,11 @@ Request ::= SEQUENCE {
-- request and response instead of explicit flag
Response ::= SEQUENCE {
responseTo INTEGER,
-- requestID of the request responding to
requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer
senderID NodeID,
parts INTEGER (0..150),
part INTEGER (0..150),
part Partnum,
finalPart BOOLEAN, -- flag indicating this `part` to be the last of this response
action Action,
actionPayload CHOICE {
queryIDResponsePayload QueryIDResponsePayload,
@ -44,7 +47,7 @@ NodeState ::= SEQUENCE {
domain Domain,
ipAddr OCTET STRING (SIZE(16)),
dhtPort INTEGER,
apPort INTEGER,
servicePort INTEGER,
vServerID INTEGER (0..255)
}
@ -59,8 +62,8 @@ NodeCache ::= SEQUENCE OF CacheEntry
JoinRequestPayload ::= NULL
JoinResponsePayload ::= SEQUENCE {
successors SEQUENCE OF NodeID,
predecessors SEQUENCE OF NodeID,
successors SEQUENCE OF NodeState,
predecessors SEQUENCE OF NodeState,
cache NodeCache
}
@ -79,14 +82,14 @@ QueryIDResponsePayload ::= SEQUENCE {
StabiliseRequestPayload ::= NULL
StabiliseResponsePayload ::= SEQUENCE {
successors SEQUENCE OF NodeID,
predecessors SEQUENCE OF NodeID
successors SEQUENCE OF NodeState,
predecessors SEQUENCE OF NodeState
-- ToDo: transfer of handled key data, if newly responsible for it
}
LeaveRequestPayload ::= SEQUENCE {
successors SEQUENCE OF NodeID,
predecessors SEQUENCE OF NodeID
successors SEQUENCE OF NodeState,
predecessors SEQUENCE OF NodeState
-- ToDo: transfer of own data to newly responsible node
}

View file

@ -46,7 +46,7 @@ category: Network
extra-source-files: CHANGELOG.md
common deps
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random
ghc-options: -Wall
@ -55,7 +55,7 @@ library
import: deps
-- Modules exported by the library.
exposed-modules: Hash2Pub.FediChord, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding
exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes
-- Modules included in this library but not exported.
other-modules: Hash2Pub.Utils

View file

@ -1,7 +1,12 @@
module Main where
import Control.Concurrent
import Data.IP (IPv6, toHostAddress6)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TVar
import Control.Exception
import Data.Either
import Data.IP (IPv6, toHostAddress6)
import System.Environment
import Hash2Pub.FediChord
@ -11,22 +16,39 @@ main = do
-- ToDo: parse and pass config
-- probably use `tomland` for that
conf <- readConfig
-- TODO: first initialise 'RealNode', then the vservers
-- ToDo: load persisted caches, bootstrapping nodes …
(serverSock, thisNode) <- fediChordInit conf
print thisNode
print serverSock
-- currently no masking is necessary, as there is nothing to clean up
cacheWriterThread <- forkIO $ cacheWriter thisNode
-- idea: list of bootstrapping nodes, try joining within a timeout
-- stop main thread from terminating during development
getChar
-- try joining the DHT using one of the provided bootstrapping nodes
joinedState <- tryBootstrapJoining thisNode
either (\err -> do
-- handle unsuccessful join
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
print =<< readTVarIO thisNode
-- launch thread attempting to join on new cache entries
_ <- forkIO $ joinOnNewEntriesThread thisNode
wait =<< async (fediMainThreads serverSock thisNode)
)
(\joinedNS -> do
-- launch main eventloop with successfully joined state
putStrLn "successful join"
wait =<< async (fediMainThreads serverSock thisNode)
)
joinedState
pure ()
readConfig :: IO FediChordConf
readConfig = do
confDomainString : ipString : portString : _ <- getArgs
confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : _ <- getArgs
pure $ FediChordConf {
confDomain = confDomainString
, confIP = toHostAddress6 . read $ ipString
, confDhtPort = read portString
, confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
--, confStabiliseInterval = 60
, confBootstrapSamplingInterval = 180
}

View file

@ -16,8 +16,8 @@ import qualified Data.Set as Set
import Data.Time.Clock.POSIX ()
import Safe
import Hash2Pub.DHTProtocol
import Hash2Pub.FediChord
import Hash2Pub.FediChordTypes
import Hash2Pub.ProtocolTypes
import Hash2Pub.Utils
import Debug.Trace
@ -77,6 +77,9 @@ chunkLength numParts totalSize = ceiling $ (realToFrac totalSize :: Double) / re
-- The number of parts per message is limited to 150 for DOS protection reasons.
-- The returned byte strings might exceed the desired maximum length, as only the payload (and not all of them)
-- can be split into multiple parts.
--
-- The return type is a Map from part number to encoded part, to be able to acknowledge
-- an encoded part without having to decode its number.
serialiseMessage :: Int -- maximum message size in bytes
-> FediChordMessage -- mesage to be serialised in preparation for sending
-> Map.Map Integer BS.ByteString -- list of ASN.1 DER encoded messages together representing
@ -100,11 +103,11 @@ serialiseMessage maxBytesLength msg =
modifyMessage i (partNum, pl) pls = (partNum, msg {
part = partNum
, payload = Just pl
, parts = fromIntegral i
, isFinalPart = partNum == fromIntegral i
}):pls
-- part starts at 1
payloadParts :: Int -> Maybe [(Integer, ActionPayload)]
payloadParts i = zip [1..] . splitPayload i <$> actionPayload
payloadParts i = zip [(part msg)..] . splitPayload i <$> actionPayload
actionPayload = payload msg
encodedMsgs i = Map.map encodeMsg $ messageParts i
maxMsgLength = maximum . fmap BS.length . Map.elems
@ -127,20 +130,20 @@ encodePayload LeaveResponsePayload = [Null]
encodePayload payload'@LeaveRequestPayload{} =
Start Sequence
: Start Sequence
: fmap (IntVal . getNodeID) (leaveSuccessors payload')
: concatMap encodeNodeState (leaveSuccessors payload')
<> [End Sequence
, Start Sequence]
<> fmap (IntVal . getNodeID) (leavePredecessors payload')
<> concatMap encodeNodeState (leavePredecessors payload')
<> [End Sequence
, End Sequence]
-- currently StabiliseResponsePayload and LeaveRequestPayload are equal
encodePayload payload'@StabiliseResponsePayload{} =
Start Sequence
: Start Sequence
: fmap (IntVal . getNodeID) (stabiliseSuccessors payload')
: concatMap encodeNodeState (stabiliseSuccessors payload')
<> [End Sequence
, Start Sequence]
<> fmap (IntVal . getNodeID) (stabilisePredecessors payload')
<> concatMap encodeNodeState (stabilisePredecessors payload')
<> [End Sequence
, End Sequence]
encodePayload payload'@StabiliseRequestPayload = [Null]
@ -167,10 +170,10 @@ encodePayload payload'@QueryIDRequestPayload{} = [
encodePayload payload'@JoinResponsePayload{} =
Start Sequence
: Start Sequence
: fmap (IntVal . getNodeID) (joinSuccessors payload')
: concatMap encodeNodeState (joinSuccessors payload')
<> [End Sequence
, Start Sequence]
<> fmap (IntVal . getNodeID) (joinPredecessors payload')
<> concatMap encodeNodeState (joinPredecessors payload')
<> [End Sequence
, Start Sequence]
<> concatMap encodeCacheEntry (joinCache payload')
@ -183,15 +186,15 @@ encodePayload payload'@PingResponsePayload{} =
: concatMap encodeNodeState (pingNodeStates payload')
<> [End Sequence]
encodeNodeState :: NodeState -> [ASN1]
encodeNodeState :: NodeState a => a -> [ASN1]
encodeNodeState ns = [
Start Sequence
, IntVal (getNodeID . nid $ ns)
, ASN1String . asn1CharacterString Visible $ domain ns
, OctetString (ipAddrAsBS $ ipAddr ns)
, IntVal (toInteger . dhtPort $ ns)
, IntVal (maybe 0 toInteger $ apPort ns)
, IntVal (vServerID ns)
, IntVal (getNodeID . getNid $ ns)
, ASN1String . asn1CharacterString Visible $ getDomain ns
, OctetString (ipAddrAsBS $ getIpAddr ns)
, IntVal (toInteger . getDhtPort $ ns)
, IntVal (toInteger . getServicePort $ ns)
, IntVal (getVServerID ns)
, End Sequence
]
@ -213,23 +216,22 @@ encodeQueryResult FORWARD{} = Enumerated 1
encodeMessage :: FediChordMessage -- ^ the 'FediChordMessage to be encoded
-> [ASN1]
encodeMessage
(Request requestID sender parts part action requestPayload) =
(Request requestID sender part isFinalPart action requestPayload) =
Start Sequence
: (Enumerated . fromIntegral . fromEnum $ action)
: IntVal requestID
: encodeNodeState sender
<> [
IntVal parts
, IntVal part ]
<> [IntVal part
, Boolean isFinalPart]
<> maybe [] encodePayload requestPayload
<> [End Sequence]
encodeMessage
(Response responseTo senderID parts part action responsePayload) = [
(Response requestID senderID part isFinalPart action responsePayload) = [
Start Sequence
, IntVal responseTo
, IntVal requestID
, IntVal . getNodeID $ senderID
, IntVal parts
, IntVal part
, Boolean isFinalPart
, Enumerated . fromIntegral . fromEnum $ action]
<> maybe [] encodePayload responsePayload
<> [End Sequence]
@ -262,8 +264,8 @@ parseRequest :: Action -> ParseASN1 FediChordMessage
parseRequest action = do
requestID <- parseInteger
sender <- parseNodeState
parts <- parseInteger
part <- parseInteger
isFinalPart <- parseBool
hasPayload <- hasNext
payload <- if not hasPayload then pure Nothing else Just <$> case action of
QueryID -> parseQueryIDRequest
@ -272,13 +274,13 @@ parseRequest action = do
Stabilise -> parseStabiliseRequest
Ping -> parsePingRequest
pure $ Request requestID sender parts part action payload
pure $ Request requestID sender part isFinalPart action payload
parseResponse :: Integer -> ParseASN1 FediChordMessage
parseResponse responseTo = do
parseResponse requestID = do
senderID <- fromInteger <$> parseInteger :: ParseASN1 NodeID
parts <- parseInteger
part <- parseInteger
isFinalPart <- parseBool
action <- parseEnum :: ParseASN1 Action
hasPayload <- hasNext
payload <- if not hasPayload then pure Nothing else Just <$> case action of
@ -288,7 +290,14 @@ parseResponse responseTo = do
Stabilise -> parseStabiliseResponse
Ping -> parsePingResponse
pure $ Response responseTo senderID parts part action payload
pure $ Response requestID senderID part isFinalPart action payload
parseBool :: ParseASN1 Bool
parseBool = do
i <- getNext
case i of
Boolean parsed -> pure parsed
x -> throwParseError $ "Expected Boolean but got " <> show x
parseInteger :: ParseASN1 Integer
parseInteger = do
@ -325,22 +334,21 @@ parseNull = do
Null -> pure ()
x -> throwParseError $ "Expected Null but got " <> show x
parseNodeState :: ParseASN1 NodeState
parseNodeState :: ParseASN1 RemoteNodeState
parseNodeState = onNextContainer Sequence $ do
nid' <- fromInteger <$> parseInteger
domain' <- parseString
ip' <- bsAsIpAddr <$> parseOctets
dhtPort' <- fromInteger <$> parseInteger
apPort' <- fromInteger <$> parseInteger
servicePort' <- fromInteger <$> parseInteger
vServer' <- parseInteger
pure NodeState {
pure RemoteNodeState {
nid = nid'
, domain = domain'
, dhtPort = dhtPort'
, apPort = if apPort' == 0 then Nothing else Just apPort'
, servicePort = servicePort'
, vServerID = vServer'
, ipAddr = ip'
, internals = Nothing
}
@ -360,8 +368,8 @@ parseJoinRequest = do
parseJoinResponse :: ParseASN1 ActionPayload
parseJoinResponse = onNextContainer Sequence $ do
succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
succ' <- onNextContainer Sequence (getMany parseNodeState)
pred' <- onNextContainer Sequence (getMany parseNodeState)
cache <- parseNodeCache
pure $ JoinResponsePayload {
joinSuccessors = succ'
@ -396,8 +404,8 @@ parseStabiliseRequest = do
parseStabiliseResponse :: ParseASN1 ActionPayload
parseStabiliseResponse = onNextContainer Sequence $ do
succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
succ' <- onNextContainer Sequence (getMany parseNodeState)
pred' <- onNextContainer Sequence (getMany parseNodeState)
pure $ StabiliseResponsePayload {
stabiliseSuccessors = succ'
, stabilisePredecessors = pred'
@ -405,8 +413,8 @@ parseStabiliseResponse = onNextContainer Sequence $ do
parseLeaveRequest :: ParseASN1 ActionPayload
parseLeaveRequest = onNextContainer Sequence $ do
succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
succ' <- onNextContainer Sequence (getMany parseNodeState)
pred' <- onNextContainer Sequence (getMany parseNodeState)
pure $ LeaveRequestPayload {
leaveSuccessors = succ'
, leavePredecessors = pred'

View file

@ -3,153 +3,146 @@ module Hash2Pub.DHTProtocol
, queryLocalCache
, addCacheEntry
, addCacheEntryPure
, addNodeAsVerified
, addNodeAsVerifiedPure
, deleteCacheEntry
, markCacheEntryAsVerified
, deserialiseMessage
, RemoteCacheEntry(..)
, toRemoteCacheEntry
, remoteNode_
, remoteNode
, Action(..)
, ActionPayload(..)
, FediChordMessage(..)
, maximumParts
, sendQueryIdMessages
, requestQueryID
, requestJoin
, requestPing
, requestStabilise
, lookupMessage
, sendRequestTo
, queryIdLookupLoop
, queueAddEntries
, queueDeleteEntries
, queueDeleteEntry
, resolve
, mkSendSocket
, mkServerSocket
, handleIncomingRequest
, ackRequest
, isPossibleSuccessor
, isPossiblePredecessor
, isJoined
, closestCachePredecessors
)
where
import qualified Data.Map as Map
import Data.Maybe (fromMaybe, maybe)
import qualified Data.Set as Set
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Exception
import Control.Monad (foldM, forM, forM_)
import qualified Data.ByteString as BS
import Data.Either (rights)
import Data.Foldable (foldl', foldr')
import Data.Functor.Identity
import Data.IP (IPv6, fromHostAddress6,
toHostAddress6)
import Data.List (delete, nub, sortBy)
import qualified Data.Map as Map
import Data.Maybe (fromJust, fromMaybe, isJust,
isNothing, mapMaybe, maybe)
import qualified Data.Set as Set
import Data.Time.Clock.POSIX
import Network.Socket hiding (recv, recvFrom, send, sendTo)
import Network.Socket hiding (recv, recvFrom, send,
sendTo)
import Network.Socket.ByteString
import Safe
import System.Random
import System.Timeout
import Hash2Pub.FediChord (CacheEntry (..), NodeCache, NodeID,
NodeState (..),
cacheGetNodeStateUnvalidated,
cacheLookup, cacheLookupPred,
cacheLookupSucc, getPredecessors,
getSuccessors, localCompare,
putPredecessors, putSuccessors)
import Hash2Pub.ASN1Coding
import Hash2Pub.FediChordTypes (CacheEntry (..),
CacheEntry (..), HasKeyID (..),
LocalNodeState (..),
LocalNodeStateSTM, NodeCache,
NodeID, NodeState (..),
RemoteNodeState (..),
RingEntry (..), RingMap (..),
addRMapEntry, addRMapEntryWith,
cacheGetNodeStateUnvalidated,
cacheLookup, cacheLookupPred,
cacheLookupSucc, genNodeID,
getKeyID, localCompare,
rMapFromList, rMapLookupPred,
rMapLookupSucc,
setPredecessors, setSuccessors)
import Hash2Pub.ProtocolTypes
import Debug.Trace (trace)
import Debug.Trace (trace)
-- === queries ===
data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) -- ^ return closest nodes from local cache.
-- whole cache entry is returned for making
-- the entry time stamp available to the
-- protocol serialiser
| FOUND NodeState -- ^node is the responsible node for queried ID
deriving (Show, Eq)
-- TODO: evaluate more fine-grained argument passing to allow granular locking
-- | look up an ID to either claim responsibility for it or return the closest l nodes from the local cache
queryLocalCache :: NodeState -> NodeCache -> Int -> NodeID -> QueryResponse
queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse
queryLocalCache ownState nCache lBestNodes targetID
-- as target ID falls between own ID and first predecessor, it is handled by this node
| (targetID `localCompare` ownID) `elem` [LT, EQ] && not (null preds) && (targetID `localCompare` head preds == GT) = FOUND ownState
-- This only makes sense if the node is part of the DHT by having joined.
-- A default answer to nodes querying an unjoined node is provided by 'respondQueryID'.
| isJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
-- the closest succeeding node (like with the p initiated parallel queries
| otherwise = FORWARD $ closestSuccessor `Set.union` closestPredecessors
| otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache
where
preds = fromMaybe [] $ getPredecessors ownState
ownID = nid ownState
ownID = getNid ownState
preds = predecessors ownState
closestSuccessor :: Set.Set RemoteCacheEntry
closestSuccessor = maybe Set.empty Set.singleton $ toRemoteCacheEntry =<< cacheLookupSucc targetID nCache
closestSuccessor = maybe Set.empty (Set.singleton . toRemoteCacheEntry) $ cacheLookupSucc targetID nCache
closestPredecessors :: Set.Set RemoteCacheEntry
closestPredecessors = closestPredecessor (lBestNodes-1) $ nid ownState
closestPredecessor :: (Integral n, Show n) => n -> NodeID -> Set.Set RemoteCacheEntry
closestPredecessor 0 _ = Set.empty
closestPredecessor remainingLookups lastID
| remainingLookups < 0 = Set.empty
| otherwise =
let result = cacheLookupPred lastID nCache
in
case toRemoteCacheEntry =<< result of
Nothing -> Set.empty
Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestPredecessor (remainingLookups-1) (nid ns)
-- === protocol serialisation data types
-- | look up the 3 direct predecessor cache entries of a given ID
closestCachePredecessors :: (Integral n)
=> n -- ^ number of entries to look up
-> NodeID -- ^ target ID to get the predecessors of
-> NodeCache -- ^ cache to use for lookup
-> Set.Set RemoteCacheEntry
closestCachePredecessors 0 _ _ = Set.empty
closestCachePredecessors remainingLookups lastID nCache
| remainingLookups < 0 = Set.empty
| otherwise =
let result = cacheLookupPred lastID nCache
in
case toRemoteCacheEntry <$> result of
Nothing -> Set.empty
Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestCachePredecessors (remainingLookups-1) (nid ns) nCache
data Action =
QueryID
| Join
| Leave
| Stabilise
| Ping
deriving (Show, Eq, Enum)
-- | Determines whether a lookup key is within the responsibility slice of a node,
-- as it falls between its first predecessor and the node itself.
-- Looks up the successor of the lookup key on a 'RingMap' representation of the
-- predecessor list with the node itself added. If the result is the same as the node
-- itself then it falls into the responsibility interval.
isInOwnResponsibilitySlice :: HasKeyID a => a -> LocalNodeState -> Bool
isInOwnResponsibilitySlice lookupTarget ownNs = (getKeyID <$> rMapLookupSucc (getKeyID lookupTarget) predecessorRMap) == pure (getNid ownNs)
where
predecessorList = predecessors ownNs
-- add node itself to RingMap representation, to distinguish between
-- responsibility of own node and predecessor
predecessorRMap = addRMapEntry (toRemoteNodeState ownNs) $ rMapFromList predecessorList
closestPredecessor = headMay predecessorList
data FediChordMessage =
Request {
requestID :: Integer
, sender :: NodeState
, parts :: Integer
, part :: Integer
-- ^ part starts at 0
, action :: Action
, payload :: Maybe ActionPayload
}
| Response {
responseTo :: Integer
, senderID :: NodeID
, parts :: Integer
, part :: Integer
, action :: Action
, payload :: Maybe ActionPayload
} deriving (Show, Eq)
isPossiblePredecessor :: HasKeyID a => a -> LocalNodeState -> Bool
isPossiblePredecessor = isInOwnResponsibilitySlice
data ActionPayload =
QueryIDRequestPayload {
queryTargetID :: NodeID
, queryLBestNodes :: Integer
}
| JoinRequestPayload
| LeaveRequestPayload {
leaveSuccessors :: [NodeID]
, leavePredecessors :: [NodeID]
}
| StabiliseRequestPayload
| PingRequestPayload
| QueryIDResponsePayload {
queryResult :: QueryResponse
}
| JoinResponsePayload {
joinSuccessors :: [NodeID]
, joinPredecessors :: [NodeID]
, joinCache :: [RemoteCacheEntry]
}
| LeaveResponsePayload
| StabiliseResponsePayload {
stabiliseSuccessors :: [NodeID]
, stabilisePredecessors :: [NodeID]
}
| PingResponsePayload {
pingNodeStates :: [NodeState]
}
deriving (Show, Eq)
-- | global limit of parts per message used when (de)serialising messages.
-- Used to limit the impact of DOS attempts with partial messages.
maximumParts :: Num a => a
maximumParts = 150
-- | dedicated data type for cache entries sent to or received from the network,
-- as these have to be considered as unvalidated. Also helps with separation of trust.
data RemoteCacheEntry = RemoteCacheEntry NodeState POSIXTime
deriving (Show, Eq)
instance Ord RemoteCacheEntry where
(RemoteCacheEntry ns1 _) `compare` (RemoteCacheEntry ns2 _) = nid ns1 `compare` nid ns2
toRemoteCacheEntry :: CacheEntry -> Maybe RemoteCacheEntry
toRemoteCacheEntry (NodeEntry _ ns ts) = Just $ RemoteCacheEntry ns ts
toRemoteCacheEntry (ProxyEntry _ (Just entry@NodeEntry{})) = toRemoteCacheEntry entry
toRemoteCacheEntry _ = Nothing
-- helper function for use in tests
remoteNode_ :: RemoteCacheEntry -> NodeState
remoteNode_ (RemoteCacheEntry ns _) = ns
isPossibleSuccessor :: HasKeyID a => a -> LocalNodeState -> Bool
isPossibleSuccessor lookupTarget ownNs = (getKeyID <$> rMapLookupPred (getKeyID lookupTarget) successorRMap) == pure (getNid ownNs)
where
successorList = successors ownNs
successorRMap = addRMapEntry (toRemoteNodeState ownNs) $ rMapFromList successorList
closestSuccessor = headMay successorList
-- cache operations
@ -164,18 +157,18 @@ addCacheEntry entry cache = do
-- | pure version of 'addCacheEntry' with current time explicitly specified as argument
addCacheEntryPure :: POSIXTime -- ^ current time
-> RemoteCacheEntry -- ^ a remote cache entry received from network
-> NodeCache -- ^ node cache to insert to
-> NodeCache -- ^ new node cache with the element inserted
-> RemoteCacheEntry -- ^ a remote cache entry received from network
-> NodeCache -- ^ node cache to insert to
-> NodeCache -- ^ new node cache with the element inserted
addCacheEntryPure now (RemoteCacheEntry ns ts) cache =
let
-- TODO: limit diffSeconds to some maximum value to prevent malicious nodes from inserting entries valid nearly until eternity
timestamp' = if ts <= now then ts else now
newCache = Map.insertWith insertCombineFunction (nid ns) (NodeEntry False ns timestamp') cache
insertCombineFunction newVal@(NodeEntry newValidationState newNode newTimestamp) oldVal =
newCache = addRMapEntryWith insertCombineFunction (CacheEntry False ns timestamp') cache
insertCombineFunction newVal@(KeyEntry (CacheEntry newValidationState newNode newTimestamp)) oldVal =
case oldVal of
ProxyEntry n _ -> ProxyEntry n (Just newVal)
NodeEntry oldValidationState _ oldTimestamp -> NodeEntry oldValidationState newNode (max oldTimestamp newTimestamp)
KeyEntry (CacheEntry oldValidationState _ oldTimestamp) -> KeyEntry (CacheEntry oldValidationState newNode (max oldTimestamp newTimestamp))
in
newCache
@ -183,10 +176,30 @@ addCacheEntryPure now (RemoteCacheEntry ns ts) cache =
deleteCacheEntry :: NodeID -- ^ID of the node to be deleted
-> NodeCache -- ^cache to delete from
-> NodeCache -- ^cache without the specified element
deleteCacheEntry = Map.update modifier
deleteCacheEntry nid = RingMap . Map.update modifier nid . getRingMap
where
modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
modifier NodeEntry {} = Nothing
modifier KeyEntry {} = Nothing
-- | Add a 'RemoteNodeState' to the node cache marked as verified.
-- If an entry already exists, it is replaced by the new verified one.
addNodeAsVerified :: RemoteNodeState
-> NodeCache
-> IO NodeCache
addNodeAsVerified node cache = do
now <- getPOSIXTime
pure $ addNodeAsVerifiedPure now node cache
-- | Pure variant of 'addNodeAsVerified' with current time explicitly specified as an argument
addNodeAsVerifiedPure :: POSIXTime
-> RemoteNodeState
-> NodeCache
-> NodeCache
addNodeAsVerifiedPure now node = addRMapEntry (CacheEntry True node now)
-- | Mark a cache entry as verified after pinging it, possibly bumping its timestamp.
markCacheEntryAsVerified :: Maybe POSIXTime -- ^ the (current) timestamp to be
@ -194,12 +207,529 @@ markCacheEntryAsVerified :: Maybe POSIXTime -- ^ the (current) timestamp to
-> NodeID -- ^ which node to mark
-> NodeCache -- ^ current node cache
-> NodeCache -- ^ new NodeCache with the updated entry
markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
markCacheEntryAsVerified timestamp nid = RingMap . Map.adjust adjustFunc nid . getRingMap
where
adjustFunc (NodeEntry _ ns ts) = NodeEntry True ns $ fromMaybe ts timestamp
adjustFunc (KeyEntry (CacheEntry _ ns ts)) = KeyEntry (CacheEntry True ns $ fromMaybe ts timestamp)
adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry
adjustFunc entry = entry
-- | uses the successor and predecessor list of a node as an indicator for whether a
-- node has properly joined the DHT
isJoined :: LocalNodeState -> Bool
isJoined ns = not . all null $ [successors ns, predecessors ns]
-- | the size limit to be used when serialising messages for sending
sendMessageSize :: Num i => i
sendMessageSize = 1200
-- ====== message send and receive operations ======
-- encode the response to a request that just signals successful receipt
ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString
ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response {
requestID = requestID req
, senderID = ownID
, part = part req
, isFinalPart = False
, action = action req
, payload = Nothing
}
ackRequest _ _ = Map.empty
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
-- the response to be sent.
handleIncomingRequest :: LocalNodeStateSTM -- ^ the handling node
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> Set.Set FediChordMessage -- ^ all parts of the request to handle
-> SockAddr -- ^ source address of the request
-> IO ()
handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
putStrLn $ "handling incoming request: " <> show msgSet
ns <- readTVarIO nsSTM
-- add nodestate to cache
now <- getPOSIXTime
case headMay . Set.elems $ msgSet of
Nothing -> pure ()
Just aPart -> do
queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns
-- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue
maybe (pure ()) (
mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr))
)
=<< (case action aPart of
Ping -> Just <$> respondPing nsSTM msgSet
Join -> Just <$> respondJoin nsSTM msgSet
-- ToDo: figure out what happens if not joined
QueryID -> Just <$> respondQueryID nsSTM msgSet
-- only when joined
Leave -> if isJoined ns then Just <$> respondLeave nsSTM msgSet else pure Nothing
Stabilise -> if isJoined ns then Just <$> respondStabilise nsSTM msgSet else pure Nothing
)
-- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1.
-- TODO: determine request type only from first part, but catch RecSelError on each record access when folding, because otherwise different request type parts can make this crash
-- TODO: test case: mixed message types of parts
-- ....... response sending .......
-- TODO: could all these respond* functions be in STM instead of IO?
-- | execute a key ID lookup on local cache and respond with the result
respondQueryID :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondQueryID nsSTM msgSet = do
putStrLn "responding to a QueryID request"
-- this message cannot be split reasonably, so just
-- consider the first payload
let
aRequestPart = Set.elemAt 0 msgSet
senderID = getNid . sender $ aRequestPart
senderPayload = foldr' (\msg plAcc ->
if isNothing plAcc && isJust (payload msg)
then payload msg
else plAcc
) Nothing msgSet
-- return only empty message serialisation if no payload was included in parts
maybe (pure Map.empty) (\senderPayload' -> do
responseMsg <- atomically $ do
nsSnap <- readTVar nsSTM
cache <- readTVar $ nodeCacheSTM nsSnap
let
responsePayload = QueryIDResponsePayload {
queryResult = if isJoined nsSnap
then queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload')
-- if not joined yet, attract responsibility for
-- all keys to make bootstrapping possible
else FOUND (toRemoteNodeState nsSnap)
}
queryResponseMsg = Response {
requestID = requestID aRequestPart
, senderID = getNid nsSnap
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
, isFinalPart = False
, action = QueryID
, payload = Just responsePayload
}
pure queryResponseMsg
pure $ serialiseMessage sendMessageSize responseMsg
) senderPayload
-- | Respond to a Leave request by removing the leaving node from local data structures
-- and confirming with response.
-- TODO: copy over key data from leaver and confirm
respondLeave :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondLeave nsSTM msgSet = do
-- combine payload of all parts
let (requestPreds, requestSuccs) = foldr' (\msg (predAcc, succAcc) ->
(maybe predAcc (++ predAcc) (leavePredecessors <$> payload msg)
,maybe succAcc (++ succAcc) (leaveSuccessors <$> payload msg))
)
([],[]) msgSet
aRequestPart = Set.elemAt 0 msgSet
senderID = getNid . sender $ aRequestPart
responseMsg <- atomically $ do
nsSnap <- readTVar nsSTM
-- remove leaving node from successors, predecessors and NodeCache
writeTQueue (cacheWriteQueue nsSnap) $ deleteCacheEntry senderID
writeTVar nsSTM $
-- add predecessors and successors of leaving node to own lists
setPredecessors (filter ((/=) senderID . getNid) $ requestPreds <> predecessors nsSnap)
. setSuccessors (filter ((/=) senderID . getNid) $ requestSuccs <> successors nsSnap) $ nsSnap
-- TODO: handle handover of key data
let leaveResponse = Response {
requestID = requestID aRequestPart
, senderID = getNid nsSnap
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
, isFinalPart = False
, action = Leave
, payload = Just LeaveResponsePayload
}
pure leaveResponse
pure $ serialiseMessage sendMessageSize responseMsg
-- | respond to stabilise requests by returning successor and predecessor list
respondStabilise :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondStabilise nsSTM msgSet = do
nsSnap <- readTVarIO nsSTM
let
aRequestPart = Set.elemAt 0 msgSet
responsePayload = StabiliseResponsePayload {
stabiliseSuccessors = successors nsSnap
, stabilisePredecessors = predecessors nsSnap
}
stabiliseResponse = Response {
requestID = requestID aRequestPart
, senderID = getNid nsSnap
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
, isFinalPart = False
, action = Stabilise
, payload = Just responsePayload
}
-- TODO: return service endpoint for copying over key data
pure $ serialiseMessage sendMessageSize stabiliseResponse
-- | respond to Ping request by returning all active vserver NodeStates
respondPing :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondPing nsSTM msgSet = do
-- TODO: respond with all active VS when implementing k-choices
nsSnap <- readTVarIO nsSTM
let
aRequestPart = Set.elemAt 0 msgSet
responsePayload = PingResponsePayload { pingNodeStates = [ toRemoteNodeState nsSnap ] }
pingResponse = Response {
requestID = requestID aRequestPart
, senderID = getNid nsSnap
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
, isFinalPart = False
, action = Ping
, payload = Just responsePayload
}
pure $ serialiseMessage sendMessageSize pingResponse
-- this modifies node state, so locking and IO seems to be necessary.
-- Still try to keep as much code as possible pure
respondJoin :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondJoin nsSTM msgSet = do
-- atomically read and modify the node state according to the parsed request
responseMsg <- atomically $ do
nsSnap <- readTVar nsSTM
cache <- readTVar $ nodeCacheSTM nsSnap
let
aRequestPart = Set.elemAt 0 msgSet
senderNS = sender aRequestPart
responsibilityLookup = queryLocalCache nsSnap cache 1 (getNid senderNS)
thisNodeResponsible (FOUND _) = True
thisNodeResponsible (FORWARD _) = False
-- check whether the joining node falls into our responsibility
if thisNodeResponsible responsibilityLookup
then do
-- if yes, adjust own predecessors/ successors and return those in a response
let
newPreds = senderNS:predecessors nsSnap
joinedNS = setPredecessors newPreds nsSnap
responsePayload = JoinResponsePayload {
joinSuccessors = successors joinedNS
, joinPredecessors = predecessors joinedNS
, joinCache = toRemoteCache cache
}
joinResponse = Response {
requestID = requestID aRequestPart
, senderID = getNid joinedNS
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
, isFinalPart = False
, action = Join
, payload = Just responsePayload
}
writeTVar nsSTM joinedNS
pure joinResponse
-- otherwise respond with empty payload
else pure Response {
requestID = requestID aRequestPart
, senderID = getNid nsSnap
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
, isFinalPart = False
, action = Join
, payload = Nothing
}
pure $ serialiseMessage sendMessageSize responseMsg
-- TODO: notify service layer to copy over data now handled by the new joined node
-- ....... request sending .......
-- | send a join request and return the joined 'LocalNodeState' including neighbours
requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted
-> LocalNodeStateSTM -- ^ joining NodeState
-> IO (Either String LocalNodeStateSTM) -- ^ node after join with all its new information
requestJoin toJoinOn ownStateSTM =
bracket (mkSendSocket (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
-- extract own state for getting request information
ownState <- readTVarIO ownStateSTM
responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
(cacheInsertQ, joinedState) <- atomically $ do
stateSnap <- readTVar ownStateSTM
let
(cacheInsertQ, predAccSet, succAccSet) = foldl'
(\(insertQ, predAccSet', succAccSet') msg ->
let
insertQ' = maybe insertQ (\msgPl ->
-- collect list of insertion statements into global cache
queueAddEntries (joinCache msgPl) : insertQ
) $ payload msg
-- collect received predecessors and successors
predAccSet'' = maybe predAccSet' (
foldr' Set.insert predAccSet' . joinPredecessors
) $ payload msg
succAccSet'' = maybe succAccSet' (
foldr' Set.insert succAccSet' . joinSuccessors
) $ payload msg
in
(insertQ', predAccSet'', succAccSet'')
)
-- reset predecessors and successors
([], Set.empty, Set.empty)
responses
-- sort, slice and set the accumulated successors and predecessors
newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap
writeTVar ownStateSTM newState
pure (cacheInsertQ, newState)
-- execute the cache insertions
mapM_ (\f -> f joinedState) cacheInsertQ
pure $ if responses == Set.empty
then Left $ "join error: got no response from " <> show (getNid toJoinOn)
else if null (predecessors joinedState) && null (successors joinedState)
then Left "join error: no predecessors or successors"
-- successful join
else Right ownStateSTM
)
`catch` (\e -> pure . Left $ displayException (e :: IOException))
-- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID.
requestQueryID :: LocalNodeState -- ^ NodeState of the querying node
-> NodeID -- ^ target key ID to look up
-> IO RemoteNodeState -- ^ the node responsible for handling that key
-- 1. do a local lookup for the l closest nodes
-- 2. create l sockets
-- 3. send a message async concurrently to all l nodes
-- 4. collect the results, insert them into cache
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
-- TODO: deal with lookup failures
requestQueryID ns targetID = do
firstCacheSnapshot <- readTVarIO . nodeCacheSTM $ ns
-- TODO: make maxAttempts configurable
queryIdLookupLoop firstCacheSnapshot ns 50 targetID
-- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining
queryIdLookupLoop :: NodeCache -> LocalNodeState -> Int -> NodeID -> IO RemoteNodeState
-- return node itself as default fallback value against infinite recursion.
-- TODO: consider using an Either instead of a default value
queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns
queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do
let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID
-- FOUND can only be returned if targetID is owned by local node
case localResult of
FOUND thisNode -> pure thisNode
FORWARD nodeSet -> do
responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet)
now <- getPOSIXTime
-- check for a FOUND and return it
case responseEntries of
FOUND foundNode -> pure foundNode
-- if not FOUND, insert entries into local cache copy and recurse
FORWARD entrySet ->
let newLCache = foldr' (
addCacheEntryPure now
) cacheSnapshot entrySet
in
queryIdLookupLoop newLCache ns (maxAttempts - 1) targetID
sendQueryIdMessages :: (Integral i)
=> NodeID -- ^ target key ID to look up
-> LocalNodeState -- ^ node state of the node doing the query
-> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned
-> [RemoteNodeState] -- ^ nodes to query
-> IO QueryResponse -- ^ accumulated response
sendQueryIdMessages targetID ns lParam targets = do
-- create connected sockets to all query targets and use them for request handling
-- ToDo: make attempts and timeout configurable
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket (getDomain resultNode) (getDhtPort resultNode)) close (
sendRequestTo 5000 3 (lookupMessage targetID ns Nothing)
)) targets
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
-- ToDo: exception handling, maybe log them
responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads
-- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing
now <- getPOSIXTime
-- collect cache entries from all responses
foldM (\acc resp -> do
let entrySet = case queryResult <$> payload resp of
Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now)
Just (FORWARD resultset) -> resultset
_ -> Set.empty
-- forward entries to global cache
queueAddEntries entrySet ns
-- return accumulated QueryResult
pure $ case acc of
-- once a FOUND as been encountered, return this as a result
isFound@FOUND{} -> isFound
FORWARD accSet -> FORWARD $ entrySet `Set.union` accSet
) (FORWARD Set.empty) responses
-- | Create a QueryID message to be supplied to 'sendRequestTo'
lookupMessage :: Integral i
=> NodeID -- ^ target ID
-> LocalNodeState -- ^ sender node state
-> Maybe i -- ^ optionally provide a different l parameter
-> (Integer -> FediChordMessage)
lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
where
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam }
-- | Send a stabilise request to provided 'RemoteNode' and, if successful,
-- return parsed neighbour lists
requestStabilise :: LocalNodeState -- ^ sending node
-> RemoteNodeState -- ^ neighbour node to send to
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node
requestStabilise ns neighbour = do
responses <- bracket (mkSendSocket (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo 5000 3 (\rid ->
Request {
requestID = rid
, sender = toRemoteNodeState ns
, part = 1
, isFinalPart = False
, action = Stabilise
, payload = Just StabiliseRequestPayload
}
)
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
either
-- forward IO error messages
(pure . Left)
(\respSet -> do
-- fold all reply parts together
let (responsePreds, responseSuccs) = foldr' (\msg (predAcc, succAcc) ->
(maybe predAcc (++ predAcc) (stabilisePredecessors <$> payload msg)
,maybe succAcc (++ succAcc) (stabiliseSuccessors <$> payload msg))
)
([],[]) respSet
-- update successfully responded neighbour in cache
now <- getPOSIXTime
maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet)
pure $ if null responsePreds && null responseSuccs
then Left "no neighbours returned"
else Right (responsePreds, responseSuccs)
) responses
requestPing :: LocalNodeState -- ^ sending node
-> RemoteNodeState -- ^ node to be PINGed
-> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node
requestPing ns target = do
responses <- bracket (mkSendSocket (getDomain target) (getDhtPort target)) close
(\sock -> do
resp <- sendRequestTo 5000 3 (\rid ->
Request {
requestID = rid
, sender = toRemoteNodeState ns
, part = 1
, isFinalPart = False
, action = Ping
, payload = Just PingRequestPayload
}
) sock
(SockAddrInet6 _ _ peerAddr _) <- getPeerName sock
pure $ Right (peerAddr, resp)
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
either
-- forward IO error messages
(pure . Left)
(\(peerAddr, respSet) -> do
-- fold all reply parts together
let responseVss = foldr' (\msg acc ->
maybe acc (foldr' (:) acc) (pingNodeStates <$> payload msg)
)
[] respSet
-- recompute ID for each received node and mark as verified in cache
now <- getPOSIXTime
forM_ responseVss (\vs ->
let recomputedID = genNodeID peerAddr (getDomain vs) (fromInteger $ getVServerID vs)
in if recomputedID == getNid vs
then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs
else pure ()
)
pure $ if null responseVss
then Left "no active vServer IDs returned, ignoring node"
else Right responseVss
) responses
-- | Generic function for sending a request over a connected socket and collecting the response.
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
sendRequestTo :: Int -- ^ timeout in seconds
-> Int -- ^ number of retries
-> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
-> Socket -- ^ connected socket to use for sending
-> IO (Set.Set FediChordMessage) -- ^ responses
sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
-- give the message a random request ID
randomID <- randomRIO (0, 2^32-1)
let
msgComplete = msgIncomplete randomID
requests = serialiseMessage sendMessageSize msgComplete
putStrLn $ "sending request message " <> show msgComplete
-- create a queue for passing received response messages back, even after a timeout
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
-- start sendAndAck with timeout
attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
recvdParts <- atomically $ flushTBQueue responseQ
pure $ Set.fromList recvdParts
where
sendAndAck :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
-> Socket -- ^ the socket used for sending and receiving for this particular remote node
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> IO ()
sendAndAck responseQueue sock remainingSends = do
sendMany sock $ Map.elems remainingSends
-- if all requests have been acked/ responded to, return prematurely
recvLoop responseQueue remainingSends Set.empty Nothing
recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> Set.Set Integer -- ^ already received response part numbers
-> Maybe Integer -- ^ total number of response parts if already known
-> IO ()
recvLoop responseQueue remainingSends' receivedPartNums totalParts = do
-- 65535 is maximum length of UDP packets, as long as
-- no IPv6 jumbograms are used
response <- deserialiseMessage <$> recv sock 65535
case response of
Right msg@Response{} -> do
atomically $ writeTBQueue responseQueue msg
let
newTotalParts = if isFinalPart msg then Just (part msg) else totalParts
newRemaining = Map.delete (part msg) remainingSends'
newReceivedParts = Set.insert (part msg) receivedPartNums
if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts
then pure ()
else recvLoop responseQueue newRemaining receivedPartNums newTotalParts
-- drop errors and invalid messages
Left _ -> recvLoop responseQueue remainingSends' receivedPartNums totalParts
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
queueAddEntries :: Foldable c => c RemoteCacheEntry
-> LocalNodeState
-> IO ()
queueAddEntries entries ns = do
now <- getPOSIXTime
forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry
-- | enque a list of node IDs to be deleted from the global NodeCache
queueDeleteEntries :: Foldable c
=> c NodeID
-> LocalNodeState
-> IO ()
queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry
-- | enque a single node ID to be deleted from the global NodeCache
queueDeleteEntry :: NodeID
-> LocalNodeState
-> IO ()
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
-- | retry an IO action at most *i* times until it delivers a result
attempts :: Int -- ^ number of retries *i*
-> IO (Maybe a) -- ^ action to retry
@ -210,3 +740,39 @@ attempts i action = do
case actionResult of
Nothing -> attempts (i-1) action
Just res -> pure $ Just res
-- ====== network socket operations ======
-- | resolve a specified host and return the 'AddrInfo' for it.
-- If no hostname or IP is specified, the 'AddrInfo' can be used to bind to all
-- addresses;
-- if no port is specified an arbitrary free port is selected.
resolve :: Maybe String -- ^ hostname or IP address to be resolved
-> Maybe PortNumber -- ^ port number of either local bind or remote
-> IO AddrInfo
resolve host port = let
hints = defaultHints { addrFamily = AF_INET6, addrSocketType = Datagram
, addrFlags = [AI_PASSIVE] }
in
head <$> getAddrInfo (Just hints) host (show <$> port)
-- | create an unconnected UDP Datagram 'Socket' bound to the specified address
mkServerSocket :: HostAddress6 -> PortNumber -> IO Socket
mkServerSocket ip port = do
sockAddr <- addrAddress <$> resolve (Just $ show . fromHostAddress6 $ ip) (Just port)
sock <- socket AF_INET6 Datagram defaultProtocol
setSocketOption sock IPv6Only 1
bind sock sockAddr
pure sock
-- | create a UDP datagram socket, connected to a destination.
-- The socket gets an arbitrary free local port assigned.
mkSendSocket :: String -- ^ destination hostname or IP
-> PortNumber -- ^ destination port
-> IO Socket -- ^ a socket with an arbitrary source port
mkSendSocket dest destPort = do
destAddr <- addrAddress <$> resolve (Just dest) (Just destPort)
sendSock <- socket AF_INET6 Datagram defaultProtocol
setSocketOption sendSock IPv6Only 1
connect sendSock destAddr
pure sendSock

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,604 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
module Hash2Pub.FediChordTypes (
NodeID -- abstract, but newtype constructors cannot be hidden
, idBits
, getNodeID
, toNodeID
, NodeState (..)
, LocalNodeState (..)
, LocalNodeStateSTM
, RemoteNodeState (..)
, RealNode (..)
, RealNodeSTM
, setSuccessors
, setPredecessors
, NodeCache
, CacheEntry(..)
, RingEntry(..)
, RingMap(..)
, HasKeyID
, getKeyID
, rMapSize
, rMapLookup
, rMapLookupPred
, rMapLookupSucc
, addRMapEntry
, addRMapEntryWith
, addPredecessors
, addSuccessors
, takeRMapPredecessors
, takeRMapSuccessors
, deleteRMapEntry
, setRMapEntries
, rMapFromList
, rMapToList
, cacheGetNodeStateUnvalidated
, initCache
, cacheEntries
, cacheLookup
, cacheLookupSucc
, cacheLookupPred
, localCompare
, genNodeID
, genNodeIDBS
, genKeyID
, genKeyIDBS
, byteStringToUInteger
, ipAddrAsBS
, bsAsIpAddr
, FediChordConf(..)
) where
import Control.Exception
import Data.Foldable (foldr')
import Data.Function (on)
import Data.List (delete, nub, sortBy)
import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust, fromMaybe, isJust,
isNothing, mapMaybe)
import qualified Data.Set as Set
import Data.Time.Clock.POSIX
import Network.Socket
-- for hashing and ID conversion
import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Monad (forever)
import Crypto.Hash
import qualified Data.ByteArray as BA
import qualified Data.ByteString as BS
import qualified Data.ByteString.UTF8 as BSU
import Data.IP (IPv6, fromHostAddress6,
toHostAddress6)
import Data.Typeable (Typeable (..), typeOf)
import Data.Word
import qualified Network.ByteOrder as NetworkBytes
import Hash2Pub.Utils
import Debug.Trace (trace)
-- define protocol constants
-- | static definition of ID length in bits
idBits :: Integer
idBits = 256
-- |NodeIDs are Integers wrapped in a newtype, to be able to redefine
-- their instance behaviour
--
-- for being able to check value bounds, the constructor should not be used directly
-- and new values are created via @toNodeID@ (newtype constructors cannot be hidden)
newtype NodeID = NodeID { getNodeID :: Integer } deriving stock (Show, Eq) deriving newtype Enum
-- |smart data constructor for NodeID that throws a runtime exception for out-of-bounds values.
-- When needing a runtime-safe constructor with drawbacks, try @fromInteger@
toNodeID :: Integer -> NodeID
toNodeID i = assert (i >= getNodeID minBound && i <= getNodeID maxBound) $ NodeID i
-- |NodeIDs are bounded by the value range of an unsigned Integer of length 'idBits'
instance Bounded NodeID where
minBound = NodeID 0
maxBound = NodeID (2^idBits - 1)
-- |calculations with NodeIDs are modular arithmetic operations
instance Num NodeID where
a + b = NodeID $ (getNodeID a + getNodeID b) `mod` (getNodeID maxBound + 1)
a * b = NodeID $ (getNodeID a * getNodeID b) `mod` (getNodeID maxBound + 1)
a - b = NodeID $ (getNodeID a - getNodeID b) `mod` (getNodeID maxBound + 1)
-- |safe constructor for NodeID values with the drawback, that out-of-bound values are wrapped around
-- with modulo to fit in the allowed value space. For runtime checking, look at @toNodeID@.
fromInteger i = NodeID $ i `mod` (getNodeID maxBound + 1)
signum = NodeID . signum . getNodeID
abs = NodeID . abs . getNodeID -- ToDo: make sure that at creation time only IDs within the range are used
-- | use normal strict monotonic ordering of integers, realising the ring structure
-- is done in the @NodeCache@ implementation
instance Ord NodeID where
a `compare` b = getNodeID a `compare` getNodeID b
-- | local comparison of 2 node IDs, only relevant for determining a successor or predecessor on caches with just 2 nodes
localCompare :: NodeID -> NodeID -> Ordering
a `localCompare` b
| getNodeID a == getNodeID b = EQ
| wayForwards > wayBackwards = GT
| otherwise = LT
where
wayForwards = getNodeID (b - a)
wayBackwards = getNodeID (a - b)
-- | Data for managing the virtual server nodes of this real node.
-- Also contains shared data and config values.
-- TODO: more data structures for k-choices bookkeeping
data RealNode = RealNode
{ vservers :: [LocalNodeStateSTM]
-- ^ references to all active versers
, nodeConfig :: FediChordConf
-- ^ holds the initial configuration read at program start
, bootstrapNodes :: [(String, PortNumber)]
-- ^ nodes to be used as bootstrapping points, new ones learned during operation
}
type RealNodeSTM = TVar RealNode
-- | represents a node and all its important state
data RemoteNodeState = RemoteNodeState
{ nid :: NodeID
, domain :: String
-- ^ full public domain name the node is reachable under
, ipAddr :: HostAddress6
-- the node's public IPv6 address
, dhtPort :: PortNumber
-- ^ port of the DHT itself
, servicePort :: PortNumber
-- ^ port of the service provided on top of the DHT
, vServerID :: Integer
-- ^ ID of this vserver
}
deriving (Show, Eq)
instance Ord RemoteNodeState where
a `compare` b = nid a `compare` nid b
-- | represents a node and encapsulates all data and parameters that are not present for remote nodes
data LocalNodeState = LocalNodeState
{ nodeState :: RemoteNodeState
-- ^ represents common data present both in remote and local node representations
, nodeCacheSTM :: TVar NodeCache
-- ^ EpiChord node cache with expiry times for nodes
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- ^ cache updates are not written directly to the 'nodeCache' but queued and
, successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well
-- ^ successor nodes in ascending order by distance
, predecessors :: [RemoteNodeState]
-- ^ predecessor nodes in ascending order by distance
, kNeighbours :: Int
-- ^ desired length of predecessor and successor list
, lNumBestNodes :: Int
-- ^ number of best next hops to provide
, pNumParallelQueries :: Int
-- ^ number of parallel sent queries
, jEntriesPerSlice :: Int
-- ^ number of desired entries per cache slice
, parentRealNode :: RealNodeSTM
-- ^ the parent node managing this vserver instance
}
deriving (Show, Eq)
-- | for concurrent access, LocalNodeState is wrapped in a TVar
type LocalNodeStateSTM = TVar LocalNodeState
-- | class for various NodeState representations, providing
-- getters and setters for common values
class NodeState a where
-- getters for common properties
getNid :: a -> NodeID
getDomain :: a -> String
getIpAddr :: a -> HostAddress6
getDhtPort :: a -> PortNumber
getServicePort :: a -> PortNumber
getVServerID :: a -> Integer
-- setters for common properties
setNid :: NodeID -> a -> a
setDomain :: String -> a -> a
setIpAddr :: HostAddress6 -> a -> a
setDhtPort :: PortNumber -> a -> a
setServicePort :: PortNumber -> a -> a
setVServerID :: Integer -> a -> a
toRemoteNodeState :: a -> RemoteNodeState
instance NodeState RemoteNodeState where
getNid = nid
getDomain = domain
getIpAddr = ipAddr
getDhtPort = dhtPort
getServicePort = servicePort
getVServerID = vServerID
setNid nid' ns = ns {nid = nid'}
setDomain domain' ns = ns {domain = domain'}
setIpAddr ipAddr' ns = ns {ipAddr = ipAddr'}
setDhtPort dhtPort' ns = ns {dhtPort = dhtPort'}
setServicePort servicePort' ns = ns {servicePort = servicePort'}
setVServerID vServerID' ns = ns {vServerID = vServerID'}
toRemoteNodeState = id
-- | helper function for setting values on the 'RemoteNodeState' contained in the 'LocalNodeState'
propagateNodeStateSet_ :: (RemoteNodeState -> RemoteNodeState) -> LocalNodeState -> LocalNodeState
propagateNodeStateSet_ func ns = let
newNs = func $ nodeState ns
in
ns {nodeState = newNs}
instance NodeState LocalNodeState where
getNid = getNid . nodeState
getDomain = getDomain . nodeState
getIpAddr = getIpAddr . nodeState
getDhtPort = getDhtPort . nodeState
getServicePort = getServicePort . nodeState
getVServerID = getVServerID . nodeState
setNid nid' = propagateNodeStateSet_ $ setNid nid'
setDomain domain' = propagateNodeStateSet_ $ setDomain domain'
setIpAddr ipAddr' = propagateNodeStateSet_ $ setIpAddr ipAddr'
setDhtPort dhtPort' = propagateNodeStateSet_ $ setDhtPort dhtPort'
setServicePort servicePort' = propagateNodeStateSet_ $ setServicePort servicePort'
setVServerID vServerID' = propagateNodeStateSet_ $ setVServerID vServerID'
toRemoteNodeState = nodeState
-- | defining Show instances to be able to print NodeState for debug purposes
instance Typeable a => Show (TVar a) where
show x = show (typeOf x)
instance Typeable a => Show (TQueue a) where
show x = show (typeOf x)
-- | convenience function that replaces the predecessors of a 'LocalNodeState' with the k closest nodes from the provided list
setPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
setPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . rMapFromList . filter ((/=) (getNid ns) . getNid) $ preds}
-- | convenience function that replaces the successors of a 'LocalNodeState' with the k closest nodes from the provided list
setSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
setSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . rMapFromList . filter ((/=) (getNid ns) . getNid) $ succs}
-- | sets the predecessors of a 'LocalNodeState' to the closest k nodes of the current predecessors and the provided list, combined
addPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
addPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . addRMapEntries (filter ((/=) (getNid ns) . getNid) preds) . rMapFromList $ predecessors ns}
-- | sets the successors of a 'LocalNodeState' to the closest k nodes of the current successors and the provided list, combined
addSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
addSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . addRMapEntries (filter ((/=) (getNid ns) . getNid) succs) . rMapFromList $ successors ns}
-- | Class for all types that can be identified via an EpiChord key.
-- Used for restricting the types a 'RingMap' can store
class (Eq a, Show a) => HasKeyID a where
getKeyID :: a -> NodeID
instance HasKeyID RemoteNodeState where
getKeyID = getNid
instance HasKeyID CacheEntry where
getKeyID (CacheEntry _ ns _) = getNid ns
instance HasKeyID NodeID where
getKeyID = id
type NodeCache = RingMap CacheEntry
-- | generic data structure for holding elements with a key and modular lookup
newtype RingMap a = RingMap { getRingMap :: HasKeyID a => Map.Map NodeID (RingEntry a) }
instance (HasKeyID a) => Eq (RingMap a) where
a == b = getRingMap a == getRingMap b
instance (HasKeyID a) => Show (RingMap a) where
show rmap = shows "RingMap " (show $ getRingMap rmap)
-- | entry of a 'RingMap' that holds a value and can also
-- wrap around the lookup direction at the edges of the name space.
data RingEntry a = KeyEntry a
| ProxyEntry (NodeID, ProxyDirection) (Maybe (RingEntry a))
deriving (Show, Eq)
-- | 'RingEntry' type for usage as a node cache
data CacheEntry = CacheEntry Bool RemoteNodeState POSIXTime
deriving (Show, Eq)
-- | as a compromise, only KeyEntry components are ordered by their NodeID
-- while ProxyEntry components should never be tried to be ordered.
instance (HasKeyID a, Eq a) => Ord (RingEntry a) where
a `compare` b = compare (extractID a) (extractID b)
where
extractID (KeyEntry e) = getKeyID e
extractID ProxyEntry{} = error "proxy entries should never appear outside of the RingMap"
data ProxyDirection = Backwards
| Forwards
deriving (Show, Eq)
instance Enum ProxyDirection where
toEnum (-1) = Backwards
toEnum 1 = Forwards
toEnum _ = error "no such ProxyDirection"
fromEnum Backwards = - 1
fromEnum Forwards = 1
-- | helper function for getting the a from a RingEntry a
extractRingEntry :: HasKeyID a => RingEntry a -> Maybe a
extractRingEntry (KeyEntry entry) = Just entry
extractRingEntry (ProxyEntry _ (Just (KeyEntry entry))) = Just entry
extractRingEntry _ = Nothing
--- useful function for getting entries for a full cache transfer
cacheEntries :: NodeCache -> [CacheEntry]
cacheEntries = mapMaybe extractRingEntry . Map.elems . getRingMap
-- | An empty 'RingMap' needs to be initialised with 2 proxy entries,
-- linking the modular name space together by connecting @minBound@ and @maxBound@
emptyRMap :: HasKeyID a => RingMap a
emptyRMap = RingMap . Map.fromList $ proxyEntry <$> [(maxBound, (minBound, Forwards)), (minBound, (maxBound, Backwards))]
where
proxyEntry (from,to) = (from, ProxyEntry to Nothing)
initCache :: NodeCache
initCache = emptyRMap
-- | Maybe returns the entry stored at given key
rMapLookup :: HasKeyID a
=> NodeID -- ^lookup key
-> RingMap a -- ^lookup cache
-> Maybe a
rMapLookup key rmap = extractRingEntry =<< Map.lookup key (getRingMap rmap)
cacheLookup :: NodeID -- ^lookup key
-> NodeCache -- ^lookup cache
-> Maybe CacheEntry
cacheLookup = rMapLookup
-- | returns number of present 'KeyEntry' in a properly initialised 'RingMap'
rMapSize :: (HasKeyID a, Integral i)
=> RingMap a
-> i
rMapSize rmap = fromIntegral $ Map.size innerMap - oneIfEntry minBound - oneIfEntry maxBound
where
innerMap = getRingMap rmap
oneIfEntry :: Integral i => NodeID -> i
oneIfEntry nid
| isNothing (rMapLookup nid rmap) = 1
| otherwise = 0
-- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@
-- to simulate a modular ring
lookupWrapper :: HasKeyID a
=> (NodeID -> Map.Map NodeID (RingEntry a) -> Maybe (NodeID, RingEntry a))
-> (NodeID -> Map.Map NodeID (RingEntry a) -> Maybe (NodeID, RingEntry a))
-> ProxyDirection
-> NodeID
-> RingMap a
-> Maybe a
lookupWrapper f fRepeat direction key rmap =
case f key $ getRingMap rmap of
-- the proxy entry found holds a
Just (_, ProxyEntry _ (Just (KeyEntry entry))) -> Just entry
-- proxy entry holds another proxy entry, this should not happen
Just (_, ProxyEntry _ (Just (ProxyEntry _ _))) -> Nothing
-- proxy entry without own entry is a pointer on where to continue
-- if lookup direction is the same as pointer direction: follow pointer
Just (foundKey, ProxyEntry (pointerID, pointerDirection) Nothing) ->
let newKey = if pointerDirection == direction
then pointerID
else foundKey + (fromInteger . toInteger . fromEnum $ direction)
in if rMapNotEmpty rmap
then lookupWrapper fRepeat fRepeat direction newKey rmap
else Nothing
-- normal entries are returned
Just (_, KeyEntry entry) -> Just entry
Nothing -> Nothing
where
rMapNotEmpty :: (HasKeyID a) => RingMap a -> Bool
rMapNotEmpty rmap' = (Map.size (getRingMap rmap') > 2) -- there are more than the 2 ProxyEntries
|| isJust (rMapLookup minBound rmap') -- or one of the ProxyEntries holds a node
|| isJust (rMapLookup maxBound rmap')
-- | find the successor node to a given key on a modular EpiChord ring.
-- Note: The EpiChord definition of "successor" includes the node at the key itself,
-- if existing.
rMapLookupSucc :: HasKeyID a
=> NodeID -- ^lookup key
-> RingMap a -- ^ring cache
-> Maybe a
rMapLookupSucc = lookupWrapper Map.lookupGE Map.lookupGE Forwards
cacheLookupSucc :: NodeID -- ^lookup key
-> NodeCache -- ^ring cache
-> Maybe CacheEntry
cacheLookupSucc = rMapLookupSucc
-- | find the predecessor node to a given key on a modular EpiChord ring.
rMapLookupPred :: HasKeyID a
=> NodeID -- ^lookup key
-> RingMap a -- ^ring cache
-> Maybe a
rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards
cacheLookupPred :: NodeID -- ^lookup key
-> NodeCache -- ^ring cache
-> Maybe CacheEntry
cacheLookupPred = rMapLookupPred
addRMapEntryWith :: HasKeyID a
=> (RingEntry a -> RingEntry a -> RingEntry a)
-> a
-> RingMap a
-> RingMap a
addRMapEntryWith combineFunc entry = RingMap
. Map.insertWith combineFunc (getKeyID entry) (KeyEntry entry)
. getRingMap
addRMapEntry :: HasKeyID a
=> a
-> RingMap a
-> RingMap a
addRMapEntry = addRMapEntryWith insertCombineFunction
where
insertCombineFunction newVal oldVal =
case oldVal of
ProxyEntry n _ -> ProxyEntry n (Just newVal)
KeyEntry _ -> newVal
addRMapEntries :: (Foldable t, HasKeyID a)
=> t a
-> RingMap a
-> RingMap a
addRMapEntries entries rmap = foldr' addRMapEntry rmap entries
setRMapEntries :: (Foldable t, HasKeyID a)
=> t a
-> RingMap a
setRMapEntries entries = addRMapEntries entries emptyRMap
deleteRMapEntry :: (HasKeyID a)
=> NodeID
-> RingMap a
-> RingMap a
deleteRMapEntry nid = RingMap . Map.update modifier nid . getRingMap
where
modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
modifier KeyEntry {} = Nothing
rMapToList :: (HasKeyID a) => RingMap a -> [a]
rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap
rMapFromList :: (HasKeyID a) => [a] -> RingMap a
rMapFromList = setRMapEntries
-- | takes up to i entries from a 'RingMap' by calling a getter function on a
-- *startAt* value and after that on the previously returned value.
-- Stops once i entries have been taken or an entry has been encountered twice
-- (meaning the ring has been traversed completely).
-- Forms the basis for 'takeRMapSuccessors' and 'takeRMapPredecessors'.
takeRMapEntries_ :: (HasKeyID a, Integral i)
=> (NodeID -> RingMap a -> Maybe a)
-> NodeID
-> i
-> RingMap a
-> [a]
-- TODO: might be more efficient with dlists
takeRMapEntries_ getterFunc startAt num rmap = reverse $
case getterFunc startAt rmap of
Nothing -> []
Just anEntry -> takeEntriesUntil (getKeyID anEntry) (getKeyID anEntry) (num-1) [anEntry]
where
takeEntriesUntil havingReached previousEntry remaining takeAcc
| remaining <= 0 = takeAcc
| getKeyID (fromJust $ getterFunc previousEntry rmap) == havingReached = takeAcc
| otherwise = let (Just gotEntry) = getterFunc previousEntry rmap
in takeEntriesUntil havingReached (getKeyID gotEntry) (remaining-1) (gotEntry:takeAcc)
takeRMapPredecessors :: (HasKeyID a, Integral i)
=> NodeID
-> i
-> RingMap a
-> [a]
takeRMapPredecessors = takeRMapEntries_ rMapLookupPred
takeRMapSuccessors :: (HasKeyID a, Integral i)
=> NodeID
-> i
-> RingMap a
-> [a]
takeRMapSuccessors = takeRMapEntries_ rMapLookupSucc
-- clean up cache entries: once now - entry > maxAge
-- transfer difference now - entry to other node
-- | return the @NodeState@ data from a cache entry without checking its validation status
cacheGetNodeStateUnvalidated :: CacheEntry -> RemoteNodeState
cacheGetNodeStateUnvalidated (CacheEntry _ nState _) = nState
-- | converts a 'HostAddress6' IP address to a big-endian strict ByteString
ipAddrAsBS :: HostAddress6 -> BS.ByteString
ipAddrAsBS (a, b, c, d) = mconcat $ fmap NetworkBytes.bytestring32 [a, b, c, d]
-- | converts a ByteString in big endian order to an IPv6 address 'HostAddress6'
bsAsIpAddr :: BS.ByteString -> HostAddress6
bsAsIpAddr bytes = (a,b,c,d)
where
a:b:c:d:_ = fmap NetworkBytes.word32 . chunkBytes 4 $ bytes
-- | generates a 256 bit long NodeID using SHAKE128, represented as ByteString
genNodeIDBS :: HostAddress6 -- ^a node's IPv6 address
-> String -- ^a node's 1st and 2nd level domain name
-> Word8 -- ^the used vserver ID
-> BS.ByteString -- ^the NodeID as a 256bit ByteString big-endian unsigned integer
genNodeIDBS ip nodeDomain vserver =
hashIpaddrUpper `BS.append` hashID nodeDomain' `BS.append` hashIpaddLower
where
vsBS = BS.pack [vserver] -- attention: only works for vserver IDs up to 255
ipaddrNet = BS.take 8 (ipAddrAsBS ip) `BS.append` vsBS
nodeDomain' = BSU.fromString nodeDomain `BS.append` vsBS
hashID bstr = BS.pack . BA.unpack $ (hash bstr :: Digest (SHAKE128 128))
(hashIpaddrUpper, hashIpaddLower) = BS.splitAt 64 $ hashID ipaddrNet
-- | generates a 256 bit long @NodeID@ using SHAKE128
genNodeID :: HostAddress6 -- ^a node's IPv6 address
-> String -- ^a node's 1st and 2nd level domain name
-> Word8 -- ^the used vserver ID
-> NodeID -- ^the generated @NodeID@
genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs
-- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT
genKeyIDBS :: String -- ^the key string
-> BS.ByteString -- ^the key ID represented as a @ByteString@
genKeyIDBS key = BS.pack . BA.unpack $ (hash (BSU.fromString key) :: Digest SHA3_256)
-- | generates a 256 bit long key identifier for looking up its data on the DHT
genKeyID :: String -- ^the key string
-> NodeID -- ^the key ID
genKeyID = NodeID . byteStringToUInteger . genKeyIDBS
-- | parses the bit pattern of a ByteString as an unsigned Integer in Big Endian order
-- by iterating it byte-wise from the back and shifting the byte values according to their offset
byteStringToUInteger :: BS.ByteString -> Integer
byteStringToUInteger bs = sum $ parsedBytes 0 bs
where
parsedBytes :: Integer -> BS.ByteString -> [ Integer ]
parsedBytes offset uintBs = case BS.unsnoc uintBs of
Nothing -> []
Just (bs', w) -> parseWithOffset offset w : parsedBytes (offset+1) bs'
parseWithOffset :: Integer -> Word8 -> Integer
parseWithOffset 0 word = toInteger word -- a shift of 0 is always 0
parseWithOffset offset word = toInteger word * 2^(8 * offset)
-- Todo: DHT backend can learn potential initial bootstrapping points through the instances mentioned in the received AP-relay messages
-- persist them on disk so they can be used for all following bootstraps
-- | configuration values used for initialising the FediChord DHT
data FediChordConf = FediChordConf
{ confDomain :: String
-- ^ the domain/ hostname the node is reachable under
, confIP :: HostAddress6
-- ^ IP address of outgoing packets
, confDhtPort :: Int
-- ^ listening port for the FediChord DHT
, confBootstrapNodes :: [(String, PortNumber)]
-- ^ list of potential bootstrapping nodes
, confBootstrapSamplingInterval :: Int
-- ^ pause between sampling the own ID through bootstrap nodes, in seconds
}
deriving (Show, Eq)

View file

@ -0,0 +1,101 @@
module Hash2Pub.ProtocolTypes where
import qualified Data.Map as Map
import Data.Maybe (mapMaybe)
import qualified Data.Set as Set
import Data.Time.Clock.POSIX (POSIXTime)
import Hash2Pub.FediChordTypes
data QueryResponse = FORWARD (Set.Set RemoteCacheEntry)
| FOUND RemoteNodeState
deriving (Show, Eq)
-- === protocol serialisation data types
data Action = QueryID
| Join
| Leave
| Stabilise
| Ping
deriving (Show, Eq, Enum)
data FediChordMessage = Request
{ requestID :: Integer
, sender :: RemoteNodeState
, part :: Integer
, isFinalPart :: Bool
-- ^ part starts at 1
, action :: Action
, payload :: Maybe ActionPayload
}
| Response
{ requestID :: Integer
, senderID :: NodeID
, part :: Integer
, isFinalPart :: Bool
, action :: Action
, payload :: Maybe ActionPayload
}
deriving (Show, Eq)
instance Ord FediChordMessage where
compare a@Request{} b@Request{} | requestID a == requestID b = part a `compare` part b
| otherwise = requestID a `compare` requestID b
compare a@Response{} b@Response{} | requestID a == requestID b = part a `compare` part b
| otherwise = requestID a `compare` requestID b
-- comparing different constructor types always yields "not equal"
compare _ _ = LT
data ActionPayload = QueryIDRequestPayload
{ queryTargetID :: NodeID
, queryLBestNodes :: Integer
}
| JoinRequestPayload
| LeaveRequestPayload
{ leaveSuccessors :: [RemoteNodeState]
, leavePredecessors :: [RemoteNodeState]
}
| StabiliseRequestPayload
| PingRequestPayload
| QueryIDResponsePayload
{ queryResult :: QueryResponse
}
| JoinResponsePayload
{ joinSuccessors :: [RemoteNodeState]
, joinPredecessors :: [RemoteNodeState]
, joinCache :: [RemoteCacheEntry]
}
| LeaveResponsePayload
| StabiliseResponsePayload
{ stabiliseSuccessors :: [RemoteNodeState]
, stabilisePredecessors :: [RemoteNodeState]
}
| PingResponsePayload
{ pingNodeStates :: [RemoteNodeState]
}
deriving (Show, Eq)
-- | global limit of parts per message used when (de)serialising messages.
-- Used to limit the impact of DOS attempts with partial messages.
maximumParts :: Num a => a
maximumParts = 150
-- | dedicated data type for cache entries sent to or received from the network,
-- as these have to be considered as unvalidated. Also helps with separation of trust.
data RemoteCacheEntry = RemoteCacheEntry RemoteNodeState POSIXTime
deriving (Show, Eq)
instance Ord RemoteCacheEntry where
(RemoteCacheEntry ns1 _) `compare` (RemoteCacheEntry ns2 _) = nid ns1 `compare` nid ns2
toRemoteCacheEntry :: CacheEntry -> RemoteCacheEntry
toRemoteCacheEntry (CacheEntry _ ns ts) = RemoteCacheEntry ns ts
-- | a list of all entries of a 'NodeCache' as 'RemoteCacheEntry', useful for cache transfers
toRemoteCache :: NodeCache -> [RemoteCacheEntry]
toRemoteCache cache = toRemoteCacheEntry <$> cacheEntries cache
-- | extract the 'NodeState' from a 'RemoteCacheEntry'
remoteNode :: RemoteCacheEntry -> RemoteNodeState
remoteNode (RemoteCacheEntry ns _) = ns

View file

@ -1,13 +1,13 @@
{-# LANGUAGE OverloadedStrings #-}
module FediChordSpec where
import Control.Concurrent.STM.TVar
import Control.Exception
import Data.ASN1.Parse (runParseASN1)
import qualified Data.ByteString as BS
import Data.IORef
import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust)
import qualified Data.Set as Set
import Data.ASN1.Parse (runParseASN1)
import qualified Data.ByteString as BS
import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust, isJust)
import qualified Data.Set as Set
import Data.Time.Clock.POSIX
import Network.Socket
import Test.Hspec
@ -15,6 +15,7 @@ import Test.Hspec
import Hash2Pub.ASN1Coding
import Hash2Pub.DHTProtocol
import Hash2Pub.FediChord
import Hash2Pub.FediChordTypes
spec :: Spec
spec = do
@ -55,14 +56,13 @@ spec = do
it "can be initialised" $
print exampleNodeState
it "can be initialised partly and then modified later" $ do
let ns = NodeState {
let ns = RemoteNodeState {
nid = undefined
, domain = exampleNodeDomain
, ipAddr = exampleIp
, dhtPort = 2342
, apPort = Nothing
, servicePort = 513
, vServerID = undefined
, internals = Nothing
}
nsReady = ns {
nid = genNodeID (ipAddr ns) (domain ns) 3
@ -81,8 +81,8 @@ spec = do
newCache = addCacheEntryPure 10 (RemoteCacheEntry exampleNodeState 10) (addCacheEntryPure 10 (RemoteCacheEntry anotherNode 10) emptyCache)
exampleID = nid exampleNodeState
it "entries can be added to a node cache and looked up again" $ do
-- the cache includes 2 additional proxy elements right from the start
Map.size newCache - Map.size emptyCache `shouldBe` 2
rMapSize emptyCache `shouldBe` 0
rMapSize newCache `shouldBe` 2
-- normal entry lookup
nid . cacheGetNodeStateUnvalidated <$> cacheLookup anotherID newCache `shouldBe` Just anotherID
nid . cacheGetNodeStateUnvalidated <$> cacheLookup (anotherID+1) newCache `shouldBe` Nothing
@ -121,50 +121,72 @@ spec = do
let
emptyCache = initCache
nid1 = toNodeID 2^(23::Integer)+1
node1 = do
eln <- exampleLocalNode -- is at 2^23.00000017198264 = 8388609
pure $ putPredecessors [nid4] $ eln {nid = nid1}
node1 = setPredecessors [node4] . setNid nid1 <$> exampleLocalNode
nid2 = toNodeID 2^(230::Integer)+12
node2 = exampleNodeState { nid = nid2}
nid3 = toNodeID 2^(25::Integer)+10
node3 = exampleNodeState { nid = nid3}
nid4 = toNodeID 2^(9::Integer)+100
node4 = exampleNodeState { nid = nid4}
cacheWith2Entries :: IO NodeCache
cacheWith2Entries = addCacheEntryPure 10 <$> (RemoteCacheEntry <$> node1 <*> pure 10) <*> pure (addCacheEntryPure 10 (RemoteCacheEntry node2 10) emptyCache)
cacheWith4Entries = addCacheEntryPure 10 (RemoteCacheEntry node3 10) <$> (addCacheEntryPure 10 (RemoteCacheEntry node4 10) <$> cacheWith2Entries)
it "works on an empty cache" $ do
nid5 = toNodeID 2^(25::Integer)+100
node5 = exampleNodeState { nid = nid5}
cacheWith2Entries :: NodeCache
cacheWith2Entries = addCacheEntryPure 10 (RemoteCacheEntry node5 10) (addCacheEntryPure 10 (RemoteCacheEntry node2 10) emptyCache)
cacheWith4Entries = addCacheEntryPure 10 (RemoteCacheEntry node3 10) (addCacheEntryPure 10 (RemoteCacheEntry node4 10) cacheWith2Entries)
it "unjoined nodes should never return themselfs" $ do
exampleLocalNodeAsRemote <- toRemoteNodeState <$> exampleLocalNode
queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) `shouldReturn` FORWARD Set.empty
queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 1 <*> pure (toNodeID 2342) `shouldReturn` FORWARD Set.empty
(FORWARD fwSet) <- queryLocalCache <$> exampleLocalNode <*> pure cacheWith4Entries <*> pure 1 <*> (getNid <$> exampleLocalNode)
remoteNode (head $ Set.elems fwSet) `shouldBe` node4
it "joined nodes do not fall back to the default" $
queryLocalCache <$> node1 <*> pure emptyCache <*> pure 1 <*> pure (toNodeID 3) `shouldReturn` FORWARD Set.empty
it "works on a cache with less entries than needed" $ do
(FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith2Entries <*> pure 4 <*> pure (toNodeID 2^(9::Integer)+5)
Set.map (nid . remoteNode_) nodeset `shouldBe` Set.fromList [ nid1, nid2 ]
(FORWARD nodeset) <- queryLocalCache <$> node1 <*> pure cacheWith2Entries <*> pure 4 <*> pure (toNodeID 2^(9::Integer)+5)
Set.map (nid . remoteNode) nodeset `shouldBe` Set.fromList [ nid5, nid2 ]
it "works on a cache with sufficient entries" $ do
(FORWARD nodeset1) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5)
(FORWARD nodeset2) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 1 <*> pure (toNodeID 2^(9::Integer)+5)
Set.map (nid . remoteNode_) nodeset1 `shouldBe` Set.fromList [nid4, nid2, nid3]
Set.map (nid . remoteNode_) nodeset2 `shouldBe` Set.fromList [nid4]
(FORWARD nodeset1) <- queryLocalCache <$> node1 <*> pure cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5)
(FORWARD nodeset2) <- queryLocalCache <$> node1 <*> pure cacheWith4Entries <*> pure 1 <*> pure (toNodeID 2^(9::Integer)+5)
Set.map (nid . remoteNode) nodeset1 `shouldBe` Set.fromList [nid4, nid2, nid5]
Set.map (nid . remoteNode) nodeset2 `shouldBe` Set.fromList [nid4]
it "recognises the node's own responsibility" $ do
FOUND selfQueryRes <- queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure nid1
nid <$> node1 `shouldReturn` nid selfQueryRes
FOUND responsibilityResult <- queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(22::Integer))
nid <$> node1 `shouldReturn` nid responsibilityResult
it "does not fail on nodes without neighbours (initial state)" $ do
(FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 11)
Set.map (nid . remoteNode_ ) nodeset `shouldBe` Set.fromList [nid4, nid2, nid3]
FOUND selfQueryRes <- queryLocalCache <$> node1 <*> pure cacheWith4Entries <*> pure 3 <*> pure nid1
getNid <$> node1 `shouldReturn` getNid selfQueryRes
FOUND responsibilityResult <- queryLocalCache <$> node1 <*> pure cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(22::Integer))
getNid <$> node1 `shouldReturn` getNid responsibilityResult
describe "successors and predecessors do not disturb the ring characteristics of EpiChord operations (see #48)" $ do
let
emptyCache = initCache
-- implicitly relies on kNeighbours to be <= 3
thisNid = toNodeID 1000
thisNode = setNid thisNid <$> exampleLocalNode
nid2 = toNodeID 1003
node2 = exampleNodeState { nid = nid2}
nid3 = toNodeID 1010
node3 = exampleNodeState { nid = nid3}
nid4 = toNodeID 1020
node4 = exampleNodeState { nid = nid4}
nid5 = toNodeID 1025
node5 = exampleNodeState { nid = nid5}
allRemoteNodes = [node2, node3, node4, node5]
it "lookups also work for slices larger than 1/2 key space" $ do
node <- setSuccessors allRemoteNodes . setPredecessors allRemoteNodes <$> thisNode
-- do lookup on empty cache but with successors for a key > 1/2 key space
-- succeeding the node
queryLocalCache node emptyCache 1 (nid5 + 10) `shouldBe` FOUND (toRemoteNodeState node)
describe "Messages can be encoded to and decoded from ASN.1" $ do
-- define test messages
let
someNodeIDs = fmap fromInteger [3..12]
someNodes = fmap (flip setNid exampleNodeState . fromInteger) [3..12]
qidReqPayload = QueryIDRequestPayload {
queryTargetID = nid exampleNodeState
, queryLBestNodes = 3
}
jReqPayload = JoinRequestPayload
lReqPayload = LeaveRequestPayload {
leaveSuccessors = someNodeIDs
, leavePredecessors = someNodeIDs
leaveSuccessors = someNodes
, leavePredecessors = someNodes
}
stabReqPayload = StabiliseRequestPayload
pingReqPayload = PingRequestPayload
@ -178,8 +200,8 @@ spec = do
]
}
jResPayload = JoinResponsePayload {
joinSuccessors = someNodeIDs
, joinPredecessors = someNodeIDs
joinSuccessors = someNodes
, joinPredecessors = someNodes
, joinCache = [
RemoteCacheEntry exampleNodeState (toEnum 23420001)
, RemoteCacheEntry (exampleNodeState {nid = fromInteger (-5)}) (toEnum 0)
@ -187,7 +209,7 @@ spec = do
}
lResPayload = LeaveResponsePayload
stabResPayload = StabiliseResponsePayload {
stabiliseSuccessors = someNodeIDs
stabiliseSuccessors = someNodes
, stabilisePredecessors = []
}
pingResPayload = PingResponsePayload {
@ -199,16 +221,16 @@ spec = do
requestTemplate = Request {
requestID = 2342
, sender = exampleNodeState
, parts = 1
, part = 1
, isFinalPart = True
, action = undefined
, payload = undefined
}
responseTemplate = Response {
responseTo = 2342
requestID = 2342
, senderID = nid exampleNodeState
, parts = 1
, part = 1
, isFinalPart = True
, action = undefined
, payload = undefined
}
@ -216,6 +238,12 @@ spec = do
responseWith a pa = responseTemplate {action = a, payload = Just pa}
encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg
largeMessage = responseWith Join $ JoinResponsePayload {
joinSuccessors = flip setNid exampleNodeState . fromInteger <$> [-20..150]
, joinPredecessors = flip setNid exampleNodeState . fromInteger <$> [5..11]
, joinCache = [ RemoteCacheEntry (exampleNodeState {nid = node}) 290001 | node <- [50602,506011..60000]]
}
it "messages are encoded and decoded correctly from and to ASN1" $ do
encodeDecodeAndCheck $ requestWith QueryID qidReqPayload
encodeDecodeAndCheck $ requestWith Join jReqPayload
@ -231,35 +259,54 @@ spec = do
it "messages are encoded and decoded to ASN.1 DER properly" $
deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652 $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload)
it "messages too large for a single packet can (often) be split into multiple parts" $ do
let largeMessage = responseWith Join $ JoinResponsePayload {
joinSuccessors = fromInteger <$> [-20..150]
, joinPredecessors = fromInteger <$> [5..11]
, joinCache = [ RemoteCacheEntry (exampleNodeState {nid = node}) 290001 | node <- [50602,506011..60000]]
}
-- TODO: once splitting works more efficient, test for exact number or payload, see #18
length (serialiseMessage 600 largeMessage) > 1 `shouldBe` True
length (serialiseMessage 6000 largeMessage) `shouldBe` 1
length (serialiseMessage 60000 largeMessage) `shouldBe` 1
it "message part numbering starts at the submitted part number" $ do
isJust (Map.lookup 1 (serialiseMessage 600 largeMessage)) `shouldBe` True
let startAt5 = serialiseMessage 600 (largeMessage {part = 5})
Map.lookup 1 startAt5 `shouldBe` Nothing
part <$> (deserialiseMessage . fromJust) (Map.lookup 5 startAt5) `shouldBe` Right 5
describe "join cache lookup" $
it "A bootstrap cache initialised with just one node returns that one." $ do
let
bootstrapNid = toNodeID 34804191837661041451755206127000721433747285589603756490902196113256157045194
bootstrapNode = setNid bootstrapNid exampleNodeState
bootstrapCache = addCacheEntryPure 10 (RemoteCacheEntry bootstrapNode 19) initCache
ownId = toNodeID 34804191837661041451755206127000721433707928516052624394829818586723613390165
ownNode <- setNid ownId <$> exampleLocalNode
let (FORWARD qResult) = queryLocalCache ownNode bootstrapCache 2 ownId
remoteNode (head $ Set.elems qResult) `shouldBe` bootstrapNode
-- some example data
exampleNodeState :: NodeState
exampleNodeState = NodeState {
exampleNodeState :: RemoteNodeState
exampleNodeState = RemoteNodeState {
nid = toNodeID 12
, domain = exampleNodeDomain
, ipAddr = exampleIp
, dhtPort = 2342
, apPort = Nothing
, servicePort = 513
, vServerID = 0
, internals = Nothing
}
exampleLocalNode :: IO NodeState
exampleLocalNode = nodeStateInit $ FediChordConf {
exampleLocalNode :: IO LocalNodeState
exampleLocalNode = nodeStateInit =<< (newTVarIO $ RealNode {
vservers = []
, nodeConfig = exampleFediConf
, bootstrapNodes = confBootstrapNodes exampleFediConf
})
exampleFediConf :: FediChordConf
exampleFediConf = FediChordConf {
confDomain = "example.social"
, confIP = exampleIp
, confDhtPort = 2342
}
exampleNodeDomain :: String
exampleNodeDomain = "example.social"
exampleVs :: (Integral i) => i