Testcontainers for Infrastructure Testing
What Testcontainers Solves
Integration tests have a dependency problem. Your code talks to PostgreSQL, Kafka, Redis, and Elasticsearch -- but where do those services come from in your test environment? The old answers were painful: shared test databases that accumulate stale data, Docker Compose files that fall out of sync, or mocking everything and hoping the mocks match reality.
Testcontainers solves this by spinning up real instances of infrastructure dependencies in Docker, managed by your test code. Each test gets fresh, isolated instances that are destroyed when the test finishes. No shared state, no cleanup scripts, no "it works on my machine."
Core Concepts
| Concept | Description |
|---|---|
| Container per test suite | A Docker container launched by your test code, shared across tests in a module |
| Automatic cleanup | Containers are destroyed when the test suite finishes (or when the JVM/process exits) |
| Port mapping | Containers expose random ports, avoiding conflicts with other tests or local services |
| Wait strategies | Testcontainers waits for the container to be healthy before running tests |
| Network isolation | Each test suite gets its own container network |
Kafka with Testcontainers
Kafka is one of the most common use cases for Testcontainers. Running a real Kafka broker in your test suite eliminates the fragility of Kafka mocks.
# tests/integration/test_kafka_consumer.py
import pytest
from testcontainers.kafka import KafkaContainer
from confluent_kafka import Producer, Consumer
import json
import time
@pytest.fixture(scope="module")
def kafka():
"""Start a Kafka container for the test module."""
with KafkaContainer("confluentinc/cp-kafka:7.6.0") as kafka:
yield kafka
def test_order_event_processing(kafka):
"""Test that order events are correctly produced and consumed."""
bootstrap_servers = kafka.get_bootstrap_server()
# Produce a test event
producer = Producer({"bootstrap.servers": bootstrap_servers})
order_event = {
"eventType": "OrderCreated",
"orderId": "ORD-789",
"timestamp": "2026-02-09T10:00:00Z",
"payload": {"items": [{"sku": "WIDGET-A", "qty": 1}]}
}
producer.produce("orders", json.dumps(order_event).encode())
producer.flush()
# Consume and verify
consumer = Consumer({
"bootstrap.servers": bootstrap_servers,
"group.id": "test-group",
"auto.offset.reset": "earliest",
})
consumer.subscribe(["orders"])
msg = consumer.poll(timeout=10.0)
assert msg is not None
assert msg.error() is None
event = json.loads(msg.value())
assert event["eventType"] == "OrderCreated"
assert event["orderId"] == "ORD-789"
consumer.close()
def test_consumer_handles_malformed_messages(kafka):
"""Test that the consumer gracefully handles non-JSON messages."""
bootstrap_servers = kafka.get_bootstrap_server()
producer = Producer({"bootstrap.servers": bootstrap_servers})
producer.produce("orders", b"this is not json")
producer.flush()
consumer = Consumer({
"bootstrap.servers": bootstrap_servers,
"group.id": "test-malformed-group",
"auto.offset.reset": "earliest",
})
consumer.subscribe(["orders"])
msg = consumer.poll(timeout=10.0)
assert msg is not None
# The application's consumer should handle this without crashing
try:
json.loads(msg.value())
assert False, "Should have raised ValueError"
except (json.JSONDecodeError, ValueError):
pass # Expected: consumer should log and skip
consumer.close()
def test_multiple_partitions(kafka):
"""Test that messages are distributed across partitions."""
bootstrap_servers = kafka.get_bootstrap_server()
# Create a topic with multiple partitions using admin client
from confluent_kafka.admin import AdminClient, NewTopic
admin = AdminClient({"bootstrap.servers": bootstrap_servers})
topic = NewTopic("multi-partition", num_partitions=3, replication_factor=1)
admin.create_topics([topic])
# Produce messages with different keys (keys determine partition)
producer = Producer({"bootstrap.servers": bootstrap_servers})
for i in range(30):
key = f"customer-{i % 3}" # 3 different keys
producer.produce(
"multi-partition",
key=key.encode(),
value=json.dumps({"index": i}).encode(),
)
producer.flush()
# Verify messages are consumable
consumer = Consumer({
"bootstrap.servers": bootstrap_servers,
"group.id": "multi-partition-test",
"auto.offset.reset": "earliest",
})
consumer.subscribe(["multi-partition"])
messages = []
deadline = time.time() + 15
while len(messages) < 30 and time.time() < deadline:
msg = consumer.poll(timeout=1.0)
if msg and not msg.error():
messages.append(json.loads(msg.value()))
assert len(messages) == 30
consumer.close()
PostgreSQL with Testcontainers
# tests/integration/test_database.py
import pytest
from testcontainers.postgres import PostgresContainer
import psycopg2
@pytest.fixture(scope="module")
def postgres():
"""Start a PostgreSQL container with test schema."""
with PostgresContainer("postgres:16-alpine") as pg:
# Apply migrations
conn = psycopg2.connect(pg.get_connection_url())
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
order_id VARCHAR(50) UNIQUE NOT NULL,
customer_id VARCHAR(50) NOT NULL,
total DECIMAL(10,2) NOT NULL,
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_orders_customer ON orders(customer_id);
CREATE INDEX idx_orders_status ON orders(status);
""")
conn.commit()
conn.close()
yield pg
@pytest.fixture
def db_conn(postgres):
"""Fresh connection for each test, with rollback for isolation."""
conn = psycopg2.connect(postgres.get_connection_url())
yield conn
conn.rollback()
conn.close()
def test_order_insertion(db_conn):
"""Test that orders can be inserted and retrieved."""
cursor = db_conn.cursor()
cursor.execute(
"INSERT INTO orders (order_id, customer_id, total) VALUES (%s, %s, %s)",
("ORD-001", "CUST-001", 99.99)
)
cursor.execute("SELECT total FROM orders WHERE order_id = %s", ("ORD-001",))
result = cursor.fetchone()
assert float(result[0]) == 99.99
def test_unique_order_id_constraint(db_conn):
"""Test that duplicate order IDs are rejected."""
cursor = db_conn.cursor()
cursor.execute(
"INSERT INTO orders (order_id, customer_id, total) VALUES (%s, %s, %s)",
("ORD-DUP", "CUST-001", 50.00)
)
with pytest.raises(psycopg2.errors.UniqueViolation):
cursor.execute(
"INSERT INTO orders (order_id, customer_id, total) VALUES (%s, %s, %s)",
("ORD-DUP", "CUST-002", 75.00)
)
Redis with Testcontainers
# tests/integration/test_cache.py
import pytest
from testcontainers.redis import RedisContainer
import redis
@pytest.fixture(scope="module")
def redis_container():
with RedisContainer("redis:7-alpine") as r:
yield r
@pytest.fixture
def redis_client(redis_container):
client = redis.Redis(
host=redis_container.get_container_host_ip(),
port=redis_container.get_exposed_port(6379),
decode_responses=True,
)
client.flushall() # Clean state for each test
yield client
def test_cache_set_and_get(redis_client):
redis_client.set("user:123", '{"name": "Alice"}', ex=300)
result = redis_client.get("user:123")
assert result == '{"name": "Alice"}'
def test_cache_expiration(redis_client):
redis_client.set("temp-key", "value", ex=1)
assert redis_client.get("temp-key") == "value"
import time
time.sleep(2)
assert redis_client.get("temp-key") is None
LocalStack for AWS Services
# tests/integration/test_aws_services.py
import pytest
from testcontainers.localstack import LocalStackContainer
import boto3
@pytest.fixture(scope="module")
def localstack():
with LocalStackContainer("localstack/localstack:3.0") as ls:
yield ls
@pytest.fixture
def dynamodb(localstack):
client = boto3.client(
"dynamodb",
endpoint_url=localstack.get_url(),
region_name="us-east-1",
aws_access_key_id="test",
aws_secret_access_key="test",
)
# Create test table
client.create_table(
TableName="orders",
KeySchema=[{"AttributeName": "orderId", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "orderId", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
return client
def test_dynamodb_put_and_get(dynamodb):
dynamodb.put_item(
TableName="orders",
Item={
"orderId": {"S": "ORD-TC-001"},
"status": {"S": "pending"},
"total": {"N": "99.99"},
}
)
result = dynamodb.get_item(
TableName="orders",
Key={"orderId": {"S": "ORD-TC-001"}}
)
assert result["Item"]["status"]["S"] == "pending"
Best Practices
Use
scope="module"-- Starting containers is slow (2-10 seconds each). Share containers across tests in a module, not per-test.Use fixtures for cleanup -- Each test should get a clean state (truncated tables, flushed caches) via fixtures, not by starting new containers.
Pin image versions -- Use
postgres:16-alpine, notpostgres:latest. Tests must be reproducible.Set wait strategies -- Some containers need custom health checks:
from testcontainers.core.waiting_utils import wait_for_logs
with KafkaContainer("confluentinc/cp-kafka:7.6.0") as kafka:
wait_for_logs(kafka, "started (kafka.server.KafkaServer)", timeout=60)
Handle port randomization -- Testcontainers assigns random host ports. Always use
container.get_exposed_port()rather than hardcoding ports.Run in CI with Docker-in-Docker or Docker socket -- CI environments need Docker access. Most CI providers support this natively.
# GitHub Actions: Docker is available by default
jobs:
integration-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: pip install testcontainers pytest
- run: pytest tests/integration/ -v
Testcontainers has become the standard for integration testing because it eliminates the "works locally, fails in CI" problem. Real services in Docker are more reliable than mocks and more portable than shared test environments.