change all function definitions to new NodeState types

- adjust implementation

contributes to #20
This commit is contained in:
Trolli Schmittlauch 2020-05-21 23:38:50 +02:00
parent fe673dc255
commit e8091b0a29
4 changed files with 70 additions and 105 deletions

View file

@ -186,15 +186,15 @@ encodePayload payload'@PingResponsePayload{} =
: concatMap encodeNodeState (pingNodeStates payload')
<> [End Sequence]
encodeNodeState :: NodeState -> [ASN1]
encodeNodeState :: NodeState a => a -> [ASN1]
encodeNodeState ns = [
Start Sequence
, IntVal (getNodeID . nid $ ns)
, ASN1String . asn1CharacterString Visible $ domain ns
, OctetString (ipAddrAsBS $ ipAddr ns)
, IntVal (toInteger . dhtPort $ ns)
, IntVal (maybe 0 toInteger $ apPort ns)
, IntVal (vServerID ns)
, IntVal (getNodeID . getNid $ ns)
, ASN1String . asn1CharacterString Visible $ getDomain ns
, OctetString (ipAddrAsBS $ getIpAddr ns)
, IntVal (toInteger . getDhtPort $ ns)
, IntVal (toInteger . getServicePort $ ns)
, IntVal (getVServerID ns)
, End Sequence
]
@ -328,22 +328,21 @@ parseNull = do
Null -> pure ()
x -> throwParseError $ "Expected Null but got " <> show x
parseNodeState :: ParseASN1 NodeState
parseNodeState :: ParseASN1 RemoteNodeState
parseNodeState = onNextContainer Sequence $ do
nid' <- fromInteger <$> parseInteger
domain' <- parseString
ip' <- bsAsIpAddr <$> parseOctets
dhtPort' <- fromInteger <$> parseInteger
apPort' <- fromInteger <$> parseInteger
servicePort' <- fromInteger <$> parseInteger
vServer' <- parseInteger
pure NodeState {
pure RemoteNodeState {
nid = nid'
, domain = domain'
, dhtPort = dhtPort'
, apPort = if apPort' == 0 then Nothing else Just apPort'
, servicePort = servicePort'
, vServerID = vServer'
, ipAddr = ip'
, internals = Nothing
}

View file

@ -37,18 +37,15 @@ import System.Random
import System.Timeout
import Hash2Pub.ASN1Coding
import Hash2Pub.FediChord (CacheEntry (..), NodeCache,
import Hash2Pub.FediChord (CacheEntry (..),
LocalNodeState (..), NodeCache,
NodeID, NodeState (..),
RemoteNodeState (..),
cacheGetNodeStateUnvalidated,
cacheLookup, cacheLookupPred,
cacheLookupSucc,
getCacheWriteQueue,
getLNumBestNodes,
getNodeCacheRef,
getPredecessors, getSuccessors,
localCompare, mkSendSocket,
mkServerSocket,
putPredecessors, putSuccessors)
cacheLookupSucc, localCompare,
mkSendSocket, mkServerSocket,
setPredecessors, setSuccessors)
import Hash2Pub.ProtocolTypes
import Debug.Trace (trace)
@ -57,22 +54,22 @@ import Debug.Trace (trace)
-- TODO: evaluate more fine-grained argument passing to allow granular locking
-- | look up an ID to either claim responsibility for it or return the closest l nodes from the local cache
queryLocalCache :: NodeState -> NodeCache -> Int -> NodeID -> QueryResponse
queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse
queryLocalCache ownState nCache lBestNodes targetID
-- as target ID falls between own ID and first predecessor, it is handled by this node
| (targetID `localCompare` ownID) `elem` [LT, EQ] && not (null preds) && (targetID `localCompare` head preds == GT) = FOUND ownState
| (targetID `localCompare` ownID) `elem` [LT, EQ] && maybe False (\p -> targetID `localCompare` p == GT) (headMay preds) = FOUND . toRemoteNodeState $ ownState
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
-- the closest succeeding node (like with the p initiated parallel queries
| otherwise = FORWARD $ closestSuccessor `Set.union` closestPredecessors
where
preds = fromMaybe [] $ getPredecessors ownState
ownID = nid ownState
ownID = getNid ownState
preds = predecessors ownState
closestSuccessor :: Set.Set RemoteCacheEntry
closestSuccessor = maybe Set.empty Set.singleton $ toRemoteCacheEntry =<< cacheLookupSucc targetID nCache
closestPredecessors :: Set.Set RemoteCacheEntry
closestPredecessors = closestPredecessor (lBestNodes-1) $ nid ownState
closestPredecessors = closestPredecessor (lBestNodes-1) $ getNid ownState
closestPredecessor :: (Integral n, Show n) => n -> NodeID -> Set.Set RemoteCacheEntry
closestPredecessor 0 _ = Set.empty
closestPredecessor remainingLookups lastID
@ -135,19 +132,19 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
-- ====== message send and receive operations ======
requestQueryID :: NodeState -> NodeID -> IO NodeState
requestQueryID :: LocalNodeState -> NodeID -> IO RemoteNodeState
-- 1. do a local lookup for the l closest nodes
-- 2. create l sockets
-- 3. send a message async concurrently to all l nodes
-- 4. collect the results, insert them into cache
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
requestQueryID ns targetID = do
firstCacheSnapshot <- readIORef $ fromJust . getNodeCacheRef $ ns
firstCacheSnapshot <- readIORef . nodeCacheRef $ ns
lookupLoop firstCacheSnapshot
where
lookupLoop :: NodeCache -> IO NodeState
lookupLoop :: NodeCache -> IO RemoteNodeState
lookupLoop cacheSnapshot = do
let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID
let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID
-- FOUND can only be returned if targetID is owned by local node
case localResult of
FOUND thisNode -> pure thisNode
@ -167,7 +164,7 @@ requestQueryID ns targetID = do
Just (FORWARD resultset) -> addCacheEntryPure now <$> Set.elems resultset
_ -> []
-- forward entries to global cache
forM_ entriesToInsert $ \entry -> atomically $ writeTQueue (fromJust . getCacheWriteQueue $ ns) entry
forM_ entriesToInsert $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) entry
-- insert entries into local cache copy
pure $ foldl' (
\oldLCache insertFunc -> insertFunc oldLCache
@ -182,9 +179,8 @@ requestQueryID ns targetID = do
-- if no FOUND, recursively call lookup again
maybe (lookupLoop newLCache) pure foundResp
-- todo: random request ID
lookupMessage targetID rID = Request rID ns 1 1 QueryID (Just $ pl ns targetID)
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . fromJust . getLNumBestNodes $ ns }
lookupMessage targetID rID = Request rID (toRemoteNodeState ns) 1 1 QueryID (Just $ pl ns targetID)
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . lNumBestNodes $ ns }
-- | Generic function for sending a request over a connected socket and collecting the response.
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.

View file

@ -17,15 +17,10 @@ module Hash2Pub.FediChord (
, getNodeID
, toNodeID
, NodeState (..)
, InternalNodeState (..)
, getNodeCacheRef
, putNodeCache
, getSuccessors
, putSuccessors
, getPredecessors
, putPredecessors
, getLNumBestNodes
, getCacheWriteQueue
, LocalNodeState (..)
, RemoteNodeState (..)
, setSuccessors
, setPredecessors
, NodeCache
, CacheEntry(..)
, cacheGetNodeStateUnvalidated
@ -144,7 +139,7 @@ data RemoteNodeState = RemoteNodeState
data LocalNodeState = LocalNodeState
{ nodeState :: RemoteNodeState
-- ^ represents common data present both in remote and local node representations
, nodeCache :: IORef NodeCache
, nodeCacheRef :: IORef NodeCache
-- ^ EpiChord node cache with expiry times for nodes
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- ^ cache updates are not written directly to the 'nodeCache' but queued and
@ -180,6 +175,7 @@ class NodeState a where
setDhtPort :: PortNumber -> a -> a
setServicePort :: PortNumber -> a -> a
setVServerID :: Integer -> a -> a
toRemoteNodeState :: a -> RemoteNodeState
instance NodeState RemoteNodeState where
getNid = nid
@ -194,6 +190,7 @@ instance NodeState RemoteNodeState where
setDhtPort dhtPort' ns = ns {dhtPort = dhtPort'}
setServicePort servicePort' ns = ns {servicePort = servicePort'}
setVServerID vServerID' ns = ns {vServerID = vServerID'}
toRemoteNodeState = id
-- | helper function for setting values on the 'RemoteNodeState' contained in the 'LocalNodeState'
propagateNodeStateSet_ :: (RemoteNodeState -> RemoteNodeState) -> LocalNodeState -> LocalNodeState
@ -216,6 +213,7 @@ instance NodeState LocalNodeState where
setDhtPort dhtPort' = propagateNodeStateSet_ $ setDhtPort dhtPort'
setServicePort servicePort' = propagateNodeStateSet_ $ setServicePort servicePort'
setVServerID vServerID' = propagateNodeStateSet_ $ setVServerID vServerID'
toRemoteNodeState = nodeState
-- | defining Show instances to be able to print NodeState for debug purposes
instance Typeable a => Show (IORef a) where
@ -224,43 +222,19 @@ instance Typeable a => Show (IORef a) where
instance Typeable a => Show (TQueue a) where
show x = show (typeOf x)
-- | convenience function for extracting the 'NodeCache' from a 'NodeState'
getNodeCacheRef :: NodeState -> Maybe (IORef NodeCache)
getNodeCacheRef = getInternals_ nodeCache
-- | convenience function that updates the successors of a 'LocalNodeState'
setSuccessors :: [NodeID] -> LocalNodeState -> LocalNodeState
setSuccessors succ' ns = ns {successors = succ'}
-- | convenience function for updating the 'NodeCache' on 'NodeState' s that have
-- internals.
-- NodeStates without a cache (without internals) are returned unchanged
putNodeCache :: IORef NodeCache -> NodeState -> NodeState
putNodeCache nc = putInternals_ (\i -> i {nodeCache = nc})
getCacheWriteQueue :: NodeState -> Maybe (TQueue (NodeCache -> NodeCache))
getCacheWriteQueue = getInternals_ cacheWriteQueue
-- | convenience function for extracting the @successors@ from a 'NodeState'
getSuccessors :: NodeState -> Maybe [NodeID]
getSuccessors = getInternals_ successors
-- | convenience function that updates the successors of a NodeState
putSuccessors :: [NodeID] -> NodeState -> NodeState
putSuccessors succ' = putInternals_ (\i -> i {successors = succ'})
-- | convenience function for extracting the @predecessors@ from a 'NodeState'
getPredecessors :: NodeState -> Maybe [NodeID]
getPredecessors = getInternals_ predecessors
-- | convenience function that updates the predecessors of a NodeState
putPredecessors :: [NodeID] -> NodeState -> NodeState
putPredecessors pred' = putInternals_ (\i -> i {predecessors = pred'})
-- | convenience function for extracting the @lNumBestNodes@ from a 'NodeState'
getLNumBestNodes :: NodeState -> Maybe Int
getLNumBestNodes = getInternals_ lNumBestNodes
-- | convenience function that updates the predecessors of a 'LocalNodeState'
setPredecessors :: [NodeID] -> LocalNodeState -> LocalNodeState
setPredecessors pred' ns = ns {predecessors = pred'}
type NodeCache = Map.Map NodeID CacheEntry
-- |an entry of the 'nodeCache' can hold 2 different kinds of data
data CacheEntry = NodeEntry Bool NodeState POSIXTime
-- | An entry of the 'nodeCache' can hold 2 different kinds of data.
-- Type variable @a@ should be of type class 'NodeState', but I do not want to use GADTs here.
data CacheEntry = NodeEntry Bool RemoteNodeState POSIXTime
| ProxyEntry (NodeID, ProxyDirection) (Maybe CacheEntry)
deriving (Show, Eq)
@ -270,7 +244,7 @@ instance Ord CacheEntry where
a `compare` b = compare (extractID a) (extractID b)
where
extractID (NodeEntry _ eState _) = nid eState
extractID (NodeEntry _ eState _) = getNid eState
extractID (ProxyEntry _ _) = error "proxy entries should never appear outside of the NodeCache"
data ProxyDirection = Backwards
@ -350,7 +324,7 @@ cacheLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards
-- transfer difference now - entry to other node
-- | return the @NodeState@ data from a cache entry without checking its validation status
cacheGetNodeStateUnvalidated :: CacheEntry -> NodeState
cacheGetNodeStateUnvalidated :: CacheEntry -> RemoteNodeState
cacheGetNodeStateUnvalidated (NodeEntry _ nState _) = nState
cacheGetNodeStateUnvalidated (ProxyEntry _ (Just entry)) = cacheGetNodeStateUnvalidated entry
cacheGetNodeStateUnvalidated _ = error "trying to pure empty node state, please report a bug"
@ -467,30 +441,30 @@ data FediChordConf = FediChordConf
-- | initialise data structures, compute own IDs and bind to listening socket
-- ToDo: load persisted state, thus this function already operates in IO
fediChordInit :: FediChordConf -> IO (Socket, NodeState)
fediChordInit :: FediChordConf -> IO (Socket, LocalNodeState)
fediChordInit conf = do
initialState <- nodeStateInit conf
serverSock <- mkServerSocket (ipAddr initialState) (dhtPort initialState)
serverSock <- mkServerSocket (getIpAddr initialState) (getDhtPort initialState)
pure (serverSock, initialState)
-- | initialises the 'NodeState' for this local node.
-- Separated from 'fediChordInit' to be usable in tests.
nodeStateInit :: FediChordConf -> IO NodeState
nodeStateInit :: FediChordConf -> IO LocalNodeState
nodeStateInit conf = do
cacheRef <- newIORef initCache
q <- atomically newTQueue
let
initialState = NodeState {
containedState = RemoteNodeState {
domain = confDomain conf
, ipAddr = confIP conf
, nid = genNodeID (confIP conf) (confDomain conf) 0
, dhtPort = toEnum $ confDhtPort conf
, apPort = Nothing
, servicePort = 0
, vServerID = 0
, internals = Just internalsInit
}
internalsInit = InternalNodeState {
nodeCache = cacheRef
initialState = LocalNodeState {
nodeState = containedState
, nodeCacheRef = cacheRef
, cacheWriteQueue = q
, successors = []
, predecessors = []
@ -501,7 +475,7 @@ nodeStateInit conf = do
}
pure initialState
--fediChordJoin :: NodeState -- ^ the local 'NodeState'
--fediChordJoin :: LocalNodeState -- ^ the local 'NodeState'
-- -> (String, PortNumber) -- ^ domain and port of a bootstrapping node
-- -> Socket -- ^ socket used for sending and receiving the join message
-- -> IO Either String NodeState -- ^ the joined 'NodeState' after a successful
@ -514,19 +488,15 @@ nodeStateInit conf = do
-- | cache updater thread that waits for incoming NodeCache update instructions on
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
cacheWriter :: NodeState -> IO ()
cacheWriter :: LocalNodeState -> IO ()
cacheWriter ns = do
let writeQueue' = getCacheWriteQueue ns
case writeQueue' of
Nothing -> pure ()
Just writeQueue -> forever $ do
f <- atomically $ readTQueue writeQueue
let writeQueue' = cacheWriteQueue ns
forever $ do
f <- atomically $ readTQueue writeQueue'
let
refModifier :: NodeCache -> (NodeCache, ())
refModifier nc = (f nc, ())
maybe (pure ()) (
\ref -> atomicModifyIORef' ref refModifier
) $ getNodeCacheRef ns
atomicModifyIORef' (nodeCacheRef ns) refModifier
-- ====== network socket operations ======

View file

@ -6,7 +6,7 @@ import Data.Time.Clock.POSIX (POSIXTime)
import Hash2Pub.FediChord
data QueryResponse = FORWARD (Set.Set RemoteCacheEntry)
| FOUND NodeState
| FOUND RemoteNodeState
deriving (Show, Eq)
-- === protocol serialisation data types
@ -20,7 +20,7 @@ data Action = QueryID
data FediChordMessage = Request
{ requestID :: Integer
, sender :: NodeState
, sender :: RemoteNodeState
, parts :: Integer
, part :: Integer
-- ^ part starts at 1
@ -62,7 +62,7 @@ data ActionPayload = QueryIDRequestPayload
, stabilisePredecessors :: [NodeID]
}
| PingResponsePayload
{ pingNodeStates :: [NodeState]
{ pingNodeStates :: [RemoteNodeState]
}
deriving (Show, Eq)
@ -73,7 +73,7 @@ maximumParts = 150
-- | dedicated data type for cache entries sent to or received from the network,
-- as these have to be considered as unvalidated. Also helps with separation of trust.
data RemoteCacheEntry = RemoteCacheEntry NodeState POSIXTime
data RemoteCacheEntry = RemoteCacheEntry RemoteNodeState POSIXTime
deriving (Show, Eq)
instance Ord RemoteCacheEntry where
@ -85,5 +85,5 @@ toRemoteCacheEntry (ProxyEntry _ (Just entry@NodeEntry{})) = toRemoteCacheEntry
toRemoteCacheEntry _ = Nothing
-- | extract the 'NodeState' from a 'RemoteCacheEntry'
remoteNode :: RemoteCacheEntry -> NodeState
remoteNode :: RemoteCacheEntry -> RemoteNodeState
remoteNode (RemoteCacheEntry ns _) = ns