forked from deepch/RTSPtoWSMP4f
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhttp.go
259 lines (210 loc) · 6.91 KB
/
http.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
package main
import (
"bytes"
"fmt"
"log"
"net/http"
"sort"
"time"
"github.com/deepch/vdk/av"
"github.com/deepch/vdk/format/mp4f"
"github.com/gin-gonic/gin"
)
type StreamConnectionRequest struct {
URL string
}
type SteamConnection struct {
suuid string
connectionURLs []string
}
var streamConnections map[string]SteamConnection
func serveHTTP() {
streamConnections = make(map[string]SteamConnection)
router := gin.Default()
gin.SetMode(gin.DebugMode)
router.GET("api/streams", func(c *gin.Context) {
streams := Config.getStreamNames()
sort.Strings(streams)
c.JSON(http.StatusOK, streams)
})
router.POST("api/streams/:suuid/connect", func(c *gin.Context) {
var suuid = c.Param("suuid")
var streamConnectionRequest StreamConnectionRequest
if !Config.streamExists(suuid) {
log.Println(fmt.Sprintf("Stream %s does not exist", suuid))
c.String(http.StatusNotFound, "Not found")
return
}
if err := c.BindJSON(&streamConnectionRequest); err != nil {
log.Println(fmt.Sprintf("Error connecting to stream %s: %v", suuid, err))
c.String(http.StatusInternalServerError, "Internal error")
return
}
connectStream(suuid, streamConnectionRequest.URL)
c.JSON(http.StatusOK, streamConnectionRequest)
})
router.POST("api/streams/:suuid/disconnect", func(c *gin.Context) {
var suuid = c.Param("suuid")
var streamConnectionRequest StreamConnectionRequest
if !Config.streamExists(suuid) {
log.Println(fmt.Sprintf("Stream %s does not exist", suuid))
c.String(http.StatusNotFound, "Not found")
return
}
if err := c.BindJSON(&streamConnectionRequest); err != nil {
log.Println(fmt.Sprintf("Error disconnecting from stream %s: %v", suuid, err))
c.String(http.StatusInternalServerError, "Fail")
return
}
disconnectStream(suuid, streamConnectionRequest.URL)
c.JSON(http.StatusOK, streamConnectionRequest)
})
// Start the HTTP server
err := router.Run(Config.Server.HTTPPort)
if err != nil {
log.Fatalln(err)
}
}
func connectStream(suuid string, connectionURL string) {
// Get the current connections for the suuid
currentConnections, exists := streamConnections[suuid]
if !exists {
// Add a new StreamConnection if it is the first one for this stream
streamConnections[suuid] = SteamConnection{suuid, []string{connectionURL}}
log.Println(fmt.Sprintf("Conection URL %s added to stream %s. Stream will be started.", connectionURL, suuid))
go streamRelay(suuid, connectionURL)
} else {
// Determine if the connection URL has already been added
existingConnectionURLIndex := sort.SearchStrings(currentConnections.connectionURLs, connectionURL)
if existingConnectionURLIndex < len(currentConnections.connectionURLs) {
// ConnectionURL is alreay configured
log.Println(fmt.Sprintf("Conection URL %s is already connected to stream %s", connectionURL, suuid))
} else {
// new ConnectionURL
log.Println(fmt.Sprintf("Conection URL %s added to stream %s", connectionURL, suuid))
currentConnections.connectionURLs = append(currentConnections.connectionURLs, connectionURL)
go streamRelay(suuid, connectionURL)
}
}
}
func disconnectStream(suuid string, connectionURL string) {
// Get the current connections for the suuid
currentConnections, exists := streamConnections[suuid]
if exists {
// Determine if the connection URL has already been added
existingConnectionURLIndex := sort.SearchStrings(currentConnections.connectionURLs, connectionURL)
if existingConnectionURLIndex < len(currentConnections.connectionURLs) {
// ConnectionURL is configured, let's remove it
currentConnections.connectionURLs = append(currentConnections.connectionURLs[:existingConnectionURLIndex], currentConnections.connectionURLs[existingConnectionURLIndex+1:]...)
log.Println(fmt.Sprintf("Conection URL %s disconnected from stream %s", connectionURL, suuid))
// Check for emty connectionURLs
if len(currentConnections.connectionURLs) == 0 {
log.Println(fmt.Sprintf("No active connections to stream %s. Stream will be stopped.", suuid))
delete(streamConnections, suuid)
}
}
}
}
func connectionExists(suuid string, connectionURL string) bool {
currentConnections, exists := streamConnections[suuid]
if exists {
existingConnectionURLIndex := sort.SearchStrings(currentConnections.connectionURLs, connectionURL)
if existingConnectionURLIndex < len(currentConnections.connectionURLs) {
return true
}
}
return false
}
func streamRelay(suuid string, connectionURL string) {
if !Config.streamExists(suuid) {
log.Println("Stream Not Found")
return
}
// Disconnect the client when method out of scope
defer disconnectStream(suuid, connectionURL)
// Start RTSP Client if it isn't running already
Config.RunIFNotRun(suuid)
// Add a new client UUID for the stream. Return the client UUID and the data channel
cuuid, ch := Config.addClient(suuid)
fullUrl := connectionURL + "/" + suuid
// Remove the client when this method goes out of scope
defer Config.deleteClient(suuid, cuuid)
// Get the codecs of the stream
codecs := Config.coGe(suuid)
if codecs == nil {
log.Println("Codecs Error")
return
}
for i, codec := range codecs {
if codec.Type().IsAudio() && codec.Type() != av.AAC {
log.Println("Track", i, "Audio Codec Work Only AAC")
}
}
// Create a new fMP4 muxer
muxer := mp4f.NewMuxer(nil)
err := muxer.WriteHeader(codecs)
if err != nil {
log.Println("muxer.WriteHeader", err)
return
}
// Get initial data
meta, init := muxer.GetInit(codecs)
log.Println(fmt.Sprintf("[%s] Sending meta (%s) and init (%d bytes) to client", connectionURL, meta, len(init)))
// Send header
err = postData(fullUrl, append([]byte{9}, meta...))
if err != nil {
return
}
// Send init
err = postData(fullUrl, init)
if err != nil {
return
}
var start bool
// Check for no video every 10s
noVideo := time.NewTimer(10 * time.Second)
var timeLine = make(map[int8]time.Duration)
for {
select {
// No video after 10s
case <-noVideo.C:
log.Println("noVideo")
return
// Data from the RTSP client
case pck := <-ch:
// Check if connection is still valid
if !connectionExists(suuid, connectionURL) {
log.Println(fmt.Sprintf("[%s] client has disconnected. Stopping relay.", connectionURL))
return
}
// Check for keyframe
if pck.IsKeyFrame {
noVideo.Reset(10 * time.Second)
start = true
}
// Wait for a key frame
if !start {
continue
}
timeLine[pck.Idx] += pck.Duration
pck.Time = timeLine[pck.Idx]
// Transform the packet to fMP4
ready, buf, _ := muxer.WritePacket(pck, false)
if ready {
log.Println(fmt.Sprintf("[%s] Sending data buffer (%d bytes)", fullUrl, len(buf)))
err = postData(fullUrl, buf)
if err != nil {
return
}
}
}
}
}
func postData(url string, data []byte) error {
_, err := http.Post(url, "application/octet-stream", bytes.NewReader(data))
if err != nil {
log.Println(fmt.Sprintf("[%s] Failed to send data", url))
return err
}
return nil
}