Skip to main content

What are Modules?

In AveniECA, a module is an organizational unit that groups related digital twins together. The module_id serves as both an identifier and a namespace for storing and querying state data in the Episodic State Space (ESS).

Module Fundamentals

Module as Type Definition

Think of a module as a “type” or “class” of digital twin. Just as you might have multiple instances of a Python class, you can have multiple instances of a module:
# Module defines the type
module_id = "temperature"

# Multiple instances can exist
instance_1 = {"sub_topic": "bedroom_temp", "instance_id": 1}
instance_2 = {"sub_topic": "kitchen_temp", "instance_id": 2}
instance_3 = {"sub_topic": "garage_temp", "instance_id": 3}
All instances share:
  • The same module_id
  • The same state vector structure
  • The same ESS storage
  • Access to the same prediction models

Module ID Structure

A module_id is a string that appears throughout the AveniECA system:
from avenieca.config.broker import Broker

config = Broker(
    url=kafka_url,
    sub_topic="left_wheel",  # Instance-specific
    group="vehicles",
    pub_topic=""
)
# Signals sent here are associated with a module_id

Module Types

Individual Modules

Purpose: Represent a single, atomic component or measurement. Characteristics:
  • Simple state vectors (often 1-3 dimensions)
  • Independent state history
  • Can be components of aggregates
  • Focused prediction domain
Examples from Real Systems:
# Environmental sensors
modules = [
    {"module_id": "temperature", "state_dim": 1},      # [celsius]
    {"module_id": "humidity", "state_dim": 1},         # [percentage]
    {"module_id": "air_quality_index", "state_dim": 1}, # [aqi]
    {"module_id": "occupancy", "state_dim": 1},        # [people_count]
]

# Equipment components
modules = [
    {"module_id": "air_conditioner", "state_dim": 1},  # [power_level]
    {"module_id": "purifier", "state_dim": 1},         # [speed]
]

# Vehicle subsystems
modules = [
    {"module_id": "left_wheel", "state_dim": 3},       # [speed, temp, pressure]
    {"module_id": "right_wheel", "state_dim": 3},
]
When to use individual modules:
  • Component needs independent monitoring
  • State is simple and well-defined
  • May be part of multiple different aggregates
  • Need granular history for specific component

Aggregate Modules

Purpose: Combine multiple individual modules to learn multi-modal patterns and relationships. Characteristics:
  • Complex state vectors (combination of component states)
  • Rich metadata about components
  • Enable holistic predictions
  • Store relationships between components
Structure Example:
from avenieca.api.model import ESSInsert

# Aggregate combining 5 environmental modules
aggregate = ESSInsert(
    module_id="climate_aggregate",
    
    # Combined state vector
    state=[25.0, 65.0, 10.0, 2.0, 70.0],  # temp, humidity, occupancy, purifier, aqi
    
    # Aggregate-level metrics
    valence=-10.0,           # Overall assessment
    avg_ess_valence=-18.0,   # Average component valence
    total_ess_score=37,      # Sum of component scores
    avg_ess_score=7,         # Average component score
    score=0,
    
    # Component tracking
    aggregate_id=[2, 7, 3, 5, 6],                      # ESS IDs of components
    aggregate_module_id=[                               # Component types
        "air_conditioner",
        "occupancy", 
        "purifier",
        "temperature",
        "air_quality_index"
    ],
    aggregate_shape=[1, 1, 1, 1, 1],                   # Dimensions of each
    aggregate_valence=[90.0, -90.0, 90.0, -90.0, -90.0],  # Individual assessments
    aggregate_score=[18, 6, 28, 2, 1],                  # Individual scores
    aggregate_context=[None, None, None, None, None],
    aggregate_emb_inp=[None, None, None, None, None]
)
When to use aggregates:
  • Components interact or influence each other
  • Need to predict multi-variable outcomes
  • Want to understand system-level behavior
  • Need efficient queries for complete system state

Module Organization Patterns

Hierarchical Organization

Organize modules in a hierarchy from simple to complex:
Individual Sensors → Component Aggregates → System Aggregates
Example: Smart Building
# Level 1: Individual sensors
individual_modules = [
    "temperature",
    "humidity",
    "co2_level",
    "light_level",
    "occupancy"
]

# Level 2: Room aggregate
room_aggregate = {
    "module_id": "room_101_climate",
    "components": ["temperature", "humidity", "co2_level"]
}

# Level 3: Floor aggregate
floor_aggregate = {
    "module_id": "floor_1_climate",
    "components": [
        "room_101_climate",
        "room_102_climate",
        "room_103_climate"
    ]
}

# Level 4: Building aggregate
building_aggregate = {
    "module_id": "building_climate",
    "components": [
        "floor_1_climate",
        "floor_2_climate",
        "floor_3_climate"
    ]
}
Benefits:
  • Predictions at multiple levels of granularity
  • Efficient queries at appropriate scope
  • Clear organization and debugging
  • Scalable to large systems

Functional Organization

Group modules by function or domain:
# Climate control domain
climate_modules = [
    "temperature",
    "humidity",
    "hvac_unit",
    "climate_aggregate"
]

# Air quality domain
air_quality_modules = [
    "co2_sensor",
    "pm25_sensor",
    "voc_sensor",
    "air_purifier",
    "air_quality_aggregate"
]

# Occupancy domain
occupancy_modules = [
    "motion_sensor",
    "door_sensor",
    "camera_count",
    "occupancy_aggregate"
]

# Building-wide aggregate
building_aggregate = {
    "module_id": "building_systems",
    "components": [
        "climate_aggregate",
        "air_quality_aggregate",
        "occupancy_aggregate"
    ]
}
Benefits:
  • Clear separation of concerns
  • Domain experts can focus on relevant modules
  • Independent evolution of domains
  • Easier maintenance and debugging

Creating and Using Modules

Defining a New Module

1

Design the state vector

Determine what information the module needs to track:
# Simple: Single value
state = [temperature_celsius]

# Multi-dimensional: Related values
state = [speed_rpm, temperature_c, vibration_level]

# Complex: System state
state = [sensor1, sensor2, sensor3, actuator1, actuator2]
2

Choose module_id naming

Use descriptive, consistent naming:
# Good
module_id = "hvac_unit_1"
module_id = "temperature_sensor"
module_id = "climate_aggregate"

# Avoid
module_id = "temp"  # Too vague
module_id = "Module1"  # Not descriptive
module_id = "sensor-01"  # Use underscores
3

Implement data capture

Create a function that generates signals:
from avenieca.data import Signal

def capture_hvac_state():
    # Read from hardware/API
    power_level = get_hvac_power()
    fan_speed = get_fan_speed()
    
    # Assess the state
    valence = calculate_efficiency_score(power_level, fan_speed)
    
    return Signal(
        state=[power_level, fan_speed],
        valence=valence,
        score=1
    )
4

Configure streaming or API

Set up Kafka streaming or direct API calls:
from avenieca.producers import Stream
from avenieca.config.broker import Broker

config = Broker(
    url=os.environ["KAFKA_URL"],
    sub_topic="hvac/unit_1",  # Instance-specific topic
    group="building_systems",
    pub_topic=""
)

stream = Stream(config=config, sync_rate=5.0)  # Every 5 seconds
stream.publish(capture_hvac_state)

Creating Aggregates

Follow this pattern from the README to create properly structured aggregates:
from avenieca.api.model import ESSResponse, ESSInsert
from typing import List

def create_aggregate_from_ess(
    array_ess: List[ESSResponse], 
    aggregate_insert: ESSInsert
) -> ESSInsert:
    """
    Combine multiple ESS entries into an aggregate.
    
    Args:
        array_ess: List of component ESS entries
        aggregate_insert: Template aggregate to populate
        
    Returns:
        Populated aggregate ready to insert
    """
    total_ess_score = 0
    total_ess_valence = 0.0
    
    for ess in array_ess:
        # Combine state vectors
        aggregate_insert.state.extend(ess.state)
        
        # Track component metadata
        aggregate_insert.aggregate_module_id.append(ess.module_id)
        aggregate_insert.aggregate_id.append(ess.id)
        aggregate_insert.aggregate_context.append(ess.context)
        aggregate_insert.aggregate_valence.append(ess.valence)
        aggregate_insert.aggregate_score.append(ess.score)
        aggregate_insert.aggregate_emb_inp.append(ess.embedding_input)
        aggregate_insert.aggregate_shape.append(len(ess.state))
        
        # Accumulate metrics
        total_ess_score += ess.score
        total_ess_valence += ess.valence
    
    # Calculate aggregate metrics
    n = len(array_ess)
    aggregate_insert.total_ess_score = total_ess_score
    aggregate_insert.avg_ess_score = int(total_ess_score / n)
    aggregate_insert.avg_ess_valence = total_ess_valence / n
    aggregate_insert.valence = total_ess_valence
    
    return aggregate_insert

# Usage
components = [
    ess_temperature,
    ess_humidity,
    ess_air_quality,
    ess_occupancy
]

aggregate_template = ESSInsert(
    module_id="room_climate_aggregate",
    state=[],
    valence=0.0,
    avg_ess_valence=0.0,
    total_ess_score=0,
    avg_ess_score=0,
    score=0,
    aggregate_id=[],
    aggregate_valence=[],
    aggregate_score=[],
    aggregate_module_id=[],
    aggregate_shape=[],
    aggregate_context=[],
    aggregate_emb_inp=[]
)

aggregate = create_aggregate_from_ess(components, aggregate_template)
res, status = eca.ess.create(data=aggregate)

Querying Modules

Get All States for a Module

from avenieca.api.eca import ECA
from avenieca.api.model import Config

config = Config(
    uri="http://localhost:2580/v1",
    username=username,
    password=password
)
eca = ECA(config)

# Get all ESS entries for a module
states, status = eca.ess.get_all(module_id="temperature")

for state in states:
    print(f"State: {state.state}, Valence: {state.valence}")

Search for Similar States

from avenieca.api.model import Search

# Find states similar to a target state
search_results, status = eca.ess.search(data=Search(
    module_id="air_conditioner",
    state=[18.0],  # Target state
    limit=5        # Return top 5 matches
))

for result in search_results:
    print(f"Score: {result.score}, State: {result.ess.state}")

Get States from Sequences

# Get all ESS entries referenced by sequences
sequence_states, status = eca.ess.get_all_sequence(
    module_id="air_conditioner"
)

# Get ESS for a specific sequence
state, status = eca.ess.get_one_sequence(
    module_id="air_conditioner",
    sequence_id=3
)

Find Aggregates Containing a Module

# Find all aggregates that include a specific component
aggregates, status = eca.ess.get_all_aggregates(
    module_id="temperature",           # Component module
    aggregate_module_id="climate_aggregate",  # Aggregate type
    ess_id=8                           # Specific component instance
)

for agg in aggregates:
    print(f"Aggregate contains: {agg.aggregate_module_id}")

Module Predictions

Requesting Predictions

The Cortex API predicts future states for modules:
from avenieca.api.model import NextStateRequest

# Predict next states
request = NextStateRequest(
    module_id="climate_aggregate",
    recall=20,    # Look back 20 states
    range=20,     # Consider states within range of 20
    n=3,          # Return top 3 predictions
    status="e"    # Use sequences with status "e" (episode)
)

response, status = eca.cortex.predictions(data=request)

# Current state
print(f"Current: {response.current_state}")

# Predicted next states (sorted by likelihood)
for i, twin_list in enumerate(response.next_state):
    print(f"Prediction {i+1}:")
    for twin in twin_list.list:
        print(f"  {twin.module_id}: {twin.state}")

Raw Predictions

Get predictions as raw aggregate values:
# Get raw state vectors instead of mapped values
response_raw, status = eca.cortex.predictions_raw(data=request)

for prediction in response_raw.next_state:
    for twin in prediction.list:
        # Raw state vector
        print(f"Module: {twin.module_id}")
        print(f"State vector: {twin.state}")  # List[float]

Best Practices

All ESS entries for a module_id must have:
  • Same state vector dimensionality
  • Same component order
  • Same units and scaling
# Good: Consistent structure
ess1 = ESSInsert(module_id="sensor", state=[temp, humidity, pressure])
ess2 = ESSInsert(module_id="sensor", state=[temp, humidity, pressure])

# Bad: Inconsistent
ess1 = ESSInsert(module_id="sensor", state=[temp, humidity])
ess2 = ESSInsert(module_id="sensor", state=[temp, humidity, pressure])
Choose descriptive names that indicate:
  • What the module represents
  • Its level of abstraction
  • Its domain or context
# Clear and descriptive
"hvac_unit_bedroom"
"temperature_sensor"
"building_climate_aggregate"

# Vague or unclear
"module1"
"temp"
"aggregate"
When creating aggregates:
  • Include related, interacting components
  • Maintain component ordering consistency
  • Validate aggregate metadata accuracy
  • Use the helper function from README
The system validates aggregate structure, so ensure:
  • aggregate_shape matches actual component dimensions
  • aggregate_id references exist in ESS
  • Metric calculations are correct
Design modules at appropriate scope:
  • Too narrow: “temp_reading_1” (overly specific)
  • Good: “temperature” (right level)
  • Too broad: “all_sensors” (too general)
Each module should represent a coherent concept.

Common Patterns

Pattern: Sensor Array

Multiple identical sensors, different locations:
# Same module_id, different instances
for i, location in enumerate(["room_1", "room_2", "room_3"]):
    config = Broker(
        url=kafka_url,
        sub_topic=f"temperature/{location}",
        group="temperature_sensors",
        pub_topic=""
    )
    
    stream = Stream(config=config, sync_rate=10.0)
    stream.publish(lambda: read_temperature(location))

Pattern: Multi-Level Aggregates

Build system understanding hierarchically:
# Level 1: Components
components = [temp_ess, humidity_ess, co2_ess]
room_agg = create_aggregate_from_ess(components, room_template)

# Level 2: Room aggregate
eca.ess.create(data=room_agg)

# Level 3: Floor aggregate (aggregate of aggregates)
floor_components = [room_1_agg, room_2_agg, room_3_agg]
floor_agg = create_aggregate_from_ess(floor_components, floor_template)
eca.ess.create(data=floor_agg)

Pattern: Dynamic Module Creation

Create modules programmatically:
def create_module_for_sensor(sensor_config):
    """
    Dynamically create and configure a module for a sensor.
    """
    module_id = f"{sensor_config['type']}_{sensor_config['location']}"
    
    def capture_state():
        value = sensor_config['read_function']()
        return Signal(
            state=[value],
            valence=sensor_config['assess_function'](value)
        )
    
    config = Broker(
        url=os.environ["KAFKA_URL"],
        sub_topic=f"{sensor_config['type']}/{sensor_config['location']}",
        group=sensor_config['type'],
        pub_topic=""
    )
    
    stream = Stream(config=config, sync_rate=sensor_config['interval'])
    return stream, capture_state

# Use it
sensors = [
    {"type": "temperature", "location": "bedroom", "read_function": read_temp, "assess_function": assess_temp, "interval": 10},
    {"type": "humidity", "location": "bedroom", "read_function": read_humidity, "assess_function": assess_humidity, "interval": 10},
]

for sensor in sensors:
    stream, handler = create_module_for_sensor(sensor)
    stream.publish(handler)

Next Steps

Signals

Learn about the Signal data structure

Digital Twins

Understand digital twin concepts

ESS API

Complete ESS API reference

Cortex API

Prediction API reference