Streams and Tables
Streams and Tables
Streams and Tables are the foundational components of eKuiper's data processing engine. They define how data is ingested, structured, and queried before being processed by rules. eKuiper Manager provides a comprehensive interface to manage these resources, enhanced by AI-driven analysis and generation tools.
Streams
A Stream represents an unbounded flow of data from an external source (e.g., MQTT, HTTP, or a simulator). In eKuiper, every rule requires at least one stream or table as its data source.
Managing Streams via the UI
The Manager allows you to:
- List & Monitor: View all defined streams and their basic configurations.
- Visual Creation: Build stream definitions by specifying source types, data formats, and field schemas.
- SQL Definition: Directly input eKuiper SQL
CREATE STREAMstatements.
Supported Configurations
Based on the eKuiper engine, the manager supports the following configurations:
| Category | Supported Options |
| :--- | :--- |
| Source Types | mqtt, httppull, httppush, memory, neuron, edgex, file, redis, simulator |
| Data Formats | json, binary, protobuf, delimited |
| Field Types | bigint, float, string, boolean, datetime, bytea, array, struct |
AI-Powered Stream Generation
The Manager includes an AI Assistant that simplifies stream creation for technicians. You can provide a natural language prompt, and the AI will generate a valid stream definition.
Example Request: "Create a stream for an industrial MQTT temperature sensor on topic 'factory/sensor1' with fields for temperature and humidity."
AI Generated Output (Example):
{
"name": "temperature_sensor_stream",
"sourceType": "mqtt",
"datasource": "factory/sensor1",
"format": "json",
"fields": [
{ "name": "temperature", "type": "float" },
{ "name": "humidity", "type": "float" }
],
"description": "Ingests environmental data from factory floor MQTT broker."
}
Tables
Tables are used for stateful data or static reference datasets. Unlike streams, which process data as it arrives, tables allow rules to perform joins against look-up data (e.g., joining a live sensor stream with a static "Equipment Metadata" table).
- Schemaless Tables: Create tables without a strict schema for flexible data ingestion.
- State Tables: Maintain the latest state for specific keys (useful for "current status" monitoring).
AI Stream Analysis
For existing streams, the Manager provides an AI Analysis feature. This tool translates complex stream definitions into human-readable summaries, helping technicians understand data flow without parsing raw SQL or JSON.
API Usage:
POST /api/ai/stream-analyze
Request Body:
{
"streamData": { ...stream_definition_json... },
"modelName": "google/gemini-flash-1.5"
}
Functionality:
- Identifies the data source (e.g., "Ingesting via MQTT").
- Explains the key fields in plain English.
- Summarizes the industrial relevance of the data structure.
API Reference
The eKuiper Manager acts as a proxy to the underlying eKuiper instance. Below are the primary endpoints for managing streams and tables via the Manager API.
1. List All Streams
GET /api/connections/[id]/ekuiper/streams
- Returns: An array of stream names.
2. Create a Stream
POST /api/connections/[id]/ekuiper/streams
- Body:
{ "sql": "CREATE STREAM demo (temp FLOAT) WITH (DATASOURCE=\"test\", FORMAT=\"json\")" }
3. AI Stream Generation
POST /api/ai/stream-gen
- Input: Natural language
prompt. - Output: A JSON object containing
name,sourceType,datasource,format, andfields.
4. Delete a Resource
DELETE /api/connections/[id]/ekuiper/streams/[stream_name]
- Action: Removes the stream definition from the eKuiper engine.
Note: When using the Manager, ensure your Connection ID is correctly specified in the URL path to route requests to the intended eKuiper instance.