← back to blog

how to script ADB workflows across many cloud phones

May 06, 2026

how to script ADB workflows across many cloud phones

if you operate a fleet of cloud phones (10, 50, 100+), interactive ADB stops scaling around 5 devices. the only sustainable pattern is scripting. ADB is a CLI; CLIs script well in Bash and Python. paired with cloudf.one’s REST API for device locking and lifecycle, you can run install / configure / verify workflows across the entire fleet with a single command. this guide shows the patterns: parallel ADB across many devices, screenshot collection, app install rollouts, and CI-friendly orchestration.

cloud phones change the orchestration calculus because every device is reachable as a network endpoint, not a USB-attached handset. that means scripting is OS-agnostic: a Python script on macOS, Linux, or Windows reaches the same fleet endpoints over TCP.

the basic ADB-over-network primitive

each cloud phone has an ADB endpoint like adb-sg.cloudf.one:5555. the standard ADB CLI accepts this as a serial.

adb connect adb-sg.cloudf.one:5555
adb -s adb-sg.cloudf.one:5555 shell echo "hello"
adb -s adb-sg.cloudf.one:5555 install app.apk
adb -s adb-sg.cloudf.one:5555 shell am start -n com.example/.MainActivity

every script in this guide builds on these primitives.

for context on cloud phone basics, see how to set up ADB on cloudf.one and cloud phones for QA testing teams.

step 1: list your fleet via the API

cloudf.one exposes a REST API for device listing. authenticate with a token, query the fleet:

import requests
import os

token = os.environ["CLOUDFONE_TOKEN"]
resp = requests.get(
    "https://api.cloudf.one/v1/devices",
    headers={"Authorization": f"Bearer {token}"},
)
devices = resp.json()["devices"]
for d in devices:
    print(d["id"], d["adb_endpoint"], d["status"])

filter by tag (e.g. tag=ci-pool) or status (status=available).

step 2: connect to many devices at once

connect ADB to every device in the fleet:

import subprocess

for d in devices:
    subprocess.run(["adb", "connect", d["adb_endpoint"]], check=True)

result = subprocess.run(["adb", "devices"], capture_output=True, text=True)
print(result.stdout)

this prints all reachable devices.

step 3: run the same command on every device

a parallelized ADB shell command across the fleet:

import subprocess
import concurrent.futures

def run_on_device(adb_endpoint, cmd):
    result = subprocess.run(
        ["adb", "-s", adb_endpoint, "shell"] + cmd,
        capture_output=True, text=True, timeout=30,
    )
    return adb_endpoint, result.stdout.strip(), result.returncode

cmd = ["getprop", "ro.product.model"]
endpoints = [d["adb_endpoint"] for d in devices]

with concurrent.futures.ThreadPoolExecutor(max_workers=20) as ex:
    futures = [ex.submit(run_on_device, ep, cmd) for ep in endpoints]
    for f in concurrent.futures.as_completed(futures):
        endpoint, output, rc = f.result()
        print(f"{endpoint}: {output}")

this fans out the same command to every device in parallel, with up to 20 concurrent ADB sessions.

step 4: install an APK across the fleet

rolling an APK to many devices:

def install_apk(adb_endpoint, apk_path):
    result = subprocess.run(
        ["adb", "-s", adb_endpoint, "install", "-r", apk_path],
        capture_output=True, text=True, timeout=120,
    )
    return adb_endpoint, "Success" in result.stdout, result.stdout

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as ex:
    futures = [ex.submit(install_apk, ep, "app-debug.apk") for ep in endpoints]
    for f in concurrent.futures.as_completed(futures):
        endpoint, ok, output = f.result()
        status = "OK" if ok else "FAIL"
        print(f"{status} {endpoint}")

useful for rolling new builds to a CI pool, refreshing test apps, or seeding a multi-account workflow.

step 5: collect screenshots from the fleet

debugging “why is account 47 different from account 46” usually starts with a screenshot.

import os
os.makedirs("screenshots", exist_ok=True)

def screenshot(adb_endpoint):
    name = adb_endpoint.replace(":", "_").replace(".", "_")
    subprocess.run(
        ["adb", "-s", adb_endpoint, "shell", "screencap", "-p", "/sdcard/sc.png"],
        check=True,
    )
    subprocess.run(
        ["adb", "-s", adb_endpoint, "pull", "/sdcard/sc.png", f"screenshots/{name}.png"],
        check=True,
    )

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as ex:
    list(ex.map(screenshot, endpoints))

print(f"saved {len(endpoints)} screenshots")

now you can open screenshots/ and visually scan the fleet state.

step 6: orchestrate longer workflows

real workflows are not single commands. here is an install / launch / wait / screenshot / uninstall flow:

def workflow(adb_endpoint):
    e = adb_endpoint
    subprocess.run(["adb", "-s", e, "install", "-r", "app.apk"], check=True)
    subprocess.run(["adb", "-s", e, "shell", "am", "start",
                    "-n", "com.example/.MainActivity"], check=True)
    import time; time.sleep(5)
    subprocess.run(["adb", "-s", e, "shell", "screencap", "-p", "/sdcard/sc.png"],
                   check=True)
    name = e.replace(":", "_")
    subprocess.run(["adb", "-s", e, "pull", "/sdcard/sc.png", f"sc/{name}.png"],
                   check=True)
    subprocess.run(["adb", "-s", e, "uninstall", "com.example"], check=True)
    return e

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as ex:
    list(ex.map(workflow, endpoints))

this is a usable smoke test across the whole fleet.

step 7: handle failures and retries

ADB-over-network sometimes drops. wrap commands with retry:

import time

def adb_with_retry(adb_endpoint, args, retries=3):
    for attempt in range(retries):
        try:
            result = subprocess.run(
                ["adb", "-s", adb_endpoint] + args,
                capture_output=True, text=True, timeout=60, check=True,
            )
            return result.stdout
        except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
            if attempt == retries - 1:
                raise
            # reconnect
            subprocess.run(["adb", "connect", adb_endpoint])
            time.sleep(2)

with 100 devices in a fleet, 1 to 2 percent transient failures per command is normal. retry with reconnect handles most.

step 8: lock / unlock for CI

for CI, lock devices before use and unlock after. cloudf.one’s API exposes lock / unlock primitives:

def lock_device(tag, duration_min=30):
    resp = requests.post(
        "https://api.cloudf.one/v1/devices/lock",
        headers={"Authorization": f"Bearer {token}"},
        json={"tag": tag, "duration_minutes": duration_min},
    )
    return resp.json()["device_id"], resp.json()["adb_endpoint"]

def unlock_device(device_id):
    requests.post(
        f"https://api.cloudf.one/v1/devices/{device_id}/unlock",
        headers={"Authorization": f"Bearer {token}"},
    )

# CI workflow
device_id, endpoint = lock_device("ci-pool", 30)
try:
    subprocess.run(["adb", "connect", endpoint], check=True)
    # run tests
    subprocess.run(["pytest", "-v", "--device", endpoint])
finally:
    unlock_device(device_id)

the CI/CD integration guide covers the GitHub Actions wrapper.

step 9: scheduled fleet maintenance

a daily script that reboots phones, clears app caches, and reports health:

import logging
logging.basicConfig(level=logging.INFO)

def maintain(adb_endpoint):
    e = adb_endpoint
    try:
        # clear cached package data
        subprocess.run(["adb", "-s", e, "shell", "pm", "trim-caches", "1G"],
                       timeout=30, check=False)
        # check uptime
        out = subprocess.check_output(
            ["adb", "-s", e, "shell", "uptime"], text=True)
        # reboot if uptime > 7 days
        if "days" in out and int(out.split("days")[0].split()[-1]) > 7:
            subprocess.run(["adb", "-s", e, "reboot"], timeout=10, check=True)
            logging.info(f"rebooted {e}")
        else:
            logging.info(f"healthy {e}: {out.strip()}")
    except Exception as ex:
        logging.error(f"failed {e}: {ex}")

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as ex:
    list(ex.map(maintain, endpoints))

run this from cron at 03:00 daily. fleet self-maintains.

common pitfalls

three issues catch teams new to ADB scripting at scale.

first, ADB server contention. one ADB server process talks to all devices. running 50 parallel commands sometimes overflows the server’s internal queue. cap concurrency at 20 to 30.

second, network flakiness. cloud phone ADB endpoints drop occasionally. always have a reconnect-on-failure wrapper.

third, command timeouts. always set a timeout on every subprocess call. without a timeout, a hung ADB call blocks the whole script.

if you need more on the device side, our scrcpy integration post covers screen-mirroring on top of ADB scripting.

try fleet ADB scripting on real Singapore cloud phones

register for a free trial for one cloud phone. write a 20-line Python script that connects, takes a screenshot, pulls it back. once confirmed, scale to a paid plan with multiple devices and adapt the parallel patterns above.

frequently asked questions

what’s the maximum parallel ADB session count?

ADB’s default server handles 16 devices well, 32 with some lag, more than 64 starts queuing. for fleets over 50 devices, run multiple ADB server processes on different ports.

can I script ADB from inside a Docker container?

yes. install android-tools-adb in your image and pass the cloudf.one token as an env var. the container reaches cloud phones over the public ADB endpoints.

how do I detect dead devices in the fleet?

adb devices shows offline devices. check status before each command. healthy = “device”, broken = “offline” or “unauthorized.”

does ADB scripting work on Windows?

yes. ADB CLI is cross-platform. PowerShell and Python work the same on Windows for fleet automation.

how do I rate-limit installs across the fleet?

throttle by setting max_workers in your ThreadPoolExecutor. install rollouts at 5 to 10 parallel keep network and Play Store load reasonable. consult the Android Developer ADB docs for the canonical command reference.