try: # List all object paths in bucket that begin with my-prefixname. uploads = minioClient.list_incomplete_uploads('mybucket', prefix='my-prefixname', recursive=True) for obj in uploads: print(obj.bucket_name, obj.object_name, obj.upload_id, obj.size) except MaxRetryError as err: print("connect minio server fail, %s" % err) raise except ResponseError as err: print("minio server response fail, %s" % err) raise except InvalidBucketError as err: print(err) except Exception as err: print(err)
# Get current policy of all object paths in bucket that begin with my-prefixname. policy = minioClient.get_bucket_policy('mybucket', 'my-prefixname') print(policy)
# Set policy Policy.READ_ONLY to all object paths in bucket that begin with my-prefixname. minioClient.set_bucket_policy('mybucket', 'my-prefixname', Policy.READ_ONLY)
获取存储桶上的通知配置
get_bucket_notification(bucket_name) -> dict
# Get the notifications configuration for a bucket. notification = minioClient.get_bucket_notification('mybucket') # If no notification is present on the bucket: # notification == {}
# Put a file with default content-type. events = minioClient.listen_bucket_notification('my-bucket', 'my-prefix/', '.my-suffix', ['s3:ObjectCreated:*', 's3:ObjectRemoved:*', 's3:ObjectAccessed:*']) for event in events: print event
# Get a full object. try: data = minioClient.get_object('mybucket', 'myobject') withopen('my-testfile', 'wb') as file_data: for d in data.stream(32*1024): file_data.write(d) except ResponseError as err: print(err)
# Offset the download by 2 bytes and retrieve a total of 4 bytes. try: data = minioClient.get_partial_object('mybucket', 'myobject', 2, 4) withopen('my-testfile', 'wb') as file_data: for d in data: file_data.write(d) except ResponseError as err: print(err)
# Get a full object and prints the original object stat information. try: print(minioClient.fget_object('mybucket', 'myobject', '/tmp/myobject')) except ResponseError as err: print(err)
import time from datetime import datetime from minio import CopyConditions
copy_conditions = CopyConditions() # Set modified condition, copy object modified since 2014 April. t = (2014, 4, 0, 0, 0, 0, 0, 0, 0) mod_since = datetime.utcfromtimestamp(time.mktime(t)) copy_conditions.set_modified_since(mod_since)
# Set unmodified condition, copy object unmodified since 2014 April. copy_conditions.set_unmodified_since(mod_since)
# Set matching ETag condition, copy object which matches the following ETag. copy_conditions.set_match_etag("31624deb84149d2f8ef9c385918b653a")
# Set matching ETag except condition, copy object which does not match the following ETag. copy_conditions.set_match_etag_except("31624deb84149d2f8ef9c385918b653a")
# Set metadata metadata = {"test-key": "test-data"}
import os # Put a file with default content-type, upon success prints the etag identifier computed by server. try: withopen('my-testfile', 'rb') as file_data: file_stat = os.stat('my-testfile') print(minioClient.put_object('mybucket', 'myobject', file_data, file_stat.st_size)) except ResponseError as err: print(err)
# Put a file with 'application/csv'. try: withopen('my-testfile.csv', 'rb') as file_data: file_stat = os.stat('my-testfile.csv') minioClient.put_object('mybucket', 'myobject.csv', file_data, file_stat.st_size, content_type='application/csv') except ResponseError as err: print(err)
# Put an object 'myobject' with contents from '/tmp/otherobject', upon success prints the etag identifier computed by server. try: print(minioClient.fput_object('mybucket', 'myobject', '/tmp/otherobject')) except ResponseError as err: print(err)
# Put on object 'myobject.csv' with contents from # '/tmp/otherobject.csv' as 'application/csv'. try: print(minioClient.fput_object('mybucket', 'myobject.csv', '/tmp/otherobject.csv', content_type='application/csv')) except ResponseError as err: print(err)
获取对象的元数据
stat_object(bucket_name, object_name)
# Fetch stats on your object. try: print(minioClient.stat_object('mybucket', 'myobject')) except ResponseError as err: print(err)
删除一个对象
remove_object(bucket_name, object_name)
# Remove an object. try: minioClient.remove_object('mybucket', 'myobject') except ResponseError as err: print(err)
删除存储桶中的多个对象
remove_objects(bucket_name, objects_iter)
# Remove multiple objects in a single library call. try: objects_to_delete = ['myobject-1', 'myobject-2', 'myobject-3'] # force evaluation of the remove_objects() call by iterating over # the returned value. for del_err in minioClient.remove_objects('mybucket', objects_to_delete): print("Deletion Error: {}".format(del_err)) except ResponseError as err: print(err)
# presigned get object URL for object name, expires in 2 days. try: print(minioClient.presigned_get_object('mybucket', 'myobject', expires=timedelta(days=2))) # Response error is still possible since internally presigned does get bucket location. except ResponseError as err: print(err)
# presigned Put object URL for an object name, expires in 3 days. try: print(minioClient.presigned_put_object('mybucket', 'myobject', expires=timedelta(days=3))) # Response error is still possible since internally presigned does get # bucket location. except ResponseError as err: print(err)