Delete All Items Dynamodb Using Python
Solution 1:
While I agree that dropping the table and recreating it is much more efficient, there may be cases such as when many GSI's or Trigger events are associated with a table and you don't want to have to re-associate those. The script below will iterate over the scan to handle large tables (each scan call will return 1Mb worth of keys) and use the batch function to delete all items in the table.
import boto3
dynamo = boto3.resource('dynamodb')
deftruncateTable(tableName):
table = dynamo.Table(tableName)
#get the table keys
tableKeyNames = [key.get("AttributeName") for key in table.key_schema]
#Only retrieve the keys for each item in the table (minimize data transfer)
projectionExpression = ", ".join('#' + key for key in tableKeyNames)
expressionAttrNames = {'#'+key: key for key in tableKeyNames}
counter = 0
page = table.scan(ProjectionExpression=projectionExpression, ExpressionAttributeNames=expressionAttrNames)
with table.batch_writer() as batch:
while page["Count"] > 0:
counter += page["Count"]
# Delete items in batchesfor itemKeys in page["Items"]:
batch.delete_item(Key=itemKeys)
# Fetch the next pageif'LastEvaluatedKey'in page:
page = table.scan(
ProjectionExpression=projectionExpression, ExpressionAttributeNames=expressionAttrNames,
ExclusiveStartKey=page['LastEvaluatedKey'])
else:
breakprint(f"Deleted {counter}")
truncateTable("YOUR_TABLE_NAME")
Solution 2:
I found a solution! I just mount the key with my table Id and search Id (compId) and It's worked :)
scan = table.scan()
with table.batch_writer() as batch:
foreach in scan['Items']:
batch.delete_item(
Key={
'uId': each['uId'],
'compId': each['compId']
}
)
Solution 3:
Here's an answer that takes into account the fact that you might not get all records back in the first call if you're trying to truncate a big table (or a smaller table with big items). It presumes you're only using a HashKey (called id
) so you'd have to add a bit to your ProjectionExpression
and delete_item
call if you also have a SortKey on your table.
There's some extra in there you could trim out that just prints a counter to stdout to keep us humans happy.
import boto3
TABLE = ...
ID = ...
table = boto3.resource('dynamodb').Table(TABLE)
scan = Nonewith table.batch_writer() as batch:
count = 0while scan isNoneor'LastEvaluatedKey'in scan:
if scan isnotNoneand'LastEvaluatedKey'in scan:
scan = table.scan(
ProjectionExpression=ID,
ExclusiveStartKey=scan['LastEvaluatedKey'],
)
else:
scan = table.scan(ProjectionExpression=ID)
for item in scan['Items']:
if count % 5000 == 0:
print(count)
batch.delete_item(Key={ID: item[ID]})
count = count + 1
Solution 4:
Use BatchWriteItem
. The documentation states
The BatchWriteItem operation puts or deletes multiple items in one or more tables. A single call to BatchWriteItem can write up to 16 MB of data, which can comprise as many as 25 put or delete requests. Individual items to be written can be as large as 400 KB.
I'm assuming that Boto3 API has this also, but might be with a different name.
Solution 5:
The same approach using batch_writer()
, but multithreaded
import boto3
import threading
import time
from queue import LifoQueue, Empty
classDDBTableCleaner(object):
def__init__(self, table_name, threads_limit=32):
self._queue = LifoQueue()
self._threads = dict()
self._cnt = 0
self._done = False
self._threads_limit = threads_limit
dynamodb_client = boto3.resource('dynamodb')
self.table = dynamodb_client.Table(table_name)
defrun(self):
for i inrange(self._threads_limit):
thread_name = f'worker_thread_{i}'
self._threads[thread_name] = threading.Thread(
target=self.worker_thread,
name=thread_name,
)
self._threads[thread_name].start()
self.queue_replenish()
while self._queue.qsize() > 0:
print(f'items processed: ({self._cnt})')
time.sleep(1)
self._done = Truefor thread in self._threads.values():
if thread.is_alive():
thread.join()
print(f'items processed: ({self._cnt})')
defqueue_replenish(self):
table_key_names = [key.get('AttributeName') for key in self.table.key_schema]
projection_expression = ', '.join('#' + key for key in table_key_names)
expression_attr_names = {'#' + key: key for key in table_key_names}
page = self.table.scan(
ProjectionExpression=projection_expression,
ExpressionAttributeNames=expression_attr_names
)
while page['Count'] > 0:
for item in page['Items']:
self._queue.put(item)
if'LastEvaluatedKey'in page:
page = self.table.scan(
ProjectionExpression=projection_expression,
ExpressionAttributeNames=expression_attr_names,
ExclusiveStartKey=page['LastEvaluatedKey']
)
else:
breakdefworker_thread(self):
thr_name = threading.current_thread().name
print(f'[{thr_name}] thread started')
with self.table.batch_writer() as batch:
whilenot self._done:
try:
item = self._queue.get_nowait()
except Empty:
time.sleep(1)
else:
try:
batch.delete_item(Key=item)
self._cnt += 1except Exception as e:
print(e)
print(f'[{thr_name}] thread completed')
if __name__ == '__main__':
table = '...'
cleaner = DDBTableCleaner(table, threads_limit=10)
cleaner.run()
Post a Comment for "Delete All Items Dynamodb Using Python"