Conversation
WalkthroughThis pull request introduces a comprehensive restructuring of the node architecture. The P2P layer transitions from a pubsub model to an asyncio TCP-based network with server lifecycle management. The main application layer is refactored to separate concerns into mining, networking, and CLI interaction, with new entry points and handler abstractions. Transaction timestamp handling is also updated to support multiple input formats. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant CLI as CLI Handler
participant Node as run_node
participant Chain
participant Mempool
participant Network
participant Peer
User->>Node: Start with --port/--connect/--fund args
Node->>Node: Initialize wallet (sk, pk)
Node->>Chain: Create blockchain
Node->>Mempool: Create mempool
Node->>Network: Create P2PNetwork
Node->>Network: start(port)
activate Network
Network->>Network: Listen for incoming connections
deactivate Network
opt connect_to peer
Node->>Network: connect_to_peer(host, port)
Network->>Peer: Establish TCP connection
Peer->>Network: Connection established
Node->>Network: on_peer_connected callback
Network->>Peer: Send state sync
end
opt fund wallet
Node->>Chain: Add initial funds to wallet address
end
Node->>CLI: Run cli_loop
activate CLI
loop User commands
User->>CLI: Enter command (balance/send/mine/peers/connect)
alt balance
CLI->>Chain: Query balance for pk
else send
CLI->>Mempool: Add transaction
CLI->>Network: broadcast_transaction
else mine
CLI->>Chain: mine_and_process_block(chain, mempool, miner_pk)
Chain->>Mempool: Extract pending txs
Chain->>Chain: Create and mine block
CLI->>Network: broadcast_block
else peers/connect
CLI->>Network: Connect to peer or show peer_count
end
end
deactivate CLI
sequenceDiagram
participant Remote as Remote Peer
participant Handler as Network Handler
participant Chain
participant Mempool
participant Network as P2PNetwork
Remote->>Network: Send message (sync/tx/block)
Network->>Network: _listen_to_peer receives line
Network->>Handler: make_network_handler processes msg
alt Message type: sync
Handler->>Chain: Validate and integrate remote state
else Message type: tx
Handler->>Mempool: Add transaction to mempool
Mempool->>Mempool: Track for mining
else Message type: block
Handler->>Chain: Validate and integrate block
Handler->>Mempool: Remove mined transactions
end
Handler->>Network: Update local state
Network->>Network: _broadcast_raw to other peers (if needed)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
main.py (1)
50-71:⚠️ Potential issue | 🟠 MajorLost transactions on block rejection: restore failed transactions to mempool.
The
get_transactions_for_block()call clears the mempool immediately (lines 52-53 in minichain/mempool.py). Ifchain.add_block()returns False due to validation failure, those transactions are permanently discarded. You should restore rejected transaction blocks back to the mempool or store them separately for retry.Note:
credit_mining_reward()is not called insideadd_block(), so double-reward accounting is not a concern.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@main.py` around lines 50 - 71, mine_and_process_block currently empties the mempool via get_transactions_for_block() then discards those txs if chain.add_block(mined_block) returns False; capture the pending_txs and, on block rejection, restore them to the mempool (e.g., call mempool.restore_transactions(pending_txs) or mempool.add_transactions(pending_txs) depending on the mempool API) before returning None, ensuring you do not call chain.state.credit_mining_reward(miner_pk) on failure; update mine_and_process_block to reinsert failed transactions back into mempool when chain.add_block(mined_block) is False.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@main.py`:
- Around line 85-93: The sync handler currently merges unverified balances from
payload.get("accounts") into chain.state.accounts whenever msg_type == "sync";
restrict this by validating the peer or chain state before merging: check the
connection origin (e.g., remote_addr or a provided sender id) and only accept
sync if the peer is in a TRUSTED_PEERS allowlist (or connection.is_localhost) OR
if chain.state.accounts is empty (fresh chain), otherwise ignore the payload and
log a warning; update the sync block around msg_type == "sync" to perform these
checks before looping over payload.get("accounts") and ensure you still log
accepted syncs and rejected attempts.
- Around line 264-275: The code assigns a private callback directly
(network._on_peer_connected = on_peer_connected); add a public setter on
P2PNetwork named set_on_peer_connected(callback) that stores the callback
internally and is used when peers connect, then replace the direct assignment
with network.set_on_peer_connected(on_peer_connected). Ensure the setter accepts
an async callback with the same signature as on_peer_connected (e.g., async def
on_peer_connected(writer)), that the P2PNetwork uses the stored callback when
handling new connections, and keep use of chain.state.accounts, writer, and
logger unchanged.
- Around line 213-224: The CLI currently calls network.connect_to_peer(host,
port) and ignores its boolean return; update the "connect" command handler (the
elif cmd == "connect" block) to capture the returned value (e.g., success =
await network.connect_to_peer(host, port)) and print a clear user-facing message
on failure or success (for example "Connected to host:port" on True and "Failed
to connect to host:port" on False), preserving the existing input validation
around parts[1] and the host/port parsing.
In `@minichain/p2p.py`:
- Around line 44-50: The start method currently binds to "0.0.0.0" which is too
permissive; change the signature of async def start(self, port: int = 9000) to
accept an optional host parameter (e.g. host: str = "127.0.0.1"), use that host
when calling asyncio.start_server(self._handle_incoming, host, port) so
self._server binds to localhost by default, and update any callers of P2P.start
to pass a host when external exposure is desired; keep references to
_handle_incoming and self._server unchanged.
- Around line 137-149: The _broadcast_raw method currently removes failed peer
tuples from self._peers without closing their StreamWriter, leaking file
descriptors/tasks; update _broadcast_raw to, for each (reader, writer) in the
disconnected list, call writer.close() and await writer.wait_closed() (with
try/except around the close/wait to swallow errors), then remove the pair from
self._peers if present; ensure this uses the existing reader/writer tuple
identifiers so you update the same pairs you collected on exception.
- Around line 52-67: In stop(), after cancelling each task in self._listen_tasks
(the list of listener tasks managed by the server), await them to ensure they
finish and run their cleanup; e.g. call asyncio.gather(*self._listen_tasks,
return_exceptions=True) or await each task and handle asyncio.CancelledError so
cleanup isn't skipped, then clear self._listen_tasks; update the stop method
(references: stop, self._listen_tasks) to cancel, await, and handle exceptions
before closing peers and the server.
- Around line 73-84: connect_to_peer currently adds outbound peers but never
invokes the same handshake/callback used for inbound peers, so outbound
connections don't trigger state sync; after successfully opening connection and
appending (reader, writer) to self._peers, call the same callback used for
inbound connections (self._on_peer_connected) with the peer key/metadata you use
elsewhere (e.g., f"{host}:{port}" or the writer/reader tuple) before creating
the _listen_to_peer task, and ensure any required args match
_on_peer_connected's signature; this keeps behavior symmetric with inbound peers
and enables state sync for outbound connections (also verify _listen_tasks still
appends the listen task afterwards).
---
Outside diff comments:
In `@main.py`:
- Around line 50-71: mine_and_process_block currently empties the mempool via
get_transactions_for_block() then discards those txs if
chain.add_block(mined_block) returns False; capture the pending_txs and, on
block rejection, restore them to the mempool (e.g., call
mempool.restore_transactions(pending_txs) or
mempool.add_transactions(pending_txs) depending on the mempool API) before
returning None, ensuring you do not call
chain.state.credit_mining_reward(miner_pk) on failure; update
mine_and_process_block to reinsert failed transactions back into mempool when
chain.add_block(mined_block) is False.
| if msg_type == "sync": | ||
| # Merge remote state into local state (for accounts we don't have yet) | ||
| remote_accounts = payload.get("accounts", {}) | ||
| for addr, acc in remote_accounts.items(): | ||
| if addr not in chain.state.accounts: | ||
| chain.state.accounts[addr] = acc | ||
| logger.info("🔄 Synced account %s... (balance=%d)", addr[:12], acc.get("balance", 0)) | ||
| logger.info("🔄 State sync complete — %d accounts", len(chain.state.accounts)) | ||
|
|
There was a problem hiding this comment.
State sync trusts unverified balances from peers.
Any peer can mint balances by sending sync. Consider gating sync to a trusted mode, localhost-only peers, or only when the chain is empty.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@main.py` around lines 85 - 93, The sync handler currently merges unverified
balances from payload.get("accounts") into chain.state.accounts whenever
msg_type == "sync"; restrict this by validating the peer or chain state before
merging: check the connection origin (e.g., remote_addr or a provided sender id)
and only accept sync if the peer is in a TRUSTED_PEERS allowlist (or
connection.is_localhost) OR if chain.state.accounts is empty (fresh chain),
otherwise ignore the payload and log a warning; update the sync block around
msg_type == "sync" to perform these checks before looping over
payload.get("accounts") and ensure you still log accepted syncs and rejected
attempts.
| elif cmd == "connect": | ||
| if len(parts) < 2: | ||
| print(" Usage: connect <host>:<port>") | ||
| continue | ||
| try: | ||
| host, port_str = parts[1].rsplit(":", 1) | ||
| port = int(port_str) | ||
| except ValueError: | ||
| print(" Invalid format. Use host:port") | ||
| continue | ||
| await network.connect_to_peer(host, port) | ||
|
|
There was a problem hiding this comment.
Surface connect failures to the user.
connect_to_peer() returns a boolean but the CLI ignores it.
💬 Suggested change
- await network.connect_to_peer(host, port)
+ success = await network.connect_to_peer(host, port)
+ if success:
+ print(f" ✅ Connected to {host}:{port}")
+ else:
+ print(f" ❌ Failed to connect to {host}:{port}")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| elif cmd == "connect": | |
| if len(parts) < 2: | |
| print(" Usage: connect <host>:<port>") | |
| continue | |
| try: | |
| host, port_str = parts[1].rsplit(":", 1) | |
| port = int(port_str) | |
| except ValueError: | |
| print(" Invalid format. Use host:port") | |
| continue | |
| await network.connect_to_peer(host, port) | |
| elif cmd == "connect": | |
| if len(parts) < 2: | |
| print(" Usage: connect <host>:<port>") | |
| continue | |
| try: | |
| host, port_str = parts[1].rsplit(":", 1) | |
| port = int(port_str) | |
| except ValueError: | |
| print(" Invalid format. Use host:port") | |
| continue | |
| success = await network.connect_to_peer(host, port) | |
| if success: | |
| print(f" ✅ Connected to {host}:{port}") | |
| else: | |
| print(f" ❌ Failed to connect to {host}:{port}") |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@main.py` around lines 213 - 224, The CLI currently calls
network.connect_to_peer(host, port) and ignores its boolean return; update the
"connect" command handler (the elif cmd == "connect" block) to capture the
returned value (e.g., success = await network.connect_to_peer(host, port)) and
print a clear user-facing message on failure or success (for example "Connected
to host:port" on True and "Failed to connect to host:port" on False), preserving
the existing input validation around parts[1] and the host/port parsing.
| async def on_peer_connected(writer): | ||
| import json as _json | ||
| sync_msg = _json.dumps({ | ||
| "type": "sync", | ||
| "data": {"accounts": chain.state.accounts} | ||
| }) + "\n" | ||
| writer.write(sync_msg.encode()) | ||
| await writer.drain() | ||
| logger.info("🔄 Sent state sync to new peer") | ||
|
|
||
| block = Block( | ||
| index=block_data.get("index"), | ||
| previous_hash=block_data.get("previous_hash"), | ||
| transactions=transactions, | ||
| timestamp=block_data.get("timestamp"), | ||
| difficulty=block_data.get("difficulty") | ||
| ) | ||
| network._on_peer_connected = on_peer_connected | ||
|
|
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Avoid setting private _on_peer_connected directly.
Prefer a public setter to keep the P2P API encapsulated.
🧩 Suggested change
- network._on_peer_connected = on_peer_connected
+ network.set_on_peer_connected(on_peer_connected)This requires adding set_on_peer_connected() on P2PNetwork.
🧰 Tools
🪛 Ruff (0.15.2)
[warning] 264-264: Missing return type annotation for private function on_peer_connected
Add return type annotation: None
(ANN202)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@main.py` around lines 264 - 275, The code assigns a private callback directly
(network._on_peer_connected = on_peer_connected); add a public setter on
P2PNetwork named set_on_peer_connected(callback) that stores the callback
internally and is used when peers connect, then replace the direct assignment
with network.set_on_peer_connected(on_peer_connected). Ensure the setter accepts
an async callback with the same signature as on_peer_connected (e.g., async def
on_peer_connected(writer)), that the P2PNetwork uses the stored callback when
handling new connections, and keep use of chain.state.accounts, writer, and
logger unchanged.
| async def start(self, port: int = 9000): | ||
| """Start listening for incoming peer connections on the given port.""" | ||
| self._port = port | ||
| self._server = await asyncio.start_server( | ||
| self._handle_incoming, "0.0.0.0", port | ||
| ) | ||
| logger.info("Network: Listening on 0.0.0.0:%d", port) |
There was a problem hiding this comment.
Default bind to 0.0.0.0 is too permissive.
For a demo, binding all interfaces by default risks unintended exposure. Prefer localhost by default and make external exposure opt‑in. Update callers if you add a host parameter.
🔒 Suggested change
- async def start(self, port: int = 9000):
+ async def start(self, port: int = 9000, host: str = "127.0.0.1"):
"""Start listening for incoming peer connections on the given port."""
self._port = port
self._server = await asyncio.start_server(
- self._handle_incoming, "0.0.0.0", port
+ self._handle_incoming, host, port
)
- logger.info("Network: Listening on 0.0.0.0:%d", port)
+ logger.info("Network: Listening on %s:%d", host, port)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def start(self, port: int = 9000): | |
| """Start listening for incoming peer connections on the given port.""" | |
| self._port = port | |
| self._server = await asyncio.start_server( | |
| self._handle_incoming, "0.0.0.0", port | |
| ) | |
| logger.info("Network: Listening on 0.0.0.0:%d", port) | |
| async def start(self, port: int = 9000, host: str = "127.0.0.1"): | |
| """Start listening for incoming peer connections on the given port.""" | |
| self._port = port | |
| self._server = await asyncio.start_server( | |
| self._handle_incoming, host, port | |
| ) | |
| logger.info("Network: Listening on %s:%d", host, port) |
🧰 Tools
🪛 Ruff (0.15.2)
[error] 48-48: Possible binding to all interfaces
(S104)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@minichain/p2p.py` around lines 44 - 50, The start method currently binds to
"0.0.0.0" which is too permissive; change the signature of async def start(self,
port: int = 9000) to accept an optional host parameter (e.g. host: str =
"127.0.0.1"), use that host when calling
asyncio.start_server(self._handle_incoming, host, port) so self._server binds to
localhost by default, and update any callers of P2P.start to pass a host when
external exposure is desired; keep references to _handle_incoming and
self._server unchanged.
| async def stop(self): | ||
| """Clean up network resources cleanly upon shutdown.""" | ||
| """Gracefully shut down the server and disconnect all peers.""" | ||
| logger.info("Network: Shutting down") | ||
| if self.pubsub: | ||
| for task in self._listen_tasks: | ||
| task.cancel() | ||
| for _, writer in self._peers: | ||
| try: | ||
| shutdown_meth = None | ||
| for method_name in ('close', 'stop', 'aclose', 'shutdown'): | ||
| if hasattr(self.pubsub, method_name): | ||
| shutdown_meth = getattr(self.pubsub, method_name) | ||
| break | ||
|
|
||
| if shutdown_meth: | ||
| import asyncio | ||
| res = shutdown_meth() | ||
| if asyncio.iscoroutine(res): | ||
| await res | ||
| except Exception as e: | ||
| logger.error("Network: Error shutting down pubsub: %s", e) | ||
| finally: | ||
| self.pubsub = None | ||
|
|
||
| async def _broadcast_message(self, topic, msg_type, payload): | ||
| msg = json.dumps({"type": msg_type, "data": payload}) | ||
| if self.pubsub: | ||
| writer.close() | ||
| await writer.wait_closed() | ||
| except Exception: | ||
| pass | ||
| self._peers.clear() | ||
| if self._server: | ||
| self._server.close() | ||
| await self._server.wait_closed() | ||
| self._server = None |
There was a problem hiding this comment.
Await cancelled listener tasks during shutdown.
Canceling without awaiting can leave pending tasks and skip cleanup.
🧹 Suggested change
for task in self._listen_tasks:
task.cancel()
+ if self._listen_tasks:
+ await asyncio.gather(*self._listen_tasks, return_exceptions=True)
+ self._listen_tasks.clear()🧰 Tools
🪛 Ruff (0.15.2)
[error] 61-62: try-except-pass detected, consider logging the exception
(S110)
[warning] 61-61: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@minichain/p2p.py` around lines 52 - 67, In stop(), after cancelling each task
in self._listen_tasks (the list of listener tasks managed by the server), await
them to ensure they finish and run their cleanup; e.g. call
asyncio.gather(*self._listen_tasks, return_exceptions=True) or await each task
and handle asyncio.CancelledError so cleanup isn't skipped, then clear
self._listen_tasks; update the stop method (references: stop,
self._listen_tasks) to cancel, await, and handle exceptions before closing peers
and the server.
| async def connect_to_peer(self, host: str, port: int) -> bool: | ||
| """Actively connect to another MiniChain node.""" | ||
| try: | ||
| reader, writer = await asyncio.open_connection(host, port) | ||
| self._peers.append((reader, writer)) | ||
| task = asyncio.create_task(self._listen_to_peer(reader, writer, f"{host}:{port}")) | ||
| self._listen_tasks.append(task) | ||
| logger.info("Network: Connected to peer %s:%d", host, port) | ||
| return True | ||
| except Exception as e: | ||
| logger.error("Network: Failed to connect to %s:%d — %s", host, port, e) | ||
| return False |
There was a problem hiding this comment.
Outbound connections never trigger _on_peer_connected.
State sync only happens for inbound peers, so connect / --connect won’t send our state. Trigger the callback for outbound connections too.
🔁 Suggested change
task = asyncio.create_task(self._listen_to_peer(reader, writer, f"{host}:{port}"))
self._listen_tasks.append(task)
logger.info("Network: Connected to peer %s:%d", host, port)
+ if self._on_peer_connected:
+ try:
+ await self._on_peer_connected(writer)
+ except Exception:
+ logger.exception("Network: Error during peer sync")
return True🧰 Tools
🪛 Ruff (0.15.2)
[warning] 81-81: Consider moving this statement to an else block
(TRY300)
[warning] 82-82: Do not catch blind exception: Exception
(BLE001)
[warning] 83-83: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@minichain/p2p.py` around lines 73 - 84, connect_to_peer currently adds
outbound peers but never invokes the same handshake/callback used for inbound
peers, so outbound connections don't trigger state sync; after successfully
opening connection and appending (reader, writer) to self._peers, call the same
callback used for inbound connections (self._on_peer_connected) with the peer
key/metadata you use elsewhere (e.g., f"{host}:{port}" or the writer/reader
tuple) before creating the _listen_to_peer task, and ensure any required args
match _on_peer_connected's signature; this keeps behavior symmetric with inbound
peers and enables state sync for outbound connections (also verify _listen_tasks
still appends the listen task afterwards).
| async def _broadcast_raw(self, payload: dict): | ||
| """Send a JSON message to every connected peer.""" | ||
| line = (json.dumps(payload) + "\n").encode() | ||
| disconnected = [] | ||
| for reader, writer in self._peers: | ||
| try: | ||
| writer.write(line) | ||
| await writer.drain() | ||
| except Exception: | ||
| disconnected.append((reader, writer)) | ||
| for pair in disconnected: | ||
| if pair in self._peers: | ||
| self._peers.remove(pair) |
There was a problem hiding this comment.
Close writers on broadcast failures to avoid leaked connections.
Removing peers without closing can leave open writers and lingering tasks.
🔌 Suggested change
for pair in disconnected:
if pair in self._peers:
self._peers.remove(pair)
+ try:
+ pair[1].close()
+ await pair[1].wait_closed()
+ except Exception:
+ pass🧰 Tools
🪛 Ruff (0.15.2)
[warning] 137-137: Missing return type annotation for private function _broadcast_raw
Add return type annotation: None
(ANN202)
[warning] 145-145: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@minichain/p2p.py` around lines 137 - 149, The _broadcast_raw method currently
removes failed peer tuples from self._peers without closing their StreamWriter,
leaking file descriptors/tasks; update _broadcast_raw to, for each (reader,
writer) in the disconnected list, call writer.close() and await
writer.wait_closed() (with try/except around the close/wait to swallow errors),
then remove the pair from self._peers if present; ensure this uses the existing
reader/writer tuple identifiers so you update the same pairs you collected on
exception.
Addressed Issues:
Hey @Zahnentferner
Implements the test net demo
The chain works with the following commands
we tested on two machines with two nodes opened on my local and one with @aniket866's machine
Screenshots/Recordings:
minichiandemo1.mp4
minichiandemo2.mp4
Additional Notes:
Checklist
AI Usage Disclosure
Check one of the checkboxes below:
I have used the following AI models and tools:
We encourage contributors to use AI tools responsibly when creating Pull Requests. While AI can be a valuable aid, it is essential to ensure that your contributions meet the task requirements, build successfully, include relevant tests, and pass all linters. Submissions that do not meet these standards may be closed without warning to maintain the quality and integrity of the project. Please take the time to understand the changes you are proposing and their impact.
Summary by CodeRabbit
New Features
Refactor
Chores