Contents in this wiki are for entertainment purposes only
This is not fiction ∞ this is psience of mind

ESP32 Packet Format & DataNet Protocol

From Catcliffe Development
Revision as of 15:51, 28 June 2025 by XenoEngineer (talk | contribs) (Created page with "Category:dataNet <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>ESP32 Packet Format & Protocol Details</title> <style> body { font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; margin: 0; padding: 20px; background: linear-gradient(135deg, #2c3e50 0%, #34495e 100%); min-height: 100vh;...")
(diff) โ† Older revision | Latest revision (diff) | Newer revision โ†’ (diff)
Jump to navigation Jump to search


ESP32 Packet Format & Protocol Details

๐Ÿ“ก ESP32 Packet Format & Protocol Details

๐Ÿ”— Event Data Packet Structure
HEADER
2 bytes
Magic: 0xABCD
PACKET_ID
2 bytes
Sequence number
ON_TIMESTAMP
4 bytes
Microseconds
OFF_TIMESTAMP
4 bytes
Microseconds
EVENT_COUNT
4 bytes
Binary counter
CHECKSUM
2 bytes
CRC16

Total packet size: 18 bytes | Block size: 10-100 packets (180-1800 bytes)

๐Ÿš€ ESP32 Firmware Event Capture

// ESP32 Interrupt Handler for Binary Input volatile uint32_t on_timestamp = 0; volatile bool waiting_for_off = false; EventPacket event_buffer[BLOCK_SIZE]; volatile int buffer_index = 0; void IRAM_ATTR binary_input_isr() { uint32_t current_time = esp_timer_get_time(); // microseconds if (digitalRead(BINARY_INPUT_PIN) == HIGH) { // Rising edge - store ON timestamp on_timestamp = current_time; waiting_for_off = true; } else if (waiting_for_off) { // Falling edge - capture complete event EventPacket* packet = &event_buffer[buffer_index]; packet->header = 0xABCD; packet->packet_id = ++sequence_counter; packet->on_timestamp = on_timestamp; packet->off_timestamp = current_time; packet->event_count = read_binary_counter(); // Sample counter NOW packet->checksum = calculate_crc16(packet); buffer_index++; waiting_for_off = false; // Check if block is full if (buffer_index >= BLOCK_SIZE) { xQueueSendFromISR(transmit_queue, &event_buffer, NULL); buffer_index = 0; // Reset for next block } } }
โฑ๏ธ Event Capture Timing Sequence
Binary Input
HIGH
โ†’
Store
ON timestamp
โ†’
Binary Input
LOW
โ†’
Capture
OFF timestamp
โ†’
Sample
Counter
โ†’
Store
Event Packet
โ†’
Block Full?
Transmit

๐Ÿ“ค Block Transmission Protocol (UDP)

Block Header Format:
FieldSizeDescriptionExample
Block Magic4 bytes0xDEADBEEFBlock identifier
Block ID4 bytesSequential0x00000001
Packet Count2 bytesEvents in block50
Timestamp Base4 bytesBlock start timeSystem uptime
Event PacketsN * 18 bytesEvent data arrayVariable
Block CRC4 bytesIntegrity checkCRC32

โš™๏ธ Control Register Map

0x1000 - CONTROL_MODE
Bit 0: Enable event capture
Bit 1: Reset counter
Bit 2: Trigger calibration
0x1004 - BLOCK_SIZE
Number of events per transmission block (10-100)
Default: 25 events
0x1008 - THRESHOLD_VALUE
Digital input threshold voltage (mV)
For add-on board signal conditioning
0x100C - COUNTER_PRESCALER
Binary counter clock divider
Controls counting resolution
0x1010 - STATUS_FLAGS
Bit 0: Buffer overflow
Bit 1: WiFi connected
Bit 2: Counter running
0x1014 - DEBUG_OUTPUT
Test signal generation
For add-on board verification

๐Ÿ”„ PC โ†” ESP32 Communication Protocol

# Python Device Communication Service import socket import struct import asyncio class ESP32Protocol: def __init__(self, esp32_ip, port=8888): self.esp32_ip = esp32_ip self.port = port self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) def write_register(self, address, value): # Command format: [CMD][ADDR][VALUE] packet = struct.pack('= 18: # Minimum block header block = self.parse_event_block(data) await callback(block) def parse_event_block(self, data): # Parse block header magic, block_id, count, timestamp_base = struct.unpack('

๐Ÿ“Š Analysis Service Data Processing

# Event Data Service - Pulse Analysis import numpy as np from collections import deque class PulseAnalyzer: def __init__(self, window_size=1000): self.pulse_widths = deque(maxlen=window_size) self.frequencies = deque(maxlen=window_size) self.counter_deltas = deque(maxlen=window_size) self.last_counter = 0 def process_event_block(self, block): events = block['events'] for event in events: # Calculate pulse characteristics pulse_width = event['pulse_width'] # microseconds counter_delta = event['event_count'] - self.last_counter self.pulse_widths.append(pulse_width) self.counter_deltas.append(counter_delta) self.last_counter = event['event_count'] # Calculate frequency from recent events if len(events) > 1: time_span = events[-1]['off_timestamp'] - events[0]['on_timestamp'] frequency = (len(events) - 1) * 1e6 / time_span # Hz self.frequencies.append(frequency) # Return analysis results for UI feedback return { 'avg_pulse_width': np.mean(self.pulse_widths) if self.pulse_widths else 0, 'frequency': np.mean(self.frequencies) if self.frequencies else 0, 'counter_rate': np.mean(self.counter_deltas) if self.counter_deltas else 0, 'pulse_jitter': np.std(self.pulse_widths) if len(self.pulse_widths) > 1 else 0 }