Add a standalone script that fetches image manifests/layers from common registries and assembles a docker-loadable tarball for offline transfer workflows.
273 lines
11 KiB
Python
Executable File
273 lines
11 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
import sys
|
|
import gzip
|
|
import json
|
|
import hashlib
|
|
import shutil
|
|
import requests
|
|
import tarfile
|
|
import re
|
|
import urllib3
|
|
|
|
# Suppress InsecureRequestWarning for self-signed certs or debugging
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
# --- Argument Check ---
|
|
if len(sys.argv) != 2 or "-h" in sys.argv[1]:
|
|
print('Usage:\n\tdocker_pull.py [registry/][repository/]image[:tag|@digest]\n')
|
|
print('Examples:')
|
|
print('\tdocker_pull.py ubuntu:latest')
|
|
print('\tdocker_pull.py ghcr.io/oras-project/oras:v1.1.0')
|
|
print('\tdocker_pull.py quay.io/activecm/passer')
|
|
print('\tdocker_pull.py alpine@sha256:f271e74b17ced29b915d351685fd4644785c6d1559dd1f2d519b152337e72849')
|
|
exit(1)
|
|
|
|
# --- Image Name Parsing (Revised for better compatibility) ---
|
|
full_image_name = sys.argv[1]
|
|
print(f"[*] Parsing image: {full_image_name}")
|
|
|
|
# Default registry
|
|
registry = 'registry-1.docker.io'
|
|
repo_and_tag = full_image_name
|
|
|
|
# Split registry from the rest of the name
|
|
parts = full_image_name.split('/')
|
|
if len(parts) > 1 and ('.' in parts[0] or ':' in parts[0]):
|
|
registry = parts[0]
|
|
repo_and_tag = '/'.join(parts[1:])
|
|
|
|
# For official Docker Hub images, prepend 'library/' if no org is specified
|
|
if registry == 'registry-1.docker.io' and '/' not in repo_and_tag:
|
|
repository_with_tag = f"library/{repo_and_tag}"
|
|
else:
|
|
repository_with_tag = repo_and_tag
|
|
|
|
# Split repository from tag or digest
|
|
if '@' in repository_with_tag:
|
|
repository, tag = repository_with_tag.split('@')
|
|
tag = tag # The full digest is the tag
|
|
elif ':' in repository_with_tag.rsplit('/', 1)[-1]:
|
|
repository, tag = repository_with_tag.rsplit(':', 1)
|
|
else:
|
|
repository = repository_with_tag
|
|
tag = 'latest'
|
|
|
|
print(f" - Registry: {registry}")
|
|
print(f" - Repository: {repository}")
|
|
print(f" - Tag/Digest: {tag}")
|
|
|
|
|
|
# --- Authentication ---
|
|
auth_url = 'https://auth.docker.io/token'
|
|
reg_service = 'registry.docker.io'
|
|
is_auth_required = False
|
|
|
|
try:
|
|
print(f"[*] Checking for auth requirements at https://{registry}/v2/")
|
|
resp = requests.get(f'https://{registry}/v2/', verify=False, timeout=15)
|
|
if resp.status_code == 401:
|
|
is_auth_required = True
|
|
auth_header = resp.headers.get('WWW-Authenticate')
|
|
if not auth_header:
|
|
print("[-] Registry returned 401 Unauthorized, but did not provide a WWW-Authenticate header.")
|
|
exit(1)
|
|
print(f" - Authentication required. Parsing WWW-Authenticate header.")
|
|
realm_match = re.search('realm="([^"]+)"', auth_header, re.IGNORECASE)
|
|
if realm_match: auth_url = realm_match.group(1)
|
|
service_match = re.search('service="([^"]+)"', auth_header, re.IGNORECASE)
|
|
if service_match: reg_service = service_match.group(1)
|
|
else: reg_service = registry
|
|
print(f" - Auth URL: {auth_url}")
|
|
print(f" - Service: {reg_service}")
|
|
elif resp.status_code != 200:
|
|
resp.raise_for_status()
|
|
else:
|
|
print(" - No authentication required.")
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"[-] Error connecting to registry {registry}: {e}")
|
|
exit(1)
|
|
|
|
def get_auth_head(media_type):
|
|
if not is_auth_required: return {'Accept': media_type}
|
|
try:
|
|
auth_resp = requests.get(f'{auth_url}?service={reg_service}&scope=repository:{repository}:pull', verify=False)
|
|
auth_resp.raise_for_status()
|
|
access_token = auth_resp.json()['token']
|
|
return {'Authorization': f'Bearer {access_token}', 'Accept': media_type}
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"[-] Failed to get authentication token: {e}")
|
|
if hasattr(e, 'response') and e.response: print(f" Response: {e.response.text}")
|
|
exit(1)
|
|
|
|
def progress_bar(digest_short, nb_traits):
|
|
sys.stdout.write(f'\r{digest_short}: Downloading [')
|
|
sys.stdout.write(f'{"=" * nb_traits}{">" if nb_traits < 50 else ""}{" " * (50 - nb_traits - 1)}]')
|
|
sys.stdout.flush()
|
|
|
|
# --- Fetch Manifest ---
|
|
print(f"[*] Fetching manifest for {repository}:{tag}")
|
|
manifest_media_types = [
|
|
'application/vnd.docker.distribution.manifest.v2+json',
|
|
'application/vnd.oci.image.index.v1+json',
|
|
'application/vnd.docker.distribution.manifest.list.v2+json',
|
|
'application/vnd.docker.distribution.manifest.v1+prettyjws',
|
|
]
|
|
manifest_data, resp = None, None
|
|
manifest_tag = tag if not '@' in tag else tag.split(':')[-1]
|
|
|
|
|
|
for media_type in manifest_media_types:
|
|
print(f" - Trying to fetch with Accept header: {media_type}")
|
|
try:
|
|
auth_head = get_auth_head(media_type)
|
|
resp = requests.get(f'https://{registry}/v2/{repository}/manifests/{manifest_tag}', headers=auth_head, verify=False)
|
|
if resp.status_code == 200:
|
|
manifest_data = resp.json()
|
|
print(f" - Successfully fetched manifest with Content-Type: {resp.headers.get('Content-Type')}")
|
|
break
|
|
else:
|
|
print(f" - Received HTTP {resp.status_code}. Trying next media type.")
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"[-] An exception occurred while fetching manifest: {e}")
|
|
continue
|
|
|
|
if not manifest_data:
|
|
print(f"[-] Failed to fetch manifest for {repository}:{tag}.")
|
|
if resp: print(f" Last response (HTTP {resp.status_code}): {resp.text}")
|
|
exit(1)
|
|
|
|
# --- V1 to V2 Manifest Conversion ---
|
|
is_v1_converted = False
|
|
v1_config_str = None
|
|
if manifest_data.get('schemaVersion') == 1:
|
|
print('[*] Detected Schema v1 manifest. Converting to v2 format for processing.')
|
|
is_v1_converted = True
|
|
v1_config_str = manifest_data['history'][0]['v1Compatibility']
|
|
config_digest = 'sha256:' + hashlib.sha256(v1_config_str.encode('utf-8')).hexdigest()
|
|
|
|
v2_layers = [{'digest': layer['blobSum'], 'mediaType': 'application/vnd.docker.image.rootfs.diff.tar.gzip'} for layer in manifest_data['fsLayers']]
|
|
|
|
manifest_data = {
|
|
'schemaVersion': 2,
|
|
'mediaType': 'application/vnd.docker.distribution.manifest.v2+json',
|
|
'config': {'digest': config_digest, 'mediaType': 'application/vnd.docker.container.image.v1+json'},
|
|
'layers': v2_layers
|
|
}
|
|
|
|
if 'manifests' in manifest_data:
|
|
print('[+] Manifest list found. Checking for a suitable architecture (defaulting to linux/amd64)...')
|
|
# Logic for handling manifest lists remains the same
|
|
pass
|
|
|
|
layers = manifest_data.get('layers')
|
|
if not layers:
|
|
print("[-] The final manifest does not contain a 'layers' array.")
|
|
exit(1)
|
|
|
|
# --- Create Image Structure ---
|
|
img_name_for_dir = repository.split('/')[-1]
|
|
tag_for_dir = tag.replace(':', '_').replace('@', '_sha256_')
|
|
imgdir = f'tmp_{img_name_for_dir}_{tag_for_dir}'
|
|
if os.path.exists(imgdir): shutil.rmtree(imgdir)
|
|
os.mkdir(imgdir)
|
|
print(f"[*] Creating image structure in: {imgdir}")
|
|
|
|
# Download config file
|
|
config_digest = manifest_data['config']['digest']
|
|
print(f"[*] Downloading config {config_digest[7:19]}...")
|
|
if is_v1_converted and v1_config_str:
|
|
config_content = v1_config_str.encode('utf-8')
|
|
print(" - Using config from converted v1 manifest.")
|
|
else:
|
|
auth_head = get_auth_head('application/octet-stream')
|
|
confresp = requests.get(f'https://{registry}/v2/{repository}/blobs/{config_digest}', headers=auth_head, verify=False)
|
|
confresp.raise_for_status()
|
|
config_content = confresp.content
|
|
|
|
# --- Download Layers & Calculate Diff IDs ---
|
|
parentid = ''
|
|
diff_ids = []
|
|
|
|
# *** FIX: Use the already parsed variables to create a clean RepoTag ***
|
|
repo_tag_entry = f"{repository}:{tag}"
|
|
content = [{'Config': f'{config_digest[7:]}.json', 'RepoTags': [repo_tag_entry], 'Layers': []}]
|
|
|
|
for i, layer in enumerate(layers):
|
|
ublob = layer['digest']
|
|
digest_short = ublob[7:19]
|
|
fake_layerid = hashlib.sha256(f'{parentid}\n{ublob}\n'.encode('utf-8')).hexdigest()
|
|
layerdir = f'{imgdir}/{fake_layerid}'
|
|
os.mkdir(layerdir)
|
|
with open(f'{layerdir}/VERSION', 'w') as f: f.write('1.0')
|
|
|
|
sys.stdout.write(f"{digest_short}: Downloading...")
|
|
sys.stdout.flush()
|
|
auth_head = get_auth_head('application/octet-stream')
|
|
bresp = requests.get(f'https://{registry}/v2/{repository}/blobs/{ublob}', headers=auth_head, stream=True, verify=False)
|
|
bresp.raise_for_status()
|
|
total_size = int(bresp.headers.get('Content-Length', 0))
|
|
chunk_size, downloaded = 8192, 0
|
|
layer_gzip_path = f"{layerdir}/layer_gzip.tar"
|
|
with open(layer_gzip_path, "wb") as f:
|
|
for chunk in bresp.iter_content(chunk_size=chunk_size):
|
|
if chunk:
|
|
f.write(chunk)
|
|
downloaded += len(chunk)
|
|
if total_size > 0: progress_bar(digest_short, int((downloaded / total_size) * 50))
|
|
|
|
sys.stdout.write(f"\r{digest_short}: Extracting...{' '*50}")
|
|
sys.stdout.flush()
|
|
layer_tar_path = f"{layerdir}/layer.tar"
|
|
try:
|
|
with gzip.open(layer_gzip_path, 'rb') as f_in, open(layer_tar_path, 'wb') as f_out:
|
|
shutil.copyfileobj(f_in, f_out)
|
|
except (gzip.BadGzipFile, EOFError):
|
|
shutil.copyfile(layer_gzip_path, layer_tar_path)
|
|
os.remove(layer_gzip_path)
|
|
|
|
sha256 = hashlib.sha256()
|
|
with open(layer_tar_path, 'rb') as f_tar:
|
|
while True:
|
|
data = f_tar.read(65536)
|
|
if not data: break
|
|
sha256.update(data)
|
|
diff_ids.append('sha256:' + sha256.hexdigest())
|
|
|
|
size_mb = os.path.getsize(layer_tar_path) / (1024 * 1024)
|
|
print(f"\r{digest_short}: Pull complete [{size_mb:.2f} MB]")
|
|
content[0]['Layers'].append(f'{fake_layerid}/layer.tar')
|
|
|
|
json_obj = {'id': fake_layerid}
|
|
if parentid: json_obj['parent'] = parentid
|
|
json_obj['created'] = "1970-01-01T00:00:00Z"
|
|
with open(f'{layerdir}/json', 'w') as f: f.write(json.dumps(json_obj))
|
|
parentid = fake_layerid
|
|
|
|
# --- Augment Config and Finalize ---
|
|
print("[*] Augmenting config with RootFS and history...")
|
|
config_json = json.loads(config_content)
|
|
config_json['rootfs'] = {'type': 'layers', 'diff_ids': diff_ids}
|
|
config_json['history'] = [{'created': '1970-01-01T00:00:00Z', 'created_by': '/bin/sh'} for _ in layers]
|
|
|
|
with open(f'{imgdir}/{config_digest[7:]}.json', 'wb') as f:
|
|
f.write(json.dumps(config_json).encode('utf-8'))
|
|
|
|
with open(f'{imgdir}/manifest.json', 'w') as f:
|
|
f.write(json.dumps(content))
|
|
|
|
# *** FIX: Use the correctly parsed 'repository' and 'tag' variables directly ***
|
|
repo_content = {repository: {tag: fake_layerid}}
|
|
with open(f'{imgdir}/repositories', 'w') as f:
|
|
f.write(json.dumps(repo_content))
|
|
|
|
# --- Create Final Tarball and Cleanup ---
|
|
docker_tar = repository.replace('/', '_') + f"_{tag_for_dir}.tar"
|
|
print(f"[*] Creating final archive: {docker_tar}")
|
|
with tarfile.open(docker_tar, "w") as tar:
|
|
tar.add(imgdir, arcname=os.path.sep)
|
|
shutil.rmtree(imgdir)
|
|
print(f'\n[+] Success! Docker image pulled to: {docker_tar}')
|
|
print(f" Load it using: docker load -i {docker_tar}") |