run stylish
This commit is contained in:
parent
8d18f952cd
commit
c31baa3635
|
@ -3,7 +3,7 @@
|
|||
module Hash2Pub.ASN1Coding where
|
||||
|
||||
import Control.Exception (displayException)
|
||||
import Data.ASN1.BinaryEncoding -- asn1-encoding package
|
||||
import Data.ASN1.BinaryEncoding
|
||||
import Data.ASN1.Encoding
|
||||
import Data.ASN1.Error ()
|
||||
import Data.ASN1.Parse
|
||||
|
@ -17,8 +17,8 @@ import Data.Time.Clock.POSIX ()
|
|||
import Safe
|
||||
|
||||
import Hash2Pub.FediChord
|
||||
import Hash2Pub.Utils
|
||||
import Hash2Pub.ProtocolTypes
|
||||
import Hash2Pub.Utils
|
||||
|
||||
import Debug.Trace
|
||||
|
||||
|
|
|
@ -15,29 +15,31 @@ module Hash2Pub.DHTProtocol
|
|||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Control.Concurrent.STM.TQueue
|
||||
import Control.Concurrent.STM.TBQueue
|
||||
import qualified Data.ByteString as BS
|
||||
import qualified Data.Map as Map
|
||||
import Data.Maybe (fromMaybe, maybe)
|
||||
import qualified Data.Set as Set
|
||||
import Control.Concurrent.STM
|
||||
import Control.Concurrent.STM.TBQueue
|
||||
import Control.Concurrent.STM.TQueue
|
||||
import qualified Data.ByteString as BS
|
||||
import qualified Data.Map as Map
|
||||
import Data.Maybe (fromMaybe, maybe)
|
||||
import qualified Data.Set as Set
|
||||
import Data.Time.Clock.POSIX
|
||||
import Network.Socket hiding (recv, recvFrom, send, sendTo)
|
||||
import Network.Socket hiding (recv, recvFrom, send,
|
||||
sendTo)
|
||||
import Network.Socket.ByteString
|
||||
import System.Timeout
|
||||
|
||||
import Hash2Pub.ASN1Coding
|
||||
import Hash2Pub.FediChord (CacheEntry (..), NodeCache, NodeID,
|
||||
NodeState (..),
|
||||
cacheGetNodeStateUnvalidated,
|
||||
cacheLookup, cacheLookupPred,
|
||||
cacheLookupSucc, getPredecessors,
|
||||
getSuccessors, localCompare,
|
||||
putPredecessors, putSuccessors)
|
||||
import Hash2Pub.FediChord (CacheEntry (..), NodeCache,
|
||||
NodeID, NodeState (..),
|
||||
cacheGetNodeStateUnvalidated,
|
||||
cacheLookup, cacheLookupPred,
|
||||
cacheLookupSucc,
|
||||
getPredecessors, getSuccessors,
|
||||
localCompare, putPredecessors,
|
||||
putSuccessors)
|
||||
import Hash2Pub.ProtocolTypes
|
||||
|
||||
import Debug.Trace (trace)
|
||||
import Debug.Trace (trace)
|
||||
|
||||
-- === queries ===
|
||||
|
||||
|
@ -136,7 +138,7 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
|
|||
-- FORWARD nodeSet ->
|
||||
-- sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet
|
||||
-- -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
|
||||
-- responses = mapM
|
||||
-- responses = mapM
|
||||
|
||||
-- | Generic function for sending a request over a connected socket and collecting the response.
|
||||
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
|
||||
|
|
|
@ -122,58 +122,43 @@ a `localCompare` b
|
|||
|
||||
|
||||
-- | represents a node and all its important state
|
||||
data NodeState = NodeState {
|
||||
nid :: NodeID
|
||||
, domain :: String
|
||||
-- ^ full public domain name the node is reachable under
|
||||
, ipAddr :: HostAddress6
|
||||
-- the node's public IPv6 address
|
||||
, dhtPort :: PortNumber
|
||||
-- ^ port of the DHT itself
|
||||
, apPort :: Maybe PortNumber
|
||||
-- ^ port of the ActivityPub relay and storage service
|
||||
-- might have to be queried first
|
||||
, vServerID :: Integer
|
||||
-- ^ ID of this vserver
|
||||
|
||||
-- ==== internal state ====
|
||||
, internals :: Maybe InternalNodeState
|
||||
-- ^ data not present in the representation of remote nodes
|
||||
-- is put into its own type.
|
||||
-- This is usually @Nothing@ for all remote nodes.
|
||||
} deriving (Show, Eq)
|
||||
data NodeState = NodeState
|
||||
{ nid :: NodeID
|
||||
, domain :: String
|
||||
-- ^ full public domain name the node is reachable under
|
||||
, ipAddr :: HostAddress6
|
||||
-- the node's public IPv6 address
|
||||
, dhtPort :: PortNumber
|
||||
-- ^ port of the DHT itself
|
||||
, apPort :: Maybe PortNumber
|
||||
-- ^ port of the ActivityPub relay and storage service
|
||||
, vServerID :: Integer
|
||||
-- ^ ID of this vserver
|
||||
, internals :: Maybe InternalNodeState
|
||||
-- ^ data not present in the representation of remote nodes
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- | encapsulates all data and parameters that are not present for remote nodes
|
||||
data InternalNodeState = InternalNodeState {
|
||||
nodeCache :: IORef NodeCache
|
||||
-- ^ EpiChord node cache with expiry times for nodes
|
||||
-- as the map is ordered, lookups for the closes preceding node can be done using @lookupLT@.
|
||||
-- encapsulated into an IORef for allowing concurrent reads without locking
|
||||
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
|
||||
-- ^ cache updates are not written directly to the 'nodeCache' but queued and
|
||||
-- only processed by a single writer thread to prevent lost updates.
|
||||
-- All nodeCache modifying functions have to be partially applied enough before
|
||||
-- being put into the queue.
|
||||
--
|
||||
, successors :: [NodeID] -- could be a set instead as these are ordered as well
|
||||
-- ^ successor nodes in ascending order by distance
|
||||
, predecessors :: [NodeID]
|
||||
-- ^ predecessor nodes in ascending order by distance
|
||||
----- protocol parameters -----
|
||||
-- TODO: evaluate moving these somewhere else
|
||||
, kNeighbours :: Int
|
||||
-- ^ desired length of predecessor and successor list
|
||||
-- needs to be parameterisable for simulation purposes
|
||||
, lNumBestNodes :: Int
|
||||
-- ^ number of best next hops to provide
|
||||
-- needs to be parameterisable for simulation purposes
|
||||
, pNumParallelQueries :: Int
|
||||
-- ^ number of parallel sent queries
|
||||
-- needs to be parameterisable for simulation purposes
|
||||
, jEntriesPerSlice :: Int
|
||||
-- ^ number of desired entries per cache slice
|
||||
-- needs to be parameterisable for simulation purposes
|
||||
} deriving (Show, Eq)
|
||||
data InternalNodeState = InternalNodeState
|
||||
{ nodeCache :: IORef NodeCache
|
||||
-- ^ EpiChord node cache with expiry times for nodes
|
||||
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
|
||||
-- ^ cache updates are not written directly to the 'nodeCache' but queued and
|
||||
, successors :: [NodeID] -- could be a set instead as these are ordered as well
|
||||
-- ^ successor nodes in ascending order by distance
|
||||
, predecessors :: [NodeID]
|
||||
-- ^ predecessor nodes in ascending order by distance
|
||||
, kNeighbours :: Int
|
||||
-- ^ desired length of predecessor and successor list
|
||||
, lNumBestNodes :: Int
|
||||
-- ^ number of best next hops to provide
|
||||
, pNumParallelQueries :: Int
|
||||
-- ^ number of parallel sent queries
|
||||
, jEntriesPerSlice :: Int
|
||||
-- ^ number of desired entries per cache slice
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- | defining Show instances to be able to print NodeState for debug purposes
|
||||
instance Typeable a => Show (IORef a) where
|
||||
|
@ -230,12 +215,8 @@ getLNumBestNodes = getInternals_ lNumBestNodes
|
|||
type NodeCache = Map.Map NodeID CacheEntry
|
||||
|
||||
-- |an entry of the 'nodeCache' can hold 2 different kinds of data
|
||||
data CacheEntry =
|
||||
-- | an entry representing its validation status, the node state and its timestamp
|
||||
NodeEntry Bool NodeState POSIXTime
|
||||
-- | a proxy field for closing the ring structure, indicating the lookup shall be
|
||||
-- resumed at the given @NodeID@ unless the @ProxyEntry@ itself holds a @NodeEntry@
|
||||
| ProxyEntry (NodeID, ProxyDirection) (Maybe CacheEntry)
|
||||
data CacheEntry = NodeEntry Bool NodeState POSIXTime
|
||||
| ProxyEntry (NodeID, ProxyDirection) (Maybe CacheEntry)
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- | as a compromise, only NodeEntry components are ordered by their NodeID
|
||||
|
@ -247,7 +228,9 @@ instance Ord CacheEntry where
|
|||
extractID (NodeEntry _ eState _) = nid eState
|
||||
extractID (ProxyEntry _ _) = error "proxy entries should never appear outside of the NodeCache"
|
||||
|
||||
data ProxyDirection = Backwards | Forwards deriving (Show, Eq)
|
||||
data ProxyDirection = Backwards
|
||||
| Forwards
|
||||
deriving (Show, Eq)
|
||||
|
||||
instance Enum ProxyDirection where
|
||||
toEnum (-1) = Backwards
|
||||
|
@ -430,11 +413,12 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs
|
|||
-- persist them on disk so they can be used for all following bootstraps
|
||||
|
||||
-- | configuration values used for initialising the FediChord DHT
|
||||
data FediChordConf = FediChordConf {
|
||||
confDomain :: String
|
||||
, confIP :: HostAddress6
|
||||
, confDhtPort :: Int
|
||||
} deriving (Show, Eq)
|
||||
data FediChordConf = FediChordConf
|
||||
{ confDomain :: String
|
||||
, confIP :: HostAddress6
|
||||
, confDhtPort :: Int
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- | initialise data structures, compute own IDs and bind to listening socket
|
||||
-- ToDo: load persisted state, thus this function already operates in IO
|
||||
|
|
|
@ -1,74 +1,69 @@
|
|||
module Hash2Pub.ProtocolTypes where
|
||||
|
||||
import qualified Data.Set as Set
|
||||
import Data.Time.Clock.POSIX (POSIXTime)
|
||||
import qualified Data.Set as Set
|
||||
import Data.Time.Clock.POSIX (POSIXTime)
|
||||
|
||||
import Hash2Pub.FediChord
|
||||
import Hash2Pub.FediChord
|
||||
|
||||
data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) -- ^return closest nodes from local cache.
|
||||
-- whole cache entry is returned for making
|
||||
-- the entry time stamp available to the
|
||||
-- protocol serialiser
|
||||
| FOUND NodeState -- ^node is the responsible node for queried ID
|
||||
deriving (Show, Eq)
|
||||
data QueryResponse = FORWARD (Set.Set RemoteCacheEntry)
|
||||
| FOUND NodeState
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- === protocol serialisation data types
|
||||
|
||||
data Action =
|
||||
QueryID
|
||||
| Join
|
||||
| Leave
|
||||
| Stabilise
|
||||
| Ping
|
||||
deriving (Show, Eq, Enum)
|
||||
data Action = QueryID
|
||||
| Join
|
||||
| Leave
|
||||
| Stabilise
|
||||
| Ping
|
||||
deriving (Show, Eq, Enum)
|
||||
|
||||
data FediChordMessage =
|
||||
Request {
|
||||
requestID :: Integer
|
||||
, sender :: NodeState
|
||||
, parts :: Integer
|
||||
, part :: Integer
|
||||
-- ^ part starts at 0
|
||||
, action :: Action
|
||||
, payload :: Maybe ActionPayload
|
||||
}
|
||||
| Response {
|
||||
responseTo :: Integer
|
||||
, senderID :: NodeID
|
||||
, parts :: Integer
|
||||
, part :: Integer
|
||||
, action :: Action
|
||||
, payload :: Maybe ActionPayload
|
||||
} deriving (Show, Eq)
|
||||
data FediChordMessage = Request
|
||||
{ requestID :: Integer
|
||||
, sender :: NodeState
|
||||
, parts :: Integer
|
||||
, part :: Integer
|
||||
-- ^ part starts at 0
|
||||
, action :: Action
|
||||
, payload :: Maybe ActionPayload
|
||||
}
|
||||
| Response
|
||||
{ responseTo :: Integer
|
||||
, senderID :: NodeID
|
||||
, parts :: Integer
|
||||
, part :: Integer
|
||||
, action :: Action
|
||||
, payload :: Maybe ActionPayload
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
data ActionPayload =
|
||||
QueryIDRequestPayload {
|
||||
queryTargetID :: NodeID
|
||||
, queryLBestNodes :: Integer
|
||||
}
|
||||
| JoinRequestPayload
|
||||
| LeaveRequestPayload {
|
||||
leaveSuccessors :: [NodeID]
|
||||
, leavePredecessors :: [NodeID]
|
||||
}
|
||||
| StabiliseRequestPayload
|
||||
| PingRequestPayload
|
||||
| QueryIDResponsePayload {
|
||||
queryResult :: QueryResponse
|
||||
}
|
||||
| JoinResponsePayload {
|
||||
joinSuccessors :: [NodeID]
|
||||
, joinPredecessors :: [NodeID]
|
||||
, joinCache :: [RemoteCacheEntry]
|
||||
}
|
||||
| LeaveResponsePayload
|
||||
| StabiliseResponsePayload {
|
||||
stabiliseSuccessors :: [NodeID]
|
||||
, stabilisePredecessors :: [NodeID]
|
||||
}
|
||||
| PingResponsePayload {
|
||||
pingNodeStates :: [NodeState]
|
||||
}
|
||||
data ActionPayload = QueryIDRequestPayload
|
||||
{ queryTargetID :: NodeID
|
||||
, queryLBestNodes :: Integer
|
||||
}
|
||||
| JoinRequestPayload
|
||||
| LeaveRequestPayload
|
||||
{ leaveSuccessors :: [NodeID]
|
||||
, leavePredecessors :: [NodeID]
|
||||
}
|
||||
| StabiliseRequestPayload
|
||||
| PingRequestPayload
|
||||
| QueryIDResponsePayload
|
||||
{ queryResult :: QueryResponse
|
||||
}
|
||||
| JoinResponsePayload
|
||||
{ joinSuccessors :: [NodeID]
|
||||
, joinPredecessors :: [NodeID]
|
||||
, joinCache :: [RemoteCacheEntry]
|
||||
}
|
||||
| LeaveResponsePayload
|
||||
| StabiliseResponsePayload
|
||||
{ stabiliseSuccessors :: [NodeID]
|
||||
, stabilisePredecessors :: [NodeID]
|
||||
}
|
||||
| PingResponsePayload
|
||||
{ pingNodeStates :: [NodeState]
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- | global limit of parts per message used when (de)serialising messages.
|
||||
|
|
Loading…
Reference in a new issue