Hash2Pub/src/Hash2Pub/FediChord.hs
Trolli Schmittlauch e3bfa26ddb join request + large FediChord refactoring
- implement sending of initial join request sending, response parsing
  and cache population (untested but compiles)
- refactor basic types and their functions into Hash2Pub.FediChordTypes
  to prevent import loops, leaving Hash2Pub.FediChord to contain the
  high level actions called from Main
2020-05-25 22:03:24 +02:00

160 lines
6 KiB
Haskell

{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE OverloadedStrings #-}
{- |
Module : FediChord
Description : An opinionated implementation of the EpiChord DHT by Leong et al.
Copyright : (c) schmittlauch, 2019-2020
License : AGPL-3
Stability : experimental
Modernised EpiChord + k-choices load balancing
-}
module Hash2Pub.FediChord (
NodeID -- abstract, but newtype constructors cannot be hidden
, getNodeID
, toNodeID
, NodeState (..)
, LocalNodeState (..)
, RemoteNodeState (..)
, setSuccessors
, setPredecessors
, NodeCache
, CacheEntry(..)
, cacheGetNodeStateUnvalidated
, initCache
, cacheLookup
, cacheLookupSucc
, cacheLookupPred
, localCompare
, genNodeID
, genNodeIDBS
, genKeyID
, genKeyIDBS
, byteStringToUInteger
, ipAddrAsBS
, bsAsIpAddr
, FediChordConf(..)
, fediChordInit
, nodeStateInit
, mkServerSocket
, mkSendSocket
, resolve
, cacheWriter
) where
import Control.Exception
import Data.Foldable (foldr')
import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe, isJust, mapMaybe)
import qualified Data.Set as Set
import Data.Time.Clock.POSIX
import Network.Socket
-- for hashing and ID conversion
import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue
import Control.Monad (forever)
import Crypto.Hash
import qualified Data.ByteArray as BA
import qualified Data.ByteString as BS
import qualified Data.ByteString.UTF8 as BSU
import Data.IORef
import Data.IP (IPv6, fromHostAddress6,
toHostAddress6)
import Data.Typeable (Typeable (..), typeOf)
import Data.Word
import qualified Network.ByteOrder as NetworkBytes
import Hash2Pub.DHTProtocol
import Hash2Pub.FediChordTypes
import Hash2Pub.Utils
import Debug.Trace (trace)
-- | initialise data structures, compute own IDs and bind to listening socket
-- ToDo: load persisted state, thus this function already operates in IO
fediChordInit :: FediChordConf -> IO (Socket, LocalNodeState)
fediChordInit conf = do
initialState <- nodeStateInit conf
serverSock <- mkServerSocket (getIpAddr initialState) (getDhtPort initialState)
pure (serverSock, initialState)
-- | initialises the 'NodeState' for this local node.
-- Separated from 'fediChordInit' to be usable in tests.
nodeStateInit :: FediChordConf -> IO LocalNodeState
nodeStateInit conf = do
cacheRef <- newIORef initCache
q <- atomically newTQueue
let
containedState = RemoteNodeState {
domain = confDomain conf
, ipAddr = confIP conf
, nid = genNodeID (confIP conf) (confDomain conf) 0
, dhtPort = toEnum $ confDhtPort conf
, servicePort = 0
, vServerID = 0
}
initialState = LocalNodeState {
nodeState = containedState
, nodeCacheRef = cacheRef
, cacheWriteQueue = q
, successors = []
, predecessors = []
, kNeighbours = 3
, lNumBestNodes = 3
, pNumParallelQueries = 2
, jEntriesPerSlice = 2
}
pure initialState
fediChordJoin :: LocalNodeState -- ^ the local 'NodeState'
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node
-> IO (Either String LocalNodeState) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message
fediChordJoin ns (joinHost, joinPort) = do
-- can be invoked multiple times with all known bootstrapping nodes until successfully joined
sock <- mkSendSocket joinHost joinPort
-- 1. get routed to placement of own ID until FOUND:
-- Initialise an empty cache only with the responses from a bootstrapping node
bootstrapResponse <- sendQueryIdMessage (getNid ns) ns sock
if bootstrapResponse == Set.empty
then pure . Left $ "Bootstrapping node " <> show joinHost <> " gave no response."
else do
now <- getPOSIXTime
-- create new cache with all returned node responses
let bootstrapCache =
-- traverse response parts
foldr' (\resp cacheAcc -> case queryResult <$> payload resp of
Nothing -> cacheAcc
Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
)
initCache bootstrapResponse
-- get routed to the currently responsible node, based on the response
-- from the bootstrapping node
currentlyResponsible <- queryIdLookupLoop bootstrapCache ns $ getNid ns
-- do actual join
joinResult <- requestJoin currentlyResponsible ns
case joinResult of
Nothing -> pure . Left $ "Error joining on " <> show currentlyResponsible
Just joinedNS -> pure . Right $ joinedNS
-- 2. then send a join to the currently responsible node
-- after successful join, finally add own node to the cache
-- | cache updater thread that waits for incoming NodeCache update instructions on
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
cacheWriter :: LocalNodeState -> IO ()
cacheWriter ns = do
let writeQueue' = cacheWriteQueue ns
forever $ do
f <- atomically $ readTQueue writeQueue'
let
refModifier :: NodeCache -> (NodeCache, ())
refModifier nc = (f nc, ())
atomicModifyIORef' (nodeCacheRef ns) refModifier