Spaces:
Runtime error
Runtime error
package tools | |
import ( | |
"bufio" | |
"bytes" | |
"io" | |
"log" | |
"strings" | |
) | |
const fieldSeparator = ":" | |
type SSEClient struct { | |
EventSource io.ReadCloser | |
logger *log.Logger | |
} | |
type Event struct { | |
ID string | |
Event string | |
Data string | |
Retry string | |
} | |
func NewSSEClient(eventSource io.ReadCloser) *SSEClient { | |
return &SSEClient{ | |
EventSource: eventSource, | |
logger: log.New(log.Writer(), "SSEClient: ", log.LstdFlags), | |
} | |
} | |
func (c *SSEClient) Read() <-chan Event { | |
events := make(chan Event) | |
go func() { | |
defer close(events) | |
reader := bufio.NewReaderSize(c.EventSource, 128*1024) | |
var data bytes.Buffer | |
for { | |
line, err := reader.ReadBytes('\n') | |
if err == io.EOF { | |
break | |
} | |
if err != nil { | |
c.logger.Printf("Error reading from event source: %v", err) | |
break | |
} | |
data.Write(line) | |
if bytes.HasSuffix(data.Bytes(), []byte("\n\n")) || bytes.HasSuffix(data.Bytes(), []byte("\r\n\r\n")) { | |
event := c.parseEvent(data.String()) | |
if event.Data != "" { | |
events <- event | |
} | |
data.Reset() | |
} | |
} | |
}() | |
return events | |
} | |
func (c *SSEClient) parseEvent(data string) Event { | |
event := Event{ | |
ID: "", | |
Event: "message", | |
Data: "", | |
Retry: "", | |
} | |
lines := strings.Split(data, "\n") | |
for _, line := range lines { | |
if strings.TrimSpace(line) == "" || strings.HasPrefix(line, fieldSeparator) { | |
continue | |
} | |
parts := strings.SplitN(line, fieldSeparator, 2) | |
field := parts[0] | |
var value string | |
if len(parts) == 2 { | |
value = strings.TrimPrefix(parts[1], " ") | |
} | |
switch field { | |
case "id": | |
event.ID = value | |
case "event": | |
event.Event = value | |
case "data": | |
event.Data += value + "\n" | |
case "retry": | |
event.Retry = value | |
} | |
} | |
if strings.HasSuffix(event.Data, "\n") { | |
event.Data = strings.TrimSuffix(event.Data, "\n") | |
} | |
return event | |
} | |
func (c *SSEClient) Close() error { | |
err := c.EventSource.Close() | |
if err != nil { | |
return err | |
} | |
return nil | |
} | |