Documentation Index
Fetch the complete documentation index at: https://mintlify.com/HKUDS/nanobot/llms.txt
Use this file to discover all available pages before exploring further.
The Cron service enables precise scheduling of agent tasks with support for one-time, recurring, and cron expression-based schedules.
Overview
Unlike the heartbeat system which checks for tasks periodically, the cron service allows you to schedule tasks with exact timing control. Jobs are persisted to disk and survive restarts.
Key features:
- Three schedule types:
at (one-time), every (recurring interval), cron (cron expressions)
- Persistent job storage in JSON format
- Automatic job state management (next run time, last status, error tracking)
- Optional result delivery to chat channels
- Timezone support for cron expressions
- Hot-reload when jobs file is modified externally
Schedule Types
1. One-Time (at)
Execute a task at a specific timestamp:
from nanobot.cron import CronService, CronSchedule
import time
cron = CronService(store_path=Path("~/.nanobot/workspace/cron/jobs.json"))
# Schedule task for tomorrow at 9 AM
tomorrow_9am_ms = int((time.time() + 86400) * 1000)
cron.add_job(
name="Morning standup reminder",
schedule=CronSchedule(kind="at", at_ms=tomorrow_9am_ms),
message="Prepare standup notes for today's meeting",
deliver=True,
channel="telegram",
delete_after_run=True, # Auto-delete after execution
)
2. Recurring Interval (every)
Repeat a task at fixed intervals:
# Check email every 15 minutes
cron.add_job(
name="Email check",
schedule=CronSchedule(kind="every", every_ms=15 * 60 * 1000),
message="Check for urgent emails and summarize",
deliver=True,
)
3. Cron Expression (cron)
Use standard cron expressions for complex schedules:
# Daily report at 6 PM Pacific Time
cron.add_job(
name="Daily report",
schedule=CronSchedule(
kind="cron",
expr="0 18 * * *", # 6 PM daily
tz="America/Los_Angeles"
),
message="Generate daily summary report",
deliver=True,
channel="slack",
to="#reports"
)
Cron expression format:
┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of month (1-31)
│ │ │ ┌───────────── month (1-12)
│ │ │ │ ┌───────────── day of week (0-6, Sunday=0)
│ │ │ │ │
* * * * *
Data Structures
CronSchedule
From nanobot/cron/types.py:8-18:
@dataclass
class CronSchedule:
"""Schedule definition for a cron job."""
kind: Literal["at", "every", "cron"]
# For "at": timestamp in ms
at_ms: int | None = None
# For "every": interval in ms
every_ms: int | None = None
# For "cron": cron expression (e.g. "0 9 * * *")
expr: str | None = None
# Timezone for cron expressions
tz: str | None = None
CronPayload
From nanobot/cron/types.py:21-29:
@dataclass
class CronPayload:
"""What to do when the job runs."""
kind: Literal["system_event", "agent_turn"] = "agent_turn"
message: str = ""
# Deliver response to channel
deliver: bool = False
channel: str | None = None # e.g. "whatsapp"
to: str | None = None # e.g. phone number
CronJob
From nanobot/cron/types.py:41-52:
@dataclass
class CronJob:
"""A scheduled job."""
id: str
name: str
enabled: bool = True
schedule: CronSchedule
payload: CronPayload
state: CronJobState
created_at_ms: int = 0
updated_at_ms: int = 0
delete_after_run: bool = False
Core Implementation
Next Run Computation
From nanobot/cron/service.py:20-46:
def _compute_next_run(schedule: CronSchedule, now_ms: int) -> int | None:
"""Compute next run time in ms."""
if schedule.kind == "at":
return schedule.at_ms if schedule.at_ms and schedule.at_ms > now_ms else None
if schedule.kind == "every":
if not schedule.every_ms or schedule.every_ms <= 0:
return None
# Next interval from now
return now_ms + schedule.every_ms
if schedule.kind == "cron" and schedule.expr:
try:
from zoneinfo import ZoneInfo
from croniter import croniter
base_time = now_ms / 1000
tz = ZoneInfo(schedule.tz) if schedule.tz else datetime.now().astimezone().tzinfo
base_dt = datetime.fromtimestamp(base_time, tz=tz)
cron = croniter(schedule.expr, base_dt)
next_dt = cron.get_next(datetime)
return int(next_dt.timestamp() * 1000)
except Exception:
return None
return None
Job Execution
From nanobot/cron/service.py:245-276:
async def _execute_job(self, job: CronJob) -> None:
"""Execute a single job."""
start_ms = _now_ms()
logger.info("Cron: executing job '{}' ({})", job.name, job.id)
try:
response = None
if self.on_job:
response = await self.on_job(job)
job.state.last_status = "ok"
job.state.last_error = None
logger.info("Cron: job '{}' completed", job.name)
except Exception as e:
job.state.last_status = "error"
job.state.last_error = str(e)
logger.error("Cron: job '{}' failed: {}", job.name, e)
job.state.last_run_at_ms = start_ms
job.updated_at_ms = _now_ms()
# Handle one-shot jobs
if job.schedule.kind == "at":
if job.delete_after_run:
self._store.jobs = [j for j in self._store.jobs if j.id != job.id]
else:
job.enabled = False
job.state.next_run_at_ms = None
else:
# Compute next run
job.state.next_run_at_ms = _compute_next_run(job.schedule, _now_ms())
Usage
Initialization
from pathlib import Path
from nanobot.cron import CronService
cron = CronService(
store_path=Path("~/.nanobot/workspace/cron/jobs.json"),
on_job=execute_job_callback,
)
await cron.start()
Adding Jobs
# Simple recurring reminder
job = cron.add_job(
name="Stand up reminder",
schedule=CronSchedule(kind="every", every_ms=60 * 60 * 1000), # Every hour
message="Time for a stretch break!",
deliver=True,
channel="telegram",
)
print(f"Job created with ID: {job.id}")
Managing Jobs
# List all jobs
jobs = cron.list_jobs(include_disabled=True)
for job in jobs:
print(f"{job.name}: next run at {job.state.next_run_at_ms}")
# Disable a job
cron.enable_job(job_id="abc123", enabled=False)
# Remove a job
cron.remove_job(job_id="abc123")
# Manually run a job
await cron.run_job(job_id="abc123", force=True)
# Get service status
status = cron.status()
print(f"Jobs: {status['jobs']}, Next wake: {status['next_wake_at_ms']}")
Hot-Reload
The service automatically detects when jobs.json is modified externally and reloads it (nanobot/cron/service.py:78-86):
def _load_store(self) -> CronStore:
"""Load jobs from disk. Reloads automatically if file was modified externally."""
if self._store and self.store_path.exists():
mtime = self.store_path.stat().st_mtime
if mtime != self._last_mtime:
logger.info("Cron: jobs.json modified externally, reloading")
self._store = None
# ... load logic ...
Use Cases
1. Daily Standup Preparation
# Every weekday at 8:45 AM
cron.add_job(
name="Standup prep",
schedule=CronSchedule(
kind="cron",
expr="45 8 * * 1-5", # Mon-Fri at 8:45 AM
tz="America/New_York"
),
message="Review yesterday's commits, open PRs, and draft standup notes",
deliver=True,
)
2. Server Health Check
# Every 5 minutes
cron.add_job(
name="Health check",
schedule=CronSchedule(kind="every", every_ms=5 * 60 * 1000),
message="Check server status, disk space, and API response times",
deliver=False, # Only deliver if issues found
)
3. Weekly Report Generation
# Every Friday at 5 PM
cron.add_job(
name="Weekly report",
schedule=CronSchedule(
kind="cron",
expr="0 17 * * 5", # Friday at 5 PM
tz="UTC"
),
message="Generate weekly summary: completed tasks, PRs merged, issues closed",
deliver=True,
channel="slack",
to="#engineering"
)
4. Meeting Reminder with Lead Time
import time
# One-time reminder 15 minutes before meeting
meeting_time = int((time.time() + 3600) * 1000) # 1 hour from now
reminder_time = meeting_time - (15 * 60 * 1000) # 15 min before
cron.add_job(
name="Client meeting reminder",
schedule=CronSchedule(kind="at", at_ms=reminder_time),
message="Client meeting in 15 minutes - review proposal deck",
deliver=True,
channel="telegram",
delete_after_run=True,
)
5. Backup Task
# Every day at 2 AM
cron.add_job(
name="Database backup",
schedule=CronSchedule(
kind="cron",
expr="0 2 * * *",
tz="America/Chicago"
),
message="Run database backup script and verify completion",
deliver=True, # Notify if backup fails
)
Jobs are stored in JSON format at ~/.nanobot/workspace/cron/jobs.json:
{
"version": 1,
"jobs": [
{
"id": "a1b2c3d4",
"name": "Daily report",
"enabled": true,
"schedule": {
"kind": "cron",
"atMs": null,
"everyMs": null,
"expr": "0 18 * * *",
"tz": "America/Los_Angeles"
},
"payload": {
"kind": "agent_turn",
"message": "Generate daily summary",
"deliver": true,
"channel": "telegram",
"to": null
},
"state": {
"nextRunAtMs": 1709827200000,
"lastRunAtMs": 1709740800000,
"lastStatus": "ok",
"lastError": null
},
"createdAtMs": 1709654400000,
"updatedAtMs": 1709740800000,
"deleteAfterRun": false
}
]
}
Best Practices
- Use meaningful names: Job names appear in logs and help debugging
- Set appropriate timezones: Always specify
tz for cron expressions to avoid ambiguity
- Monitor job status: Check
last_status and last_error to catch failures
- Use
delete_after_run for one-time tasks: Automatically clean up completed one-shot jobs
- Test with manual runs: Use
run_job(force=True) to test before relying on schedule
- Handle errors gracefully: Job execution errors are logged but don’t stop the service
- Consider delivery settings: Set
deliver=True only when you want channel notifications
Validation
Schedule validation prevents non-runnable jobs (nanobot/cron/service.py:49-61):
def _validate_schedule_for_add(schedule: CronSchedule) -> None:
"""Validate schedule fields that would otherwise create non-runnable jobs."""
if schedule.tz and schedule.kind != "cron":
raise ValueError("tz can only be used with cron schedules")
if schedule.kind == "cron" and schedule.tz:
try:
from zoneinfo import ZoneInfo
ZoneInfo(schedule.tz)
except Exception:
raise ValueError(f"unknown timezone '{schedule.tz}'") from None
Stopping the Service
Jobs are persisted to disk and will resume when the service restarts.