QA Engineer Skills 2026QA-2026Testcontainers for Infrastructure Testing

Testcontainers for Infrastructure Testing

What Testcontainers Solves

Integration tests have a dependency problem. Your code talks to PostgreSQL, Kafka, Redis, and Elasticsearch -- but where do those services come from in your test environment? The old answers were painful: shared test databases that accumulate stale data, Docker Compose files that fall out of sync, or mocking everything and hoping the mocks match reality.

Testcontainers solves this by spinning up real instances of infrastructure dependencies in Docker, managed by your test code. Each test gets fresh, isolated instances that are destroyed when the test finishes. No shared state, no cleanup scripts, no "it works on my machine."


Core Concepts

Concept Description
Container per test suite A Docker container launched by your test code, shared across tests in a module
Automatic cleanup Containers are destroyed when the test suite finishes (or when the JVM/process exits)
Port mapping Containers expose random ports, avoiding conflicts with other tests or local services
Wait strategies Testcontainers waits for the container to be healthy before running tests
Network isolation Each test suite gets its own container network

Kafka with Testcontainers

Kafka is one of the most common use cases for Testcontainers. Running a real Kafka broker in your test suite eliminates the fragility of Kafka mocks.

# tests/integration/test_kafka_consumer.py
import pytest
from testcontainers.kafka import KafkaContainer
from confluent_kafka import Producer, Consumer
import json
import time

@pytest.fixture(scope="module")
def kafka():
    """Start a Kafka container for the test module."""
    with KafkaContainer("confluentinc/cp-kafka:7.6.0") as kafka:
        yield kafka

def test_order_event_processing(kafka):
    """Test that order events are correctly produced and consumed."""
    bootstrap_servers = kafka.get_bootstrap_server()

    # Produce a test event
    producer = Producer({"bootstrap.servers": bootstrap_servers})
    order_event = {
        "eventType": "OrderCreated",
        "orderId": "ORD-789",
        "timestamp": "2026-02-09T10:00:00Z",
        "payload": {"items": [{"sku": "WIDGET-A", "qty": 1}]}
    }
    producer.produce("orders", json.dumps(order_event).encode())
    producer.flush()

    # Consume and verify
    consumer = Consumer({
        "bootstrap.servers": bootstrap_servers,
        "group.id": "test-group",
        "auto.offset.reset": "earliest",
    })
    consumer.subscribe(["orders"])

    msg = consumer.poll(timeout=10.0)
    assert msg is not None
    assert msg.error() is None
    event = json.loads(msg.value())
    assert event["eventType"] == "OrderCreated"
    assert event["orderId"] == "ORD-789"

    consumer.close()

def test_consumer_handles_malformed_messages(kafka):
    """Test that the consumer gracefully handles non-JSON messages."""
    bootstrap_servers = kafka.get_bootstrap_server()

    producer = Producer({"bootstrap.servers": bootstrap_servers})
    producer.produce("orders", b"this is not json")
    producer.flush()

    consumer = Consumer({
        "bootstrap.servers": bootstrap_servers,
        "group.id": "test-malformed-group",
        "auto.offset.reset": "earliest",
    })
    consumer.subscribe(["orders"])

    msg = consumer.poll(timeout=10.0)
    assert msg is not None

    # The application's consumer should handle this without crashing
    try:
        json.loads(msg.value())
        assert False, "Should have raised ValueError"
    except (json.JSONDecodeError, ValueError):
        pass  # Expected: consumer should log and skip

    consumer.close()

def test_multiple_partitions(kafka):
    """Test that messages are distributed across partitions."""
    bootstrap_servers = kafka.get_bootstrap_server()

    # Create a topic with multiple partitions using admin client
    from confluent_kafka.admin import AdminClient, NewTopic
    admin = AdminClient({"bootstrap.servers": bootstrap_servers})
    topic = NewTopic("multi-partition", num_partitions=3, replication_factor=1)
    admin.create_topics([topic])

    # Produce messages with different keys (keys determine partition)
    producer = Producer({"bootstrap.servers": bootstrap_servers})
    for i in range(30):
        key = f"customer-{i % 3}"  # 3 different keys
        producer.produce(
            "multi-partition",
            key=key.encode(),
            value=json.dumps({"index": i}).encode(),
        )
    producer.flush()

    # Verify messages are consumable
    consumer = Consumer({
        "bootstrap.servers": bootstrap_servers,
        "group.id": "multi-partition-test",
        "auto.offset.reset": "earliest",
    })
    consumer.subscribe(["multi-partition"])

    messages = []
    deadline = time.time() + 15
    while len(messages) < 30 and time.time() < deadline:
        msg = consumer.poll(timeout=1.0)
        if msg and not msg.error():
            messages.append(json.loads(msg.value()))

    assert len(messages) == 30
    consumer.close()

PostgreSQL with Testcontainers

# tests/integration/test_database.py
import pytest
from testcontainers.postgres import PostgresContainer
import psycopg2

@pytest.fixture(scope="module")
def postgres():
    """Start a PostgreSQL container with test schema."""
    with PostgresContainer("postgres:16-alpine") as pg:
        # Apply migrations
        conn = psycopg2.connect(pg.get_connection_url())
        cursor = conn.cursor()
        cursor.execute("""
            CREATE TABLE orders (
                id SERIAL PRIMARY KEY,
                order_id VARCHAR(50) UNIQUE NOT NULL,
                customer_id VARCHAR(50) NOT NULL,
                total DECIMAL(10,2) NOT NULL,
                status VARCHAR(20) DEFAULT 'pending',
                created_at TIMESTAMP DEFAULT NOW()
            );
            CREATE INDEX idx_orders_customer ON orders(customer_id);
            CREATE INDEX idx_orders_status ON orders(status);
        """)
        conn.commit()
        conn.close()
        yield pg

@pytest.fixture
def db_conn(postgres):
    """Fresh connection for each test, with rollback for isolation."""
    conn = psycopg2.connect(postgres.get_connection_url())
    yield conn
    conn.rollback()
    conn.close()

def test_order_insertion(db_conn):
    """Test that orders can be inserted and retrieved."""
    cursor = db_conn.cursor()
    cursor.execute(
        "INSERT INTO orders (order_id, customer_id, total) VALUES (%s, %s, %s)",
        ("ORD-001", "CUST-001", 99.99)
    )

    cursor.execute("SELECT total FROM orders WHERE order_id = %s", ("ORD-001",))
    result = cursor.fetchone()
    assert float(result[0]) == 99.99

def test_unique_order_id_constraint(db_conn):
    """Test that duplicate order IDs are rejected."""
    cursor = db_conn.cursor()
    cursor.execute(
        "INSERT INTO orders (order_id, customer_id, total) VALUES (%s, %s, %s)",
        ("ORD-DUP", "CUST-001", 50.00)
    )

    with pytest.raises(psycopg2.errors.UniqueViolation):
        cursor.execute(
            "INSERT INTO orders (order_id, customer_id, total) VALUES (%s, %s, %s)",
            ("ORD-DUP", "CUST-002", 75.00)
        )

Redis with Testcontainers

# tests/integration/test_cache.py
import pytest
from testcontainers.redis import RedisContainer
import redis

@pytest.fixture(scope="module")
def redis_container():
    with RedisContainer("redis:7-alpine") as r:
        yield r

@pytest.fixture
def redis_client(redis_container):
    client = redis.Redis(
        host=redis_container.get_container_host_ip(),
        port=redis_container.get_exposed_port(6379),
        decode_responses=True,
    )
    client.flushall()  # Clean state for each test
    yield client

def test_cache_set_and_get(redis_client):
    redis_client.set("user:123", '{"name": "Alice"}', ex=300)
    result = redis_client.get("user:123")
    assert result == '{"name": "Alice"}'

def test_cache_expiration(redis_client):
    redis_client.set("temp-key", "value", ex=1)
    assert redis_client.get("temp-key") == "value"

    import time
    time.sleep(2)
    assert redis_client.get("temp-key") is None

LocalStack for AWS Services

# tests/integration/test_aws_services.py
import pytest
from testcontainers.localstack import LocalStackContainer
import boto3

@pytest.fixture(scope="module")
def localstack():
    with LocalStackContainer("localstack/localstack:3.0") as ls:
        yield ls

@pytest.fixture
def dynamodb(localstack):
    client = boto3.client(
        "dynamodb",
        endpoint_url=localstack.get_url(),
        region_name="us-east-1",
        aws_access_key_id="test",
        aws_secret_access_key="test",
    )
    # Create test table
    client.create_table(
        TableName="orders",
        KeySchema=[{"AttributeName": "orderId", "KeyType": "HASH"}],
        AttributeDefinitions=[{"AttributeName": "orderId", "AttributeType": "S"}],
        BillingMode="PAY_PER_REQUEST",
    )
    return client

def test_dynamodb_put_and_get(dynamodb):
    dynamodb.put_item(
        TableName="orders",
        Item={
            "orderId": {"S": "ORD-TC-001"},
            "status": {"S": "pending"},
            "total": {"N": "99.99"},
        }
    )

    result = dynamodb.get_item(
        TableName="orders",
        Key={"orderId": {"S": "ORD-TC-001"}}
    )
    assert result["Item"]["status"]["S"] == "pending"

Best Practices

  1. Use scope="module" -- Starting containers is slow (2-10 seconds each). Share containers across tests in a module, not per-test.

  2. Use fixtures for cleanup -- Each test should get a clean state (truncated tables, flushed caches) via fixtures, not by starting new containers.

  3. Pin image versions -- Use postgres:16-alpine, not postgres:latest. Tests must be reproducible.

  4. Set wait strategies -- Some containers need custom health checks:

from testcontainers.core.waiting_utils import wait_for_logs

with KafkaContainer("confluentinc/cp-kafka:7.6.0") as kafka:
    wait_for_logs(kafka, "started (kafka.server.KafkaServer)", timeout=60)
  1. Handle port randomization -- Testcontainers assigns random host ports. Always use container.get_exposed_port() rather than hardcoding ports.

  2. Run in CI with Docker-in-Docker or Docker socket -- CI environments need Docker access. Most CI providers support this natively.

# GitHub Actions: Docker is available by default
jobs:
  integration-tests:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: pip install testcontainers pytest
      - run: pytest tests/integration/ -v

Testcontainers has become the standard for integration testing because it eliminates the "works locally, fails in CI" problem. Real services in Docker are more reliable than mocks and more portable than shared test environments.