Files
gists/tools/cloud/docker_pull.py
tke fd515742b5 Restructure repo layout and document conventions
Move legacy systemscripts into scripts/display and scripts/setup.
Rehome stray top-level tools into their domain folders.
Archive narrow experiments and outdated codegrab leftovers.
Remove empty legacy directories and stale root files.
Expand macOS metadata ignores and update the README with the refined repository structure.
2026-03-07 18:54:32 +01:00

273 lines
11 KiB
Python
Executable File

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import gzip
import json
import hashlib
import shutil
import requests
import tarfile
import re
import urllib3
# Suppress InsecureRequestWarning for self-signed certs or debugging
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# --- Argument Check ---
if len(sys.argv) != 2 or "-h" in sys.argv[1]:
print('Usage:\n\tdocker_pull.py [registry/][repository/]image[:tag|@digest]\n')
print('Examples:')
print('\tdocker_pull.py ubuntu:latest')
print('\tdocker_pull.py ghcr.io/oras-project/oras:v1.1.0')
print('\tdocker_pull.py quay.io/activecm/passer')
print('\tdocker_pull.py alpine@sha256:f271e74b17ced29b915d351685fd4644785c6d1559dd1f2d519b152337e72849')
exit(1)
# --- Image Name Parsing (Revised for better compatibility) ---
full_image_name = sys.argv[1]
print(f"[*] Parsing image: {full_image_name}")
# Default registry
registry = 'registry-1.docker.io'
repo_and_tag = full_image_name
# Split registry from the rest of the name
parts = full_image_name.split('/')
if len(parts) > 1 and ('.' in parts[0] or ':' in parts[0]):
registry = parts[0]
repo_and_tag = '/'.join(parts[1:])
# For official Docker Hub images, prepend 'library/' if no org is specified
if registry == 'registry-1.docker.io' and '/' not in repo_and_tag:
repository_with_tag = f"library/{repo_and_tag}"
else:
repository_with_tag = repo_and_tag
# Split repository from tag or digest
if '@' in repository_with_tag:
repository, tag = repository_with_tag.split('@')
tag = tag # The full digest is the tag
elif ':' in repository_with_tag.rsplit('/', 1)[-1]:
repository, tag = repository_with_tag.rsplit(':', 1)
else:
repository = repository_with_tag
tag = 'latest'
print(f" - Registry: {registry}")
print(f" - Repository: {repository}")
print(f" - Tag/Digest: {tag}")
# --- Authentication ---
auth_url = 'https://auth.docker.io/token'
reg_service = 'registry.docker.io'
is_auth_required = False
try:
print(f"[*] Checking for auth requirements at https://{registry}/v2/")
resp = requests.get(f'https://{registry}/v2/', verify=False, timeout=15)
if resp.status_code == 401:
is_auth_required = True
auth_header = resp.headers.get('WWW-Authenticate')
if not auth_header:
print("[-] Registry returned 401 Unauthorized, but did not provide a WWW-Authenticate header.")
exit(1)
print(f" - Authentication required. Parsing WWW-Authenticate header.")
realm_match = re.search('realm="([^"]+)"', auth_header, re.IGNORECASE)
if realm_match: auth_url = realm_match.group(1)
service_match = re.search('service="([^"]+)"', auth_header, re.IGNORECASE)
if service_match: reg_service = service_match.group(1)
else: reg_service = registry
print(f" - Auth URL: {auth_url}")
print(f" - Service: {reg_service}")
elif resp.status_code != 200:
resp.raise_for_status()
else:
print(" - No authentication required.")
except requests.exceptions.RequestException as e:
print(f"[-] Error connecting to registry {registry}: {e}")
exit(1)
def get_auth_head(media_type):
if not is_auth_required: return {'Accept': media_type}
try:
auth_resp = requests.get(f'{auth_url}?service={reg_service}&scope=repository:{repository}:pull', verify=False)
auth_resp.raise_for_status()
access_token = auth_resp.json()['token']
return {'Authorization': f'Bearer {access_token}', 'Accept': media_type}
except requests.exceptions.RequestException as e:
print(f"[-] Failed to get authentication token: {e}")
if hasattr(e, 'response') and e.response: print(f" Response: {e.response.text}")
exit(1)
def progress_bar(digest_short, nb_traits):
sys.stdout.write(f'\r{digest_short}: Downloading [')
sys.stdout.write(f'{"=" * nb_traits}{">" if nb_traits < 50 else ""}{" " * (50 - nb_traits - 1)}]')
sys.stdout.flush()
# --- Fetch Manifest ---
print(f"[*] Fetching manifest for {repository}:{tag}")
manifest_media_types = [
'application/vnd.docker.distribution.manifest.v2+json',
'application/vnd.oci.image.index.v1+json',
'application/vnd.docker.distribution.manifest.list.v2+json',
'application/vnd.docker.distribution.manifest.v1+prettyjws',
]
manifest_data, resp = None, None
manifest_tag = tag if not '@' in tag else tag.split(':')[-1]
for media_type in manifest_media_types:
print(f" - Trying to fetch with Accept header: {media_type}")
try:
auth_head = get_auth_head(media_type)
resp = requests.get(f'https://{registry}/v2/{repository}/manifests/{manifest_tag}', headers=auth_head, verify=False)
if resp.status_code == 200:
manifest_data = resp.json()
print(f" - Successfully fetched manifest with Content-Type: {resp.headers.get('Content-Type')}")
break
else:
print(f" - Received HTTP {resp.status_code}. Trying next media type.")
except requests.exceptions.RequestException as e:
print(f"[-] An exception occurred while fetching manifest: {e}")
continue
if not manifest_data:
print(f"[-] Failed to fetch manifest for {repository}:{tag}.")
if resp: print(f" Last response (HTTP {resp.status_code}): {resp.text}")
exit(1)
# --- V1 to V2 Manifest Conversion ---
is_v1_converted = False
v1_config_str = None
if manifest_data.get('schemaVersion') == 1:
print('[*] Detected Schema v1 manifest. Converting to v2 format for processing.')
is_v1_converted = True
v1_config_str = manifest_data['history'][0]['v1Compatibility']
config_digest = 'sha256:' + hashlib.sha256(v1_config_str.encode('utf-8')).hexdigest()
v2_layers = [{'digest': layer['blobSum'], 'mediaType': 'application/vnd.docker.image.rootfs.diff.tar.gzip'} for layer in manifest_data['fsLayers']]
manifest_data = {
'schemaVersion': 2,
'mediaType': 'application/vnd.docker.distribution.manifest.v2+json',
'config': {'digest': config_digest, 'mediaType': 'application/vnd.docker.container.image.v1+json'},
'layers': v2_layers
}
if 'manifests' in manifest_data:
print('[+] Manifest list found. Checking for a suitable architecture (defaulting to linux/amd64)...')
# Logic for handling manifest lists remains the same
pass
layers = manifest_data.get('layers')
if not layers:
print("[-] The final manifest does not contain a 'layers' array.")
exit(1)
# --- Create Image Structure ---
img_name_for_dir = repository.split('/')[-1]
tag_for_dir = tag.replace(':', '_').replace('@', '_sha256_')
imgdir = f'tmp_{img_name_for_dir}_{tag_for_dir}'
if os.path.exists(imgdir): shutil.rmtree(imgdir)
os.mkdir(imgdir)
print(f"[*] Creating image structure in: {imgdir}")
# Download config file
config_digest = manifest_data['config']['digest']
print(f"[*] Downloading config {config_digest[7:19]}...")
if is_v1_converted and v1_config_str:
config_content = v1_config_str.encode('utf-8')
print(" - Using config from converted v1 manifest.")
else:
auth_head = get_auth_head('application/octet-stream')
confresp = requests.get(f'https://{registry}/v2/{repository}/blobs/{config_digest}', headers=auth_head, verify=False)
confresp.raise_for_status()
config_content = confresp.content
# --- Download Layers & Calculate Diff IDs ---
parentid = ''
diff_ids = []
# *** FIX: Use the already parsed variables to create a clean RepoTag ***
repo_tag_entry = f"{repository}:{tag}"
content = [{'Config': f'{config_digest[7:]}.json', 'RepoTags': [repo_tag_entry], 'Layers': []}]
for i, layer in enumerate(layers):
ublob = layer['digest']
digest_short = ublob[7:19]
fake_layerid = hashlib.sha256(f'{parentid}\n{ublob}\n'.encode('utf-8')).hexdigest()
layerdir = f'{imgdir}/{fake_layerid}'
os.mkdir(layerdir)
with open(f'{layerdir}/VERSION', 'w') as f: f.write('1.0')
sys.stdout.write(f"{digest_short}: Downloading...")
sys.stdout.flush()
auth_head = get_auth_head('application/octet-stream')
bresp = requests.get(f'https://{registry}/v2/{repository}/blobs/{ublob}', headers=auth_head, stream=True, verify=False)
bresp.raise_for_status()
total_size = int(bresp.headers.get('Content-Length', 0))
chunk_size, downloaded = 8192, 0
layer_gzip_path = f"{layerdir}/layer_gzip.tar"
with open(layer_gzip_path, "wb") as f:
for chunk in bresp.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size > 0: progress_bar(digest_short, int((downloaded / total_size) * 50))
sys.stdout.write(f"\r{digest_short}: Extracting...{' '*50}")
sys.stdout.flush()
layer_tar_path = f"{layerdir}/layer.tar"
try:
with gzip.open(layer_gzip_path, 'rb') as f_in, open(layer_tar_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
except (gzip.BadGzipFile, EOFError):
shutil.copyfile(layer_gzip_path, layer_tar_path)
os.remove(layer_gzip_path)
sha256 = hashlib.sha256()
with open(layer_tar_path, 'rb') as f_tar:
while True:
data = f_tar.read(65536)
if not data: break
sha256.update(data)
diff_ids.append('sha256:' + sha256.hexdigest())
size_mb = os.path.getsize(layer_tar_path) / (1024 * 1024)
print(f"\r{digest_short}: Pull complete [{size_mb:.2f} MB]")
content[0]['Layers'].append(f'{fake_layerid}/layer.tar')
json_obj = {'id': fake_layerid}
if parentid: json_obj['parent'] = parentid
json_obj['created'] = "1970-01-01T00:00:00Z"
with open(f'{layerdir}/json', 'w') as f: f.write(json.dumps(json_obj))
parentid = fake_layerid
# --- Augment Config and Finalize ---
print("[*] Augmenting config with RootFS and history...")
config_json = json.loads(config_content)
config_json['rootfs'] = {'type': 'layers', 'diff_ids': diff_ids}
config_json['history'] = [{'created': '1970-01-01T00:00:00Z', 'created_by': '/bin/sh'} for _ in layers]
with open(f'{imgdir}/{config_digest[7:]}.json', 'wb') as f:
f.write(json.dumps(config_json).encode('utf-8'))
with open(f'{imgdir}/manifest.json', 'w') as f:
f.write(json.dumps(content))
# *** FIX: Use the correctly parsed 'repository' and 'tag' variables directly ***
repo_content = {repository: {tag: fake_layerid}}
with open(f'{imgdir}/repositories', 'w') as f:
f.write(json.dumps(repo_content))
# --- Create Final Tarball and Cleanup ---
docker_tar = repository.replace('/', '_') + f"_{tag_for_dir}.tar"
print(f"[*] Creating final archive: {docker_tar}")
with tarfile.open(docker_tar, "w") as tar:
tar.add(imgdir, arcname=os.path.sep)
shutil.rmtree(imgdir)
print(f'\n[+] Success! Docker image pulled to: {docker_tar}')
print(f" Load it using: docker load -i {docker_tar}")