forked from schmittlauch/Hash2Pub
Compare commits
3 commits
master
...
dhtNetwork
Author | SHA1 | Date | |
---|---|---|---|
Trolli Schmittlauch | 4e62bb08f8 | ||
Trolli Schmittlauch | 8b01ad2f37 | ||
Trolli Schmittlauch | b8be20b86e |
0
Hash2Pub/.gitignore → .gitignore
vendored
0
Hash2Pub/.gitignore → .gitignore
vendored
|
@ -46,7 +46,7 @@ category: Network
|
||||||
extra-source-files: CHANGELOG.md
|
extra-source-files: CHANGELOG.md
|
||||||
|
|
||||||
common deps
|
common deps
|
||||||
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute
|
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl
|
||||||
ghc-options: -Wall
|
ghc-options: -Wall
|
||||||
|
|
||||||
|
|
17
Readme.md
Normal file
17
Readme.md
Normal file
|
@ -0,0 +1,17 @@
|
||||||
|
# Hash2Pub
|
||||||
|
|
||||||
|
***This is heavily WIP and does not provide any useful functionality yet***.
|
||||||
|
I aim for always having the master branch at a state where it builds and tests pass.
|
||||||
|
|
||||||
|
A fully-decentralised relay for global hashtag federation in [ActivityPub](https://activitypub.rocks) based on a distributed hash table.
|
||||||
|
It allows querying and subscribing to all posts of a certain hashtag and is implemented in Haskell.
|
||||||
|
|
||||||
|
This is the practical implementation of the concept presented in the paper [Decentralised Hashtag Search and Subscription for Federated Social Networks](https://git.orlives.de/schmittlauch/paper_hashtag_federation). A 30 minutes [introduction talk](https://conf.tube/videos/watch/340eb706-28c0-4a43-9364-700297ca96cb) is available as well.
|
||||||
|
|
||||||
|
The ASN.1 module schema used for DHT messages can be found in `FediChord.asn1`.
|
||||||
|
|
||||||
|
## Building
|
||||||
|
|
||||||
|
The project and its developent environment are built with [Nix](https://nixos.org/nix/).
|
||||||
|
|
||||||
|
The development environment can be entered with `nix-shell`. Then the project can be built with `cabal build` from within the environment, or using `nix-shell --command "cabal build"` to do both steps at once.
|
|
@ -23,6 +23,8 @@ import qualified Data.Map as Map
|
||||||
import Data.Time.Clock.POSIX
|
import Data.Time.Clock.POSIX
|
||||||
import Network.Socket hiding (send, sendTo, recv, recvFrom)
|
import Network.Socket hiding (send, sendTo, recv, recvFrom)
|
||||||
import Network.Socket.ByteString
|
import Network.Socket.ByteString
|
||||||
|
import System.Timeout
|
||||||
|
import Control.Monad.State.Strict
|
||||||
|
|
||||||
import Hash2Pub.FediChord
|
import Hash2Pub.FediChord
|
||||||
( NodeID
|
( NodeID
|
||||||
|
@ -210,6 +212,52 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
|
||||||
adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry
|
adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry
|
||||||
adjustFunc entry = entry
|
adjustFunc entry = entry
|
||||||
|
|
||||||
|
-- ====== message send and receive operations ======
|
||||||
|
|
||||||
|
requestQueryID :: NodeState -> NodeID -> IO NodeState
|
||||||
|
-- 1. do a local lookup for the l closest nodes
|
||||||
|
-- 2. create l sockets
|
||||||
|
-- 3. send a message async concurrently to all l nodes
|
||||||
|
-- 4. collect the results, insert them into cache
|
||||||
|
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
|
||||||
|
requestQueryID ns targetID = do
|
||||||
|
cacheSnapshot <- readIORef $ getNodeCacheRef ns
|
||||||
|
let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID
|
||||||
|
-- FOUND can only be returned if targetID is owned by local node
|
||||||
|
case localResult of
|
||||||
|
FOUND thisNode -> return thisNode
|
||||||
|
FORWARD nodeSet ->
|
||||||
|
sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet
|
||||||
|
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
|
||||||
|
responses = mapM
|
||||||
|
|
||||||
|
sendRequestTo :: Int -- ^ timeout in seconds
|
||||||
|
-> Int -- ^ number of retries
|
||||||
|
-> FediChordMessage -- ^ the message to be sent
|
||||||
|
-> Socket -- ^ connected socket to use for sending
|
||||||
|
-> IO (Set.Set FediChordMessage) -- ^ responses
|
||||||
|
sendRequestTo timeout attempts msg sock = do
|
||||||
|
let requests = serialiseMessage 1200 msg
|
||||||
|
-- ToDo: make attempts and timeout configurable
|
||||||
|
attempts 3 . timeout 5000 $ do
|
||||||
|
where
|
||||||
|
-- state reingeben: state = noch nicht geackte messages, result = responses
|
||||||
|
sendAndAck :: Socket -> StateT (Map.Map Integer BS.ByteString) IO (Set.Set FediChordMessage)
|
||||||
|
sendAndAck sock = do
|
||||||
|
remainingSends <- get
|
||||||
|
sendMany sock $ Map.elems remainingSends
|
||||||
|
-- timeout pro receive socket, danach catMaybes
|
||||||
|
-- wichtig: Pakete können dupliziert werden, dh es können mehr ACKs als gesendete parts ankommen
|
||||||
|
replicateM
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
-- idea: send all parts at once
|
||||||
|
-- Set/ Map with unacked parts
|
||||||
|
-- then recv with timeout for |unackedParts| attempts, receive acked parts from set/ map
|
||||||
|
-- how to manage individual retries? nested "attempts"
|
||||||
|
|
||||||
-- | retry an IO action at most *i* times until it delivers a result
|
-- | retry an IO action at most *i* times until it delivers a result
|
||||||
attempts :: Int -- ^ number of retries *i*
|
attempts :: Int -- ^ number of retries *i*
|
||||||
-> IO (Maybe a) -- ^ action to retry
|
-> IO (Maybe a) -- ^ action to retry
|
Loading…
Reference in a new issue