feat: Add standalone Docker image pull and save tool
Add dockerpullsave.py - a Python utility that downloads Docker images
directly from registries and saves them as tar archives without requiring
Docker daemon to be running.
Features:
- Supports multiple registries (Docker Hub, GHCR, ECR, private registries)
- Handles authentication via bearer tokens and registry discovery
- Multi-architecture manifest list support (defaults to linux/amd64)
- Robust digest-based image selection for consistent pulls
- Progress bar for large layer downloads with Docker-style formatting
- Proper Docker image structure generation for 'docker load' compatibility
Usage examples:
./dockerpullsave.py ubuntu:latest
./dockerpullsave.py ghcr.io/oras-project/oras:v1.1.0
./dockerpullsave.py alpine@sha256:f271e74b17ced...
Output: Creates {image}_{tag}.tar file ready for 'docker load -i'
This tool is particularly useful for:
- Air-gapped environments where Docker daemon cannot access registries
- Batch image downloading and offline distribution
- Registry migration and backup scenarios
- Security scanning workflows requiring image inspection
This commit is contained in:
@@ -0,0 +1,388 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os
|
||||
import sys
|
||||
import gzip
|
||||
import json
|
||||
import hashlib
|
||||
import shutil
|
||||
import requests
|
||||
import tarfile
|
||||
import re
|
||||
import urllib3
|
||||
|
||||
# Suppress InsecureRequestWarning for self-signed certs or debugging
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
|
||||
# --- Argument Check ---
|
||||
if len(sys.argv) != 2:
|
||||
print('Usage:\n\tdocker_pull.py [registry/][repository/]image[:tag|@digest]\n')
|
||||
print('Examples:')
|
||||
print('\tdocker_pull.py ubuntu:latest')
|
||||
print('\tdocker_pull.py ghcr.io/oras-project/oras:v1.1.0')
|
||||
print('\tdocker_pull.py alpine@sha256:f271e74b17ced29b915d351685fd4644785c6d1559dd1f2d519b152337e72849')
|
||||
exit(1)
|
||||
|
||||
# --- Image Name Parsing (Revised for better compatibility) ---
|
||||
full_image_name = sys.argv[1]
|
||||
print(f"[*] Parsing image: {full_image_name}")
|
||||
|
||||
# Default registry
|
||||
registry = 'registry-1.docker.io'
|
||||
repo_and_tag = full_image_name
|
||||
|
||||
# Split registry from the rest of the name
|
||||
parts = full_image_name.split('/')
|
||||
if len(parts) > 1 and ('.' in parts[0] or ':' in parts[0]):
|
||||
registry = parts[0]
|
||||
repo_and_tag = '/'.join(parts[1:])
|
||||
|
||||
# For official Docker Hub images, prepend 'library/' if no org is specified
|
||||
if registry == 'registry-1.docker.io' and '/' not in repo_and_tag:
|
||||
repository_with_tag = f"library/{repo_and_tag}"
|
||||
else:
|
||||
repository_with_tag = repo_and_tag
|
||||
|
||||
# Split repository from tag or digest
|
||||
if '@' in repository_with_tag:
|
||||
repository, tag = repository_with_tag.split('@')
|
||||
# The tag is now the digest
|
||||
tag = f"sha256:{tag.split(':')[-1]}"
|
||||
elif ':' in repository_with_tag.rsplit('/', 1)[-1]: # Check for tag only in the last component
|
||||
repository, tag = repository_with_tag.rsplit(':', 1)
|
||||
else:
|
||||
repository = repository_with_tag
|
||||
tag = 'latest'
|
||||
|
||||
print(f" - Registry: {registry}")
|
||||
print(f" - Repository: {repository}")
|
||||
print(f" - Tag/Digest: {tag}")
|
||||
|
||||
# --- Authentication (Revised for GHCR.io and others) ---
|
||||
auth_url = 'https://auth.docker.io/token'
|
||||
reg_service = 'registry.docker.io'
|
||||
is_auth_required = False
|
||||
|
||||
# Ping the v2 endpoint to check for auth requirements
|
||||
try:
|
||||
print(f"[*] Checking for auth requirements at https://{registry}/v2/")
|
||||
resp = requests.get(f'https://{registry}/v2/', verify=False, timeout=15)
|
||||
|
||||
# If we get a 401, it means we need to authenticate.
|
||||
if resp.status_code == 401:
|
||||
is_auth_required = True
|
||||
# The WWW-Authenticate header is crucial for getting the token endpoint.
|
||||
auth_header = resp.headers.get('WWW-Authenticate')
|
||||
if not auth_header:
|
||||
print("[-] Registry returned 401 Unauthorized, but did not provide a WWW-Authenticate header.")
|
||||
print(resp.text)
|
||||
exit(1)
|
||||
|
||||
print(f" - Authentication required. Parsing WWW-Authenticate header.")
|
||||
|
||||
# Use regex to robustly parse the WWW-Authenticate header
|
||||
realm_match = re.search('realm="([^"]+)"', auth_header, re.IGNORECASE)
|
||||
if realm_match:
|
||||
auth_url = realm_match.group(1)
|
||||
|
||||
service_match = re.search('service="([^"]+)"', auth_header, re.IGNORECASE)
|
||||
if service_match:
|
||||
reg_service = service_match.group(1)
|
||||
else:
|
||||
# Fallback to using the registry name as the service if not specified
|
||||
reg_service = registry
|
||||
|
||||
print(f" - Auth URL: {auth_url}")
|
||||
print(f" - Service: {reg_service}")
|
||||
|
||||
# For other non-200 codes, raise an error.
|
||||
elif resp.status_code != 200:
|
||||
resp.raise_for_status()
|
||||
|
||||
else:
|
||||
print(" - No authentication required.")
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
# This will now catch connection errors, timeouts, and errors raised by raise_for_status()
|
||||
print(f"[-] Error connecting to registry {registry}: {e}")
|
||||
exit(1)
|
||||
|
||||
def get_auth_head(media_type):
|
||||
"""Gets the Authorization header, if required."""
|
||||
if not is_auth_required:
|
||||
# No authentication needed, just return the Accept header
|
||||
return {'Accept': media_type}
|
||||
|
||||
# Otherwise, get an access token
|
||||
print(f"[*] Requesting token from {auth_url} for scope repository:{repository}:pull")
|
||||
try:
|
||||
auth_resp = requests.get(
|
||||
f'{auth_url}?service={reg_service}&scope=repository:{repository}:pull',
|
||||
verify=False
|
||||
)
|
||||
auth_resp.raise_for_status()
|
||||
access_token = auth_resp.json()['token']
|
||||
return {
|
||||
'Authorization': f'Bearer {access_token}',
|
||||
'Accept': media_type
|
||||
}
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"[-] Failed to get authentication token: {e}")
|
||||
if e.response:
|
||||
print(f" Response: {e.response.text}")
|
||||
exit(1)
|
||||
|
||||
|
||||
# --- Docker Style Progress Bar ---
|
||||
def progress_bar(digest_short, nb_traits):
|
||||
"""Displays a simple progress bar."""
|
||||
sys.stdout.write(f'\r{digest_short}: Downloading [')
|
||||
progress = '=' * nb_traits
|
||||
if nb_traits < 50:
|
||||
progress += '>'
|
||||
sys.stdout.write(f'{progress:<50}]')
|
||||
sys.stdout.flush()
|
||||
|
||||
# --- Fetch Manifest ---
|
||||
print(f"[*] Fetching manifest for {repository}:{tag}")
|
||||
|
||||
# List of manifest media types to try. OCI and Docker formats are included.
|
||||
manifest_media_types = [
|
||||
'application/vnd.oci.image.index.v1+json',
|
||||
'application/vnd.docker.distribution.manifest.list.v2+json',
|
||||
'application/vnd.docker.distribution.manifest.v2+json',
|
||||
]
|
||||
|
||||
manifest_data = None
|
||||
resp = None
|
||||
manifest_tag = tag
|
||||
if '@' in full_image_name:
|
||||
manifest_tag = full_image_name.split('@')[-1]
|
||||
|
||||
for media_type in manifest_media_types:
|
||||
print(f" - Trying to fetch with Accept header: {media_type}")
|
||||
try:
|
||||
auth_head = get_auth_head(media_type)
|
||||
resp = requests.get(f'https://{registry}/v2/{repository}/manifests/{manifest_tag}', headers=auth_head, verify=False)
|
||||
|
||||
if resp.status_code == 200:
|
||||
manifest_data = resp.json()
|
||||
print(f" - Successfully fetched manifest with Content-Type: {resp.headers.get('Content-Type')}")
|
||||
break # Success, exit the loop
|
||||
else:
|
||||
print(f" - Received HTTP {resp.status_code}. Trying next media type.")
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"[-] An exception occurred while fetching manifest: {e}")
|
||||
continue
|
||||
|
||||
if not manifest_data:
|
||||
print(f"[-] Failed to fetch manifest for {repository}:{tag} after trying all available media types.")
|
||||
if resp:
|
||||
print(f" Last response (HTTP {resp.status_code}): {resp.text}")
|
||||
exit(1)
|
||||
|
||||
# Check if we received a manifest list or an OCI index (they both have a 'manifests' key)
|
||||
if 'manifests' in manifest_data:
|
||||
print('[+] Manifests list/index found. Checking for a suitable architecture (defaulting to linux/amd64)...')
|
||||
selected_digest = None
|
||||
for manifest in manifest_data['manifests']:
|
||||
platform = manifest.get("platform", {})
|
||||
arch = platform.get("architecture")
|
||||
os_type = platform.get("os")
|
||||
|
||||
if arch == 'amd64' and os_type == 'linux':
|
||||
selected_digest = manifest.get("digest")
|
||||
if selected_digest:
|
||||
print(f" - Found linux/amd64. Using digest: {selected_digest}")
|
||||
break
|
||||
|
||||
if selected_digest:
|
||||
# A suitable digest was found, now fetch the actual image manifest using that digest
|
||||
print(f"[*] Re-fetching manifest for the selected architecture using digest...")
|
||||
|
||||
image_manifest_media_types = [
|
||||
'application/vnd.docker.distribution.manifest.v2+json',
|
||||
'application/vnd.oci.image.manifest.v1+json'
|
||||
]
|
||||
|
||||
new_manifest_data = None
|
||||
for media_type in image_manifest_media_types:
|
||||
try:
|
||||
auth_head = get_auth_head(media_type)
|
||||
resp = requests.get(f'https://{registry}/v2/{repository}/manifests/{selected_digest}', headers=auth_head, verify=False)
|
||||
if resp.status_code == 200:
|
||||
new_manifest_data = resp.json()
|
||||
print(f" - Successfully fetched image manifest for linux/amd64.")
|
||||
break
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"[-] An exception occurred while re-fetching manifest: {e}")
|
||||
continue
|
||||
|
||||
if new_manifest_data:
|
||||
manifest_data = new_manifest_data
|
||||
else:
|
||||
print(f"[-] Failed to fetch manifest for digest {selected_digest}.")
|
||||
exit(1)
|
||||
|
||||
else:
|
||||
# Original behavior: if no suitable arch is found, print list and exit
|
||||
print('[!] Could not find a default (linux/amd64) architecture. Please pull a specific one using the @digest format:')
|
||||
for manifest in manifest_data['manifests']:
|
||||
platform_info = ', '.join([f'{k}: {v}' for k, v in manifest.get("platform", {}).items()])
|
||||
digest = manifest.get("digest", "N/A")
|
||||
print(f' - {platform_info}, digest: {digest}')
|
||||
exit(1)
|
||||
|
||||
|
||||
# If we are here, it should be a single image manifest
|
||||
layers = manifest_data.get('layers')
|
||||
if not layers:
|
||||
print("[-] The fetched manifest does not contain a 'layers' array.")
|
||||
print(json.dumps(manifest_data, indent=2))
|
||||
exit(1)
|
||||
|
||||
# --- Create Image Structure ---
|
||||
img_name_for_dir = repository.split('/')[-1]
|
||||
tag_for_dir = tag.replace(':', '_').replace('@', '_sha256_')
|
||||
imgdir = f'tmp_{img_name_for_dir}_{tag_for_dir}'
|
||||
|
||||
if os.path.exists(imgdir):
|
||||
shutil.rmtree(imgdir)
|
||||
os.mkdir(imgdir)
|
||||
print(f"[*] Creating image structure in: {imgdir}")
|
||||
|
||||
# Download config file
|
||||
config_digest = manifest_data['config']['digest']
|
||||
print(f"[*] Downloading config {config_digest[7:19]}...")
|
||||
auth_head = get_auth_head('application/octet-stream') # Re-auth in case token expired
|
||||
confresp = requests.get(f'https://{registry}/v2/{repository}/blobs/{config_digest}', headers=auth_head, verify=False)
|
||||
confresp.raise_for_status()
|
||||
with open(f'{imgdir}/{config_digest[7:]}.json', 'wb') as f:
|
||||
f.write(confresp.content)
|
||||
|
||||
# Prepare manifest.json content
|
||||
repo_tag_entry = sys.argv[1]
|
||||
if '@' not in repo_tag_entry and ':' not in repo_tag_entry.split('/')[-1]:
|
||||
repo_tag_entry += f":{tag}"
|
||||
|
||||
content = [{
|
||||
'Config': f'{config_digest[7:]}.json',
|
||||
'RepoTags': [repo_tag_entry],
|
||||
'Layers': []
|
||||
}]
|
||||
|
||||
# --- Download and Build Layers ---
|
||||
parentid = ''
|
||||
for i, layer in enumerate(layers):
|
||||
ublob = layer['digest']
|
||||
digest_short = ublob[7:19]
|
||||
|
||||
# Create a fake layer ID. Docker's actual ID generation is more complex,
|
||||
# but this is sufficient for 'docker load' to work.
|
||||
layer_json_content = f'{parentid}\n{ublob}\n'
|
||||
fake_layerid = hashlib.sha256(layer_json_content.encode('utf-8')).hexdigest()
|
||||
layerdir = f'{imgdir}/{fake_layerid}'
|
||||
os.mkdir(layerdir)
|
||||
|
||||
# VERSION file
|
||||
with open(f'{layerdir}/VERSION', 'w') as f:
|
||||
f.write('1.0')
|
||||
|
||||
# Download layer tarball with progress
|
||||
sys.stdout.write(f"{digest_short}: Downloading...")
|
||||
sys.stdout.flush()
|
||||
|
||||
# Refresh token before each download to avoid expiration on large layers
|
||||
auth_head = get_auth_head('application/octet-stream')
|
||||
|
||||
# Attempt to download from the primary blob URL
|
||||
bresp = requests.get(f'https://{registry}/v2/{repository}/blobs/{ublob}', headers=auth_head, stream=True, verify=False)
|
||||
|
||||
# Some registries (e.g., ECR) may provide a redirect URL
|
||||
if bresp.status_code != 200 and 'urls' in layer:
|
||||
print(f"\r{digest_short}: Following redirect...", " "*50)
|
||||
bresp = requests.get(layer['urls'][0], headers=auth_head, stream=True, verify=False)
|
||||
|
||||
if bresp.status_code != 200:
|
||||
print(f'\rERROR: Cannot download layer {digest_short} [HTTP {bresp.status_code}]')
|
||||
print(bresp.text)
|
||||
exit(1)
|
||||
|
||||
# Stream download with progress bar
|
||||
bresp.raise_for_status()
|
||||
total_size = int(bresp.headers.get('Content-Length', 0))
|
||||
chunk_size = 8192
|
||||
downloaded = 0
|
||||
|
||||
layer_gzip_path = f"{layerdir}/layer_gzip.tar"
|
||||
with open(layer_gzip_path, "wb") as f:
|
||||
for chunk in bresp.iter_content(chunk_size=chunk_size):
|
||||
if chunk:
|
||||
f.write(chunk)
|
||||
downloaded += len(chunk)
|
||||
if total_size > 0:
|
||||
progress_traits = int((downloaded / total_size) * 50)
|
||||
progress_bar(digest_short, progress_traits)
|
||||
|
||||
# Decompress layer
|
||||
sys.stdout.write(f"\r{digest_short}: Extracting...{' '*50}")
|
||||
sys.stdout.flush()
|
||||
layer_tar_path = f"{layerdir}/layer.tar"
|
||||
with gzip.open(layer_gzip_path, 'rb') as f_in, open(layer_tar_path, 'wb') as f_out:
|
||||
shutil.copyfileobj(f_in, f_out)
|
||||
os.remove(layer_gzip_path)
|
||||
|
||||
size_mb = os.path.getsize(layer_tar_path) / (1024 * 1024)
|
||||
print(f"\r{digest_short}: Pull complete [{size_mb:.2f} MB]")
|
||||
content[0]['Layers'].append(f'{fake_layerid}/layer.tar')
|
||||
|
||||
# Create layer json file
|
||||
json_obj = {}
|
||||
json_obj['id'] = fake_layerid
|
||||
if parentid:
|
||||
json_obj['parent'] = parentid
|
||||
|
||||
# The last layer's JSON is derived from the main config file
|
||||
if i == len(layers) - 1:
|
||||
config_json = json.loads(confresp.content)
|
||||
# Copy relevant fields from main config to layer config
|
||||
for key in ['created', 'author', 'architecture', 'os', 'config', 'container_config']:
|
||||
if key in config_json:
|
||||
json_obj[key] = config_json[key]
|
||||
else:
|
||||
# Intermediate layers have minimal JSON
|
||||
json_obj['created'] = "1970-01-01T00:00:00Z"
|
||||
|
||||
with open(f'{layerdir}/json', 'w') as f:
|
||||
f.write(json.dumps(json_obj))
|
||||
|
||||
parentid = fake_layerid
|
||||
|
||||
# --- Finalize Image Files ---
|
||||
# manifest.json
|
||||
with open(f'{imgdir}/manifest.json', 'w') as f:
|
||||
f.write(json.dumps(content))
|
||||
|
||||
# repositories file
|
||||
final_tag = tag
|
||||
if '@' in repo_tag_entry:
|
||||
final_tag = repo_tag_entry.split('@')[-1]
|
||||
elif ':' in repo_tag_entry:
|
||||
final_tag = repo_tag_entry.split(':')[-1]
|
||||
|
||||
repo_content = {repository: {final_tag: fake_layerid}}
|
||||
with open(f'{imgdir}/repositories', 'w') as f:
|
||||
f.write(json.dumps(repo_content))
|
||||
|
||||
# --- Create Final Tarball and Cleanup ---
|
||||
docker_tar = repository.replace('/', '_') + f"_{tag_for_dir}.tar"
|
||||
print(f"[*] Creating final archive: {docker_tar}")
|
||||
sys.stdout.flush()
|
||||
with tarfile.open(docker_tar, "w") as tar:
|
||||
tar.add(imgdir, arcname=os.path.sep)
|
||||
|
||||
shutil.rmtree(imgdir)
|
||||
print(f'\n[+] Success! Docker image pulled to: {docker_tar}')
|
||||
print(f" Load it using: docker load -i {docker_tar}")
|
||||
Reference in New Issue
Block a user