input: nats_jetstream: urls: [ nats://TODO:4222 ] queue: myqueue subject: traffic.light.events deliver: allbuffer: system_window: timestamp_mapping: root = this.created_at size: 1hpipeline: processors: - group_by_value: value: '${! json("traffic_light_id") }' - mapping: | root = if batch_index() == 0 { { "traffic_light_id": this.traffic_light_id, "created_at": @window_end_timestamp, "total_cars": json("registration_plate").from_all().unique().length(), "passengers": json("passengers").from_all().sum(), } } else { deleted() }output: http_client: url: https://example.com/traffic_data verb: POST max_in_flight: 64
First seen: 2025-05-10 01:17
Last seen: 2025-05-10 01:17