Part 15: kubeadm join — The Workers
"Joining a worker is one command. Joining N workers is N parallel commands plus a coordinator."
Why
Part 14 bootstrapped the control plane. This part adds workers to it. The mechanism is kubeadm join, which takes a control-plane endpoint, a bootstrap token, and a CA cert hash. The token + hash come from the kubeadm init output (parsed by the KubeadmInitSaga and stored in the saga context).
The thesis: KubeadmJoinSaga joins workers in parallel (with a degree-of-parallelism cap), with per-worker compensation, and handles the common failure modes (token expired, network unreachable, kubelet already running).
The shape
[Saga]
public sealed class KubeadmJoinSaga
{
private readonly KubeadmClient _kubeadm;
private readonly IVosBackend _vagrant;
private readonly KubectlClient _kubectl;
private readonly IHomeLabEventBus _events;
private readonly IClock _clock;
[SagaStep(Order = 1, Compensation = nameof(NothingToCompensate))]
public async Task<Result> EnsureTokenIsFresh(KubeadmJoinContext ctx, CancellationToken ct)
{
// Bootstrap tokens expire after 24h. If our token is older, mint a new one.
if (ctx.JoinToken is null || (_clock.UtcNow - ctx.TokenCreatedAt) > TimeSpan.FromHours(20))
{
var newTokenResult = await _vagrant.SshCommandAsync(
ctx.ControlPlaneNodeName,
"sudo kubeadm token create --print-join-command",
ct);
if (newTokenResult.IsFailure) return newTokenResult.Map();
var (token, hash) = KubeadmOutputParser.ParseJoinCommand(newTokenResult.Value.StdOut);
ctx.JoinToken = token;
ctx.CaCertHash = hash;
ctx.TokenCreatedAt = _clock.UtcNow;
}
return Result.Success();
}
[SagaStep(Order = 2, Compensation = nameof(KubeadmResetWorkers))]
public async Task<Result> JoinAllWorkersInParallel(KubeadmJoinContext ctx, CancellationToken ct)
{
var semaphore = new SemaphoreSlim(initialCount: 3); // limit concurrent joins
var tasks = ctx.Workers.Select(async worker =>
{
await semaphore.WaitAsync(ct);
try
{
return await JoinOneWorker(worker, ctx, ct);
}
finally
{
semaphore.Release();
}
});
var results = await Task.WhenAll(tasks);
var failures = results.Where(r => r.IsFailure).ToList();
if (failures.Count > 0)
return Result.Failure($"{failures.Count}/{results.Length} workers failed to join");
return Result.Success();
}
private async Task<Result> JoinOneWorker(WorkerNodeSpec worker, KubeadmJoinContext ctx, CancellationToken ct)
{
await _events.PublishAsync(new ClusterNodeJoinStarted(worker.Name, _clock.UtcNow), ct);
var cmd =
$"sudo kubeadm join {ctx.ControlPlaneEndpoint} " +
$"--token {ctx.JoinToken} " +
$"--discovery-token-ca-cert-hash sha256:{ctx.CaCertHash} " +
$"--node-name {worker.Name}";
var result = await _vagrant.SshCommandAsync(worker.Name, cmd, ct);
if (result.IsFailure)
{
await _events.PublishAsync(new ClusterNodeJoinFailed(worker.Name, result.Errors, _clock.UtcNow), ct);
return result.Map();
}
await _events.PublishAsync(new ClusterNodeJoinCompleted(worker.Name, _clock.UtcNow), ct);
return Result.Success();
}
[SagaStep(Order = 3, Compensation = nameof(NothingToCompensate))]
public async Task<Result> WaitForAllNodesReady(KubeadmJoinContext ctx, CancellationToken ct)
{
for (var i = 0; i < 60; i++)
{
var result = await _kubectl
.WithKubeconfig(ctx.KubeconfigPath)
.GetNodesAsync(output: "json", ct);
if (result.IsSuccess)
{
var nodes = NodeListParser.Parse(result.Value);
var ready = nodes.Where(n => n.Status == "Ready").ToList();
if (ready.Count == ctx.Workers.Count + 1) // workers + control plane
return Result.Success();
}
await Task.Delay(TimeSpan.FromSeconds(5), ct);
}
return Result.Failure("not all nodes became Ready within 5 minutes");
}
public async Task<Result> KubeadmResetWorkers(KubeadmJoinContext ctx, CancellationToken ct)
{
// Reset every worker that was attempted, in parallel
var tasks = ctx.Workers.Select(async worker =>
await _vagrant.SshCommandAsync(worker.Name, "sudo kubeadm reset --force", ct));
await Task.WhenAll(tasks);
return Result.Success();
}
public Task<Result> NothingToCompensate(KubeadmJoinContext ctx, CancellationToken ct)
=> Task.FromResult(Result.Success());
}[Saga]
public sealed class KubeadmJoinSaga
{
private readonly KubeadmClient _kubeadm;
private readonly IVosBackend _vagrant;
private readonly KubectlClient _kubectl;
private readonly IHomeLabEventBus _events;
private readonly IClock _clock;
[SagaStep(Order = 1, Compensation = nameof(NothingToCompensate))]
public async Task<Result> EnsureTokenIsFresh(KubeadmJoinContext ctx, CancellationToken ct)
{
// Bootstrap tokens expire after 24h. If our token is older, mint a new one.
if (ctx.JoinToken is null || (_clock.UtcNow - ctx.TokenCreatedAt) > TimeSpan.FromHours(20))
{
var newTokenResult = await _vagrant.SshCommandAsync(
ctx.ControlPlaneNodeName,
"sudo kubeadm token create --print-join-command",
ct);
if (newTokenResult.IsFailure) return newTokenResult.Map();
var (token, hash) = KubeadmOutputParser.ParseJoinCommand(newTokenResult.Value.StdOut);
ctx.JoinToken = token;
ctx.CaCertHash = hash;
ctx.TokenCreatedAt = _clock.UtcNow;
}
return Result.Success();
}
[SagaStep(Order = 2, Compensation = nameof(KubeadmResetWorkers))]
public async Task<Result> JoinAllWorkersInParallel(KubeadmJoinContext ctx, CancellationToken ct)
{
var semaphore = new SemaphoreSlim(initialCount: 3); // limit concurrent joins
var tasks = ctx.Workers.Select(async worker =>
{
await semaphore.WaitAsync(ct);
try
{
return await JoinOneWorker(worker, ctx, ct);
}
finally
{
semaphore.Release();
}
});
var results = await Task.WhenAll(tasks);
var failures = results.Where(r => r.IsFailure).ToList();
if (failures.Count > 0)
return Result.Failure($"{failures.Count}/{results.Length} workers failed to join");
return Result.Success();
}
private async Task<Result> JoinOneWorker(WorkerNodeSpec worker, KubeadmJoinContext ctx, CancellationToken ct)
{
await _events.PublishAsync(new ClusterNodeJoinStarted(worker.Name, _clock.UtcNow), ct);
var cmd =
$"sudo kubeadm join {ctx.ControlPlaneEndpoint} " +
$"--token {ctx.JoinToken} " +
$"--discovery-token-ca-cert-hash sha256:{ctx.CaCertHash} " +
$"--node-name {worker.Name}";
var result = await _vagrant.SshCommandAsync(worker.Name, cmd, ct);
if (result.IsFailure)
{
await _events.PublishAsync(new ClusterNodeJoinFailed(worker.Name, result.Errors, _clock.UtcNow), ct);
return result.Map();
}
await _events.PublishAsync(new ClusterNodeJoinCompleted(worker.Name, _clock.UtcNow), ct);
return Result.Success();
}
[SagaStep(Order = 3, Compensation = nameof(NothingToCompensate))]
public async Task<Result> WaitForAllNodesReady(KubeadmJoinContext ctx, CancellationToken ct)
{
for (var i = 0; i < 60; i++)
{
var result = await _kubectl
.WithKubeconfig(ctx.KubeconfigPath)
.GetNodesAsync(output: "json", ct);
if (result.IsSuccess)
{
var nodes = NodeListParser.Parse(result.Value);
var ready = nodes.Where(n => n.Status == "Ready").ToList();
if (ready.Count == ctx.Workers.Count + 1) // workers + control plane
return Result.Success();
}
await Task.Delay(TimeSpan.FromSeconds(5), ct);
}
return Result.Failure("not all nodes became Ready within 5 minutes");
}
public async Task<Result> KubeadmResetWorkers(KubeadmJoinContext ctx, CancellationToken ct)
{
// Reset every worker that was attempted, in parallel
var tasks = ctx.Workers.Select(async worker =>
await _vagrant.SshCommandAsync(worker.Name, "sudo kubeadm reset --force", ct));
await Task.WhenAll(tasks);
return Result.Success();
}
public Task<Result> NothingToCompensate(KubeadmJoinContext ctx, CancellationToken ct)
=> Task.FromResult(Result.Success());
}Three steps. The first ensures the bootstrap token is fresh (mint a new one if the original is older than 20 hours). The second joins all workers in parallel with a concurrency cap of 3 (to avoid overwhelming the API server with simultaneous CSR requests). The third waits for every node to report Ready.
The compensation in step 2 (KubeadmResetWorkers) wipes every attempted worker so the next retry can succeed. We do not try to be cute and only reset the failed ones — partial resets create state that the next attempt has to detect, and the time savings are not worth the complexity.
Why parallel join with a cap
One worker takes about 30 seconds to join (download images, start kubelet, register, get scheduled). Three workers in parallel take about 35 seconds (the API server can handle three concurrent CSR requests without breaking a sweat). Eight workers in parallel takes about 40 seconds and hits enough rate limits that you start seeing 429s. The cap of 3 is the sweet spot for HomeLab's use case (typical multi-node topology has three workers; HA has three to five).
The cap is configurable:
k8s:
kubeadm:
join_parallelism: 3k8s:
kubeadm:
join_parallelism: 3For larger clusters (a future plugin doing 50-node bootstraps for some reason), the user bumps the cap. For smaller clusters, the cap does not matter because the worker count is below it.
The token rotation gotcha
kubeadm bootstrap tokens have a default TTL of 24 hours. If the user runs homelab k8s create to bootstrap the control plane, then waits a day before adding a worker via homelab k8s node add, the original token has expired and the join would fail. The saga's first step handles this by checking the token age and minting a new one if needed.
The new token is not persisted to disk. It is held in memory for the duration of the saga and discarded afterwards. Joining a worker tomorrow always mints a fresh token through the same code path. This is intentional: long-lived join tokens are a security risk (anyone with the token can join a node and get a kubelet credential), so we treat them as ephemeral.
The homelab k8s node add verb
[Injectable(ServiceLifetime.Singleton)]
[VerbGroup("k8s")]
public sealed class K8sNodeAddCommand : IHomeLabVerbCommand
{
private readonly IMediator _mediator;
private readonly IHomeLabConsole _console;
public Command Build()
{
var name = new Argument<string>("name", "the new node name");
var memory = new Option<int>("--memory", () => 8192);
var cpus = new Option<int>("--cpus", () => 2);
var cmd = new Command("add", "Add a new worker node to the cluster");
cmd.AddArgument(name);
cmd.AddOption(memory);
cmd.AddOption(cpus);
cmd.SetHandler(async (string n, int m, int c) =>
{
var result = await _mediator.SendAsync(
new K8sNodeAddRequest(n, c, m), CancellationToken.None);
_console.Render(result);
Environment.ExitCode = result.IsSuccess ? 0 : 1;
}, name, memory, cpus);
return cmd;
}
}[Injectable(ServiceLifetime.Singleton)]
[VerbGroup("k8s")]
public sealed class K8sNodeAddCommand : IHomeLabVerbCommand
{
private readonly IMediator _mediator;
private readonly IHomeLabConsole _console;
public Command Build()
{
var name = new Argument<string>("name", "the new node name");
var memory = new Option<int>("--memory", () => 8192);
var cpus = new Option<int>("--cpus", () => 2);
var cmd = new Command("add", "Add a new worker node to the cluster");
cmd.AddArgument(name);
cmd.AddOption(memory);
cmd.AddOption(cpus);
cmd.SetHandler(async (string n, int m, int c) =>
{
var result = await _mediator.SendAsync(
new K8sNodeAddRequest(n, c, m), CancellationToken.None);
_console.Render(result);
Environment.ExitCode = result.IsSuccess ? 0 : 1;
}, name, memory, cpus);
return cmd;
}
}Standard thin shell. The handler in the lib provisions a new Vagrant VM, runs the K8sNodePackerContributor's scripts on it (or assumes the box already has them baked in), and then runs the join saga for that one node.
homelab k8s node add acme-w-4 --cpus 4 --memory 16384 adds a 4-cpu/16-GB worker to the existing acme cluster, takes about 90 seconds end-to-end, and prints the new node's status when it is Ready.
The test
[Fact]
public async Task join_saga_uses_existing_token_when_fresh()
{
var clock = new FakeClock(DateTimeOffset.Parse("2026-04-16T12:00:00Z"));
var vagrant = new ScriptedVosBackend();
vagrant.OnSshCommand(_ => true, cmd => cmd.Contains("kubeadm join"), exitCode: 0);
var saga = new KubeadmJoinSaga(/* deps */, clock);
var ctx = new KubeadmJoinContext
{
ControlPlaneNodeName = "acme-cp-1",
ControlPlaneEndpoint = "192.168.60.10:6443",
JoinToken = "abc123.def456",
CaCertHash = "cafebabe",
TokenCreatedAt = clock.UtcNow.AddHours(-2), // 2 hours old, fresh
Workers = new[] { new WorkerNodeSpec("acme-w-1") }
};
var result = await saga.RunAsync(ctx, default);
result.IsSuccess.Should().BeTrue();
vagrant.Calls.Should().NotContain(c => c.Command.Contains("token create"));
}
[Fact]
public async Task join_saga_mints_new_token_when_existing_is_stale()
{
var clock = new FakeClock(DateTimeOffset.Parse("2026-04-17T12:00:00Z"));
var vagrant = new ScriptedVosBackend();
vagrant.OnSshCommand("acme-cp-1",
cmd => cmd.Contains("token create"),
exitCode: 0,
stdout: "kubeadm join 192.168.60.10:6443 --token new123.fresh456 --discovery-token-ca-cert-hash sha256:newhash");
vagrant.OnSshCommand(_ => true, cmd => cmd.Contains("kubeadm join 192.168"), exitCode: 0);
var saga = new KubeadmJoinSaga(/* deps */, clock);
var ctx = new KubeadmJoinContext
{
ControlPlaneNodeName = "acme-cp-1",
JoinToken = "old123.expired",
CaCertHash = "oldhash",
TokenCreatedAt = clock.UtcNow.AddHours(-23), // stale
Workers = new[] { new WorkerNodeSpec("acme-w-1") }
};
var result = await saga.RunAsync(ctx, default);
result.IsSuccess.Should().BeTrue();
ctx.JoinToken.Should().Be("new123.fresh456");
vagrant.Calls.Should().Contain(c => c.Command.Contains("token create"));
}
[Fact]
public async Task join_saga_compensates_failed_workers_via_kubeadm_reset()
{
var clock = new FakeClock(DateTimeOffset.UtcNow);
var vagrant = new ScriptedVosBackend();
vagrant.OnSshCommand("acme-w-1", cmd => cmd.Contains("kubeadm join"), exitCode: 1, stderr: "error: x");
vagrant.OnSshCommand("acme-w-2", cmd => cmd.Contains("kubeadm join"), exitCode: 0);
vagrant.OnSshCommand("acme-w-3", cmd => cmd.Contains("kubeadm join"), exitCode: 0);
vagrant.OnSshCommand(_ => true, cmd => cmd.Contains("kubeadm reset"), exitCode: 0);
var saga = new KubeadmJoinSaga(/* deps */, clock);
var ctx = new KubeadmJoinContext
{
Workers = new[] { new WorkerNodeSpec("acme-w-1"), new WorkerNodeSpec("acme-w-2"), new WorkerNodeSpec("acme-w-3") },
JoinToken = "fresh", TokenCreatedAt = clock.UtcNow,
ControlPlaneEndpoint = "192.168.60.10:6443"
};
var result = await saga.RunAsync(ctx, default);
result.IsFailure.Should().BeTrue();
// All three workers were reset, including the two that succeeded
vagrant.Calls.Where(c => c.Command.Contains("kubeadm reset")).Should().HaveCount(3);
}[Fact]
public async Task join_saga_uses_existing_token_when_fresh()
{
var clock = new FakeClock(DateTimeOffset.Parse("2026-04-16T12:00:00Z"));
var vagrant = new ScriptedVosBackend();
vagrant.OnSshCommand(_ => true, cmd => cmd.Contains("kubeadm join"), exitCode: 0);
var saga = new KubeadmJoinSaga(/* deps */, clock);
var ctx = new KubeadmJoinContext
{
ControlPlaneNodeName = "acme-cp-1",
ControlPlaneEndpoint = "192.168.60.10:6443",
JoinToken = "abc123.def456",
CaCertHash = "cafebabe",
TokenCreatedAt = clock.UtcNow.AddHours(-2), // 2 hours old, fresh
Workers = new[] { new WorkerNodeSpec("acme-w-1") }
};
var result = await saga.RunAsync(ctx, default);
result.IsSuccess.Should().BeTrue();
vagrant.Calls.Should().NotContain(c => c.Command.Contains("token create"));
}
[Fact]
public async Task join_saga_mints_new_token_when_existing_is_stale()
{
var clock = new FakeClock(DateTimeOffset.Parse("2026-04-17T12:00:00Z"));
var vagrant = new ScriptedVosBackend();
vagrant.OnSshCommand("acme-cp-1",
cmd => cmd.Contains("token create"),
exitCode: 0,
stdout: "kubeadm join 192.168.60.10:6443 --token new123.fresh456 --discovery-token-ca-cert-hash sha256:newhash");
vagrant.OnSshCommand(_ => true, cmd => cmd.Contains("kubeadm join 192.168"), exitCode: 0);
var saga = new KubeadmJoinSaga(/* deps */, clock);
var ctx = new KubeadmJoinContext
{
ControlPlaneNodeName = "acme-cp-1",
JoinToken = "old123.expired",
CaCertHash = "oldhash",
TokenCreatedAt = clock.UtcNow.AddHours(-23), // stale
Workers = new[] { new WorkerNodeSpec("acme-w-1") }
};
var result = await saga.RunAsync(ctx, default);
result.IsSuccess.Should().BeTrue();
ctx.JoinToken.Should().Be("new123.fresh456");
vagrant.Calls.Should().Contain(c => c.Command.Contains("token create"));
}
[Fact]
public async Task join_saga_compensates_failed_workers_via_kubeadm_reset()
{
var clock = new FakeClock(DateTimeOffset.UtcNow);
var vagrant = new ScriptedVosBackend();
vagrant.OnSshCommand("acme-w-1", cmd => cmd.Contains("kubeadm join"), exitCode: 1, stderr: "error: x");
vagrant.OnSshCommand("acme-w-2", cmd => cmd.Contains("kubeadm join"), exitCode: 0);
vagrant.OnSshCommand("acme-w-3", cmd => cmd.Contains("kubeadm join"), exitCode: 0);
vagrant.OnSshCommand(_ => true, cmd => cmd.Contains("kubeadm reset"), exitCode: 0);
var saga = new KubeadmJoinSaga(/* deps */, clock);
var ctx = new KubeadmJoinContext
{
Workers = new[] { new WorkerNodeSpec("acme-w-1"), new WorkerNodeSpec("acme-w-2"), new WorkerNodeSpec("acme-w-3") },
JoinToken = "fresh", TokenCreatedAt = clock.UtcNow,
ControlPlaneEndpoint = "192.168.60.10:6443"
};
var result = await saga.RunAsync(ctx, default);
result.IsFailure.Should().BeTrue();
// All three workers were reset, including the two that succeeded
vagrant.Calls.Where(c => c.Command.Contains("kubeadm reset")).Should().HaveCount(3);
}What this gives you that a join shell loop doesn't
A bash loop that joins workers one at a time looks like for w in $WORKERS; do ssh $w sudo kubeadm join $TOKEN $HASH; done. It works for the happy path. It does not handle: token expiry between iterations, partial failures leaving some workers half-joined, parallel optimization, ready-state polling, event publication, retry-after-cleanup.
A typed KubeadmJoinSaga gives you, for the same surface area:
- Token freshness check before joining
- Parallel join with concurrency cap
- Per-worker event publication for observability
- Whole-saga compensation on failure (reset every attempted worker)
- Ready-state wait before declaring success
- Tests that exercise success, failure, and stale-token paths
The bargain pays back the first time you add three workers in parallel and one of them fails midway — the saga cleans up the survivors so you can retry the whole thing without manual kubeadm reset on the half-joined nodes.