Skip to content

Commit

Permalink
Merge pull request #20 from datacol-io/19_streaming_logs
Browse files Browse the repository at this point in the history
adding streaming logs for cli
  • Loading branch information
dinesh committed May 31, 2017
2 parents 98cbca2 + 5820c6b commit b21cc76
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 143 deletions.
253 changes: 140 additions & 113 deletions api/controller/services.pb.go

Large diffs are not rendered by default.

18 changes: 13 additions & 5 deletions api/controller/services.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/controller/services.proto
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ service ProviderService {
};
}

rpc LogStream(LogStreamReq) returns (LogStreamResponse) {
rpc LogStream(LogStreamReq) returns (stream LogStreamResponse) {
option (google.api.http) = {
get: "/v1/logs/{name}"
};
Expand Down
2 changes: 1 addition & 1 deletion api/controller/services.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@
"operationId": "LogStream",
"responses": {
"200": {
"description": "",
"description": "(streaming responses)",
"schema": {
"$ref": "#/definitions/controllerLogStreamResponse"
}
Expand Down
29 changes: 21 additions & 8 deletions api/server.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"bytes"
"cloud.google.com/go/compute/metadata"
"fmt"
pbs "github.com/dinesh/datacol/api/controller"
Expand Down Expand Up @@ -298,21 +297,35 @@ func (s *Server) ResourceUnlink(ctx context.Context, req *pbs.AppResourceReq) (*
return ret, nil
}

func (s *Server) LogStream(ctx context.Context, req *pbs.LogStreamReq) (*pbs.LogStreamResponse, error) {
buf := new(bytes.Buffer)
func (s *Server) LogStream(req *pbs.LogStreamReq, stream pbs.ProviderService_LogStreamServer) error {
since, err := ptypes.Duration(req.Since)
if err != nil {
return nil, err
return err
}

if err := s.Provider.LogStream(req.Name, buf, pb.LogStreamOptions{
reader, close, err := s.Provider.LogStream(req.Name, pb.LogStreamOptions{
Follow: req.Follow,
Since: since,
}); err != nil {
return nil, err
})

if err != nil {
return err
}
defer close()

for {
line, err := reader.ReadBytes('\n')
if err != nil {
if err == io.EOF {
return nil
}
return err
}

return &pbs.LogStreamResponse{Data: buf.Bytes()}, nil
if err := stream.Send(&pbs.LogStreamResponse{Data: line}); err != nil {
return err
}
}
}

func (s *Server) unaryInterceptor(
Expand Down
20 changes: 14 additions & 6 deletions client/apps.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package client

import (
"bytes"
pbs "github.com/dinesh/datacol/api/controller"
pb "github.com/dinesh/datacol/api/models"
"github.com/golang/protobuf/ptypes"
Expand Down Expand Up @@ -36,7 +35,7 @@ func (c *Client) RestartApp(name string) error {
}

func (c *Client) StreamAppLogs(name string, follow bool, since time.Duration, out io.Writer) error {
ret, err := c.ProviderServiceClient.LogStream(ctx, &pbs.LogStreamReq{
stream, err := c.ProviderServiceClient.LogStream(ctx, &pbs.LogStreamReq{
Name: name,
Since: ptypes.DurationProto(since),
Follow: follow,
Expand All @@ -45,11 +44,20 @@ func (c *Client) StreamAppLogs(name string, follow bool, since time.Duration, ou
return err
}

if _, err = io.Copy(out, bytes.NewBuffer(ret.Data)); err != nil {
return err
}
defer stream.CloseSend()

return nil
for {
ret, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
if _, err := out.Write(ret.Data); err != nil {
return err
}
}
}

func (c *Client) RunProcess(name string, args []string) (*pbs.CmdResponse, error) {
Expand Down
13 changes: 6 additions & 7 deletions cloud/google/provider.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package google

import (
"bufio"
"cloud.google.com/go/datastore"
"context"
"fmt"
Expand Down Expand Up @@ -98,16 +99,16 @@ func runningPods(ns, app string, c *kubernetes.Clientset) (string, error) {
return podNames[0], nil
}

func (g *GCPCloud) LogStream(app string, out io.Writer, opts pb.LogStreamOptions) error {
func (g *GCPCloud) LogStream(app string, opts pb.LogStreamOptions) (*bufio.Reader, func() error, error) {
ns := g.DeploymentName
c, err := getKubeClientset(ns)
if err != nil {
return err
return nil, nil, err
}

pod, err := runningPods(ns, app, c)
if err != nil {
return err
return nil, nil, err
}

log.Debugf("Getting logs from pod %s", pod)
Expand All @@ -127,12 +128,10 @@ func (g *GCPCloud) LogStream(app string, out io.Writer, opts pb.LogStreamOptions

rc, err := req.Stream()
if err != nil {
return err
return nil, nil, err
}

defer rc.Close()
_, err = io.Copy(out, rc)
return err
return bufio.NewReader(rc), rc.Close, nil
}

func (g *GCPCloud) storage() *storage.Service {
Expand Down
3 changes: 2 additions & 1 deletion cloud/provider.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cloud

import (
"bufio"
pb "github.com/dinesh/datacol/api/models"
"io"
)
Expand All @@ -25,7 +26,7 @@ type Provider interface {
ReleaseList(string, int) (pb.Releases, error)
ReleaseDelete(string, string) error

LogStream(app string, w io.Writer, opts pb.LogStreamOptions) error
LogStream(app string, opts pb.LogStreamOptions) (*bufio.Reader, func() error, error)

ResourceList() (pb.Resources, error)
ResourceCreate(name, kind string, params map[string]string) (*pb.Resource, error)
Expand Down
3 changes: 2 additions & 1 deletion cmd/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ func init() {
Action: cmdAppLogStream,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "follow, f",
Name: "follow",
Aliases: []string{"f"},
Usage: "keep streaming new log output (default)",
},
&cli.DurationFlag{
Expand Down

0 comments on commit b21cc76

Please sign in to comment.