Compare commits

..

No commits in common. "16769d1395637c616661f6def7dbda34d41feb2d" and "99a2b0ba09f3fb3e88f9abd020cde3a03a6501a4" have entirely different histories.

6 changed files with 144 additions and 147 deletions

View file

@ -44,7 +44,7 @@ NodeState ::= SEQUENCE {
domain Domain, domain Domain,
ipAddr OCTET STRING (SIZE(16)), ipAddr OCTET STRING (SIZE(16)),
dhtPort INTEGER, dhtPort INTEGER,
servicePort INTEGER, apPort INTEGER,
vServerID INTEGER (0..255) vServerID INTEGER (0..255)
} }

View file

@ -186,15 +186,15 @@ encodePayload payload'@PingResponsePayload{} =
: concatMap encodeNodeState (pingNodeStates payload') : concatMap encodeNodeState (pingNodeStates payload')
<> [End Sequence] <> [End Sequence]
encodeNodeState :: NodeState a => a -> [ASN1] encodeNodeState :: NodeState -> [ASN1]
encodeNodeState ns = [ encodeNodeState ns = [
Start Sequence Start Sequence
, IntVal (getNodeID . getNid $ ns) , IntVal (getNodeID . nid $ ns)
, ASN1String . asn1CharacterString Visible $ getDomain ns , ASN1String . asn1CharacterString Visible $ domain ns
, OctetString (ipAddrAsBS $ getIpAddr ns) , OctetString (ipAddrAsBS $ ipAddr ns)
, IntVal (toInteger . getDhtPort $ ns) , IntVal (toInteger . dhtPort $ ns)
, IntVal (toInteger . getServicePort $ ns) , IntVal (maybe 0 toInteger $ apPort ns)
, IntVal (getVServerID ns) , IntVal (vServerID ns)
, End Sequence , End Sequence
] ]
@ -328,21 +328,22 @@ parseNull = do
Null -> pure () Null -> pure ()
x -> throwParseError $ "Expected Null but got " <> show x x -> throwParseError $ "Expected Null but got " <> show x
parseNodeState :: ParseASN1 RemoteNodeState parseNodeState :: ParseASN1 NodeState
parseNodeState = onNextContainer Sequence $ do parseNodeState = onNextContainer Sequence $ do
nid' <- fromInteger <$> parseInteger nid' <- fromInteger <$> parseInteger
domain' <- parseString domain' <- parseString
ip' <- bsAsIpAddr <$> parseOctets ip' <- bsAsIpAddr <$> parseOctets
dhtPort' <- fromInteger <$> parseInteger dhtPort' <- fromInteger <$> parseInteger
servicePort' <- fromInteger <$> parseInteger apPort' <- fromInteger <$> parseInteger
vServer' <- parseInteger vServer' <- parseInteger
pure RemoteNodeState { pure NodeState {
nid = nid' nid = nid'
, domain = domain' , domain = domain'
, dhtPort = dhtPort' , dhtPort = dhtPort'
, servicePort = servicePort' , apPort = if apPort' == 0 then Nothing else Just apPort'
, vServerID = vServer' , vServerID = vServer'
, ipAddr = ip' , ipAddr = ip'
, internals = Nothing
} }

View file

@ -37,15 +37,18 @@ import System.Random
import System.Timeout import System.Timeout
import Hash2Pub.ASN1Coding import Hash2Pub.ASN1Coding
import Hash2Pub.FediChord (CacheEntry (..), import Hash2Pub.FediChord (CacheEntry (..), NodeCache,
LocalNodeState (..), NodeCache,
NodeID, NodeState (..), NodeID, NodeState (..),
RemoteNodeState (..),
cacheGetNodeStateUnvalidated, cacheGetNodeStateUnvalidated,
cacheLookup, cacheLookupPred, cacheLookup, cacheLookupPred,
cacheLookupSucc, localCompare, cacheLookupSucc,
mkSendSocket, mkServerSocket, getCacheWriteQueue,
setPredecessors, setSuccessors) getLNumBestNodes,
getNodeCacheRef,
getPredecessors, getSuccessors,
localCompare, mkSendSocket,
mkServerSocket,
putPredecessors, putSuccessors)
import Hash2Pub.ProtocolTypes import Hash2Pub.ProtocolTypes
import Debug.Trace (trace) import Debug.Trace (trace)
@ -54,22 +57,22 @@ import Debug.Trace (trace)
-- TODO: evaluate more fine-grained argument passing to allow granular locking -- TODO: evaluate more fine-grained argument passing to allow granular locking
-- | look up an ID to either claim responsibility for it or return the closest l nodes from the local cache -- | look up an ID to either claim responsibility for it or return the closest l nodes from the local cache
queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse queryLocalCache :: NodeState -> NodeCache -> Int -> NodeID -> QueryResponse
queryLocalCache ownState nCache lBestNodes targetID queryLocalCache ownState nCache lBestNodes targetID
-- as target ID falls between own ID and first predecessor, it is handled by this node -- as target ID falls between own ID and first predecessor, it is handled by this node
| (targetID `localCompare` ownID) `elem` [LT, EQ] && maybe False (\p -> targetID `localCompare` p == GT) (headMay preds) = FOUND . toRemoteNodeState $ ownState | (targetID `localCompare` ownID) `elem` [LT, EQ] && not (null preds) && (targetID `localCompare` head preds == GT) = FOUND ownState
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
-- the closest succeeding node (like with the p initiated parallel queries -- the closest succeeding node (like with the p initiated parallel queries
| otherwise = FORWARD $ closestSuccessor `Set.union` closestPredecessors | otherwise = FORWARD $ closestSuccessor `Set.union` closestPredecessors
where where
ownID = getNid ownState preds = fromMaybe [] $ getPredecessors ownState
preds = predecessors ownState ownID = nid ownState
closestSuccessor :: Set.Set RemoteCacheEntry closestSuccessor :: Set.Set RemoteCacheEntry
closestSuccessor = maybe Set.empty Set.singleton $ toRemoteCacheEntry =<< cacheLookupSucc targetID nCache closestSuccessor = maybe Set.empty Set.singleton $ toRemoteCacheEntry =<< cacheLookupSucc targetID nCache
closestPredecessors :: Set.Set RemoteCacheEntry closestPredecessors :: Set.Set RemoteCacheEntry
closestPredecessors = closestPredecessor (lBestNodes-1) $ getNid ownState closestPredecessors = closestPredecessor (lBestNodes-1) $ nid ownState
closestPredecessor :: (Integral n, Show n) => n -> NodeID -> Set.Set RemoteCacheEntry closestPredecessor :: (Integral n, Show n) => n -> NodeID -> Set.Set RemoteCacheEntry
closestPredecessor 0 _ = Set.empty closestPredecessor 0 _ = Set.empty
closestPredecessor remainingLookups lastID closestPredecessor remainingLookups lastID
@ -132,19 +135,19 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
-- ====== message send and receive operations ====== -- ====== message send and receive operations ======
requestQueryID :: LocalNodeState -> NodeID -> IO RemoteNodeState requestQueryID :: NodeState -> NodeID -> IO NodeState
-- 1. do a local lookup for the l closest nodes -- 1. do a local lookup for the l closest nodes
-- 2. create l sockets -- 2. create l sockets
-- 3. send a message async concurrently to all l nodes -- 3. send a message async concurrently to all l nodes
-- 4. collect the results, insert them into cache -- 4. collect the results, insert them into cache
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) -- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
requestQueryID ns targetID = do requestQueryID ns targetID = do
firstCacheSnapshot <- readIORef . nodeCacheRef $ ns firstCacheSnapshot <- readIORef $ fromJust . getNodeCacheRef $ ns
lookupLoop firstCacheSnapshot lookupLoop firstCacheSnapshot
where where
lookupLoop :: NodeCache -> IO RemoteNodeState lookupLoop :: NodeCache -> IO NodeState
lookupLoop cacheSnapshot = do lookupLoop cacheSnapshot = do
let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID
-- FOUND can only be returned if targetID is owned by local node -- FOUND can only be returned if targetID is owned by local node
case localResult of case localResult of
FOUND thisNode -> pure thisNode FOUND thisNode -> pure thisNode
@ -164,7 +167,7 @@ requestQueryID ns targetID = do
Just (FORWARD resultset) -> addCacheEntryPure now <$> Set.elems resultset Just (FORWARD resultset) -> addCacheEntryPure now <$> Set.elems resultset
_ -> [] _ -> []
-- forward entries to global cache -- forward entries to global cache
forM_ entriesToInsert $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) entry forM_ entriesToInsert $ \entry -> atomically $ writeTQueue (fromJust . getCacheWriteQueue $ ns) entry
-- insert entries into local cache copy -- insert entries into local cache copy
pure $ foldl' ( pure $ foldl' (
\oldLCache insertFunc -> insertFunc oldLCache \oldLCache insertFunc -> insertFunc oldLCache
@ -179,8 +182,9 @@ requestQueryID ns targetID = do
-- if no FOUND, recursively call lookup again -- if no FOUND, recursively call lookup again
maybe (lookupLoop newLCache) pure foundResp maybe (lookupLoop newLCache) pure foundResp
lookupMessage targetID rID = Request rID (toRemoteNodeState ns) 1 1 QueryID (Just $ pl ns targetID) -- todo: random request ID
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . lNumBestNodes $ ns } lookupMessage targetID rID = Request rID ns 1 1 QueryID (Just $ pl ns targetID)
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . fromJust . getLNumBestNodes $ ns }
-- | Generic function for sending a request over a connected socket and collecting the response. -- | Generic function for sending a request over a connected socket and collecting the response.
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. -- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.

View file

@ -17,10 +17,15 @@ module Hash2Pub.FediChord (
, getNodeID , getNodeID
, toNodeID , toNodeID
, NodeState (..) , NodeState (..)
, LocalNodeState (..) , InternalNodeState (..)
, RemoteNodeState (..) , getNodeCacheRef
, setSuccessors , putNodeCache
, setPredecessors , getSuccessors
, putSuccessors
, getPredecessors
, putPredecessors
, getLNumBestNodes
, getCacheWriteQueue
, NodeCache , NodeCache
, CacheEntry(..) , CacheEntry(..)
, cacheGetNodeStateUnvalidated , cacheGetNodeStateUnvalidated
@ -120,7 +125,7 @@ a `localCompare` b
-- | represents a node and all its important state -- | represents a node and all its important state
data RemoteNodeState = RemoteNodeState data NodeState = NodeState
{ nid :: NodeID { nid :: NodeID
, domain :: String , domain :: String
-- ^ full public domain name the node is reachable under -- ^ full public domain name the node is reachable under
@ -128,18 +133,18 @@ data RemoteNodeState = RemoteNodeState
-- the node's public IPv6 address -- the node's public IPv6 address
, dhtPort :: PortNumber , dhtPort :: PortNumber
-- ^ port of the DHT itself -- ^ port of the DHT itself
, servicePort :: PortNumber , apPort :: Maybe PortNumber
-- ^ port of the service provided on top of the DHT -- ^ port of the ActivityPub relay and storage service
, vServerID :: Integer , vServerID :: Integer
-- ^ ID of this vserver -- ^ ID of this vserver
, internals :: Maybe InternalNodeState
-- ^ data not present in the representation of remote nodes
} }
deriving (Show, Eq) deriving (Show, Eq)
-- | represents a node and encapsulates all data and parameters that are not present for remote nodes -- | encapsulates all data and parameters that are not present for remote nodes
data LocalNodeState = LocalNodeState data InternalNodeState = InternalNodeState
{ nodeState :: RemoteNodeState { nodeCache :: IORef NodeCache
-- ^ represents common data present both in remote and local node representations
, nodeCacheRef :: IORef NodeCache
-- ^ EpiChord node cache with expiry times for nodes -- ^ EpiChord node cache with expiry times for nodes
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache) , cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- ^ cache updates are not written directly to the 'nodeCache' but queued and -- ^ cache updates are not written directly to the 'nodeCache' but queued and
@ -158,63 +163,6 @@ data LocalNodeState = LocalNodeState
} }
deriving (Show, Eq) deriving (Show, Eq)
-- | class for various NodeState representations, providing
-- getters and setters for common values
class NodeState a where
-- getters for common properties
getNid :: a -> NodeID
getDomain :: a -> String
getIpAddr :: a -> HostAddress6
getDhtPort :: a -> PortNumber
getServicePort :: a -> PortNumber
getVServerID :: a -> Integer
-- setters for common properties
setNid :: NodeID -> a -> a
setDomain :: String -> a -> a
setIpAddr :: HostAddress6 -> a -> a
setDhtPort :: PortNumber -> a -> a
setServicePort :: PortNumber -> a -> a
setVServerID :: Integer -> a -> a
toRemoteNodeState :: a -> RemoteNodeState
instance NodeState RemoteNodeState where
getNid = nid
getDomain = domain
getIpAddr = ipAddr
getDhtPort = dhtPort
getServicePort = servicePort
getVServerID = vServerID
setNid nid' ns = ns {nid = nid'}
setDomain domain' ns = ns {domain = domain'}
setIpAddr ipAddr' ns = ns {ipAddr = ipAddr'}
setDhtPort dhtPort' ns = ns {dhtPort = dhtPort'}
setServicePort servicePort' ns = ns {servicePort = servicePort'}
setVServerID vServerID' ns = ns {vServerID = vServerID'}
toRemoteNodeState = id
-- | helper function for setting values on the 'RemoteNodeState' contained in the 'LocalNodeState'
propagateNodeStateSet_ :: (RemoteNodeState -> RemoteNodeState) -> LocalNodeState -> LocalNodeState
propagateNodeStateSet_ func ns = let
newNs = func $ nodeState ns
in
ns {nodeState = newNs}
instance NodeState LocalNodeState where
getNid = getNid . nodeState
getDomain = getDomain . nodeState
getIpAddr = getIpAddr . nodeState
getDhtPort = getDhtPort . nodeState
getServicePort = getServicePort . nodeState
getVServerID = getVServerID . nodeState
setNid nid' = propagateNodeStateSet_ $ setNid nid'
setDomain domain' = propagateNodeStateSet_ $ setDomain domain'
setIpAddr ipAddr' = propagateNodeStateSet_ $ setIpAddr ipAddr'
setDhtPort dhtPort' = propagateNodeStateSet_ $ setDhtPort dhtPort'
setServicePort servicePort' = propagateNodeStateSet_ $ setServicePort servicePort'
setVServerID vServerID' = propagateNodeStateSet_ $ setVServerID vServerID'
toRemoteNodeState = nodeState
-- | defining Show instances to be able to print NodeState for debug purposes -- | defining Show instances to be able to print NodeState for debug purposes
instance Typeable a => Show (IORef a) where instance Typeable a => Show (IORef a) where
show x = show (typeOf x) show x = show (typeOf x)
@ -222,19 +170,55 @@ instance Typeable a => Show (IORef a) where
instance Typeable a => Show (TQueue a) where instance Typeable a => Show (TQueue a) where
show x = show (typeOf x) show x = show (typeOf x)
-- | convenience function that updates the successors of a 'LocalNodeState' -- | extract a value from the internals of a 'NodeState'
setSuccessors :: [NodeID] -> LocalNodeState -> LocalNodeState getInternals_ :: (InternalNodeState -> a) -> NodeState -> Maybe a
setSuccessors succ' ns = ns {successors = succ'} getInternals_ func ns = func <$> internals ns
-- | convenience function that updates the predecessors of a 'LocalNodeState' -- could be done better with lenses
setPredecessors :: [NodeID] -> LocalNodeState -> LocalNodeState -- | convenience function that updates an internal value of a NodeState
setPredecessors pred' ns = ns {predecessors = pred'} putInternals_ :: (InternalNodeState -> InternalNodeState) -> NodeState -> NodeState
putInternals_ func ns = let
newInternals = func <$> internals ns
in
ns {internals = newInternals }
-- | convenience function for extracting the 'NodeCache' from a 'NodeState'
getNodeCacheRef :: NodeState -> Maybe (IORef NodeCache)
getNodeCacheRef = getInternals_ nodeCache
-- | convenience function for updating the 'NodeCache' on 'NodeState' s that have
-- internals.
-- NodeStates without a cache (without internals) are returned unchanged
putNodeCache :: IORef NodeCache -> NodeState -> NodeState
putNodeCache nc = putInternals_ (\i -> i {nodeCache = nc})
getCacheWriteQueue :: NodeState -> Maybe (TQueue (NodeCache -> NodeCache))
getCacheWriteQueue = getInternals_ cacheWriteQueue
-- | convenience function for extracting the @successors@ from a 'NodeState'
getSuccessors :: NodeState -> Maybe [NodeID]
getSuccessors = getInternals_ successors
-- | convenience function that updates the successors of a NodeState
putSuccessors :: [NodeID] -> NodeState -> NodeState
putSuccessors succ' = putInternals_ (\i -> i {successors = succ'})
-- | convenience function for extracting the @predecessors@ from a 'NodeState'
getPredecessors :: NodeState -> Maybe [NodeID]
getPredecessors = getInternals_ predecessors
-- | convenience function that updates the predecessors of a NodeState
putPredecessors :: [NodeID] -> NodeState -> NodeState
putPredecessors pred' = putInternals_ (\i -> i {predecessors = pred'})
-- | convenience function for extracting the @lNumBestNodes@ from a 'NodeState'
getLNumBestNodes :: NodeState -> Maybe Int
getLNumBestNodes = getInternals_ lNumBestNodes
type NodeCache = Map.Map NodeID CacheEntry type NodeCache = Map.Map NodeID CacheEntry
-- | An entry of the 'nodeCache' can hold 2 different kinds of data. -- |an entry of the 'nodeCache' can hold 2 different kinds of data
-- Type variable @a@ should be of type class 'NodeState', but I do not want to use GADTs here. data CacheEntry = NodeEntry Bool NodeState POSIXTime
data CacheEntry = NodeEntry Bool RemoteNodeState POSIXTime
| ProxyEntry (NodeID, ProxyDirection) (Maybe CacheEntry) | ProxyEntry (NodeID, ProxyDirection) (Maybe CacheEntry)
deriving (Show, Eq) deriving (Show, Eq)
@ -244,7 +228,7 @@ instance Ord CacheEntry where
a `compare` b = compare (extractID a) (extractID b) a `compare` b = compare (extractID a) (extractID b)
where where
extractID (NodeEntry _ eState _) = getNid eState extractID (NodeEntry _ eState _) = nid eState
extractID (ProxyEntry _ _) = error "proxy entries should never appear outside of the NodeCache" extractID (ProxyEntry _ _) = error "proxy entries should never appear outside of the NodeCache"
data ProxyDirection = Backwards data ProxyDirection = Backwards
@ -324,7 +308,7 @@ cacheLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards
-- transfer difference now - entry to other node -- transfer difference now - entry to other node
-- | return the @NodeState@ data from a cache entry without checking its validation status -- | return the @NodeState@ data from a cache entry without checking its validation status
cacheGetNodeStateUnvalidated :: CacheEntry -> RemoteNodeState cacheGetNodeStateUnvalidated :: CacheEntry -> NodeState
cacheGetNodeStateUnvalidated (NodeEntry _ nState _) = nState cacheGetNodeStateUnvalidated (NodeEntry _ nState _) = nState
cacheGetNodeStateUnvalidated (ProxyEntry _ (Just entry)) = cacheGetNodeStateUnvalidated entry cacheGetNodeStateUnvalidated (ProxyEntry _ (Just entry)) = cacheGetNodeStateUnvalidated entry
cacheGetNodeStateUnvalidated _ = error "trying to pure empty node state, please report a bug" cacheGetNodeStateUnvalidated _ = error "trying to pure empty node state, please report a bug"
@ -441,30 +425,30 @@ data FediChordConf = FediChordConf
-- | initialise data structures, compute own IDs and bind to listening socket -- | initialise data structures, compute own IDs and bind to listening socket
-- ToDo: load persisted state, thus this function already operates in IO -- ToDo: load persisted state, thus this function already operates in IO
fediChordInit :: FediChordConf -> IO (Socket, LocalNodeState) fediChordInit :: FediChordConf -> IO (Socket, NodeState)
fediChordInit conf = do fediChordInit conf = do
initialState <- nodeStateInit conf initialState <- nodeStateInit conf
serverSock <- mkServerSocket (getIpAddr initialState) (getDhtPort initialState) serverSock <- mkServerSocket (ipAddr initialState) (dhtPort initialState)
pure (serverSock, initialState) pure (serverSock, initialState)
-- | initialises the 'NodeState' for this local node. -- | initialises the 'NodeState' for this local node.
-- Separated from 'fediChordInit' to be usable in tests. -- Separated from 'fediChordInit' to be usable in tests.
nodeStateInit :: FediChordConf -> IO LocalNodeState nodeStateInit :: FediChordConf -> IO NodeState
nodeStateInit conf = do nodeStateInit conf = do
cacheRef <- newIORef initCache cacheRef <- newIORef initCache
q <- atomically newTQueue q <- atomically newTQueue
let let
containedState = RemoteNodeState { initialState = NodeState {
domain = confDomain conf domain = confDomain conf
, ipAddr = confIP conf , ipAddr = confIP conf
, nid = genNodeID (confIP conf) (confDomain conf) 0 , nid = genNodeID (confIP conf) (confDomain conf) 0
, dhtPort = toEnum $ confDhtPort conf , dhtPort = toEnum $ confDhtPort conf
, servicePort = 0 , apPort = Nothing
, vServerID = 0 , vServerID = 0
, internals = Just internalsInit
} }
initialState = LocalNodeState { internalsInit = InternalNodeState {
nodeState = containedState nodeCache = cacheRef
, nodeCacheRef = cacheRef
, cacheWriteQueue = q , cacheWriteQueue = q
, successors = [] , successors = []
, predecessors = [] , predecessors = []
@ -475,7 +459,7 @@ nodeStateInit conf = do
} }
pure initialState pure initialState
--fediChordJoin :: LocalNodeState -- ^ the local 'NodeState' --fediChordJoin :: NodeState -- ^ the local 'NodeState'
-- -> (String, PortNumber) -- ^ domain and port of a bootstrapping node -- -> (String, PortNumber) -- ^ domain and port of a bootstrapping node
-- -> Socket -- ^ socket used for sending and receiving the join message -- -> Socket -- ^ socket used for sending and receiving the join message
-- -> IO Either String NodeState -- ^ the joined 'NodeState' after a successful -- -> IO Either String NodeState -- ^ the joined 'NodeState' after a successful
@ -488,15 +472,19 @@ nodeStateInit conf = do
-- | cache updater thread that waits for incoming NodeCache update instructions on -- | cache updater thread that waits for incoming NodeCache update instructions on
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer. -- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
cacheWriter :: LocalNodeState -> IO () cacheWriter :: NodeState -> IO ()
cacheWriter ns = do cacheWriter ns = do
let writeQueue' = cacheWriteQueue ns let writeQueue' = getCacheWriteQueue ns
forever $ do case writeQueue' of
f <- atomically $ readTQueue writeQueue' Nothing -> pure ()
Just writeQueue -> forever $ do
f <- atomically $ readTQueue writeQueue
let let
refModifier :: NodeCache -> (NodeCache, ()) refModifier :: NodeCache -> (NodeCache, ())
refModifier nc = (f nc, ()) refModifier nc = (f nc, ())
atomicModifyIORef' (nodeCacheRef ns) refModifier maybe (pure ()) (
\ref -> atomicModifyIORef' ref refModifier
) $ getNodeCacheRef ns
-- ====== network socket operations ====== -- ====== network socket operations ======

View file

@ -6,7 +6,7 @@ import Data.Time.Clock.POSIX (POSIXTime)
import Hash2Pub.FediChord import Hash2Pub.FediChord
data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) data QueryResponse = FORWARD (Set.Set RemoteCacheEntry)
| FOUND RemoteNodeState | FOUND NodeState
deriving (Show, Eq) deriving (Show, Eq)
-- === protocol serialisation data types -- === protocol serialisation data types
@ -20,7 +20,7 @@ data Action = QueryID
data FediChordMessage = Request data FediChordMessage = Request
{ requestID :: Integer { requestID :: Integer
, sender :: RemoteNodeState , sender :: NodeState
, parts :: Integer , parts :: Integer
, part :: Integer , part :: Integer
-- ^ part starts at 1 -- ^ part starts at 1
@ -62,7 +62,7 @@ data ActionPayload = QueryIDRequestPayload
, stabilisePredecessors :: [NodeID] , stabilisePredecessors :: [NodeID]
} }
| PingResponsePayload | PingResponsePayload
{ pingNodeStates :: [RemoteNodeState] { pingNodeStates :: [NodeState]
} }
deriving (Show, Eq) deriving (Show, Eq)
@ -73,7 +73,7 @@ maximumParts = 150
-- | dedicated data type for cache entries sent to or received from the network, -- | dedicated data type for cache entries sent to or received from the network,
-- as these have to be considered as unvalidated. Also helps with separation of trust. -- as these have to be considered as unvalidated. Also helps with separation of trust.
data RemoteCacheEntry = RemoteCacheEntry RemoteNodeState POSIXTime data RemoteCacheEntry = RemoteCacheEntry NodeState POSIXTime
deriving (Show, Eq) deriving (Show, Eq)
instance Ord RemoteCacheEntry where instance Ord RemoteCacheEntry where
@ -85,5 +85,5 @@ toRemoteCacheEntry (ProxyEntry _ (Just entry@NodeEntry{})) = toRemoteCacheEntry
toRemoteCacheEntry _ = Nothing toRemoteCacheEntry _ = Nothing
-- | extract the 'NodeState' from a 'RemoteCacheEntry' -- | extract the 'NodeState' from a 'RemoteCacheEntry'
remoteNode :: RemoteCacheEntry -> RemoteNodeState remoteNode :: RemoteCacheEntry -> NodeState
remoteNode (RemoteCacheEntry ns _) = ns remoteNode (RemoteCacheEntry ns _) = ns

View file

@ -55,13 +55,14 @@ spec = do
it "can be initialised" $ it "can be initialised" $
print exampleNodeState print exampleNodeState
it "can be initialised partly and then modified later" $ do it "can be initialised partly and then modified later" $ do
let ns = RemoteNodeState { let ns = NodeState {
nid = undefined nid = undefined
, domain = exampleNodeDomain , domain = exampleNodeDomain
, ipAddr = exampleIp , ipAddr = exampleIp
, dhtPort = 2342 , dhtPort = 2342
, servicePort = 513 , apPort = Nothing
, vServerID = undefined , vServerID = undefined
, internals = Nothing
} }
nsReady = ns { nsReady = ns {
nid = genNodeID (ipAddr ns) (domain ns) 3 nid = genNodeID (ipAddr ns) (domain ns) 3
@ -120,7 +121,9 @@ spec = do
let let
emptyCache = initCache emptyCache = initCache
nid1 = toNodeID 2^(23::Integer)+1 nid1 = toNodeID 2^(23::Integer)+1
node1 = setPredecessors [nid4] . setNid nid1 <$> exampleLocalNode node1 = do
eln <- exampleLocalNode -- is at 2^23.00000017198264 = 8388609
pure $ putPredecessors [nid4] $ eln {nid = nid1}
nid2 = toNodeID 2^(230::Integer)+12 nid2 = toNodeID 2^(230::Integer)+12
node2 = exampleNodeState { nid = nid2} node2 = exampleNodeState { nid = nid2}
nid3 = toNodeID 2^(25::Integer)+10 nid3 = toNodeID 2^(25::Integer)+10
@ -128,7 +131,7 @@ spec = do
nid4 = toNodeID 2^(9::Integer)+100 nid4 = toNodeID 2^(9::Integer)+100
node4 = exampleNodeState { nid = nid4} node4 = exampleNodeState { nid = nid4}
cacheWith2Entries :: IO NodeCache cacheWith2Entries :: IO NodeCache
cacheWith2Entries = addCacheEntryPure 10 <$> (RemoteCacheEntry <$> (toRemoteNodeState <$> node1) <*> pure 10) <*> pure (addCacheEntryPure 10 (RemoteCacheEntry node2 10) emptyCache) cacheWith2Entries = addCacheEntryPure 10 <$> (RemoteCacheEntry <$> node1 <*> pure 10) <*> pure (addCacheEntryPure 10 (RemoteCacheEntry node2 10) emptyCache)
cacheWith4Entries = addCacheEntryPure 10 (RemoteCacheEntry node3 10) <$> (addCacheEntryPure 10 (RemoteCacheEntry node4 10) <$> cacheWith2Entries) cacheWith4Entries = addCacheEntryPure 10 (RemoteCacheEntry node3 10) <$> (addCacheEntryPure 10 (RemoteCacheEntry node4 10) <$> cacheWith2Entries)
it "works on an empty cache" $ do it "works on an empty cache" $ do
queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) `shouldReturn` FORWARD Set.empty queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) `shouldReturn` FORWARD Set.empty
@ -143,9 +146,9 @@ spec = do
Set.map (nid . remoteNode) nodeset2 `shouldBe` Set.fromList [nid4] Set.map (nid . remoteNode) nodeset2 `shouldBe` Set.fromList [nid4]
it "recognises the node's own responsibility" $ do it "recognises the node's own responsibility" $ do
FOUND selfQueryRes <- queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure nid1 FOUND selfQueryRes <- queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure nid1
getNid <$> node1 `shouldReturn` getNid selfQueryRes nid <$> node1 `shouldReturn` nid selfQueryRes
FOUND responsibilityResult <- queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(22::Integer)) FOUND responsibilityResult <- queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(22::Integer))
getNid <$> node1 `shouldReturn` getNid responsibilityResult nid <$> node1 `shouldReturn` nid responsibilityResult
it "does not fail on nodes without neighbours (initial state)" $ do it "does not fail on nodes without neighbours (initial state)" $ do
(FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 11) (FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 11)
Set.map (nid . remoteNode ) nodeset `shouldBe` Set.fromList [nid4, nid2, nid3] Set.map (nid . remoteNode ) nodeset `shouldBe` Set.fromList [nid4, nid2, nid3]
@ -240,17 +243,18 @@ spec = do
-- some example data -- some example data
exampleNodeState :: RemoteNodeState exampleNodeState :: NodeState
exampleNodeState = RemoteNodeState { exampleNodeState = NodeState {
nid = toNodeID 12 nid = toNodeID 12
, domain = exampleNodeDomain , domain = exampleNodeDomain
, ipAddr = exampleIp , ipAddr = exampleIp
, dhtPort = 2342 , dhtPort = 2342
, servicePort = 513 , apPort = Nothing
, vServerID = 0 , vServerID = 0
, internals = Nothing
} }
exampleLocalNode :: IO LocalNodeState exampleLocalNode :: IO NodeState
exampleLocalNode = nodeStateInit $ FediChordConf { exampleLocalNode = nodeStateInit $ FediChordConf {
confDomain = "example.social" confDomain = "example.social"
, confIP = exampleIp , confIP = exampleIp