aboutsummaryrefslogtreecommitdiff
path: root/tests/functional_tests/k_anonymity.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/functional_tests/k_anonymity.py')
-rwxr-xr-xtests/functional_tests/k_anonymity.py314
1 files changed, 314 insertions, 0 deletions
diff --git a/tests/functional_tests/k_anonymity.py b/tests/functional_tests/k_anonymity.py
new file mode 100755
index 000000000..ffa670b4c
--- /dev/null
+++ b/tests/functional_tests/k_anonymity.py
@@ -0,0 +1,314 @@
+#!/usr/bin/env python3
+
+# Copyright (c) 2023, The Monero Project
+#
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without modification, are
+# permitted provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice, this list of
+# conditions and the following disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above copyright notice, this list
+# of conditions and the following disclaimer in the documentation and/or other
+# materials provided with the distribution.
+#
+# 3. Neither the name of the copyright holder nor the names of its contributors may be
+# used to endorse or promote products derived from this software without specific
+# prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
+# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+# THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+from __future__ import print_function
+import math
+import random
+
+"""
+Test the k-anonymity daemon RPC features:
+* txid fetching by prefix
+"""
+
+from framework.daemon import Daemon
+from framework.wallet import Wallet
+
+seeds = [
+ 'velvet lymph giddy number token physics poetry unquoted nibs useful sabotage limits benches lifestyle eden nitrogen anvil fewest avoid batch vials washing fences goat unquoted',
+ 'peeled mixture ionic radar utopia puddle buying illness nuns gadget river spout cavernous bounced paradise drunk looking cottage jump tequila melting went winter adjust spout',
+ 'tadpoles shrugged ritual exquisite deepest rest people musical farming otherwise shelter fabrics altitude seventh request tidy ivory diet vapidly syllabus logic espionage oozed opened people',
+ 'ocio charla pomelo humilde maduro geranio bruto moño admitir mil difícil diva lucir cuatro odisea riego bebida mueble cáncer puchero carbón poeta flor fruta fruta'
+]
+
+pub_addrs = [
+ '42ey1afDFnn4886T7196doS9GPMzexD9gXpsZJDwVjeRVdFCSoHnv7KPbBeGpzJBzHRCAs9UxqeoyFQMYbqSWYTfJJQAWDm',
+ '44Kbx4sJ7JDRDV5aAhLJzQCjDz2ViLRduE3ijDZu3osWKBjMGkV1XPk4pfDUMqt1Aiezvephdqm6YD19GKFD9ZcXVUTp6BW',
+ '45uQD4jzWwPazqr9QJx8CmFPN7a9RaEE8T4kULg6r8GzfcrcgKXshfYf8cezLWwmENHC9pDN2fGAUFmmdFxjeZSs3n671rz',
+ '48hKTTTMfuiW2gDkmsibERHCjTCpqyCCh57WcU4KBeqDSAw7dG7Ad1h7v8iJF4q59aDqBATg315MuZqVmkF89E3cLPrBWsi'
+]
+
+CRYPTONOTE_MINED_MONEY_UNLOCK_WINDOW = 60
+CRYPTONOTE_DEFAULT_TX_SPENDABLE_AGE = 10
+RESTRICTED_SPENT_KEY_IMAGES_COUNT = 5000
+
+def make_hash32_loose_template(txid, nbits):
+ txid_bytes = list(bytes.fromhex(txid))
+ for i in reversed(range(32)):
+ mask_nbits = min(8, nbits)
+ mask = 256 - (1 << (8 - mask_nbits))
+ nbits -= mask_nbits
+ txid_bytes[i] &= mask
+ return bytes(txid_bytes).hex()
+
+def txid_list_is_sorted_in_template_order(txids):
+ reversed_txid_bytes = [bytes(reversed(bytes.fromhex(txid))) for txid in txids]
+ return sorted(reversed_txid_bytes) == reversed_txid_bytes
+
+def txid_matches_template(txid, template, nbits):
+ txid_bytes = bytes.fromhex(txid)
+ template_bytes = bytes.fromhex(template)
+ for i in reversed(range(32)):
+ mask_nbits = min(8, nbits)
+ mask = 256 - (1 << (8 - mask_nbits))
+ nbits -= mask_nbits
+ if 0 != ((txid_bytes[i] ^ template_bytes[i]) & mask):
+ return False
+ return True
+
+class KAnonymityTest:
+ def run_test(self):
+ self.reset()
+ self.create_wallets()
+
+ # If each of the N wallets is making N-1 transfers the first round, each N wallets needs
+ # N-1 unlocked coinbase outputs
+ N = len(seeds)
+ self.mine_and_refresh(2 * N * (N - 1))
+ self.mine_and_refresh(CRYPTONOTE_MINED_MONEY_UNLOCK_WINDOW)
+
+ # Generate a bunch of transactions
+ NUM_ROUNDS = 10
+ intermediate_mining_period = int(math.ceil(CRYPTONOTE_DEFAULT_TX_SPENDABLE_AGE / N)) * N
+ for i in range(NUM_ROUNDS):
+ self.transfer_around()
+ self.mine_and_refresh(intermediate_mining_period)
+ print("Wallets created {} transactions in {} rounds".format(len(self.wallet_txids), NUM_ROUNDS))
+
+ self.test_all_chain_txids() # Also gathers miner_txids
+
+ self.test_get_txids_loose_chain_suite()
+
+ self.test_get_txids_loose_pool_suite()
+
+ self.test_bad_txid_templates()
+
+ def reset(self):
+ print('Resetting blockchain')
+ daemon = Daemon()
+ res = daemon.get_height()
+ daemon.pop_blocks(res.height - 1)
+ daemon.flush_txpool()
+ self.wallet_txids = set()
+ self.total_blocks_mined = 0
+ self.miner_txids = set()
+ self.pool_txids = set()
+
+ def create_wallets(self):
+ print('Creating wallets')
+ assert len(seeds) == len(pub_addrs)
+ self.wallet = [None] * len(seeds)
+ for i in range(len(seeds)):
+ self.wallet[i] = Wallet(idx = i)
+ # close the wallet if any, will throw if none is loaded
+ try: self.wallet[i].close_wallet()
+ except: pass
+ res = self.wallet[i].restore_deterministic_wallet(seed = seeds[i])
+
+ def mine_and_refresh(self, num_blocks):
+ print("Mining {} blocks".format(num_blocks))
+ daemon = Daemon()
+
+ res = daemon.get_info()
+ old_height = res.height
+
+ assert num_blocks % len(self.wallet) == 0
+ assert len(self.wallet) == len(pub_addrs)
+
+ for i in range(len(self.wallet)):
+ daemon.generateblocks(pub_addrs[i], num_blocks // len(self.wallet))
+
+ res = daemon.get_info()
+ new_height = res.height
+ assert new_height == old_height + num_blocks, "height {} -> {}".format(old_height, new_height)
+
+ for i in range(len(self.wallet)):
+ self.wallet[i].refresh()
+ res = self.wallet[i].get_height()
+ assert res.height == new_height, "{} vs {}".format(res.height, new_height)
+
+ self.wallet_txids.update(self.pool_txids)
+ self.pool_txids.clear()
+ self.total_blocks_mined += num_blocks
+
+ def transfer_around(self):
+ N = len(self.wallet)
+ assert N == len(pub_addrs)
+
+ print("Creating transfers b/t wallets")
+
+ num_successful_transfers = 0
+ fee_margin = 0.05 # 5%
+ for sender in range(N):
+ receivers = list((r for r in range(N) if r != sender))
+ random.shuffle(receivers)
+ assert len(receivers) == N - 1
+ for j, receiver in enumerate(receivers):
+ unlocked_balance = self.wallet[sender].get_balance().unlocked_balance
+ if 0 == unlocked_balance:
+ assert j != 0 # we want all wallets to start out with at least some funds
+ break
+ imperfect_starting_balance = unlocked_balance * (N - 1) / (N - 1 - j) * (1 - fee_margin)
+ transfer_amount = int(imperfect_starting_balance / (N - 1))
+ assert transfer_amount < unlocked_balance
+ dst = {'address': pub_addrs[receiver], 'amount': transfer_amount}
+ res = self.wallet[sender].transfer([dst], get_tx_metadata = True)
+ tx_hex = res.tx_metadata
+ self.pool_txids.add(res.tx_hash)
+ res = self.wallet[sender].relay_tx(tx_hex)
+ self.wallet[sender].refresh()
+ num_successful_transfers += 1
+
+ print("Transferred {} times".format(num_successful_transfers))
+
+ def test_all_chain_txids(self):
+ daemon = Daemon()
+
+ print("Grabbing all txids from the daemon and testing against known txids")
+
+ # If assert stmt below fails, this test case needs to be rewritten to chunk the requests;
+ # there are simply too many txids on-chain to gather at once
+ expected_total_num_txids = len(self.wallet_txids) + self.total_blocks_mined + 1 # +1 for genesis coinbase tx
+ assert expected_total_num_txids <= RESTRICTED_SPENT_KEY_IMAGES_COUNT
+
+ res = daemon.get_txids_loose('0' * 64, 0)
+ all_txids = res.txids
+ assert 'c88ce9783b4f11190d7b9c17a69c1c52200f9faaee8e98dd07e6811175177139' in all_txids # genesis coinbase tx
+ assert len(all_txids) == expected_total_num_txids, "{} {}".format(len(all_txids), expected_total_num_txids)
+
+ assert txid_list_is_sorted_in_template_order(all_txids)
+
+ for txid in self.wallet_txids:
+ assert txid in all_txids
+
+ self.miner_txids = set(all_txids) - self.wallet_txids
+
+ def test_get_txids_loose_success(self, txid, num_matching_bits):
+ daemon = Daemon()
+
+ txid_template = make_hash32_loose_template(txid, num_matching_bits)
+
+ res = daemon.get_txids_loose(txid_template, num_matching_bits)
+ assert 'txids' in res
+ txids = res.txids
+
+ first_pool_index = 0
+ while first_pool_index < len(txids):
+ if txids[first_pool_index] in self.pool_txids:
+ break
+ else:
+ first_pool_index += 1
+
+ chain_txids = txids[:first_pool_index]
+ pool_txids = txids[first_pool_index:]
+
+ assert txid_list_is_sorted_in_template_order(chain_txids)
+ assert txid_list_is_sorted_in_template_order(pool_txids)
+
+ # Assert we know where txids came from
+ for txid in chain_txids:
+ assert (txid in self.wallet_txids) or (txid in self.miner_txids)
+ for txid in pool_txids:
+ assert txid in self.pool_txids
+
+ # Assert that all known txids were matched as they should've been
+ for txid in self.wallet_txids:
+ assert txid_matches_template(txid, txid_template, num_matching_bits) == (txid in chain_txids)
+ for txid in self.miner_txids:
+ assert txid_matches_template(txid, txid_template, num_matching_bits) == (txid in chain_txids)
+ for txid in self.pool_txids:
+ assert txid_matches_template(txid, txid_template, num_matching_bits) == (txid in pool_txids)
+
+ def test_get_txids_loose_chain_suite(self):
+ daemon = Daemon()
+
+ print("Testing grabbing on-chain txids loosely with all different bit sizes")
+
+ # Assert pool empty
+ assert len(self.pool_txids) == 0
+ res = daemon.get_transaction_pool_hashes()
+ assert not 'tx_hashes' in res or len(res.tx_hashes) == 0
+
+ assert len(self.wallet_txids)
+
+ current_chain_txids = list(self.wallet_txids.union(self.miner_txids))
+ for nbits in range(0, 256):
+ random_txid = random.choice(current_chain_txids)
+ self.test_get_txids_loose_success(random_txid, nbits)
+
+ def test_get_txids_loose_pool_suite(self):
+ daemon = Daemon()
+
+ print("Testing grabbing pool txids loosely with all different bit sizes")
+
+ # Create transactions to pool
+ self.transfer_around()
+
+ # Assert pool not empty
+ assert len(self.pool_txids) != 0
+ res = daemon.get_transaction_pool_hashes()
+ assert 'tx_hashes' in res and set(res.tx_hashes) == self.pool_txids
+
+ current_pool_txids = list(self.pool_txids)
+ for nbits in range(0, 256):
+ random_txid = random.choice(current_pool_txids)
+ self.test_get_txids_loose_success(random_txid, nbits)
+
+ def test_bad_txid_templates(self):
+ daemon = Daemon()
+
+ print("Making sure the daemon catches bad txid templates")
+
+ test_cases = [
+ ['q', 256],
+ ['a', 128],
+ ['69' * 32, 257],
+ ['0abcdef1234567890abcdef1234567890abcdef1234567890abcdef123456789', 0],
+ ['0abcdef1234567890abcdef1234567890abcdef1234567890abcdef123456789', 1],
+ ['0abcdef1234567890abcdef1234567890abcdef1234567890abcdef123456789', 2],
+ ['0abcdef1234567890abcdef1234567890abcdef1234567890abcdef123456789', 4],
+ ['0abcdef1234567890abcdef1234567890abcdef1234567890abcdef123456789', 8],
+ ['0abcdef1234567890abcdef1234567890abcdef1234567890abcdef123456789', 16],
+ ['0abcdef1234567890abcdef1234567890abcdef1234567890abcdef123456789', 32],
+ ['0abcdef1234567890abcdef1234567890abcdef1234567890abcdef123456789', 64],
+ ['0abcdef1234567890abcdef1234567890abcdef1234567890abcdef123456789', 128],
+ ['0abcdef1234567890abcdef1234567890abcdef1234567890abcdef123456789', 193],
+ ['0000000000000000000000000000000000000000000000000000000000000080', 0],
+ ['0000000000000000000000000000000000000000000000000000000000000007', 5],
+ ['00000000000000000000000000000000000000000000000000000000000000f7', 5],
+ ]
+
+ for txid_template, num_matching_bits in test_cases:
+ ok = False
+ try: res = daemon.get_txids_loose(txid_template, num_matching_bits)
+ except: ok = True
+ assert ok, 'bad template didnt error: {} {}'.format(txid_template, num_matching_bits)
+
+if __name__ == '__main__':
+ KAnonymityTest().run_test()