diff --git a/tools/dockerpullsave.py b/tools/dockerpullsave.py new file mode 100644 index 0000000..ecb53d4 --- /dev/null +++ b/tools/dockerpullsave.py @@ -0,0 +1,388 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +import sys +import gzip +import json +import hashlib +import shutil +import requests +import tarfile +import re +import urllib3 + +# Suppress InsecureRequestWarning for self-signed certs or debugging +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +# --- Argument Check --- +if len(sys.argv) != 2: + print('Usage:\n\tdocker_pull.py [registry/][repository/]image[:tag|@digest]\n') + print('Examples:') + print('\tdocker_pull.py ubuntu:latest') + print('\tdocker_pull.py ghcr.io/oras-project/oras:v1.1.0') + print('\tdocker_pull.py alpine@sha256:f271e74b17ced29b915d351685fd4644785c6d1559dd1f2d519b152337e72849') + exit(1) + +# --- Image Name Parsing (Revised for better compatibility) --- +full_image_name = sys.argv[1] +print(f"[*] Parsing image: {full_image_name}") + +# Default registry +registry = 'registry-1.docker.io' +repo_and_tag = full_image_name + +# Split registry from the rest of the name +parts = full_image_name.split('/') +if len(parts) > 1 and ('.' in parts[0] or ':' in parts[0]): + registry = parts[0] + repo_and_tag = '/'.join(parts[1:]) + +# For official Docker Hub images, prepend 'library/' if no org is specified +if registry == 'registry-1.docker.io' and '/' not in repo_and_tag: + repository_with_tag = f"library/{repo_and_tag}" +else: + repository_with_tag = repo_and_tag + +# Split repository from tag or digest +if '@' in repository_with_tag: + repository, tag = repository_with_tag.split('@') + # The tag is now the digest + tag = f"sha256:{tag.split(':')[-1]}" +elif ':' in repository_with_tag.rsplit('/', 1)[-1]: # Check for tag only in the last component + repository, tag = repository_with_tag.rsplit(':', 1) +else: + repository = repository_with_tag + tag = 'latest' + +print(f" - Registry: {registry}") +print(f" - Repository: {repository}") +print(f" - Tag/Digest: {tag}") + +# --- Authentication (Revised for GHCR.io and others) --- +auth_url = 'https://auth.docker.io/token' +reg_service = 'registry.docker.io' +is_auth_required = False + +# Ping the v2 endpoint to check for auth requirements +try: + print(f"[*] Checking for auth requirements at https://{registry}/v2/") + resp = requests.get(f'https://{registry}/v2/', verify=False, timeout=15) + + # If we get a 401, it means we need to authenticate. + if resp.status_code == 401: + is_auth_required = True + # The WWW-Authenticate header is crucial for getting the token endpoint. + auth_header = resp.headers.get('WWW-Authenticate') + if not auth_header: + print("[-] Registry returned 401 Unauthorized, but did not provide a WWW-Authenticate header.") + print(resp.text) + exit(1) + + print(f" - Authentication required. Parsing WWW-Authenticate header.") + + # Use regex to robustly parse the WWW-Authenticate header + realm_match = re.search('realm="([^"]+)"', auth_header, re.IGNORECASE) + if realm_match: + auth_url = realm_match.group(1) + + service_match = re.search('service="([^"]+)"', auth_header, re.IGNORECASE) + if service_match: + reg_service = service_match.group(1) + else: + # Fallback to using the registry name as the service if not specified + reg_service = registry + + print(f" - Auth URL: {auth_url}") + print(f" - Service: {reg_service}") + + # For other non-200 codes, raise an error. + elif resp.status_code != 200: + resp.raise_for_status() + + else: + print(" - No authentication required.") + +except requests.exceptions.RequestException as e: + # This will now catch connection errors, timeouts, and errors raised by raise_for_status() + print(f"[-] Error connecting to registry {registry}: {e}") + exit(1) + +def get_auth_head(media_type): + """Gets the Authorization header, if required.""" + if not is_auth_required: + # No authentication needed, just return the Accept header + return {'Accept': media_type} + + # Otherwise, get an access token + print(f"[*] Requesting token from {auth_url} for scope repository:{repository}:pull") + try: + auth_resp = requests.get( + f'{auth_url}?service={reg_service}&scope=repository:{repository}:pull', + verify=False + ) + auth_resp.raise_for_status() + access_token = auth_resp.json()['token'] + return { + 'Authorization': f'Bearer {access_token}', + 'Accept': media_type + } + except requests.exceptions.RequestException as e: + print(f"[-] Failed to get authentication token: {e}") + if e.response: + print(f" Response: {e.response.text}") + exit(1) + + +# --- Docker Style Progress Bar --- +def progress_bar(digest_short, nb_traits): + """Displays a simple progress bar.""" + sys.stdout.write(f'\r{digest_short}: Downloading [') + progress = '=' * nb_traits + if nb_traits < 50: + progress += '>' + sys.stdout.write(f'{progress:<50}]') + sys.stdout.flush() + +# --- Fetch Manifest --- +print(f"[*] Fetching manifest for {repository}:{tag}") + +# List of manifest media types to try. OCI and Docker formats are included. +manifest_media_types = [ + 'application/vnd.oci.image.index.v1+json', + 'application/vnd.docker.distribution.manifest.list.v2+json', + 'application/vnd.docker.distribution.manifest.v2+json', +] + +manifest_data = None +resp = None +manifest_tag = tag +if '@' in full_image_name: + manifest_tag = full_image_name.split('@')[-1] + +for media_type in manifest_media_types: + print(f" - Trying to fetch with Accept header: {media_type}") + try: + auth_head = get_auth_head(media_type) + resp = requests.get(f'https://{registry}/v2/{repository}/manifests/{manifest_tag}', headers=auth_head, verify=False) + + if resp.status_code == 200: + manifest_data = resp.json() + print(f" - Successfully fetched manifest with Content-Type: {resp.headers.get('Content-Type')}") + break # Success, exit the loop + else: + print(f" - Received HTTP {resp.status_code}. Trying next media type.") + + except requests.exceptions.RequestException as e: + print(f"[-] An exception occurred while fetching manifest: {e}") + continue + +if not manifest_data: + print(f"[-] Failed to fetch manifest for {repository}:{tag} after trying all available media types.") + if resp: + print(f" Last response (HTTP {resp.status_code}): {resp.text}") + exit(1) + +# Check if we received a manifest list or an OCI index (they both have a 'manifests' key) +if 'manifests' in manifest_data: + print('[+] Manifests list/index found. Checking for a suitable architecture (defaulting to linux/amd64)...') + selected_digest = None + for manifest in manifest_data['manifests']: + platform = manifest.get("platform", {}) + arch = platform.get("architecture") + os_type = platform.get("os") + + if arch == 'amd64' and os_type == 'linux': + selected_digest = manifest.get("digest") + if selected_digest: + print(f" - Found linux/amd64. Using digest: {selected_digest}") + break + + if selected_digest: + # A suitable digest was found, now fetch the actual image manifest using that digest + print(f"[*] Re-fetching manifest for the selected architecture using digest...") + + image_manifest_media_types = [ + 'application/vnd.docker.distribution.manifest.v2+json', + 'application/vnd.oci.image.manifest.v1+json' + ] + + new_manifest_data = None + for media_type in image_manifest_media_types: + try: + auth_head = get_auth_head(media_type) + resp = requests.get(f'https://{registry}/v2/{repository}/manifests/{selected_digest}', headers=auth_head, verify=False) + if resp.status_code == 200: + new_manifest_data = resp.json() + print(f" - Successfully fetched image manifest for linux/amd64.") + break + except requests.exceptions.RequestException as e: + print(f"[-] An exception occurred while re-fetching manifest: {e}") + continue + + if new_manifest_data: + manifest_data = new_manifest_data + else: + print(f"[-] Failed to fetch manifest for digest {selected_digest}.") + exit(1) + + else: + # Original behavior: if no suitable arch is found, print list and exit + print('[!] Could not find a default (linux/amd64) architecture. Please pull a specific one using the @digest format:') + for manifest in manifest_data['manifests']: + platform_info = ', '.join([f'{k}: {v}' for k, v in manifest.get("platform", {}).items()]) + digest = manifest.get("digest", "N/A") + print(f' - {platform_info}, digest: {digest}') + exit(1) + + +# If we are here, it should be a single image manifest +layers = manifest_data.get('layers') +if not layers: + print("[-] The fetched manifest does not contain a 'layers' array.") + print(json.dumps(manifest_data, indent=2)) + exit(1) + +# --- Create Image Structure --- +img_name_for_dir = repository.split('/')[-1] +tag_for_dir = tag.replace(':', '_').replace('@', '_sha256_') +imgdir = f'tmp_{img_name_for_dir}_{tag_for_dir}' + +if os.path.exists(imgdir): + shutil.rmtree(imgdir) +os.mkdir(imgdir) +print(f"[*] Creating image structure in: {imgdir}") + +# Download config file +config_digest = manifest_data['config']['digest'] +print(f"[*] Downloading config {config_digest[7:19]}...") +auth_head = get_auth_head('application/octet-stream') # Re-auth in case token expired +confresp = requests.get(f'https://{registry}/v2/{repository}/blobs/{config_digest}', headers=auth_head, verify=False) +confresp.raise_for_status() +with open(f'{imgdir}/{config_digest[7:]}.json', 'wb') as f: + f.write(confresp.content) + +# Prepare manifest.json content +repo_tag_entry = sys.argv[1] +if '@' not in repo_tag_entry and ':' not in repo_tag_entry.split('/')[-1]: + repo_tag_entry += f":{tag}" + +content = [{ + 'Config': f'{config_digest[7:]}.json', + 'RepoTags': [repo_tag_entry], + 'Layers': [] +}] + +# --- Download and Build Layers --- +parentid = '' +for i, layer in enumerate(layers): + ublob = layer['digest'] + digest_short = ublob[7:19] + + # Create a fake layer ID. Docker's actual ID generation is more complex, + # but this is sufficient for 'docker load' to work. + layer_json_content = f'{parentid}\n{ublob}\n' + fake_layerid = hashlib.sha256(layer_json_content.encode('utf-8')).hexdigest() + layerdir = f'{imgdir}/{fake_layerid}' + os.mkdir(layerdir) + + # VERSION file + with open(f'{layerdir}/VERSION', 'w') as f: + f.write('1.0') + + # Download layer tarball with progress + sys.stdout.write(f"{digest_short}: Downloading...") + sys.stdout.flush() + + # Refresh token before each download to avoid expiration on large layers + auth_head = get_auth_head('application/octet-stream') + + # Attempt to download from the primary blob URL + bresp = requests.get(f'https://{registry}/v2/{repository}/blobs/{ublob}', headers=auth_head, stream=True, verify=False) + + # Some registries (e.g., ECR) may provide a redirect URL + if bresp.status_code != 200 and 'urls' in layer: + print(f"\r{digest_short}: Following redirect...", " "*50) + bresp = requests.get(layer['urls'][0], headers=auth_head, stream=True, verify=False) + + if bresp.status_code != 200: + print(f'\rERROR: Cannot download layer {digest_short} [HTTP {bresp.status_code}]') + print(bresp.text) + exit(1) + + # Stream download with progress bar + bresp.raise_for_status() + total_size = int(bresp.headers.get('Content-Length', 0)) + chunk_size = 8192 + downloaded = 0 + + layer_gzip_path = f"{layerdir}/layer_gzip.tar" + with open(layer_gzip_path, "wb") as f: + for chunk in bresp.iter_content(chunk_size=chunk_size): + if chunk: + f.write(chunk) + downloaded += len(chunk) + if total_size > 0: + progress_traits = int((downloaded / total_size) * 50) + progress_bar(digest_short, progress_traits) + + # Decompress layer + sys.stdout.write(f"\r{digest_short}: Extracting...{' '*50}") + sys.stdout.flush() + layer_tar_path = f"{layerdir}/layer.tar" + with gzip.open(layer_gzip_path, 'rb') as f_in, open(layer_tar_path, 'wb') as f_out: + shutil.copyfileobj(f_in, f_out) + os.remove(layer_gzip_path) + + size_mb = os.path.getsize(layer_tar_path) / (1024 * 1024) + print(f"\r{digest_short}: Pull complete [{size_mb:.2f} MB]") + content[0]['Layers'].append(f'{fake_layerid}/layer.tar') + + # Create layer json file + json_obj = {} + json_obj['id'] = fake_layerid + if parentid: + json_obj['parent'] = parentid + + # The last layer's JSON is derived from the main config file + if i == len(layers) - 1: + config_json = json.loads(confresp.content) + # Copy relevant fields from main config to layer config + for key in ['created', 'author', 'architecture', 'os', 'config', 'container_config']: + if key in config_json: + json_obj[key] = config_json[key] + else: + # Intermediate layers have minimal JSON + json_obj['created'] = "1970-01-01T00:00:00Z" + + with open(f'{layerdir}/json', 'w') as f: + f.write(json.dumps(json_obj)) + + parentid = fake_layerid + +# --- Finalize Image Files --- +# manifest.json +with open(f'{imgdir}/manifest.json', 'w') as f: + f.write(json.dumps(content)) + +# repositories file +final_tag = tag +if '@' in repo_tag_entry: + final_tag = repo_tag_entry.split('@')[-1] +elif ':' in repo_tag_entry: + final_tag = repo_tag_entry.split(':')[-1] + +repo_content = {repository: {final_tag: fake_layerid}} +with open(f'{imgdir}/repositories', 'w') as f: + f.write(json.dumps(repo_content)) + +# --- Create Final Tarball and Cleanup --- +docker_tar = repository.replace('/', '_') + f"_{tag_for_dir}.tar" +print(f"[*] Creating final archive: {docker_tar}") +sys.stdout.flush() +with tarfile.open(docker_tar, "w") as tar: + tar.add(imgdir, arcname=os.path.sep) + +shutil.rmtree(imgdir) +print(f'\n[+] Success! Docker image pulled to: {docker_tar}') +print(f" Load it using: docker load -i {docker_tar}")