commit
						16769d1395
					
				
					 6 changed files with 147 additions and 144 deletions
				
			
		| 
						 | 
				
			
			@ -44,7 +44,7 @@ NodeState ::= SEQUENCE {
 | 
			
		|||
	domain			Domain,
 | 
			
		||||
	ipAddr			OCTET STRING (SIZE(16)),
 | 
			
		||||
	dhtPort			INTEGER,
 | 
			
		||||
	apPort			INTEGER,
 | 
			
		||||
	servicePort		INTEGER,
 | 
			
		||||
	vServerID		INTEGER (0..255)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -186,15 +186,15 @@ encodePayload payload'@PingResponsePayload{} =
 | 
			
		|||
  : concatMap encodeNodeState (pingNodeStates payload')
 | 
			
		||||
  <> [End Sequence]
 | 
			
		||||
 | 
			
		||||
encodeNodeState :: NodeState -> [ASN1]
 | 
			
		||||
encodeNodeState :: NodeState a => a -> [ASN1]
 | 
			
		||||
encodeNodeState ns = [
 | 
			
		||||
    Start Sequence
 | 
			
		||||
  , IntVal (getNodeID . nid $ ns)
 | 
			
		||||
  , ASN1String . asn1CharacterString Visible $ domain ns
 | 
			
		||||
  , OctetString (ipAddrAsBS $ ipAddr ns)
 | 
			
		||||
  , IntVal (toInteger . dhtPort $ ns)
 | 
			
		||||
  , IntVal (maybe 0 toInteger $ apPort ns)
 | 
			
		||||
  , IntVal (vServerID ns)
 | 
			
		||||
  , IntVal (getNodeID . getNid $ ns)
 | 
			
		||||
  , ASN1String . asn1CharacterString Visible $ getDomain ns
 | 
			
		||||
  , OctetString (ipAddrAsBS $ getIpAddr ns)
 | 
			
		||||
  , IntVal (toInteger . getDhtPort $ ns)
 | 
			
		||||
  , IntVal (toInteger . getServicePort $ ns)
 | 
			
		||||
  , IntVal (getVServerID ns)
 | 
			
		||||
  , End Sequence
 | 
			
		||||
                     ]
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -328,22 +328,21 @@ parseNull = do
 | 
			
		|||
        Null -> pure ()
 | 
			
		||||
        x    -> throwParseError $ "Expected Null but got " <> show x
 | 
			
		||||
 | 
			
		||||
parseNodeState :: ParseASN1 NodeState
 | 
			
		||||
parseNodeState :: ParseASN1 RemoteNodeState
 | 
			
		||||
parseNodeState = onNextContainer Sequence $ do
 | 
			
		||||
    nid' <- fromInteger <$> parseInteger
 | 
			
		||||
    domain' <- parseString
 | 
			
		||||
    ip' <- bsAsIpAddr <$> parseOctets
 | 
			
		||||
    dhtPort' <- fromInteger <$> parseInteger
 | 
			
		||||
    apPort' <- fromInteger <$> parseInteger
 | 
			
		||||
    servicePort' <- fromInteger <$> parseInteger
 | 
			
		||||
    vServer' <- parseInteger
 | 
			
		||||
    pure NodeState {
 | 
			
		||||
    pure RemoteNodeState {
 | 
			
		||||
        nid = nid'
 | 
			
		||||
      , domain = domain'
 | 
			
		||||
      , dhtPort = dhtPort'
 | 
			
		||||
      , apPort = if apPort' == 0 then Nothing else Just apPort'
 | 
			
		||||
      , servicePort = servicePort'
 | 
			
		||||
      , vServerID = vServer'
 | 
			
		||||
      , ipAddr = ip'
 | 
			
		||||
      , internals = Nothing
 | 
			
		||||
                     }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -37,18 +37,15 @@ import           System.Random
 | 
			
		|||
import           System.Timeout
 | 
			
		||||
 | 
			
		||||
import           Hash2Pub.ASN1Coding
 | 
			
		||||
import           Hash2Pub.FediChord             (CacheEntry (..), NodeCache,
 | 
			
		||||
import           Hash2Pub.FediChord             (CacheEntry (..),
 | 
			
		||||
                                                 LocalNodeState (..), NodeCache,
 | 
			
		||||
                                                 NodeID, NodeState (..),
 | 
			
		||||
                                                 RemoteNodeState (..),
 | 
			
		||||
                                                 cacheGetNodeStateUnvalidated,
 | 
			
		||||
                                                 cacheLookup, cacheLookupPred,
 | 
			
		||||
                                                 cacheLookupSucc,
 | 
			
		||||
                                                 getCacheWriteQueue,
 | 
			
		||||
                                                 getLNumBestNodes,
 | 
			
		||||
                                                 getNodeCacheRef,
 | 
			
		||||
                                                 getPredecessors, getSuccessors,
 | 
			
		||||
                                                 localCompare, mkSendSocket,
 | 
			
		||||
                                                 mkServerSocket,
 | 
			
		||||
                                                 putPredecessors, putSuccessors)
 | 
			
		||||
                                                 cacheLookupSucc, localCompare,
 | 
			
		||||
                                                 mkSendSocket, mkServerSocket,
 | 
			
		||||
                                                 setPredecessors, setSuccessors)
 | 
			
		||||
import           Hash2Pub.ProtocolTypes
 | 
			
		||||
 | 
			
		||||
import           Debug.Trace                    (trace)
 | 
			
		||||
| 
						 | 
				
			
			@ -57,22 +54,22 @@ import           Debug.Trace                    (trace)
 | 
			
		|||
 | 
			
		||||
-- TODO: evaluate more fine-grained argument passing to allow granular locking
 | 
			
		||||
-- | look up an ID to either claim responsibility for it or return the closest l nodes from the local cache
 | 
			
		||||
queryLocalCache :: NodeState -> NodeCache -> Int -> NodeID -> QueryResponse
 | 
			
		||||
queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse
 | 
			
		||||
queryLocalCache ownState nCache lBestNodes targetID
 | 
			
		||||
    -- as target ID falls between own ID and first predecessor, it is handled by this node
 | 
			
		||||
      | (targetID `localCompare`  ownID) `elem` [LT, EQ] && not (null preds) && (targetID `localCompare` head preds == GT) = FOUND ownState
 | 
			
		||||
      | (targetID `localCompare`  ownID) `elem` [LT, EQ] && maybe False (\p -> targetID `localCompare` p  == GT) (headMay preds) = FOUND . toRemoteNodeState $ ownState
 | 
			
		||||
    -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
 | 
			
		||||
    -- the closest succeeding node (like with the p initiated parallel queries
 | 
			
		||||
      | otherwise = FORWARD $ closestSuccessor `Set.union` closestPredecessors
 | 
			
		||||
  where
 | 
			
		||||
    preds = fromMaybe [] $ getPredecessors ownState
 | 
			
		||||
    ownID = nid ownState
 | 
			
		||||
    ownID = getNid ownState
 | 
			
		||||
    preds = predecessors ownState
 | 
			
		||||
 | 
			
		||||
    closestSuccessor :: Set.Set RemoteCacheEntry
 | 
			
		||||
    closestSuccessor = maybe Set.empty Set.singleton $ toRemoteCacheEntry =<< cacheLookupSucc targetID nCache
 | 
			
		||||
 | 
			
		||||
    closestPredecessors :: Set.Set RemoteCacheEntry
 | 
			
		||||
    closestPredecessors = closestPredecessor (lBestNodes-1) $ nid ownState
 | 
			
		||||
    closestPredecessors = closestPredecessor (lBestNodes-1) $ getNid ownState
 | 
			
		||||
    closestPredecessor :: (Integral n, Show n) => n -> NodeID -> Set.Set RemoteCacheEntry
 | 
			
		||||
    closestPredecessor 0 _ = Set.empty
 | 
			
		||||
    closestPredecessor remainingLookups lastID
 | 
			
		||||
| 
						 | 
				
			
			@ -135,19 +132,19 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
 | 
			
		|||
 | 
			
		||||
-- ====== message send and receive operations ======
 | 
			
		||||
 | 
			
		||||
requestQueryID :: NodeState -> NodeID -> IO NodeState
 | 
			
		||||
requestQueryID :: LocalNodeState -> NodeID -> IO RemoteNodeState
 | 
			
		||||
-- 1. do a local lookup for the l closest nodes
 | 
			
		||||
-- 2. create l sockets
 | 
			
		||||
-- 3. send a message async concurrently to all l nodes
 | 
			
		||||
-- 4. collect the results, insert them into cache
 | 
			
		||||
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
 | 
			
		||||
requestQueryID ns targetID = do
 | 
			
		||||
    firstCacheSnapshot <- readIORef $ fromJust . getNodeCacheRef $ ns
 | 
			
		||||
    firstCacheSnapshot <- readIORef . nodeCacheRef $ ns
 | 
			
		||||
    lookupLoop firstCacheSnapshot
 | 
			
		||||
  where
 | 
			
		||||
    lookupLoop :: NodeCache -> IO NodeState
 | 
			
		||||
    lookupLoop :: NodeCache -> IO RemoteNodeState
 | 
			
		||||
    lookupLoop cacheSnapshot = do
 | 
			
		||||
        let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID
 | 
			
		||||
        let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID
 | 
			
		||||
        -- FOUND can only be returned if targetID is owned by local node
 | 
			
		||||
        case localResult of
 | 
			
		||||
          FOUND thisNode -> pure thisNode
 | 
			
		||||
| 
						 | 
				
			
			@ -167,7 +164,7 @@ requestQueryID ns targetID = do
 | 
			
		|||
                                 Just (FORWARD resultset) -> addCacheEntryPure now <$> Set.elems resultset
 | 
			
		||||
                                 _ -> []
 | 
			
		||||
                -- forward entries to global cache
 | 
			
		||||
                forM_ entriesToInsert $ \entry -> atomically $ writeTQueue (fromJust . getCacheWriteQueue $ ns) entry
 | 
			
		||||
                forM_ entriesToInsert $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) entry
 | 
			
		||||
                -- insert entries into local cache copy
 | 
			
		||||
                pure $ foldl' (
 | 
			
		||||
                    \oldLCache insertFunc -> insertFunc oldLCache
 | 
			
		||||
| 
						 | 
				
			
			@ -182,9 +179,8 @@ requestQueryID ns targetID = do
 | 
			
		|||
              -- if no FOUND, recursively call lookup again
 | 
			
		||||
              maybe (lookupLoop newLCache) pure foundResp
 | 
			
		||||
 | 
			
		||||
    -- todo: random request ID
 | 
			
		||||
    lookupMessage targetID rID = Request rID ns 1 1 QueryID (Just $ pl ns targetID)
 | 
			
		||||
    pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . fromJust . getLNumBestNodes $ ns }
 | 
			
		||||
    lookupMessage targetID rID = Request rID (toRemoteNodeState ns) 1 1 QueryID (Just $ pl ns targetID)
 | 
			
		||||
    pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . lNumBestNodes $ ns }
 | 
			
		||||
 | 
			
		||||
-- | Generic function for sending a request over a connected socket and collecting the response.
 | 
			
		||||
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -17,15 +17,10 @@ module Hash2Pub.FediChord (
 | 
			
		|||
  , getNodeID
 | 
			
		||||
  , toNodeID
 | 
			
		||||
  , NodeState (..)
 | 
			
		||||
  , InternalNodeState (..)
 | 
			
		||||
  , getNodeCacheRef
 | 
			
		||||
  , putNodeCache
 | 
			
		||||
  , getSuccessors
 | 
			
		||||
  , putSuccessors
 | 
			
		||||
  , getPredecessors
 | 
			
		||||
  , putPredecessors
 | 
			
		||||
  , getLNumBestNodes
 | 
			
		||||
  , getCacheWriteQueue
 | 
			
		||||
  , LocalNodeState (..)
 | 
			
		||||
  , RemoteNodeState (..)
 | 
			
		||||
  , setSuccessors
 | 
			
		||||
  , setPredecessors
 | 
			
		||||
  , NodeCache
 | 
			
		||||
  , CacheEntry(..)
 | 
			
		||||
  , cacheGetNodeStateUnvalidated
 | 
			
		||||
| 
						 | 
				
			
			@ -125,26 +120,26 @@ a `localCompare` b
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
-- | represents a node and all its important state
 | 
			
		||||
data NodeState = NodeState
 | 
			
		||||
    { nid       :: NodeID
 | 
			
		||||
    , domain    :: String
 | 
			
		||||
data RemoteNodeState = RemoteNodeState
 | 
			
		||||
    { nid         :: NodeID
 | 
			
		||||
    , domain      :: String
 | 
			
		||||
    -- ^ full public domain name the node is reachable under
 | 
			
		||||
    , ipAddr    :: HostAddress6
 | 
			
		||||
    , ipAddr      :: HostAddress6
 | 
			
		||||
    -- the node's public IPv6 address
 | 
			
		||||
    , dhtPort   :: PortNumber
 | 
			
		||||
    , dhtPort     :: PortNumber
 | 
			
		||||
    -- ^ port of the DHT itself
 | 
			
		||||
    , apPort    :: Maybe PortNumber
 | 
			
		||||
    -- ^ port of the ActivityPub relay and storage service
 | 
			
		||||
    , vServerID :: Integer
 | 
			
		||||
    , servicePort :: PortNumber
 | 
			
		||||
    -- ^ port of the service provided on top of the DHT
 | 
			
		||||
    , vServerID   :: Integer
 | 
			
		||||
    -- ^ ID of this vserver
 | 
			
		||||
    , internals :: Maybe InternalNodeState
 | 
			
		||||
    -- ^ data not present in the representation of remote nodes
 | 
			
		||||
    }
 | 
			
		||||
    deriving (Show, Eq)
 | 
			
		||||
 | 
			
		||||
-- | encapsulates all data and parameters that are not present for remote nodes
 | 
			
		||||
data InternalNodeState = InternalNodeState
 | 
			
		||||
    { nodeCache           :: IORef NodeCache
 | 
			
		||||
-- | represents a node and encapsulates all data and parameters that are not present for remote nodes
 | 
			
		||||
data LocalNodeState = LocalNodeState
 | 
			
		||||
    { nodeState           :: RemoteNodeState
 | 
			
		||||
    -- ^ represents common data present both in remote and local node representations
 | 
			
		||||
    , nodeCacheRef        :: IORef NodeCache
 | 
			
		||||
    -- ^ EpiChord node cache with expiry times for nodes
 | 
			
		||||
    , cacheWriteQueue     :: TQueue (NodeCache -> NodeCache)
 | 
			
		||||
    -- ^ cache updates are not written directly to the  'nodeCache' but queued and
 | 
			
		||||
| 
						 | 
				
			
			@ -163,6 +158,63 @@ data InternalNodeState = InternalNodeState
 | 
			
		|||
    }
 | 
			
		||||
    deriving (Show, Eq)
 | 
			
		||||
 | 
			
		||||
-- | class for various NodeState representations, providing
 | 
			
		||||
-- getters and setters for common values
 | 
			
		||||
class NodeState a where
 | 
			
		||||
    -- getters for common properties
 | 
			
		||||
    getNid :: a -> NodeID
 | 
			
		||||
    getDomain :: a -> String
 | 
			
		||||
    getIpAddr :: a -> HostAddress6
 | 
			
		||||
    getDhtPort :: a -> PortNumber
 | 
			
		||||
    getServicePort :: a -> PortNumber
 | 
			
		||||
    getVServerID :: a -> Integer
 | 
			
		||||
    -- setters for common properties
 | 
			
		||||
    setNid :: NodeID -> a -> a
 | 
			
		||||
    setDomain :: String -> a -> a
 | 
			
		||||
    setIpAddr :: HostAddress6 -> a -> a
 | 
			
		||||
    setDhtPort :: PortNumber -> a -> a
 | 
			
		||||
    setServicePort :: PortNumber -> a -> a
 | 
			
		||||
    setVServerID :: Integer -> a -> a
 | 
			
		||||
    toRemoteNodeState :: a -> RemoteNodeState
 | 
			
		||||
 | 
			
		||||
instance NodeState RemoteNodeState where
 | 
			
		||||
    getNid = nid
 | 
			
		||||
    getDomain = domain
 | 
			
		||||
    getIpAddr = ipAddr
 | 
			
		||||
    getDhtPort = dhtPort
 | 
			
		||||
    getServicePort = servicePort
 | 
			
		||||
    getVServerID = vServerID
 | 
			
		||||
    setNid nid' ns = ns {nid = nid'}
 | 
			
		||||
    setDomain domain' ns = ns {domain = domain'}
 | 
			
		||||
    setIpAddr ipAddr' ns = ns {ipAddr = ipAddr'}
 | 
			
		||||
    setDhtPort dhtPort' ns = ns {dhtPort = dhtPort'}
 | 
			
		||||
    setServicePort servicePort' ns = ns {servicePort = servicePort'}
 | 
			
		||||
    setVServerID vServerID' ns = ns {vServerID = vServerID'}
 | 
			
		||||
    toRemoteNodeState = id
 | 
			
		||||
 | 
			
		||||
-- | helper function for setting values on the 'RemoteNodeState' contained in the 'LocalNodeState'
 | 
			
		||||
propagateNodeStateSet_ :: (RemoteNodeState -> RemoteNodeState) -> LocalNodeState -> LocalNodeState
 | 
			
		||||
propagateNodeStateSet_ func ns = let
 | 
			
		||||
        newNs = func $ nodeState ns
 | 
			
		||||
    in
 | 
			
		||||
        ns {nodeState = newNs}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
instance NodeState LocalNodeState where
 | 
			
		||||
    getNid = getNid . nodeState
 | 
			
		||||
    getDomain = getDomain . nodeState
 | 
			
		||||
    getIpAddr = getIpAddr . nodeState
 | 
			
		||||
    getDhtPort = getDhtPort . nodeState
 | 
			
		||||
    getServicePort = getServicePort . nodeState
 | 
			
		||||
    getVServerID = getVServerID . nodeState
 | 
			
		||||
    setNid nid' = propagateNodeStateSet_ $ setNid nid'
 | 
			
		||||
    setDomain domain' = propagateNodeStateSet_ $ setDomain domain'
 | 
			
		||||
    setIpAddr ipAddr' = propagateNodeStateSet_ $ setIpAddr ipAddr'
 | 
			
		||||
    setDhtPort dhtPort' = propagateNodeStateSet_ $ setDhtPort dhtPort'
 | 
			
		||||
    setServicePort servicePort' = propagateNodeStateSet_ $ setServicePort servicePort'
 | 
			
		||||
    setVServerID vServerID' = propagateNodeStateSet_ $ setVServerID vServerID'
 | 
			
		||||
    toRemoteNodeState = nodeState
 | 
			
		||||
 | 
			
		||||
-- | defining Show instances to be able to print NodeState for debug purposes
 | 
			
		||||
instance Typeable a => Show (IORef a) where
 | 
			
		||||
    show x = show (typeOf x)
 | 
			
		||||
| 
						 | 
				
			
			@ -170,55 +222,19 @@ instance Typeable a => Show (IORef a) where
 | 
			
		|||
instance Typeable a => Show (TQueue a) where
 | 
			
		||||
    show x = show (typeOf x)
 | 
			
		||||
 | 
			
		||||
-- | extract a value from the internals of a 'NodeState'
 | 
			
		||||
getInternals_ :: (InternalNodeState -> a) -> NodeState -> Maybe a
 | 
			
		||||
getInternals_ func ns = func <$> internals ns
 | 
			
		||||
-- | convenience function that updates the successors of a 'LocalNodeState'
 | 
			
		||||
setSuccessors :: [NodeID] -> LocalNodeState -> LocalNodeState
 | 
			
		||||
setSuccessors succ' ns = ns {successors = succ'}
 | 
			
		||||
 | 
			
		||||
-- could be done better with lenses
 | 
			
		||||
-- | convenience function that updates an internal value of a NodeState
 | 
			
		||||
putInternals_ :: (InternalNodeState -> InternalNodeState) -> NodeState -> NodeState
 | 
			
		||||
putInternals_ func ns = let
 | 
			
		||||
        newInternals = func <$> internals ns
 | 
			
		||||
    in
 | 
			
		||||
        ns {internals = newInternals }
 | 
			
		||||
 | 
			
		||||
-- | convenience function for extracting the 'NodeCache' from a 'NodeState'
 | 
			
		||||
getNodeCacheRef :: NodeState -> Maybe (IORef NodeCache)
 | 
			
		||||
getNodeCacheRef = getInternals_ nodeCache
 | 
			
		||||
 | 
			
		||||
-- | convenience function for updating the 'NodeCache' on 'NodeState' s that have
 | 
			
		||||
-- internals.
 | 
			
		||||
-- NodeStates without a cache (without internals) are returned unchanged
 | 
			
		||||
putNodeCache :: IORef NodeCache -> NodeState -> NodeState
 | 
			
		||||
putNodeCache nc = putInternals_ (\i -> i {nodeCache = nc})
 | 
			
		||||
 | 
			
		||||
getCacheWriteQueue :: NodeState -> Maybe (TQueue (NodeCache -> NodeCache))
 | 
			
		||||
getCacheWriteQueue = getInternals_ cacheWriteQueue
 | 
			
		||||
 | 
			
		||||
-- | convenience function for extracting the @successors@ from a 'NodeState'
 | 
			
		||||
getSuccessors :: NodeState -> Maybe [NodeID]
 | 
			
		||||
getSuccessors = getInternals_ successors
 | 
			
		||||
 | 
			
		||||
-- | convenience function that updates the successors of a NodeState
 | 
			
		||||
putSuccessors :: [NodeID] -> NodeState -> NodeState
 | 
			
		||||
putSuccessors succ' = putInternals_ (\i -> i {successors = succ'})
 | 
			
		||||
 | 
			
		||||
-- | convenience function for extracting the @predecessors@ from a 'NodeState'
 | 
			
		||||
getPredecessors :: NodeState -> Maybe [NodeID]
 | 
			
		||||
getPredecessors = getInternals_ predecessors
 | 
			
		||||
 | 
			
		||||
-- | convenience function that updates the predecessors of a NodeState
 | 
			
		||||
putPredecessors :: [NodeID] -> NodeState -> NodeState
 | 
			
		||||
putPredecessors pred' = putInternals_ (\i -> i {predecessors = pred'})
 | 
			
		||||
 | 
			
		||||
-- | convenience function for extracting the @lNumBestNodes@ from a 'NodeState'
 | 
			
		||||
getLNumBestNodes :: NodeState -> Maybe Int
 | 
			
		||||
getLNumBestNodes = getInternals_ lNumBestNodes
 | 
			
		||||
-- | convenience function that updates the predecessors of a 'LocalNodeState'
 | 
			
		||||
setPredecessors :: [NodeID] -> LocalNodeState -> LocalNodeState
 | 
			
		||||
setPredecessors pred' ns = ns {predecessors = pred'}
 | 
			
		||||
 | 
			
		||||
type NodeCache = Map.Map NodeID CacheEntry
 | 
			
		||||
 | 
			
		||||
-- |an entry of the 'nodeCache' can hold 2 different kinds of data
 | 
			
		||||
data CacheEntry = NodeEntry Bool NodeState POSIXTime
 | 
			
		||||
-- | An entry of the 'nodeCache' can hold 2 different kinds of data.
 | 
			
		||||
-- Type variable @a@ should be of type class 'NodeState', but I do not want to use GADTs here.
 | 
			
		||||
data CacheEntry = NodeEntry Bool RemoteNodeState POSIXTime
 | 
			
		||||
    | ProxyEntry (NodeID, ProxyDirection) (Maybe CacheEntry)
 | 
			
		||||
    deriving (Show, Eq)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -228,7 +244,7 @@ instance Ord CacheEntry where
 | 
			
		|||
 | 
			
		||||
    a `compare` b = compare (extractID a) (extractID b)
 | 
			
		||||
        where
 | 
			
		||||
            extractID (NodeEntry _ eState _) = nid eState
 | 
			
		||||
            extractID (NodeEntry _ eState _) = getNid eState
 | 
			
		||||
            extractID (ProxyEntry _ _) = error "proxy entries should never appear outside of the NodeCache"
 | 
			
		||||
 | 
			
		||||
data ProxyDirection = Backwards
 | 
			
		||||
| 
						 | 
				
			
			@ -308,7 +324,7 @@ cacheLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards
 | 
			
		|||
-- transfer difference now - entry to other node
 | 
			
		||||
 | 
			
		||||
-- | return the @NodeState@ data from a cache entry without checking its validation status
 | 
			
		||||
cacheGetNodeStateUnvalidated :: CacheEntry -> NodeState
 | 
			
		||||
cacheGetNodeStateUnvalidated :: CacheEntry -> RemoteNodeState
 | 
			
		||||
cacheGetNodeStateUnvalidated (NodeEntry _ nState _) = nState
 | 
			
		||||
cacheGetNodeStateUnvalidated (ProxyEntry _ (Just entry)) = cacheGetNodeStateUnvalidated entry
 | 
			
		||||
cacheGetNodeStateUnvalidated _ = error "trying to pure empty node state, please report a bug"
 | 
			
		||||
| 
						 | 
				
			
			@ -425,30 +441,30 @@ data FediChordConf = FediChordConf
 | 
			
		|||
 | 
			
		||||
-- | initialise data structures, compute own IDs and bind to listening socket
 | 
			
		||||
-- ToDo: load persisted state, thus this function already operates in IO
 | 
			
		||||
fediChordInit :: FediChordConf -> IO (Socket, NodeState)
 | 
			
		||||
fediChordInit :: FediChordConf -> IO (Socket, LocalNodeState)
 | 
			
		||||
fediChordInit conf = do
 | 
			
		||||
    initialState <- nodeStateInit conf
 | 
			
		||||
    serverSock <- mkServerSocket (ipAddr initialState) (dhtPort initialState)
 | 
			
		||||
    serverSock <- mkServerSocket (getIpAddr initialState) (getDhtPort initialState)
 | 
			
		||||
    pure (serverSock, initialState)
 | 
			
		||||
 | 
			
		||||
-- | initialises the 'NodeState' for this local node.
 | 
			
		||||
-- Separated from 'fediChordInit' to be usable in tests.
 | 
			
		||||
nodeStateInit :: FediChordConf -> IO NodeState
 | 
			
		||||
nodeStateInit :: FediChordConf -> IO LocalNodeState
 | 
			
		||||
nodeStateInit conf = do
 | 
			
		||||
    cacheRef <- newIORef initCache
 | 
			
		||||
    q <- atomically newTQueue
 | 
			
		||||
    let
 | 
			
		||||
        initialState = NodeState {
 | 
			
		||||
        containedState = RemoteNodeState {
 | 
			
		||||
            domain = confDomain conf
 | 
			
		||||
          , ipAddr = confIP conf
 | 
			
		||||
          , nid = genNodeID (confIP conf) (confDomain conf) 0
 | 
			
		||||
          , dhtPort = toEnum $ confDhtPort conf
 | 
			
		||||
          , apPort = Nothing
 | 
			
		||||
          , servicePort = 0
 | 
			
		||||
          , vServerID = 0
 | 
			
		||||
          , internals = Just internalsInit
 | 
			
		||||
                                            }
 | 
			
		||||
        internalsInit = InternalNodeState {
 | 
			
		||||
           nodeCache = cacheRef
 | 
			
		||||
        initialState = LocalNodeState {
 | 
			
		||||
            nodeState = containedState
 | 
			
		||||
          , nodeCacheRef = cacheRef
 | 
			
		||||
          , cacheWriteQueue = q
 | 
			
		||||
          , successors = []
 | 
			
		||||
          , predecessors = []
 | 
			
		||||
| 
						 | 
				
			
			@ -459,7 +475,7 @@ nodeStateInit conf = do
 | 
			
		|||
                                           }
 | 
			
		||||
    pure initialState
 | 
			
		||||
 | 
			
		||||
--fediChordJoin :: NodeState              -- ^ the local 'NodeState'
 | 
			
		||||
--fediChordJoin :: LocalNodeState              -- ^ the local 'NodeState'
 | 
			
		||||
--              -> (String, PortNumber)   -- ^ domain and port of a bootstrapping node
 | 
			
		||||
--              -> Socket                 -- ^ socket used for sending and receiving the join message
 | 
			
		||||
--              -> IO Either String NodeState -- ^ the joined 'NodeState' after a successful
 | 
			
		||||
| 
						 | 
				
			
			@ -472,19 +488,15 @@ nodeStateInit conf = do
 | 
			
		|||
 | 
			
		||||
-- | cache updater thread that waits for incoming NodeCache update instructions on
 | 
			
		||||
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
 | 
			
		||||
cacheWriter :: NodeState -> IO ()
 | 
			
		||||
cacheWriter :: LocalNodeState -> IO ()
 | 
			
		||||
cacheWriter ns = do
 | 
			
		||||
    let writeQueue' = getCacheWriteQueue ns
 | 
			
		||||
    case writeQueue' of
 | 
			
		||||
      Nothing -> pure ()
 | 
			
		||||
      Just writeQueue -> forever $ do
 | 
			
		||||
        f <- atomically $ readTQueue writeQueue
 | 
			
		||||
        let
 | 
			
		||||
            refModifier :: NodeCache -> (NodeCache, ())
 | 
			
		||||
            refModifier nc = (f nc, ())
 | 
			
		||||
        maybe (pure ()) (
 | 
			
		||||
            \ref -> atomicModifyIORef' ref refModifier
 | 
			
		||||
                 ) $ getNodeCacheRef ns
 | 
			
		||||
    let writeQueue' = cacheWriteQueue ns
 | 
			
		||||
    forever $ do
 | 
			
		||||
      f <- atomically $ readTQueue writeQueue'
 | 
			
		||||
      let
 | 
			
		||||
          refModifier :: NodeCache -> (NodeCache, ())
 | 
			
		||||
          refModifier nc = (f nc, ())
 | 
			
		||||
      atomicModifyIORef' (nodeCacheRef ns) refModifier
 | 
			
		||||
 | 
			
		||||
-- ====== network socket operations ======
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -6,7 +6,7 @@ import           Data.Time.Clock.POSIX (POSIXTime)
 | 
			
		|||
import           Hash2Pub.FediChord
 | 
			
		||||
 | 
			
		||||
data QueryResponse = FORWARD (Set.Set RemoteCacheEntry)
 | 
			
		||||
    | FOUND NodeState
 | 
			
		||||
    | FOUND RemoteNodeState
 | 
			
		||||
    deriving (Show, Eq)
 | 
			
		||||
 | 
			
		||||
-- === protocol serialisation data types
 | 
			
		||||
| 
						 | 
				
			
			@ -20,7 +20,7 @@ data Action = QueryID
 | 
			
		|||
 | 
			
		||||
data FediChordMessage = Request
 | 
			
		||||
    { requestID :: Integer
 | 
			
		||||
    , sender    :: NodeState
 | 
			
		||||
    , sender    :: RemoteNodeState
 | 
			
		||||
    , parts     :: Integer
 | 
			
		||||
    , part      :: Integer
 | 
			
		||||
    -- ^ part starts at 1
 | 
			
		||||
| 
						 | 
				
			
			@ -62,7 +62,7 @@ data ActionPayload = QueryIDRequestPayload
 | 
			
		|||
    , stabilisePredecessors :: [NodeID]
 | 
			
		||||
    }
 | 
			
		||||
    | PingResponsePayload
 | 
			
		||||
    { pingNodeStates :: [NodeState]
 | 
			
		||||
    { pingNodeStates :: [RemoteNodeState]
 | 
			
		||||
    }
 | 
			
		||||
    deriving (Show, Eq)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -73,7 +73,7 @@ maximumParts = 150
 | 
			
		|||
 | 
			
		||||
-- | dedicated data type for cache entries sent to or received from the network,
 | 
			
		||||
-- as these have to be considered as unvalidated. Also helps with separation of trust.
 | 
			
		||||
data RemoteCacheEntry = RemoteCacheEntry NodeState POSIXTime
 | 
			
		||||
data RemoteCacheEntry = RemoteCacheEntry RemoteNodeState POSIXTime
 | 
			
		||||
    deriving (Show, Eq)
 | 
			
		||||
 | 
			
		||||
instance Ord RemoteCacheEntry where
 | 
			
		||||
| 
						 | 
				
			
			@ -85,5 +85,5 @@ toRemoteCacheEntry (ProxyEntry _ (Just entry@NodeEntry{})) = toRemoteCacheEntry
 | 
			
		|||
toRemoteCacheEntry _ = Nothing
 | 
			
		||||
 | 
			
		||||
-- | extract the 'NodeState' from a 'RemoteCacheEntry'
 | 
			
		||||
remoteNode :: RemoteCacheEntry -> NodeState
 | 
			
		||||
remoteNode :: RemoteCacheEntry -> RemoteNodeState
 | 
			
		||||
remoteNode (RemoteCacheEntry ns _) = ns
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -55,14 +55,13 @@ spec = do
 | 
			
		|||
        it "can be initialised" $
 | 
			
		||||
            print exampleNodeState
 | 
			
		||||
        it "can be initialised partly and then modified later" $ do
 | 
			
		||||
            let ns = NodeState {
 | 
			
		||||
            let ns = RemoteNodeState {
 | 
			
		||||
                nid = undefined
 | 
			
		||||
              , domain = exampleNodeDomain
 | 
			
		||||
              , ipAddr = exampleIp
 | 
			
		||||
              , dhtPort = 2342
 | 
			
		||||
              , apPort = Nothing
 | 
			
		||||
              , servicePort = 513
 | 
			
		||||
              , vServerID = undefined
 | 
			
		||||
              , internals = Nothing
 | 
			
		||||
                               }
 | 
			
		||||
                nsReady = ns {
 | 
			
		||||
                  nid = genNodeID (ipAddr ns) (domain ns) 3
 | 
			
		||||
| 
						 | 
				
			
			@ -121,9 +120,7 @@ spec = do
 | 
			
		|||
        let
 | 
			
		||||
            emptyCache = initCache
 | 
			
		||||
            nid1 = toNodeID 2^(23::Integer)+1
 | 
			
		||||
            node1 = do
 | 
			
		||||
                eln <- exampleLocalNode -- is at 2^23.00000017198264 = 8388609
 | 
			
		||||
                pure $ putPredecessors [nid4] $ eln {nid = nid1}
 | 
			
		||||
            node1 = setPredecessors [nid4] . setNid nid1 <$> exampleLocalNode
 | 
			
		||||
            nid2 = toNodeID 2^(230::Integer)+12
 | 
			
		||||
            node2 = exampleNodeState { nid = nid2}
 | 
			
		||||
            nid3 = toNodeID 2^(25::Integer)+10
 | 
			
		||||
| 
						 | 
				
			
			@ -131,7 +128,7 @@ spec = do
 | 
			
		|||
            nid4 = toNodeID 2^(9::Integer)+100
 | 
			
		||||
            node4 = exampleNodeState { nid = nid4}
 | 
			
		||||
            cacheWith2Entries :: IO NodeCache
 | 
			
		||||
            cacheWith2Entries = addCacheEntryPure 10 <$> (RemoteCacheEntry <$> node1 <*> pure 10) <*> pure (addCacheEntryPure 10 (RemoteCacheEntry node2 10) emptyCache)
 | 
			
		||||
            cacheWith2Entries = addCacheEntryPure 10 <$> (RemoteCacheEntry <$> (toRemoteNodeState <$> node1) <*> pure 10) <*> pure (addCacheEntryPure 10 (RemoteCacheEntry node2 10) emptyCache)
 | 
			
		||||
            cacheWith4Entries = addCacheEntryPure 10 (RemoteCacheEntry node3 10) <$> (addCacheEntryPure 10 (RemoteCacheEntry node4 10) <$> cacheWith2Entries)
 | 
			
		||||
        it "works on an empty cache" $ do
 | 
			
		||||
            queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) `shouldReturn` FORWARD Set.empty
 | 
			
		||||
| 
						 | 
				
			
			@ -146,9 +143,9 @@ spec = do
 | 
			
		|||
            Set.map (nid . remoteNode) nodeset2 `shouldBe` Set.fromList [nid4]
 | 
			
		||||
        it "recognises the node's own responsibility" $ do
 | 
			
		||||
            FOUND selfQueryRes <- queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure nid1
 | 
			
		||||
            nid <$> node1 `shouldReturn` nid selfQueryRes
 | 
			
		||||
            getNid <$> node1 `shouldReturn` getNid selfQueryRes
 | 
			
		||||
            FOUND responsibilityResult <- queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(22::Integer))
 | 
			
		||||
            nid <$> node1 `shouldReturn` nid responsibilityResult
 | 
			
		||||
            getNid <$> node1 `shouldReturn` getNid responsibilityResult
 | 
			
		||||
        it "does not fail on nodes without neighbours (initial state)" $ do
 | 
			
		||||
            (FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 11)
 | 
			
		||||
            Set.map (nid . remoteNode ) nodeset `shouldBe` Set.fromList [nid4, nid2, nid3]
 | 
			
		||||
| 
						 | 
				
			
			@ -243,18 +240,17 @@ spec = do
 | 
			
		|||
 | 
			
		||||
-- some example data
 | 
			
		||||
 | 
			
		||||
exampleNodeState :: NodeState
 | 
			
		||||
exampleNodeState = NodeState {
 | 
			
		||||
exampleNodeState :: RemoteNodeState
 | 
			
		||||
exampleNodeState = RemoteNodeState {
 | 
			
		||||
    nid = toNodeID 12
 | 
			
		||||
  , domain = exampleNodeDomain
 | 
			
		||||
  , ipAddr = exampleIp
 | 
			
		||||
  , dhtPort = 2342
 | 
			
		||||
  , apPort = Nothing
 | 
			
		||||
  , servicePort = 513
 | 
			
		||||
  , vServerID = 0
 | 
			
		||||
  , internals = Nothing
 | 
			
		||||
                   }
 | 
			
		||||
 | 
			
		||||
exampleLocalNode :: IO NodeState
 | 
			
		||||
exampleLocalNode :: IO LocalNodeState
 | 
			
		||||
exampleLocalNode = nodeStateInit $ FediChordConf {
 | 
			
		||||
    confDomain = "example.social"
 | 
			
		||||
  , confIP = exampleIp
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue