Documentation Index
Fetch the complete documentation index at: https://mintlify.com/HKUDS/nanobot/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Adding a new channel to nanobot allows users to interact with the agent through additional chat platforms. Channels are the interface between nanobot and external messaging services.
Channel Architecture
All channels implement the BaseChannel abstract class, which provides a consistent interface for:
- Starting the channel (connecting to the platform)
- Receiving messages from the platform
- Sending messages to the platform
- Stopping the channel (cleanup and disconnect)
The BaseChannel Interface
from nanobot.channels.base import BaseChannel
from nanobot.bus.queue import MessageBus
from nanobot.bus.events import OutboundMessage
class BaseChannel(ABC):
name: str = "base" # Override with your channel name
def __init__(self, config: Any, bus: MessageBus):
self.config = config
self.bus = bus
self._running = False
@abstractmethod
async def start(self) -> None:
"""Connect to the platform and start listening."""
@abstractmethod
async def send(self, msg: OutboundMessage) -> None:
"""Send a message to the platform."""
@abstractmethod
async def stop(self) -> None:
"""Clean up and disconnect."""
Step-by-Step Guide
Step 1: Create the Channel File
Create a new file in nanobot/channels/, e.g., mychannel.py.
Step 2: Define the Config Schema
Edit nanobot/config/schema.py and add a configuration class:
class MyChannelConfig(Base):
"""MyChannel configuration."""
enabled: bool = False
api_key: str = "" # Or token, credentials, etc.
api_secret: str = ""
allow_from: list[str] = Field(default_factory=list)
# Add other platform-specific settings
Then add it to ChannelsConfig:
class ChannelsConfig(Base):
telegram: TelegramConfig = TelegramConfig()
discord: DiscordConfig = DiscordConfig()
mychannel: MyChannelConfig = MyChannelConfig() # Add this
# ...
Step 3: Implement the Channel Class
from loguru import logger
from nanobot.channels.base import BaseChannel
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.config.schema import MyChannelConfig
class MyChannel(BaseChannel):
"""MyChannel integration."""
name = "mychannel"
def __init__(self, config: MyChannelConfig, bus: MessageBus):
super().__init__(config, bus)
self.config: MyChannelConfig = config
# Initialize platform client/connection
self.client = None
async def start(self) -> None:
"""Connect to MyChannel platform."""
if not self.config.api_key:
logger.error("MyChannel API key not configured")
return
self._running = True
logger.info("Starting MyChannel...")
# 1. Initialize the platform client
self.client = MyChannelClient(
api_key=self.config.api_key,
api_secret=self.config.api_secret
)
# 2. Connect to the platform
await self.client.connect()
# 3. Register event handlers
self.client.on_message(self._on_message)
# 4. Start listening (blocking or non-blocking)
await self.client.listen()
logger.info("MyChannel connected")
async def stop(self) -> None:
"""Disconnect from MyChannel."""
self._running = False
if self.client:
await self.client.disconnect()
logger.info("MyChannel disconnected")
async def send(self, msg: OutboundMessage) -> None:
"""Send a message via MyChannel."""
if not self.client:
logger.warning("MyChannel client not initialized")
return
try:
# Send text message
if msg.content:
await self.client.send_message(
chat_id=msg.chat_id,
text=msg.content
)
# Send media files
for media_path in (msg.media or []):
await self.client.send_file(
chat_id=msg.chat_id,
file_path=media_path
)
except Exception as e:
logger.error("Failed to send message: {}", e)
async def _on_message(self, event) -> None:
"""Handle incoming messages from the platform."""
# Extract message data from platform event
sender_id = str(event.user_id)
chat_id = str(event.chat_id)
content = event.text or ""
# Check access control
if not self.is_allowed(sender_id):
logger.warning("Access denied for {}", sender_id)
return
# Forward to the message bus
await self._handle_message(
sender_id=sender_id,
chat_id=chat_id,
content=content,
media=[],
metadata={
"user_id": event.user_id,
"username": event.username,
}
)
Step 4: Register the Channel
Edit nanobot/channels/manager.py to register your channel:
from nanobot.channels.mychannel import MyChannel
class ChannelManager:
async def start_all(self, config: ChannelsConfig) -> None:
# ... existing channels ...
if config.mychannel.enabled:
channel = MyChannel(config.mychannel, self.bus)
self.channels.append(channel)
self.tasks.append(asyncio.create_task(channel.start()))
Step 5: Test Your Channel
- Configure in
~/.nanobot/config.json:
{
"channels": {
"mychannel": {
"enabled": true,
"apiKey": "your-api-key",
"apiSecret": "your-api-secret",
"allowFrom": ["your-user-id"]
}
}
}
- Run the gateway:
-
Send a test message from the platform.
-
Verify nanobot responds.
Real-World Example: Telegram Channel
Let’s examine the Telegram channel implementation as a reference.
Key Features
- Long polling (no webhook needed)
- Media handling (photos, voice, documents)
- Typing indicators
- Command handling (
/start, /new, /help)
- Media groups (multiple images in one message)
- Voice transcription (via Groq Whisper)
Code Structure
class TelegramChannel(BaseChannel):
name = "telegram"
def __init__(self, config: TelegramConfig, bus: MessageBus, groq_api_key: str = ""):
super().__init__(config, bus)
self.config: TelegramConfig = config
self.groq_api_key = groq_api_key
self._app: Application | None = None
self._typing_tasks: dict[str, asyncio.Task] = {}
async def start(self) -> None:
"""Start the Telegram bot with long polling."""
# Build application
self._app = Application.builder().token(self.config.token).build()
# Register handlers
self._app.add_handler(CommandHandler("start", self._on_start))
self._app.add_handler(CommandHandler("new", self._forward_command))
self._app.add_handler(MessageHandler(
filters.TEXT | filters.PHOTO | filters.VOICE,
self._on_message
))
# Start polling
await self._app.initialize()
await self._app.start()
await self._app.updater.start_polling()
# Keep running
while self._running:
await asyncio.sleep(1)
async def send(self, msg: OutboundMessage) -> None:
"""Send message via Telegram."""
chat_id = int(msg.chat_id)
# Stop typing indicator
if not msg.metadata.get("_progress", False):
self._stop_typing(msg.chat_id)
# Send media files
for media_path in (msg.media or []):
with open(media_path, 'rb') as f:
await self._app.bot.send_photo(chat_id=chat_id, photo=f)
# Send text content
if msg.content:
html = _markdown_to_telegram_html(msg.content)
await self._app.bot.send_message(
chat_id=chat_id,
text=html,
parse_mode="HTML"
)
async def _on_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle incoming messages."""
message = update.message
user = update.effective_user
sender_id = str(user.id)
chat_id = str(message.chat_id)
content = message.text or message.caption or ""
# Handle media
media_paths = []
if message.photo:
file = await self._app.bot.get_file(message.photo[-1].file_id)
file_path = await file.download_to_drive()
media_paths.append(str(file_path))
# Start typing indicator
self._start_typing(chat_id)
# Forward to bus
await self._handle_message(
sender_id=sender_id,
chat_id=chat_id,
content=content,
media=media_paths,
metadata={"message_id": message.message_id}
)
Typing Indicator Implementation
def _start_typing(self, chat_id: str) -> None:
"""Start sending 'typing...' indicator."""
self._stop_typing(chat_id)
self._typing_tasks[chat_id] = asyncio.create_task(self._typing_loop(chat_id))
def _stop_typing(self, chat_id: str) -> None:
"""Stop the typing indicator."""
task = self._typing_tasks.pop(chat_id, None)
if task and not task.done():
task.cancel()
async def _typing_loop(self, chat_id: str) -> None:
"""Repeatedly send 'typing' action."""
try:
while self._app:
await self._app.bot.send_chat_action(chat_id=int(chat_id), action="typing")
await asyncio.sleep(4)
except asyncio.CancelledError:
pass
Common Patterns
1. Access Control
Use the built-in is_allowed() method:
if not self.is_allowed(sender_id):
logger.warning("Access denied for {}", sender_id)
return
Download media to ~/.nanobot/media/:
from pathlib import Path
media_dir = Path.home() / ".nanobot" / "media"
media_dir.mkdir(parents=True, exist_ok=True)
file_path = media_dir / f"{file_id}.jpg"
await download_file(url, file_path)
media_paths.append(str(file_path))
3. Voice Transcription
Use Groq for voice-to-text:
from nanobot.providers.transcription import GroqTranscriptionProvider
transcriber = GroqTranscriptionProvider(api_key=self.groq_api_key)
transcription = await transcriber.transcribe(file_path)
if transcription:
content = f"[transcription: {transcription}]"
4. Long Polling vs WebSocket
Long Polling (Telegram):
await self._app.updater.start_polling()
while self._running:
await asyncio.sleep(1)
WebSocket (Discord):
await self.client.connect(ws_url)
while self._running:
event = await self.client.receive()
await self._handle_event(event)
Convert markdown to platform-specific format:
def _markdown_to_platform_format(text: str) -> str:
# Bold
text = re.sub(r'\*\*(.+?)\*\*', r'<b>\1</b>', text)
# Italic
text = re.sub(r'_(.+?)_', r'<i>\1</i>', text)
# Code blocks
text = re.sub(r'```([\w]*)\n?([\s\S]*?)```', r'<pre>\2</pre>', text)
return text
6. Group/Thread Support
For platforms with threads (Discord, Slack):
session_key = f"{chat_id}:{thread_id}" # Unique per thread
await self._handle_message(
sender_id=sender_id,
chat_id=chat_id,
content=content,
session_key=session_key # Isolate thread conversations
)
7. Mention Detection
For group chats, respond only when mentioned:
if chat_type == "group":
if self.config.group_policy == "mention":
if not self._is_mentioned(message):
return # Ignore non-mentions
await self._handle_message(...)
8. Error Handling
Log errors and continue:
try:
await self.client.send_message(chat_id, content)
except RateLimitError:
logger.warning("Rate limited, waiting...")
await asyncio.sleep(5)
await self.client.send_message(chat_id, content)
except Exception as e:
logger.error("Send failed: {}", e)
# Don't crash the channel
Telegram
- Uses long polling (no public IP needed)
- Supports media groups (multiple images)
- Has command menu (BotFather registration)
- Requires HTML escaping for special characters
Discord
- Uses WebSocket gateway
- Requires intents (Message Content intent)
- Has typing indicators and reactions
- Supports threads (session isolation)
WhatsApp
- Requires bridge server (Node.js)
- Uses QR code login
- Media sent as base64 or URLs
- No group support (DMs only)
Slack
- Uses Socket Mode (no public URL)
- Requires app-level token
- Supports threads and reactions
- Uses mrkdwn formatting (not markdown)
Email
- Uses IMAP (receive) and SMTP (send)
- Long poll interval (30s default)
- Requires consent flag (mailbox access)
- Supports attachments and HTML emails
Feishu/DingTalk/QQ
- Use WebSocket long connection
- No public IP required
- Require app registration on platform
- Use open_id / staff_id for users
Testing Checklist
Basic Functionality
Advanced Features
Error Handling
Configuration
Debugging Tips
Enable Debug Logging
from loguru import logger
logger.debug("Received message: {}", event)
logger.debug("Sending to chat_id: {}", chat_id)
Inspect Message Bus Events
logger.info("Publishing inbound: channel={}, sender={}, content={}",
msg.channel, msg.sender_id, msg.content[:50])
Test with --logs Flag
Shows detailed runtime logs.
Use Simple Test Message
await self._handle_message(
sender_id="test_user",
chat_id="test_chat",
content="Hello from test",
)
Connection Pooling
Reuse HTTP connections:
self.session = aiohttp.ClientSession()
# Use self.session for all requests
await self.session.post(url, json=data)
Rate Limiting
Implement backoff:
import asyncio
max_retries = 3
for attempt in range(max_retries):
try:
await self.client.send(message)
break
except RateLimitError:
await asyncio.sleep(2 ** attempt)
Message Batching
Batch multiple media files:
if len(msg.media) > 1:
await self.client.send_media_group(
chat_id=chat_id,
media=msg.media
)
else:
for media in msg.media:
await self.client.send_media(chat_id, media)
Security Best Practices
if not chat_id or not chat_id.isdigit():
logger.error("Invalid chat_id: {}", chat_id)
return
Sanitize Content
def _escape_html(text: str) -> str:
return text.replace("&", "&").replace("<", "<").replace(">", ">")
Protect API Credentials
# Never log API keys
logger.debug("Connecting with token: ***")
# Store in config, not code
api_key = self.config.api_key
Implement Access Control
if not self.is_allowed(sender_id):
# Don't reveal channel exists
return # Silent ignore
Contributing Your Channel
When submitting a PR:
- Implement all BaseChannel methods
- Add config schema to
schema.py
- Register in ChannelManager
- Add README section with setup instructions
- Test thoroughly (see checklist above)
- Include example config in PR description
- Document special requirements (API keys, permissions, etc.)
See the Contributing Guide for more details.