re-organise protocol definition and create own type for remote cache entries
This commit is contained in:
parent
91bb72cb57
commit
89cc51af52
|
@ -18,68 +18,7 @@ import Safe
|
|||
|
||||
import Hash2Pub.FediChord
|
||||
import Hash2Pub.Utils
|
||||
import Hash2Pub.DHTProtocol (QueryResponse (..))
|
||||
|
||||
import Debug.Trace (trace)
|
||||
|
||||
data Action =
|
||||
QueryID
|
||||
| Join
|
||||
| Leave
|
||||
| Stabilise
|
||||
| Ping
|
||||
deriving (Show, Eq, Enum)
|
||||
|
||||
-- ToDo: probably move this to DHTProtocol as it is high-level
|
||||
|
||||
data FediChordMessage =
|
||||
Request {
|
||||
requestID :: Integer
|
||||
, sender :: NodeState
|
||||
, parts :: Integer
|
||||
, part :: Integer
|
||||
-- ^ part starts at 0
|
||||
, action :: Action
|
||||
, payload :: ActionPayload
|
||||
}
|
||||
| Response {
|
||||
responseTo :: Integer
|
||||
, senderID :: NodeID
|
||||
, parts :: Integer
|
||||
, part :: Integer
|
||||
, action :: Action
|
||||
, payload :: ActionPayload
|
||||
} deriving (Show, Eq)
|
||||
|
||||
data ActionPayload =
|
||||
QueryIDRequestPayload {
|
||||
queryTargetID :: NodeID
|
||||
, queryLBestNodes :: Integer
|
||||
}
|
||||
| JoinRequestPayload
|
||||
| LeaveRequestPayload {
|
||||
leaveSuccessors :: [NodeID]
|
||||
, leavePredecessors :: [NodeID]
|
||||
}
|
||||
| StabiliseRequestPayload
|
||||
| PingRequestPayload
|
||||
| QueryIDResponsePayload {
|
||||
queryResult :: QueryResponse
|
||||
}
|
||||
| JoinResponsePayload {
|
||||
joinSuccessors :: [NodeID]
|
||||
, joinPredecessors :: [NodeID]
|
||||
, joinCache :: [CacheEntry]
|
||||
}
|
||||
| LeaveResponsePayload
|
||||
| StabiliseResponsePayload {
|
||||
stabiliseSuccessors :: [NodeID]
|
||||
, stabilisePredecessors :: [NodeID]
|
||||
}
|
||||
| PingResponsePayload {
|
||||
pingNodeStates :: [NodeState]
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
import Hash2Pub.DHTProtocol
|
||||
|
||||
-- | Try splitting a payload into multiple parts to be able to reduce size of
|
||||
-- individual messages.
|
||||
|
@ -245,8 +184,8 @@ encodeNodeState ns = [
|
|||
, End Sequence
|
||||
]
|
||||
|
||||
encodeCacheEntry :: CacheEntry -> [ASN1]
|
||||
encodeCacheEntry (NodeEntry _ ns timestamp) =
|
||||
encodeCacheEntry :: RemoteCacheEntry -> [ASN1]
|
||||
encodeCacheEntry (RemoteCacheEntry ns timestamp) =
|
||||
Start Sequence
|
||||
: encodeNodeState ns
|
||||
-- ToDo: possibly optimise this by using dlists
|
||||
|
@ -392,13 +331,13 @@ parseNodeState = onNextContainer Sequence $ do
|
|||
}
|
||||
|
||||
|
||||
parseCacheEntry :: ParseASN1 CacheEntry
|
||||
parseCacheEntry :: ParseASN1 RemoteCacheEntry
|
||||
parseCacheEntry = onNextContainer Sequence $ do
|
||||
node <- parseNodeState
|
||||
timestamp <- toEnum . fromIntegral <$> parseInteger
|
||||
return $ NodeEntry False node timestamp
|
||||
return $ RemoteCacheEntry node timestamp
|
||||
|
||||
parseNodeCache :: ParseASN1 [CacheEntry]
|
||||
parseNodeCache :: ParseASN1 [RemoteCacheEntry]
|
||||
parseNodeCache = onNextContainer Sequence $ getMany parseCacheEntry
|
||||
|
||||
parseJoinRequest :: ParseASN1 ActionPayload
|
||||
|
|
|
@ -3,11 +3,20 @@
|
|||
module Hash2Pub.DHTProtocol
|
||||
( QueryResponse (..)
|
||||
, incomingQuery
|
||||
, addCacheEntry
|
||||
, deleteCacheEntry
|
||||
, RemoteCacheEntry(..)
|
||||
, toRemoteCacheEntry
|
||||
, Action(..)
|
||||
, ActionPayload(..)
|
||||
, FediChordMessage(..)
|
||||
)
|
||||
where
|
||||
|
||||
import Data.Maybe (maybe, fromMaybe)
|
||||
import qualified Data.Set as Set
|
||||
import qualified Data.Map as Map
|
||||
import Data.Time.Clock.POSIX
|
||||
|
||||
import Hash2Pub.FediChord
|
||||
( NodeID
|
||||
|
@ -20,9 +29,7 @@ import Hash2Pub.FediChord
|
|||
, putPredecessors
|
||||
, cacheGetNodeStateUnvalidated
|
||||
, NodeCache
|
||||
, CacheEntry
|
||||
, addCacheEntry
|
||||
, deleteCacheEntry
|
||||
, CacheEntry(..)
|
||||
, cacheLookup
|
||||
, cacheLookupSucc
|
||||
, cacheLookupPred
|
||||
|
@ -31,7 +38,9 @@ import Hash2Pub.FediChord
|
|||
|
||||
import Debug.Trace (trace)
|
||||
|
||||
data QueryResponse = FORWARD (Set.Set CacheEntry) -- ^return closest nodes from local cache.
|
||||
-- === queries ===
|
||||
|
||||
data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) -- ^return closest nodes from local cache.
|
||||
-- whole cache entry is returned for making
|
||||
-- the entry time stamp available to the
|
||||
-- protocol serialiser
|
||||
|
@ -51,20 +60,120 @@ incomingQuery ownState nCache lBestNodes targetID
|
|||
preds = fromMaybe [] $ getPredecessors ownState
|
||||
ownID = nid ownState
|
||||
|
||||
closestSuccessor :: Set.Set CacheEntry
|
||||
closestSuccessor = maybe Set.empty Set.singleton $ cacheLookupSucc targetID nCache
|
||||
closestSuccessor :: Set.Set RemoteCacheEntry
|
||||
closestSuccessor = maybe Set.empty Set.singleton $ toRemoteCacheEntry =<< cacheLookupSucc targetID nCache
|
||||
|
||||
closestPredecessors :: Set.Set CacheEntry
|
||||
closestPredecessors :: Set.Set RemoteCacheEntry
|
||||
closestPredecessors = closestPredecessor (lBestNodes-1) $ nid ownState
|
||||
closestPredecessor :: (Integral n, Show n) => n -> NodeID -> Set.Set CacheEntry
|
||||
closestPredecessor :: (Integral n, Show n) => n -> NodeID -> Set.Set RemoteCacheEntry
|
||||
closestPredecessor 0 _ = Set.empty
|
||||
closestPredecessor remainingLookups lastID
|
||||
| remainingLookups < 0 = Set.empty
|
||||
| otherwise =
|
||||
let result = cacheLookupPred lastID nCache
|
||||
in
|
||||
case result of
|
||||
case toRemoteCacheEntry =<< result of
|
||||
Nothing -> Set.empty
|
||||
Just nPred -> Set.insert nPred $ closestPredecessor (remainingLookups-1) (nid . cacheGetNodeStateUnvalidated $ nPred)
|
||||
Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestPredecessor (remainingLookups-1) (nid ns)
|
||||
|
||||
-- === protocol serialisation data types
|
||||
|
||||
data Action =
|
||||
QueryID
|
||||
| Join
|
||||
| Leave
|
||||
| Stabilise
|
||||
| Ping
|
||||
deriving (Show, Eq, Enum)
|
||||
|
||||
data FediChordMessage =
|
||||
Request {
|
||||
requestID :: Integer
|
||||
, sender :: NodeState
|
||||
, parts :: Integer
|
||||
, part :: Integer
|
||||
-- ^ part starts at 0
|
||||
, action :: Action
|
||||
, payload :: ActionPayload
|
||||
}
|
||||
| Response {
|
||||
responseTo :: Integer
|
||||
, senderID :: NodeID
|
||||
, parts :: Integer
|
||||
, part :: Integer
|
||||
, action :: Action
|
||||
, payload :: ActionPayload
|
||||
} deriving (Show, Eq)
|
||||
|
||||
data ActionPayload =
|
||||
QueryIDRequestPayload {
|
||||
queryTargetID :: NodeID
|
||||
, queryLBestNodes :: Integer
|
||||
}
|
||||
| JoinRequestPayload
|
||||
| LeaveRequestPayload {
|
||||
leaveSuccessors :: [NodeID]
|
||||
, leavePredecessors :: [NodeID]
|
||||
}
|
||||
| StabiliseRequestPayload
|
||||
| PingRequestPayload
|
||||
| QueryIDResponsePayload {
|
||||
queryResult :: QueryResponse
|
||||
}
|
||||
| JoinResponsePayload {
|
||||
joinSuccessors :: [NodeID]
|
||||
, joinPredecessors :: [NodeID]
|
||||
, joinCache :: [RemoteCacheEntry]
|
||||
}
|
||||
| LeaveResponsePayload
|
||||
| StabiliseResponsePayload {
|
||||
stabiliseSuccessors :: [NodeID]
|
||||
, stabilisePredecessors :: [NodeID]
|
||||
}
|
||||
| PingResponsePayload {
|
||||
pingNodeStates :: [NodeState]
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- | dedicated data type for cache entries sent to or received from the network,
|
||||
-- as these have to be considered as unvalidated. Also helps with separation of trust.
|
||||
data RemoteCacheEntry = RemoteCacheEntry NodeState POSIXTime
|
||||
deriving (Show, Eq)
|
||||
|
||||
instance Ord RemoteCacheEntry where
|
||||
(RemoteCacheEntry ns1 _) `compare` (RemoteCacheEntry ns2 _) = nid ns1 `compare` nid ns2
|
||||
|
||||
toRemoteCacheEntry :: CacheEntry -> Maybe RemoteCacheEntry
|
||||
toRemoteCacheEntry (NodeEntry _ ns ts) = Just $ RemoteCacheEntry ns ts
|
||||
toRemoteCacheEntry (ProxyEntry _ (Just entry@NodeEntry{})) = toRemoteCacheEntry entry
|
||||
toRemoteCacheEntry _ = Nothing
|
||||
|
||||
-- cache operations
|
||||
|
||||
-- | update or insert a 'RemoteCacheEntry' into the cache,
|
||||
-- converting it to a local 'CacheEntry'
|
||||
addCacheEntry :: RemoteCacheEntry -- ^ a remote cache entry received from network
|
||||
-> NodeCache -- ^ node cache to insert to
|
||||
-> IO NodeCache -- ^ new node cache with the element inserted
|
||||
addCacheEntry (RemoteCacheEntry ns ts) cache = do
|
||||
now <- getPOSIXTime
|
||||
let
|
||||
-- TODO: limit diffSeconds to some maximum value to prevent malicious nodes from inserting entries valid nearly until eternity
|
||||
timestamp' = if ts <= now then ts else now
|
||||
newCache = Map.insertWith insertCombineFunction (nid ns) (NodeEntry False ns timestamp') cache
|
||||
insertCombineFunction newVal@(NodeEntry newValidationState newNode newTimestamp) oldVal =
|
||||
case oldVal of
|
||||
ProxyEntry n _ -> ProxyEntry n (Just newVal)
|
||||
NodeEntry oldValidationState _ oldTimestamp -> NodeEntry oldValidationState newNode (max oldTimestamp newTimestamp)
|
||||
return newCache
|
||||
|
||||
-- | delete the node with given ID from cache
|
||||
deleteCacheEntry :: NodeID -- ^ID of the node to be deleted
|
||||
-> NodeCache -- ^cache to delete from
|
||||
-> NodeCache -- ^cache without the specified element
|
||||
deleteCacheEntry = Map.update modifier
|
||||
where
|
||||
modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
|
||||
modifier NodeEntry {} = Nothing
|
||||
|
||||
|
||||
|
|
|
@ -25,8 +25,6 @@ module Hash2Pub.FediChord (
|
|||
, CacheEntry(..)
|
||||
, cacheGetNodeStateUnvalidated
|
||||
, initCache
|
||||
, addCacheEntry
|
||||
, deleteCacheEntry
|
||||
, cacheLookup
|
||||
, cacheLookupSucc
|
||||
, cacheLookupPred
|
||||
|
@ -234,32 +232,6 @@ initCache = Map.fromList $ proxyEntry <$> [(maxBound, (minBound, Forwards)), (mi
|
|||
where
|
||||
proxyEntry (from,to) = (from, ProxyEntry to Nothing)
|
||||
|
||||
-- | insert or update a new @NodeState@ node into the cache
|
||||
addCacheEntry :: NodeState -- ^ the node to insert
|
||||
-> Integer -- ^ initial age penalty in seconds
|
||||
-> NodeCache -- ^ node cache to insert to
|
||||
-> IO NodeCache -- ^ new node cache with the element inserted
|
||||
addCacheEntry node timestamp cache = do
|
||||
now <- getPOSIXTime
|
||||
let
|
||||
-- TODO: limit diffSeconds to some maximum value to prevent malicious nodes from inserting entries valid nearly until eternity
|
||||
timestamp' = fromInteger timestamp
|
||||
newCache = Map.insertWith insertCombineFunction (nid node) (NodeEntry False node timestamp') cache
|
||||
insertCombineFunction newVal@(NodeEntry newValidationState newNode newTimestamp) oldVal =
|
||||
case oldVal of
|
||||
ProxyEntry n _ -> ProxyEntry n (Just newVal)
|
||||
NodeEntry oldValidationState _ oldTimestamp -> NodeEntry oldValidationState newNode (max oldTimestamp newTimestamp)
|
||||
return newCache
|
||||
|
||||
-- | delete the node with given ID from cache
|
||||
deleteCacheEntry :: NodeID -- ^ID of the node to be deleted
|
||||
-> NodeCache -- ^cache to delete from
|
||||
-> NodeCache -- ^cache without the specified element
|
||||
deleteCacheEntry = Map.update modifier
|
||||
where
|
||||
modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
|
||||
modifier NodeEntry {} = Nothing
|
||||
|
||||
-- | Maybe returns the cache entry stored at given key
|
||||
cacheLookup :: NodeID -- ^lookup key
|
||||
-> NodeCache -- ^lookup cache
|
||||
|
|
Loading…
Reference in a new issue