-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sync-v2): sync-v2 implemented, sync-v1 still default #275
Conversation
d3fda51
to
aea60ec
Compare
aea60ec
to
124a00e
Compare
266c111
to
4cae7ef
Compare
124a00e
to
1e30cd3
Compare
67cd607
to
92c4036
Compare
1e30cd3
to
336cdbc
Compare
92c4036
to
7648a4b
Compare
1584396
to
6abbfed
Compare
7648a4b
to
768047f
Compare
6abbfed
to
02ffc0d
Compare
768047f
to
b1d6f5a
Compare
4aaec0f
to
e11ff9d
Compare
e11ff9d
to
4245bb4
Compare
if self._started: | ||
raise Exception('NodeSyncBlock is already running') | ||
self._started = True | ||
self._lc_run.start(5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would there be any penalty for us if we decrease this time from 5s to maybe 1s or 2s?
I have the impression we could be losing some seconds between the end of the current streaming and the start of the next one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might just move it to a variable and let us change it through syscall.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, I'll move it to sysctl.
tx_bytes = base64.b64decode(payload) | ||
tx = tx_or_block_from_bytes(tx_bytes) | ||
assert tx.hash is not None | ||
if not isinstance(tx, Transaction): | ||
self.log.warn('not a transaction', hash=tx.hash_hex) | ||
# Not a transaction. Punish peer? | ||
return | ||
|
||
self._tx_received += 1 | ||
if self._tx_received > self._tx_max_quantity + 1: | ||
self.log.warn('too many txs received') | ||
self.state = PeerState.ERROR | ||
return | ||
|
||
try: | ||
# this methods takes care of checking if the tx already exists, it will take care of doing at least | ||
# a basic validation | ||
# self.log.debug('add new tx', tx=tx.hash_hex) | ||
if self.partial_vertex_exists(tx.hash): | ||
# XXX: early terminate? | ||
self.log.debug('tx early terminate?', tx_id=tx.hash.hex()) | ||
else: | ||
self.log.debug('tx received', tx_id=tx.hash.hex()) | ||
self.on_new_tx(tx, propagate_to_peers=False, quiet=True, reject_locked_reward=True) | ||
except HathorError: | ||
self.log.warn('invalid new tx', exc_info=True) | ||
# Invalid block?! | ||
# Invalid transaction?! | ||
# Maybe stop syncing and punish peer. | ||
self.state = PeerState.ERROR | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are stopping sync and/or punishing a peer for sending invalid txs, we shouldn't forget about handling invalid payloads on lines 961 and 962. The same for handle_blocks()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe refactor the following lines to a _parse_tx_bytes(tx_bytes: bytes) -> BaseTransaction
method.
tx_bytes = base64.b64decode(payload)
tx = tx_or_block_from_bytes(tx_bytes)
So, handlers can check vertex types only.
if tx is None: | ||
self.log.error('failed to get tx', tx_id=tx_id.hex()) | ||
self.protocol.send_error_and_close_connection(f'DATA mempool {tx_id.hex()} not found') | ||
raise | ||
if tx.hash != tx_id: | ||
self.protocol.send_error_and_close_connection(f'DATA mempool {tx_id.hex()} hash mismatch') | ||
raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are multiple places where we call send_error_and_close_connection()
but do not raise
. Considering consistency, is this expected? Should we raise in the other places too, or not raise here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes more sense to return instead.
raise | ||
return tx | ||
|
||
def get_data(self, tx_id: bytes, origin: str) -> Deferred: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should origin
be an enum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe. Not sure whether it has the peer id or its more related to the type of origin (sync-v2-mempool, sync-v2-blocks, sync-v2-mempool).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to use an enum, I'll also refactor the methods a little bit.
a9dc05b
to
a7581ef
Compare
Co-authored-by: Marcelo Salhab Brogliato <[email protected]> Co-authored-by: Pedro Ferreira <[email protected]>
a7581ef
to
f63f3b8
Compare
These changes were separated from #236, it should now have mostly the sync-v2 base code itself. RFC
Acceptance Criteria
DepsIndex
and its both implementations (memory and rocksdb), allowing voided vertices and using the correct scope when retrieving vertices from the storage.SyncVersion.V2
to'v2'
(previously'v2-fake'
).SyncV2Factory
and use it onConnectionsManager._sync_factories
.NodeBlockSync
that sync blocks and its transactions.BlockchainStreaming
andTransactionsStreaming
to stream vertices as requested by peers. They are used byNodeBlockSync
.TransactionStorage.iter_mempool_tips_from_best_index()
.Current issues